Commit df0989a3 authored by Jeromy's avatar Jeromy

add in support for batched writes

implement batch ops for different datastore types

rename Transaction to Batch

Revert "add in support for batched writes"

add in benchmarks for put and batchput

move batching into separate interface

address concerns from PR

regrab old code
parent e4b84bef
......@@ -63,8 +63,8 @@ func (d *MapDatastore) Query(q dsq.Query) (dsq.Results, error) {
return r, nil
}
func (d *MapDatastore) StartBatchOp() Transaction {
return newBasicTransaction(d)
func (d *MapDatastore) Batch() (Batch, error) {
return NewBasicBatch(d), nil
}
// NullDatastore stores nothing, but conforms to the API.
......@@ -102,8 +102,8 @@ func (d *NullDatastore) Query(q dsq.Query) (dsq.Results, error) {
return dsq.ResultsWithEntries(q, nil), nil
}
func (d *NullDatastore) StartBatchOp() Transaction {
return newBasicTransaction(d)
func (d *NullDatastore) Batch() (Batch, error) {
return NewBasicBatch(d), nil
}
// LogDatastore logs all accesses through the datastore.
......@@ -120,7 +120,7 @@ type Shim interface {
}
// NewLogDatastore constructs a log datastore.
func NewLogDatastore(ds Datastore, name string) Shim {
func NewLogDatastore(ds Datastore, name string) *LogDatastore {
if len(name) < 1 {
name = "LogDatastore"
}
......@@ -163,7 +163,11 @@ func (d *LogDatastore) Query(q dsq.Query) (dsq.Results, error) {
return d.child.Query(q)
}
func (d *LogDatastore) StartBatchOp() Transaction {
log.Printf("%s: StartBatchOp\n", d.Name)
return d.child.StartBatchOp()
func (d *LogDatastore) Batch() (Batch, error) {
log.Printf("%s: Batch\n", d.Name)
bds, ok := d.child.(BatchingDatastore)
if !ok {
return nil, ErrBatchUnsupported
}
return bds.Batch()
}
package datastore
type GetCallback func(interface{}, error)
// basicTransaction implements the transaction interface for datastores who do
// basicBatch implements the transaction interface for datastores who do
// not have any sort of underlying transactional support
type basicTransaction struct {
type basicBatch struct {
puts map[Key]interface{}
gets map[Key]GetCallback
deletes map[Key]struct{}
target Datastore
}
func newBasicTransaction(ds Datastore) Transaction {
return &basicTransaction{
func NewBasicBatch(ds Datastore) Batch {
return &basicBatch{
puts: make(map[Key]interface{}),
gets: make(map[Key]GetCallback),
deletes: make(map[Key]struct{}),
target: ds,
}
}
func (bt *basicTransaction) Put(key Key, val interface{}) error {
func (bt *basicBatch) Put(key Key, val interface{}) error {
bt.puts[key] = val
return nil
}
func (bt *basicTransaction) Delete(key Key) error {
func (bt *basicBatch) Delete(key Key) error {
bt.deletes[key] = struct{}{}
return nil
}
func (bt *basicTransaction) Commit() error {
for k, cb := range bt.gets {
cb(bt.target.Get(k))
}
func (bt *basicBatch) Commit() error {
for k, val := range bt.puts {
if err := bt.target.Put(k, val); err != nil {
return err
......
......@@ -10,7 +10,7 @@ type Datastore struct {
F func()
}
func Wrap(ds ds.Datastore, f func()) ds.Datastore {
func Wrap(ds ds.Datastore, f func()) *Datastore {
return &Datastore{ds, f}
}
......
......@@ -67,11 +67,16 @@ type Datastore interface {
// result.AllEntries()
//
Query(q query.Query) (query.Results, error)
}
type BatchingDatastore interface {
Datastore
// StartBatchOp begins a datastore transaction
StartBatchOp() Transaction
Batch() (Batch, error)
}
var ErrBatchUnsupported = errors.New("this datastore does not support batching")
// ThreadSafeDatastore is an interface that all threadsafe datastore should
// implement to leverage type safety checks.
type ThreadSafeDatastore interface {
......@@ -108,7 +113,7 @@ func GetBackedHas(ds Datastore, key Key) (bool, error) {
}
}
type Transaction interface {
type Batch interface {
Put(key Key, val interface{}) error
Delete(key Key) error
......
......@@ -323,38 +323,32 @@ func (fs *Datastore) enumerateKeys(fi os.FileInfo, res []query.Entry) ([]query.E
return res, nil
}
type flatfsTransaction struct {
type flatfsBatch struct {
puts map[datastore.Key]interface{}
gets map[datastore.Key]datastore.GetCallback
deletes map[datastore.Key]struct{}
ds *Datastore
}
func (fs *Datastore) StartBatchOp() datastore.Transaction {
return &flatfsTransaction{
func (fs *Datastore) Batch() datastore.Batch {
return &flatfsBatch{
puts: make(map[datastore.Key]interface{}),
gets: make(map[datastore.Key]datastore.GetCallback),
deletes: make(map[datastore.Key]struct{}),
ds: fs,
}
}
func (bt *flatfsTransaction) Put(key datastore.Key, val interface{}) error {
func (bt *flatfsBatch) Put(key datastore.Key, val interface{}) error {
bt.puts[key] = val
return nil
}
func (bt *flatfsTransaction) Delete(key datastore.Key) error {
func (bt *flatfsBatch) Delete(key datastore.Key) error {
bt.deletes[key] = struct{}{}
return nil
}
func (bt *flatfsTransaction) Commit() error {
for k, cb := range bt.gets {
cb(bt.ds.Get(k))
}
func (bt *flatfsBatch) Commit() error {
if err := bt.ds.putMany(bt.puts); err != nil {
return err
}
......
package flatfs_test
import (
"encoding/base32"
"io/ioutil"
"os"
"path/filepath"
......@@ -10,6 +11,8 @@ import (
"github.com/jbenet/go-datastore"
"github.com/jbenet/go-datastore/flatfs"
"github.com/jbenet/go-datastore/query"
rand "github.com/dustin/randbo"
)
func tempdir(t testing.TB) (path string, cleanup func()) {
......@@ -316,3 +319,71 @@ func TestQuerySimple(t *testing.T) {
t.Errorf("did not see wanted key %q in %+v", myKey, entries)
}
}
func BenchmarkConsecutivePut(b *testing.B) {
r := rand.New()
var blocks [][]byte
var keys []datastore.Key
for i := 0; i < b.N; i++ {
blk := make([]byte, 256*1024)
r.Read(blk)
blocks = append(blocks, blk)
key := base32.StdEncoding.EncodeToString(blk[:8])
keys = append(keys, datastore.NewKey(key))
}
temp, cleanup := tempdir(b)
defer cleanup()
fs, err := flatfs.New(temp, 2)
if err != nil {
b.Fatalf("New fail: %v\n", err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := fs.Put(keys[i], blocks[i])
if err != nil {
b.Fatal(err)
}
}
}
func BenchmarkBatchedPut(b *testing.B) {
r := rand.New()
var blocks [][]byte
var keys []datastore.Key
for i := 0; i < b.N; i++ {
blk := make([]byte, 256*1024)
r.Read(blk)
blocks = append(blocks, blk)
key := base32.StdEncoding.EncodeToString(blk[:8])
keys = append(keys, datastore.NewKey(key))
}
temp, cleanup := tempdir(b)
defer cleanup()
fs, err := flatfs.New(temp, 2)
if err != nil {
b.Fatalf("New fail: %v\n", err)
}
b.ResetTimer()
for i := 0; i < b.N; {
batch := fs.Batch()
for n := i; i-n < 512 && i < b.N; i++ {
err := batch.Put(keys[i], blocks[i])
if err != nil {
b.Fatal(err)
}
}
err = batch.Commit()
if err != nil {
b.Fatal(err)
}
}
}
......@@ -16,6 +16,8 @@ type KeyTransform interface {
type Datastore interface {
ds.Shim
KeyTransform
Batch() (ds.Batch, error)
}
// Wrap wraps a given datastore with a KeyTransform function.
......
......@@ -73,3 +73,37 @@ func (d *ktds) Query(q dsq.Query) (dsq.Results, error) {
return dsq.DerivedResults(qr, ch), nil
}
func (d *ktds) Batch() (ds.Batch, error) {
bds, ok := d.child.(ds.BatchingDatastore)
if !ok {
return nil, ds.ErrBatchUnsupported
}
childbatch, err := bds.Batch()
if err != nil {
return nil, err
}
return &transformBatch{
dst: childbatch,
f: d.ConvertKey,
}, nil
}
type transformBatch struct {
dst ds.Batch
f KeyMapping
}
func (t *transformBatch) Put(key ds.Key, val interface{}) error {
return t.dst.Put(t.f(key), val)
}
func (t *transformBatch) Delete(key ds.Key) error {
return t.dst.Delete(t.f(key))
}
func (t *transformBatch) Commit() error {
return t.dst.Commit()
}
......@@ -100,40 +100,6 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
return qr, nil
}
type ldbBatch struct {
b *leveldb.Batch
d *datastore
}
func (d *datastore) StartBatchOp() ds.Transaction {
return &ldbBatch{
b: new(leveldb.Batch),
d: d,
}
}
func (b *ldbBatch) Put(key ds.Key, val interface{}) error {
v, ok := val.([]byte)
if !ok {
return ds.ErrInvalidType
}
b.b.Put(key.Bytes(), v) // #dealwithit
return nil
}
func (b *ldbBatch) Delete(key ds.Key) error {
b.b.Delete(key.Bytes())
return nil
}
func (b *ldbBatch) Commit() error {
opts := &opt.WriteOptions{Sync: true}
if err := b.d.DB.Write(b.b, opts); err != nil {
return err
}
return nil
}
func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
var rnge *util.Range
......
......@@ -148,6 +148,85 @@ func (m *measure) Query(q query.Query) (query.Results, error) {
return res, err
}
type measuredBatch struct {
puts int
deletes int
putts datastore.Batch
delts datastore.Batch
m *measure
}
func (m *measure) Batch() (datastore.Batch, error) {
bds, ok := m.backend.(datastore.BatchingDatastore)
if !ok {
return nil, datastore.ErrBatchUnsupported
}
pb, err := bds.Batch()
if err != nil {
return nil, err
}
db, err := bds.Batch()
if err != nil {
return nil, err
}
return &measuredBatch{
putts: pb,
delts: db,
m: m,
}, nil
}
func (mt *measuredBatch) Put(key datastore.Key, val interface{}) error {
mt.puts++
valb, ok := val.([]byte)
if !ok {
return datastore.ErrInvalidType
}
_ = mt.m.putSize.RecordValue(int64(len(valb)))
return mt.putts.Put(key, val)
}
func (mt *measuredBatch) Delete(key datastore.Key) error {
mt.deletes++
return mt.delts.Delete(key)
}
func (mt *measuredBatch) Commit() error {
err := logBatchCommit(mt.delts, mt.deletes, mt.m.deleteNum, mt.m.deleteErr, mt.m.deleteLatency)
if err != nil {
return err
}
err = logBatchCommit(mt.putts, mt.puts, mt.m.putNum, mt.m.putErr, mt.m.putLatency)
if err != nil {
return err
}
return nil
}
func logBatchCommit(b datastore.Batch, n int, num, errs metrics.Counter, lat *metrics.Histogram) error {
if n > 0 {
before := time.Now()
err := b.Commit()
took := int(time.Now().Sub(before)/time.Microsecond) / n
num.AddN(uint64(n))
for i := 0; i < n; i++ {
_ = lat.RecordValue(int64(took))
}
if err != nil {
errs.Add()
return err
}
}
return nil
}
func (m *measure) Close() error {
m.putNum.Remove()
m.putErr.Remove()
......
......@@ -114,3 +114,62 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {
r = query.ResultsReplaceQuery(r, q)
return r, nil
}
type mountBatch struct {
mounts map[string]datastore.Batch
d *Datastore
}
func (d *Datastore) Batch() (datastore.Batch, error) {
return &mountBatch{
mounts: make(map[string]datastore.Batch),
d: d,
}, nil
}
func (mt *mountBatch) lookupBatch(key datastore.Key) (datastore.Batch, datastore.Key, error) {
child, loc, rest := mt.d.lookup(key)
t, ok := mt.mounts[loc.String()]
if !ok {
bds, ok := child.(datastore.BatchingDatastore)
if !ok {
return nil, datastore.NewKey(""), datastore.ErrBatchUnsupported
}
var err error
t, err = bds.Batch()
if err != nil {
return nil, datastore.NewKey(""), err
}
mt.mounts[loc.String()] = t
}
return t, rest, nil
}
func (mt *mountBatch) Put(key datastore.Key, val interface{}) error {
t, rest, err := mt.lookupBatch(key)
if err != nil {
return err
}
return t.Put(rest, val)
}
func (mt *mountBatch) Delete(key datastore.Key) error {
t, rest, err := mt.lookupBatch(key)
if err != nil {
return err
}
return t.Delete(rest)
}
func (mt *mountBatch) Commit() error {
for _, t := range mt.mounts {
err := t.Commit()
if err != nil {
return err
}
}
return nil
}
......@@ -66,3 +66,34 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
}
return r, nil
}
type panicBatch struct {
t ds.Batch
}
func (p *panicBatch) Put(key ds.Key, val interface{}) error {
err := p.t.Put(key, val)
if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err)
panic("panic datastore: transaction put failed")
}
return nil
}
func (p *panicBatch) Delete(key ds.Key) error {
err := p.t.Delete(key)
if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err)
panic("panic datastore: transaction delete failed")
}
return nil
}
func (p *panicBatch) Commit() error {
err := p.t.Commit()
if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err)
panic("panic datastore: transaction commit failed")
}
return nil
}
......@@ -63,3 +63,43 @@ func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) {
defer d.RUnlock()
return d.child.Query(q)
}
func (d *MutexDatastore) Batch() (ds.Batch, error) {
d.RLock()
defer d.RUnlock()
bds, ok := d.child.(ds.BatchingDatastore)
if !ok {
return nil, ds.ErrBatchUnsupported
}
b, err := bds.Batch()
if err != nil {
return nil, err
}
return &syncBatch{
batch: b,
}, nil
}
type syncBatch struct {
lk sync.Mutex
batch ds.Batch
}
func (b *syncBatch) Put(key ds.Key, val interface{}) error {
b.lk.Lock()
defer b.lk.Unlock()
return b.batch.Put(key, val)
}
func (b *syncBatch) Delete(key ds.Key) error {
b.lk.Lock()
defer b.lk.Unlock()
return b.batch.Delete(key)
}
func (b *syncBatch) Commit() error {
b.lk.Lock()
defer b.lk.Unlock()
return b.batch.Commit()
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment