Unverified Commit 0f659c79 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #128 from ipfs/feat/batch-delete

autobatch: batch deletes
parents 55256606 534082e9
...@@ -13,38 +13,43 @@ type Datastore struct { ...@@ -13,38 +13,43 @@ type Datastore struct {
child ds.Batching child ds.Batching
// TODO: discuss making ds.Batch implement the full ds.Datastore interface // TODO: discuss making ds.Batch implement the full ds.Datastore interface
buffer map[ds.Key][]byte buffer map[ds.Key]op
maxBufferEntries int maxBufferEntries int
} }
type op struct {
delete bool
value []byte
}
// NewAutoBatching returns a new datastore that automatically // NewAutoBatching returns a new datastore that automatically
// batches writes using the given Batching datastore. The size // batches writes using the given Batching datastore. The size
// of the memory pool is given by size. // of the memory pool is given by size.
func NewAutoBatching(d ds.Batching, size int) *Datastore { func NewAutoBatching(d ds.Batching, size int) *Datastore {
return &Datastore{ return &Datastore{
child: d, child: d,
buffer: make(map[ds.Key][]byte), buffer: make(map[ds.Key]op, size),
maxBufferEntries: size, maxBufferEntries: size,
} }
} }
// Delete deletes a key/value // Delete deletes a key/value
func (d *Datastore) Delete(k ds.Key) error { func (d *Datastore) Delete(k ds.Key) error {
_, found := d.buffer[k] d.buffer[k] = op{delete: true}
delete(d.buffer, k) if len(d.buffer) > d.maxBufferEntries {
return d.Flush()
err := d.child.Delete(k)
if found && err == ds.ErrNotFound {
return nil
} }
return err return nil
} }
// Get retrieves a value given a key. // Get retrieves a value given a key.
func (d *Datastore) Get(k ds.Key) ([]byte, error) { func (d *Datastore) Get(k ds.Key) ([]byte, error) {
val, ok := d.buffer[k] o, ok := d.buffer[k]
if ok { if ok {
return val, nil if o.delete {
return nil, ds.ErrNotFound
}
return o.value, nil
} }
return d.child.Get(k) return d.child.Get(k)
...@@ -52,7 +57,7 @@ func (d *Datastore) Get(k ds.Key) ([]byte, error) { ...@@ -52,7 +57,7 @@ func (d *Datastore) Get(k ds.Key) ([]byte, error) {
// Put stores a key/value. // Put stores a key/value.
func (d *Datastore) Put(k ds.Key, val []byte) error { func (d *Datastore) Put(k ds.Key, val []byte) error {
d.buffer[k] = val d.buffer[k] = op{value: val}
if len(d.buffer) > d.maxBufferEntries { if len(d.buffer) > d.maxBufferEntries {
return d.Flush() return d.Flush()
} }
...@@ -66,23 +71,32 @@ func (d *Datastore) Flush() error { ...@@ -66,23 +71,32 @@ func (d *Datastore) Flush() error {
return err return err
} }
for k, v := range d.buffer { for k, o := range d.buffer {
err := b.Put(k, v) var err error
if o.delete {
err = b.Delete(k)
if err == ds.ErrNotFound {
// Ignore these, let delete be idempotent.
err = nil
}
} else {
err = b.Put(k, o.value)
}
if err != nil { if err != nil {
return err return err
} }
} }
// clear out buffer // clear out buffer
d.buffer = make(map[ds.Key][]byte) d.buffer = make(map[ds.Key]op, d.maxBufferEntries)
return b.Commit() return b.Commit()
} }
// Has checks if a key is stored. // Has checks if a key is stored.
func (d *Datastore) Has(k ds.Key) (bool, error) { func (d *Datastore) Has(k ds.Key) (bool, error) {
_, ok := d.buffer[k] o, ok := d.buffer[k]
if ok { if ok {
return true, nil return !o.delete, nil
} }
return d.child.Has(k) return d.child.Has(k)
...@@ -90,9 +104,12 @@ func (d *Datastore) Has(k ds.Key) (bool, error) { ...@@ -90,9 +104,12 @@ func (d *Datastore) Has(k ds.Key) (bool, error) {
// GetSize implements Datastore.GetSize // GetSize implements Datastore.GetSize
func (d *Datastore) GetSize(k ds.Key) (int, error) { func (d *Datastore) GetSize(k ds.Key) (int, error) {
v, ok := d.buffer[k] o, ok := d.buffer[k]
if ok { if ok {
return len(v), nil if o.delete {
return -1, ds.ErrNotFound
}
return len(o.value), nil
} }
return d.child.GetSize(k) return d.child.GetSize(k)
...@@ -114,5 +131,13 @@ func (d *Datastore) DiskUsage() (uint64, error) { ...@@ -114,5 +131,13 @@ func (d *Datastore) DiskUsage() (uint64, error) {
} }
func (d *Datastore) Close() error { func (d *Datastore) Close() error {
return d.child.Close() err1 := d.Flush()
err2 := d.child.Close()
if err1 != nil {
return err1
}
if err2 != nil {
return err2
}
return nil
} }
...@@ -30,18 +30,48 @@ func TestFlushing(t *testing.T) { ...@@ -30,18 +30,48 @@ func TestFlushing(t *testing.T) {
} }
} }
// Get works normally.
for _, k := range keys {
val, err := d.Get(k)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(val, v) {
t.Fatal("wrong value")
}
}
// Not flushed
_, err := child.Get(keys[0]) _, err := child.Get(keys[0])
if err != ds.ErrNotFound { if err != ds.ErrNotFound {
t.Fatal("shouldnt have found value") t.Fatal("shouldnt have found value")
} }
// Delete works.
err = d.Delete(keys[14])
if err != nil {
t.Fatal(err)
}
_, err = d.Get(keys[14])
if err != ds.ErrNotFound {
t.Fatal(err)
}
// Still not flushed
_, err = child.Get(keys[0])
if err != ds.ErrNotFound {
t.Fatal("shouldnt have found value")
}
// Final put flushes.
err = d.Put(ds.NewKey("test16"), v) err = d.Put(ds.NewKey("test16"), v)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// should be flushed now, try to get keys from child datastore // should be flushed now, try to get keys from child datastore
for _, k := range keys { for _, k := range keys[:14] {
val, err := child.Get(k) val, err := child.Get(k)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
...@@ -51,4 +81,25 @@ func TestFlushing(t *testing.T) { ...@@ -51,4 +81,25 @@ func TestFlushing(t *testing.T) {
t.Fatal("wrong value") t.Fatal("wrong value")
} }
} }
// Never flushed the deleted key.
_, err = child.Get(keys[14])
if err != ds.ErrNotFound {
t.Fatal("shouldnt have found value")
}
// Delete doesn't flush
err = d.Delete(keys[0])
if err != nil {
t.Fatal(err)
}
val, err := child.Get(keys[0])
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(val, v) {
t.Fatal("wrong value")
}
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment