Commit 301890e7 authored by Steven Allen's avatar Steven Allen
parent 55256606
......@@ -13,38 +13,40 @@ type Datastore struct {
child ds.Batching
// TODO: discuss making ds.Batch implement the full ds.Datastore interface
buffer map[ds.Key][]byte
buffer map[ds.Key]op
maxBufferEntries int
}
type op struct {
delete bool
value []byte
}
// NewAutoBatching returns a new datastore that automatically
// batches writes using the given Batching datastore. The size
// of the memory pool is given by size.
func NewAutoBatching(d ds.Batching, size int) *Datastore {
return &Datastore{
child: d,
buffer: make(map[ds.Key][]byte),
buffer: make(map[ds.Key]op, size),
maxBufferEntries: size,
}
}
// Delete deletes a key/value
func (d *Datastore) Delete(k ds.Key) error {
_, found := d.buffer[k]
delete(d.buffer, k)
err := d.child.Delete(k)
if found && err == ds.ErrNotFound {
d.buffer[k] = op{delete: true}
return nil
}
return err
}
// Get retrieves a value given a key.
func (d *Datastore) Get(k ds.Key) ([]byte, error) {
val, ok := d.buffer[k]
o, ok := d.buffer[k]
if ok {
return val, nil
if o.delete {
return nil, ds.ErrNotFound
}
return o.value, nil
}
return d.child.Get(k)
......@@ -52,7 +54,7 @@ func (d *Datastore) Get(k ds.Key) ([]byte, error) {
// Put stores a key/value.
func (d *Datastore) Put(k ds.Key, val []byte) error {
d.buffer[k] = val
d.buffer[k] = op{value: val}
if len(d.buffer) > d.maxBufferEntries {
return d.Flush()
}
......@@ -66,23 +68,32 @@ func (d *Datastore) Flush() error {
return err
}
for k, v := range d.buffer {
err := b.Put(k, v)
for k, o := range d.buffer {
var err error
if o.delete {
err = b.Delete(k)
if err == ds.ErrNotFound {
// Ignore these, let delete be idempotent.
err = nil
}
} else {
err = b.Put(k, o.value)
}
if err != nil {
return err
}
}
// clear out buffer
d.buffer = make(map[ds.Key][]byte)
d.buffer = make(map[ds.Key]op, d.maxBufferEntries)
return b.Commit()
}
// Has checks if a key is stored.
func (d *Datastore) Has(k ds.Key) (bool, error) {
_, ok := d.buffer[k]
o, ok := d.buffer[k]
if ok {
return true, nil
return !o.delete, nil
}
return d.child.Has(k)
......@@ -90,9 +101,12 @@ func (d *Datastore) Has(k ds.Key) (bool, error) {
// GetSize implements Datastore.GetSize
func (d *Datastore) GetSize(k ds.Key) (int, error) {
v, ok := d.buffer[k]
o, ok := d.buffer[k]
if ok {
return len(v), nil
if o.delete {
return -1, ds.ErrNotFound
}
return len(o.value), nil
}
return d.child.GetSize(k)
......
......@@ -30,18 +30,48 @@ func TestFlushing(t *testing.T) {
}
}
// Get works normally.
for _, k := range keys {
val, err := d.Get(k)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(val, v) {
t.Fatal("wrong value")
}
}
// Not flushed
_, err := child.Get(keys[0])
if err != ds.ErrNotFound {
t.Fatal("shouldnt have found value")
}
// Delete works.
err = d.Delete(keys[14])
if err != nil {
t.Fatal(err)
}
_, err = d.Get(keys[14])
if err != ds.ErrNotFound {
t.Fatal(err)
}
// Still not flushed
_, err = child.Get(keys[0])
if err != ds.ErrNotFound {
t.Fatal("shouldnt have found value")
}
// Final put flushes.
err = d.Put(ds.NewKey("test16"), v)
if err != nil {
t.Fatal(err)
}
// should be flushed now, try to get keys from child datastore
for _, k := range keys {
for _, k := range keys[:14] {
val, err := child.Get(k)
if err != nil {
t.Fatal(err)
......@@ -51,4 +81,25 @@ func TestFlushing(t *testing.T) {
t.Fatal("wrong value")
}
}
// Never flushed the deleted key.
_, err = child.Get(keys[14])
if err != ds.ErrNotFound {
t.Fatal("shouldnt have found value")
}
// Delete doesn't flush
err = d.Delete(keys[0])
if err != nil {
t.Fatal(err)
}
val, err := child.Get(keys[0])
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(val, v) {
t.Fatal("wrong value")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment