diff --git a/autobatch/autobatch.go b/autobatch/autobatch.go index 62ea38d681e9c6fcda03286f2114cdb9f7e1f409..aeca9a455bbfc842b2c067497a1c92d8731ccc1b 100644 --- a/autobatch/autobatch.go +++ b/autobatch/autobatch.go @@ -13,38 +13,43 @@ type Datastore struct { child ds.Batching // TODO: discuss making ds.Batch implement the full ds.Datastore interface - buffer map[ds.Key][]byte + buffer map[ds.Key]op maxBufferEntries int } +type op struct { + delete bool + value []byte +} + // NewAutoBatching returns a new datastore that automatically // batches writes using the given Batching datastore. The size // of the memory pool is given by size. func NewAutoBatching(d ds.Batching, size int) *Datastore { return &Datastore{ child: d, - buffer: make(map[ds.Key][]byte), + buffer: make(map[ds.Key]op, size), maxBufferEntries: size, } } // Delete deletes a key/value func (d *Datastore) Delete(k ds.Key) error { - _, found := d.buffer[k] - delete(d.buffer, k) - - err := d.child.Delete(k) - if found && err == ds.ErrNotFound { - return nil + d.buffer[k] = op{delete: true} + if len(d.buffer) > d.maxBufferEntries { + return d.Flush() } - return err + return nil } // Get retrieves a value given a key. func (d *Datastore) Get(k ds.Key) ([]byte, error) { - val, ok := d.buffer[k] + o, ok := d.buffer[k] if ok { - return val, nil + if o.delete { + return nil, ds.ErrNotFound + } + return o.value, nil } return d.child.Get(k) @@ -52,7 +57,7 @@ func (d *Datastore) Get(k ds.Key) ([]byte, error) { // Put stores a key/value. func (d *Datastore) Put(k ds.Key, val []byte) error { - d.buffer[k] = val + d.buffer[k] = op{value: val} if len(d.buffer) > d.maxBufferEntries { return d.Flush() } @@ -66,23 +71,32 @@ func (d *Datastore) Flush() error { return err } - for k, v := range d.buffer { - err := b.Put(k, v) + for k, o := range d.buffer { + var err error + if o.delete { + err = b.Delete(k) + if err == ds.ErrNotFound { + // Ignore these, let delete be idempotent. + err = nil + } + } else { + err = b.Put(k, o.value) + } if err != nil { return err } } // clear out buffer - d.buffer = make(map[ds.Key][]byte) + d.buffer = make(map[ds.Key]op, d.maxBufferEntries) return b.Commit() } // Has checks if a key is stored. func (d *Datastore) Has(k ds.Key) (bool, error) { - _, ok := d.buffer[k] + o, ok := d.buffer[k] if ok { - return true, nil + return !o.delete, nil } return d.child.Has(k) @@ -90,9 +104,12 @@ func (d *Datastore) Has(k ds.Key) (bool, error) { // GetSize implements Datastore.GetSize func (d *Datastore) GetSize(k ds.Key) (int, error) { - v, ok := d.buffer[k] + o, ok := d.buffer[k] if ok { - return len(v), nil + if o.delete { + return -1, ds.ErrNotFound + } + return len(o.value), nil } return d.child.GetSize(k) @@ -114,5 +131,13 @@ func (d *Datastore) DiskUsage() (uint64, error) { } func (d *Datastore) Close() error { - return d.child.Close() + err1 := d.Flush() + err2 := d.child.Close() + if err1 != nil { + return err1 + } + if err2 != nil { + return err2 + } + return nil } diff --git a/autobatch/autobatch_test.go b/autobatch/autobatch_test.go index 4eebb5339479d540834bd344d9bac91cae078932..0dcf2a0c5d6bf77daadd4546159b637a4f108571 100644 --- a/autobatch/autobatch_test.go +++ b/autobatch/autobatch_test.go @@ -30,18 +30,48 @@ func TestFlushing(t *testing.T) { } } + // Get works normally. + for _, k := range keys { + val, err := d.Get(k) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(val, v) { + t.Fatal("wrong value") + } + } + + // Not flushed _, err := child.Get(keys[0]) if err != ds.ErrNotFound { t.Fatal("shouldnt have found value") } + // Delete works. + err = d.Delete(keys[14]) + if err != nil { + t.Fatal(err) + } + _, err = d.Get(keys[14]) + if err != ds.ErrNotFound { + t.Fatal(err) + } + + // Still not flushed + _, err = child.Get(keys[0]) + if err != ds.ErrNotFound { + t.Fatal("shouldnt have found value") + } + + // Final put flushes. err = d.Put(ds.NewKey("test16"), v) if err != nil { t.Fatal(err) } // should be flushed now, try to get keys from child datastore - for _, k := range keys { + for _, k := range keys[:14] { val, err := child.Get(k) if err != nil { t.Fatal(err) @@ -51,4 +81,25 @@ func TestFlushing(t *testing.T) { t.Fatal("wrong value") } } + + // Never flushed the deleted key. + _, err = child.Get(keys[14]) + if err != ds.ErrNotFound { + t.Fatal("shouldnt have found value") + } + + // Delete doesn't flush + err = d.Delete(keys[0]) + if err != nil { + t.Fatal(err) + } + + val, err := child.Get(keys[0]) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(val, v) { + t.Fatal("wrong value") + } }