Unverified Commit 8b79466f authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #140 from ipfs/feat/async-ds

Support Asynchronous Writing Datastores
parents d4417ca0 8ddf6ada
......@@ -64,6 +64,35 @@ func (d *Datastore) Put(k ds.Key, val []byte) error {
return nil
}
// Sync flushes all operations on keys at or under the prefix
// from the current batch to the underlying datastore
func (d *Datastore) Sync(prefix ds.Key) error {
b, err := d.child.Batch()
if err != nil {
return err
}
for k, o := range d.buffer {
if !(k.Equal(prefix) || k.IsDescendantOf(prefix)) {
continue
}
var err error
if o.delete {
err = b.Delete(k)
} else {
err = b.Put(k, o.value)
}
if err != nil {
return err
}
delete(d.buffer, k)
}
return b.Commit()
}
// Flush flushes the current batch to the underlying datastore.
func (d *Datastore) Flush() error {
b, err := d.child.Batch()
......
......@@ -103,3 +103,149 @@ func TestFlushing(t *testing.T) {
t.Fatal("wrong value")
}
}
func TestSync(t *testing.T) {
child := ds.NewMapDatastore()
d := NewAutoBatching(child, 100)
put := func(key ds.Key) {
if err := d.Put(key, []byte(key.String())); err != nil {
t.Fatal(err)
}
}
del := func(key ds.Key) {
if err := d.Delete(key); err != nil {
t.Fatal(err)
}
}
get := func(d ds.Datastore, key ds.Key) {
val, err := d.Get(key)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(val, []byte(key.String())) {
t.Fatal("wrong value")
}
}
invalidGet := func(d ds.Datastore, key ds.Key) {
if _, err := d.Get(key); err != ds.ErrNotFound {
t.Fatal("should not have found value")
}
}
// Test if Syncing Puts works
internalSyncTest(t, d, child, put, del, get, invalidGet)
// Test if Syncing Deletes works
internalSyncTest(t, d, child, del, put, invalidGet, get)
}
// This function can be used to test Sync Puts and Deletes
// For clarity comments are written as if op = Put and undoOp = Delete
func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Key),
checkOp, checkUndoOp func(ds.Datastore, ds.Key)) {
var keys []ds.Key
keymap := make(map[ds.Key]int)
for i := 0; i < 4; i++ {
k := ds.NewKey(fmt.Sprintf("%d", i))
keymap[k] = len(keys)
keys = append(keys, k)
for j := 0; j < 2; j++ {
k := ds.NewKey(fmt.Sprintf("%d/%d", i, j))
keymap[k] = len(keys)
keys = append(keys, k)
for k := 0; k < 2; k++ {
k := ds.NewKey(fmt.Sprintf("%d/%d/%d", i, j, k))
keymap[k] = len(keys)
keys = append(keys, k)
}
}
}
for _, k := range keys {
op(k)
}
// Get works normally.
for _, k := range keys {
checkOp(d, k)
}
// Put not flushed
checkUndoOp(child, ds.NewKey("0"))
// Delete works.
deletedKey := ds.NewKey("2/1/1")
undoOp(deletedKey)
checkUndoOp(d, deletedKey)
// Put still not flushed
checkUndoOp(child, ds.NewKey("0"))
// Sync the tree "0/*/*"
if err := d.Sync(ds.NewKey("0")); err != nil {
t.Fatal(err)
}
// Try to get keys "0/*/*" from the child datastore
checkKeyRange(t, keymap, keys, d, [][]string{{"0", "0/1/1"}}, checkOp)
// Verify no other keys were synchronized
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "3/1/1"}}, checkUndoOp)
// Sync the tree "1/1/*"
if err := d.Sync(ds.NewKey("1/1")); err != nil {
t.Fatal(err)
}
// Try to get keys "0/*/*" and "1/1/*" from the child datastore
checkKeyRange(t, keymap, keys, d, [][]string{{"0", "0/1/1"}, {"1/1", "1/1/1"}}, checkOp)
// Verify no other keys were synchronized
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "1/0/1"}, {"2", "3/1/1"}}, checkUndoOp)
// Sync the tree "3/1/1"
if err := d.Sync(ds.NewKey("3/1/1")); err != nil {
t.Fatal(err)
}
// Try to get keys "0/*/*", "1/1/*", "3/1/1" from the child datastore
checkKeyRange(t, keymap, keys, d, [][]string{{"0", "0/1/1"}, {"1/1", "1/1/1"}, {"3/1/1", "3/1/1"}}, checkOp)
// Verify no other keys were synchronized
checkKeyRange(t, keymap, keys, child, [][]string{{"1", "1/0/1"}, {"2", "3/1/0"}}, checkUndoOp)
if err := d.Sync(ds.Key{}); err != nil {
t.Fatal(err)
}
// Never flushed the deleted key.
checkUndoOp(child, deletedKey)
// Try to get all keys except the deleted key from the child datastore
checkKeyRange(t, keymap, keys, d, [][]string{{"0", "2/1/0"}, {"3", "3/1/1"}}, checkOp)
// Add the deleted key into the datastore
op(deletedKey)
// Sync it
if err := d.Sync(deletedKey); err != nil {
t.Fatal(err)
}
// Check it
checkOp(d, deletedKey)
}
func checkKeyRange(t *testing.T, keymap map[ds.Key]int, keys []ds.Key,
d ds.Datastore, validKeyRanges [][]string, checkFn func(ds.Datastore, ds.Key)) {
t.Helper()
for _, validKeyBoundaries := range validKeyRanges {
start, end := keymap[ds.NewKey(validKeyBoundaries[0])], keymap[ds.NewKey(validKeyBoundaries[1])]
for _, k := range keys[start:end] {
checkFn(d, k)
}
}
}
......@@ -28,6 +28,11 @@ func (d *MapDatastore) Put(key Key, value []byte) (err error) {
return nil
}
// Sync implements Datastore.Sync
func (d *MapDatastore) Sync(prefix Key) error {
return nil
}
// Get implements Datastore.Get
func (d *MapDatastore) Get(key Key) (value []byte, err error) {
val, found := d.values[key]
......@@ -95,6 +100,11 @@ func (d *NullDatastore) Put(key Key, value []byte) (err error) {
return nil
}
// Sync implements Datastore.Sync
func (d *NullDatastore) Sync(prefix Key) error {
return nil
}
// Get implements Datastore.Get
func (d *NullDatastore) Get(key Key) (value []byte, err error) {
return nil, ErrNotFound
......
......@@ -34,6 +34,13 @@ should be checked by callers.
type Datastore interface {
Read
Write
// Sync guarantees that any Put or Delete calls under prefix that returned
// before Sync(prefix) was called will be observed after Sync(prefix)
// returns, even if the program crashes. If Put/Delete operations already
// satisfy these requirements then Sync may be a no-op.
//
// If the prefix fails to Sync this method returns an error.
Sync(prefix Key) error
io.Closer
}
......
......@@ -30,6 +30,12 @@ func (dds *Delayed) Put(key ds.Key, value []byte) (err error) {
return dds.ds.Put(key, value)
}
// Sync implements Datastore.Sync
func (dds *Delayed) Sync(prefix ds.Key) error {
dds.delay.Wait()
return dds.ds.Sync(prefix)
}
// Get implements the ds.Datastore interface.
func (dds *Delayed) Get(key ds.Key) (value []byte, err error) {
dds.delay.Wait()
......
......@@ -63,6 +63,12 @@ func (d *Datastore) Put(key ds.Key, value []byte) (err error) {
return ioutil.WriteFile(fn, value, 0666)
}
// Sync would ensure that any previous Puts under the prefix are written to disk.
// However, they already are.
func (d *Datastore) Sync(prefix ds.Key) error {
return nil
}
// Get returns the value for given key
func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
fn := d.KeyFilename(key)
......@@ -103,8 +109,9 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {
walkFn := func(path string, info os.FileInfo, err error) error {
// remove ds path prefix
if strings.HasPrefix(path, d.path) {
path = path[len(d.path):]
relPath, err := filepath.Rel(d.path, path)
if err == nil {
path = filepath.ToSlash(relPath)
}
if !info.IsDir() {
......@@ -167,7 +174,7 @@ func (d *Datastore) DiskUsage() (uint64, error) {
log.Println(err)
return err
}
if f != nil {
if f != nil && f.Mode().IsRegular() {
du += uint64(f.Size())
}
return nil
......
......@@ -96,13 +96,16 @@ func (ks *DSSuite) TestDiskUsage(c *C) {
"foo/bar/baz/barb",
})
totalBytes := 0
for _, k := range keys {
err := ks.ds.Put(k, []byte(k.String()))
value := []byte(k.String())
totalBytes += len(value)
err := ks.ds.Put(k, value)
c.Check(err, Equals, nil)
}
if ps, ok := ks.ds.(ds.PersistentDatastore); ok {
if s, err := ps.DiskUsage(); s <= 100 || err != nil {
if s, err := ps.DiskUsage(); s != uint64(totalBytes) || err != nil {
c.Error("unexpected size is: ", s)
}
} else {
......
......@@ -36,6 +36,16 @@ func (d *Failstore) Put(k ds.Key, val []byte) error {
return d.child.Put(k, val)
}
// Sync implements Datastore.Sync
func (d *Failstore) Sync(prefix ds.Key) error {
err := d.errfunc("sync")
if err != nil {
return err
}
return d.child.Sync(prefix)
}
// Get retrieves a value from the datastore.
func (d *Failstore) Get(k ds.Key) ([]byte, error) {
err := d.errfunc("get")
......
......@@ -37,6 +37,11 @@ func (d *Datastore) Put(key ds.Key, value []byte) (err error) {
return d.child.Put(d.ConvertKey(key), value)
}
// Sync implements Datastore.Sync
func (d *Datastore) Sync(prefix ds.Key) error {
return d.child.Sync(d.ConvertKey(prefix))
}
// Get returns the value for given key, transforming the key first.
func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
return d.child.Get(d.ConvertKey(key))
......
......@@ -189,6 +189,24 @@ func (d *Datastore) Put(key ds.Key, value []byte) error {
return cds.Put(k, value)
}
// Sync implements Datastore.Sync
func (d *Datastore) Sync(prefix ds.Key) error {
// Sync all mount points below the prefix
// Sync the mount point right at (or above) the prefix
dstores, mountPts, rest := d.lookupAll(prefix)
for i, suffix := range rest {
if err := dstores[i].Sync(suffix); err != nil {
return err
}
if mountPts[i].Equal(prefix) || suffix.String() != "/" {
return nil
}
}
return nil
}
func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
cds, _, k := d.lookup(key)
if cds == nil {
......
......@@ -5,6 +5,7 @@ import (
"testing"
datastore "github.com/ipfs/go-datastore"
autobatch "github.com/ipfs/go-datastore/autobatch"
mount "github.com/ipfs/go-datastore/mount"
query "github.com/ipfs/go-datastore/query"
sync "github.com/ipfs/go-datastore/sync"
......@@ -641,6 +642,70 @@ func TestLookupPrio(t *testing.T) {
}
}
func TestNestedMountSync(t *testing.T) {
internalDSRoot := datastore.NewMapDatastore()
internalDSFoo := datastore.NewMapDatastore()
internalDSFooBar := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/foo"), Datastore: autobatch.NewAutoBatching(internalDSFoo, 10)},
{Prefix: datastore.NewKey("/foo/bar"), Datastore: autobatch.NewAutoBatching(internalDSFooBar, 10)},
{Prefix: datastore.NewKey("/"), Datastore: autobatch.NewAutoBatching(internalDSRoot, 10)},
})
// Testing scenarios
// 1) Make sure child(ren) sync
// 2) Make sure parent syncs
// 3) Make sure parent only syncs the relevant subtree (instead of fully syncing)
addToDS := func(str string) {
t.Helper()
if err := m.Put(datastore.NewKey(str), []byte(str)); err != nil {
t.Fatal(err)
}
}
checkVal := func(d datastore.Datastore, str string, expectFound bool) {
t.Helper()
res, err := d.Has(datastore.NewKey(str))
if err != nil {
t.Fatal(err)
}
if res != expectFound {
if expectFound {
t.Fatal("datastore is missing key")
}
t.Fatal("datastore has key it should not have")
}
}
// Add /foo/bar/0, Add /foo/bar/0/1, Add /foo/baz, Add /beep/bop, Sync /foo: all added except last - checks 1 and 2
addToDS("/foo/bar/0")
addToDS("/foo/bar/1")
addToDS("/foo/baz")
addToDS("/beep/bop")
if err := m.Sync(datastore.NewKey("/foo")); err != nil {
t.Fatal(err)
}
checkVal(internalDSFooBar, "/0", true)
checkVal(internalDSFooBar, "/1", true)
checkVal(internalDSFoo, "/baz", true)
checkVal(internalDSRoot, "/beep/bop", false)
// Add /fwop Add /bloop Sync /fwop, both added - checks 3
addToDS("/fwop")
addToDS("/bloop")
if err := m.Sync(datastore.NewKey("/fwop")); err != nil {
t.Fatal(err)
}
checkVal(internalDSRoot, "/fwop", true)
checkVal(internalDSRoot, "/bloop", false)
}
type errQueryDS struct {
datastore.NullDatastore
}
......
......@@ -73,6 +73,13 @@ func (d *Datastore) Put(k ds.Key, val []byte) error {
})
}
// Sync implements Datastore.Sync
func (d *Datastore) Sync(prefix ds.Key) error {
return d.runOp(func() error {
return d.Batching.Sync(prefix)
})
}
// Has checks if a key is stored.
func (d *Datastore) Has(k ds.Key) (bool, error) {
var has bool
......
......@@ -33,6 +33,13 @@ func (d *MutexDatastore) Put(key ds.Key, value []byte) (err error) {
return d.child.Put(key, value)
}
// Sync implements Datastore.Sync
func (d *MutexDatastore) Sync(prefix ds.Key) error {
d.Lock()
defer d.Unlock()
return d.child.Sync(prefix)
}
// Get implements Datastore.Get
func (d *MutexDatastore) Get(key ds.Key) (value []byte, err error) {
d.RLock()
......
......@@ -176,6 +176,36 @@ func SubtestManyKeysAndQuery(t *testing.T, ds dstore.Datastore) {
subtestQuery(t, ds, dsq.Query{KeysOnly: true}, 100)
}
func SubtestBasicSync(t *testing.T, ds dstore.Datastore) {
if err := ds.Sync(dstore.NewKey("foo")); err != nil {
t.Fatal(err)
}
if err := ds.Put(dstore.NewKey("/foo"), []byte("foo")); err != nil {
t.Fatal(err)
}
if err := ds.Sync(dstore.NewKey("/foo")); err != nil {
t.Fatal(err)
}
if err := ds.Put(dstore.NewKey("/foo/bar"), []byte("bar")); err != nil {
t.Fatal(err)
}
if err := ds.Sync(dstore.NewKey("/foo")); err != nil {
t.Fatal(err)
}
if err := ds.Sync(dstore.NewKey("/foo/bar")); err != nil {
t.Fatal(err)
}
if err := ds.Sync(dstore.NewKey("")); err != nil {
t.Fatal(err)
}
}
// need a custom test filter to test the "fallback" filter case for unknown
// filters.
type testFilter struct{}
......
......@@ -19,6 +19,7 @@ var BasicSubtests = []func(t *testing.T, ds dstore.Datastore){
SubtestFilter,
SubtestManyKeysAndQuery,
SubtestReturnSizes,
SubtestBasicSync,
}
// BatchSubtests is a list of all basic batching datastore tests.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment