diff --git a/autobatch/autobatch.go b/autobatch/autobatch.go index 4eb089bf5f4a806b0c613c65b3586861d3c18c45..0f86764525bd824f4f433663070bbec35044e850 100644 --- a/autobatch/autobatch.go +++ b/autobatch/autobatch.go @@ -64,6 +64,35 @@ func (d *Datastore) Put(k ds.Key, val []byte) error { return nil } +// Sync flushes all operations on keys at or under the prefix +// from the current batch to the underlying datastore +func (d *Datastore) Sync(prefix ds.Key) error { + b, err := d.child.Batch() + if err != nil { + return err + } + + for k, o := range d.buffer { + if !(k.Equal(prefix) || k.IsDescendantOf(prefix)) { + continue + } + + var err error + if o.delete { + err = b.Delete(k) + } else { + err = b.Put(k, o.value) + } + if err != nil { + return err + } + + delete(d.buffer, k) + } + + return b.Commit() +} + // Flush flushes the current batch to the underlying datastore. func (d *Datastore) Flush() error { b, err := d.child.Batch() diff --git a/autobatch/autobatch_test.go b/autobatch/autobatch_test.go index 0dcf2a0c5d6bf77daadd4546159b637a4f108571..bd6fb30075cd662111d1481ad72f772e42a02f27 100644 --- a/autobatch/autobatch_test.go +++ b/autobatch/autobatch_test.go @@ -103,3 +103,149 @@ func TestFlushing(t *testing.T) { t.Fatal("wrong value") } } + +func TestSync(t *testing.T) { + child := ds.NewMapDatastore() + d := NewAutoBatching(child, 100) + + put := func(key ds.Key) { + if err := d.Put(key, []byte(key.String())); err != nil { + t.Fatal(err) + } + } + del := func(key ds.Key) { + if err := d.Delete(key); err != nil { + t.Fatal(err) + } + } + + get := func(d ds.Datastore, key ds.Key) { + val, err := d.Get(key) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(val, []byte(key.String())) { + t.Fatal("wrong value") + } + } + invalidGet := func(d ds.Datastore, key ds.Key) { + if _, err := d.Get(key); err != ds.ErrNotFound { + t.Fatal("should not have found value") + } + } + + // Test if Syncing Puts works + internalSyncTest(t, d, child, put, del, get, invalidGet) + + // Test if Syncing Deletes works + internalSyncTest(t, d, child, del, put, invalidGet, get) +} + +// This function can be used to test Sync Puts and Deletes +// For clarity comments are written as if op = Put and undoOp = Delete +func internalSyncTest(t *testing.T, d, child ds.Datastore, op, undoOp func(ds.Key), + checkOp, checkUndoOp func(ds.Datastore, ds.Key)) { + var keys []ds.Key + keymap := make(map[ds.Key]int) + for i := 0; i < 4; i++ { + k := ds.NewKey(fmt.Sprintf("%d", i)) + keymap[k] = len(keys) + keys = append(keys, k) + for j := 0; j < 2; j++ { + k := ds.NewKey(fmt.Sprintf("%d/%d", i, j)) + keymap[k] = len(keys) + keys = append(keys, k) + for k := 0; k < 2; k++ { + k := ds.NewKey(fmt.Sprintf("%d/%d/%d", i, j, k)) + keymap[k] = len(keys) + keys = append(keys, k) + } + } + } + + for _, k := range keys { + op(k) + } + + // Get works normally. + for _, k := range keys { + checkOp(d, k) + } + + // Put not flushed + checkUndoOp(child, ds.NewKey("0")) + + // Delete works. + deletedKey := ds.NewKey("2/1/1") + undoOp(deletedKey) + checkUndoOp(d, deletedKey) + + // Put still not flushed + checkUndoOp(child, ds.NewKey("0")) + + // Sync the tree "0/*/*" + if err := d.Sync(ds.NewKey("0")); err != nil { + t.Fatal(err) + } + + // Try to get keys "0/*/*" from the child datastore + checkKeyRange(t, keymap, keys, d, [][]string{{"0", "0/1/1"}}, checkOp) + + // Verify no other keys were synchronized + checkKeyRange(t, keymap, keys, child, [][]string{{"1", "3/1/1"}}, checkUndoOp) + + // Sync the tree "1/1/*" + if err := d.Sync(ds.NewKey("1/1")); err != nil { + t.Fatal(err) + } + + // Try to get keys "0/*/*" and "1/1/*" from the child datastore + checkKeyRange(t, keymap, keys, d, [][]string{{"0", "0/1/1"}, {"1/1", "1/1/1"}}, checkOp) + + // Verify no other keys were synchronized + checkKeyRange(t, keymap, keys, child, [][]string{{"1", "1/0/1"}, {"2", "3/1/1"}}, checkUndoOp) + + // Sync the tree "3/1/1" + if err := d.Sync(ds.NewKey("3/1/1")); err != nil { + t.Fatal(err) + } + + // Try to get keys "0/*/*", "1/1/*", "3/1/1" from the child datastore + checkKeyRange(t, keymap, keys, d, [][]string{{"0", "0/1/1"}, {"1/1", "1/1/1"}, {"3/1/1", "3/1/1"}}, checkOp) + + // Verify no other keys were synchronized + checkKeyRange(t, keymap, keys, child, [][]string{{"1", "1/0/1"}, {"2", "3/1/0"}}, checkUndoOp) + + if err := d.Sync(ds.Key{}); err != nil { + t.Fatal(err) + } + + // Never flushed the deleted key. + checkUndoOp(child, deletedKey) + + // Try to get all keys except the deleted key from the child datastore + checkKeyRange(t, keymap, keys, d, [][]string{{"0", "2/1/0"}, {"3", "3/1/1"}}, checkOp) + + // Add the deleted key into the datastore + op(deletedKey) + + // Sync it + if err := d.Sync(deletedKey); err != nil { + t.Fatal(err) + } + + // Check it + checkOp(d, deletedKey) +} + +func checkKeyRange(t *testing.T, keymap map[ds.Key]int, keys []ds.Key, + d ds.Datastore, validKeyRanges [][]string, checkFn func(ds.Datastore, ds.Key)) { + t.Helper() + for _, validKeyBoundaries := range validKeyRanges { + start, end := keymap[ds.NewKey(validKeyBoundaries[0])], keymap[ds.NewKey(validKeyBoundaries[1])] + for _, k := range keys[start:end] { + checkFn(d, k) + } + } +} diff --git a/basic_ds.go b/basic_ds.go index 98476767f1ddace487f097811f0b02162a0c1c3b..46567cc0246af229a4da35ec7a1d5c283c93ba87 100644 --- a/basic_ds.go +++ b/basic_ds.go @@ -28,6 +28,11 @@ func (d *MapDatastore) Put(key Key, value []byte) (err error) { return nil } +// Sync implements Datastore.Sync +func (d *MapDatastore) Sync(prefix Key) error { + return nil +} + // Get implements Datastore.Get func (d *MapDatastore) Get(key Key) (value []byte, err error) { val, found := d.values[key] @@ -95,6 +100,11 @@ func (d *NullDatastore) Put(key Key, value []byte) (err error) { return nil } +// Sync implements Datastore.Sync +func (d *NullDatastore) Sync(prefix Key) error { + return nil +} + // Get implements Datastore.Get func (d *NullDatastore) Get(key Key) (value []byte, err error) { return nil, ErrNotFound diff --git a/datastore.go b/datastore.go index 7715c47ebd32fff18705f7a75686638793e67745..04ca726c4ab346bf3af0210a5e0cbc8326c17839 100644 --- a/datastore.go +++ b/datastore.go @@ -34,6 +34,13 @@ should be checked by callers. type Datastore interface { Read Write + // Sync guarantees that any Put or Delete calls under prefix that returned + // before Sync(prefix) was called will be observed after Sync(prefix) + // returns, even if the program crashes. If Put/Delete operations already + // satisfy these requirements then Sync may be a no-op. + // + // If the prefix fails to Sync this method returns an error. + Sync(prefix Key) error io.Closer } diff --git a/delayed/delayed.go b/delayed/delayed.go index d34909fe8d694638eccb998754dfa9a843315d7c..f634c947c7be7e8ebcc33f619f41ebc6f861babd 100644 --- a/delayed/delayed.go +++ b/delayed/delayed.go @@ -30,6 +30,12 @@ func (dds *Delayed) Put(key ds.Key, value []byte) (err error) { return dds.ds.Put(key, value) } +// Sync implements Datastore.Sync +func (dds *Delayed) Sync(prefix ds.Key) error { + dds.delay.Wait() + return dds.ds.Sync(prefix) +} + // Get implements the ds.Datastore interface. func (dds *Delayed) Get(key ds.Key) (value []byte, err error) { dds.delay.Wait() diff --git a/examples/fs.go b/examples/fs.go index 87369e811384a3d70dc3b38290d2cf18853b8fd6..a5fda62be3cfb098756099b96ab3d025a80c3008 100644 --- a/examples/fs.go +++ b/examples/fs.go @@ -63,6 +63,12 @@ func (d *Datastore) Put(key ds.Key, value []byte) (err error) { return ioutil.WriteFile(fn, value, 0666) } +// Sync would ensure that any previous Puts under the prefix are written to disk. +// However, they already are. +func (d *Datastore) Sync(prefix ds.Key) error { + return nil +} + // Get returns the value for given key func (d *Datastore) Get(key ds.Key) (value []byte, err error) { fn := d.KeyFilename(key) @@ -103,8 +109,9 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { walkFn := func(path string, info os.FileInfo, err error) error { // remove ds path prefix - if strings.HasPrefix(path, d.path) { - path = path[len(d.path):] + relPath, err := filepath.Rel(d.path, path) + if err == nil { + path = filepath.ToSlash(relPath) } if !info.IsDir() { @@ -167,7 +174,7 @@ func (d *Datastore) DiskUsage() (uint64, error) { log.Println(err) return err } - if f != nil { + if f != nil && f.Mode().IsRegular() { du += uint64(f.Size()) } return nil diff --git a/examples/fs_test.go b/examples/fs_test.go index 19be7a05ee455b83c98d709d24b8ee9f9e54a4a1..5ee54d39645623417bb455ce3422c13ac564289c 100644 --- a/examples/fs_test.go +++ b/examples/fs_test.go @@ -96,13 +96,16 @@ func (ks *DSSuite) TestDiskUsage(c *C) { "foo/bar/baz/barb", }) + totalBytes := 0 for _, k := range keys { - err := ks.ds.Put(k, []byte(k.String())) + value := []byte(k.String()) + totalBytes += len(value) + err := ks.ds.Put(k, value) c.Check(err, Equals, nil) } if ps, ok := ks.ds.(ds.PersistentDatastore); ok { - if s, err := ps.DiskUsage(); s <= 100 || err != nil { + if s, err := ps.DiskUsage(); s != uint64(totalBytes) || err != nil { c.Error("unexpected size is: ", s) } } else { diff --git a/failstore/failstore.go b/failstore/failstore.go index 1408a3ef9a0e961da0f70359d2555b0136be44ba..f790f96e74af99c07e5ea5f4e71cda101f7802db 100644 --- a/failstore/failstore.go +++ b/failstore/failstore.go @@ -36,6 +36,16 @@ func (d *Failstore) Put(k ds.Key, val []byte) error { return d.child.Put(k, val) } +// Sync implements Datastore.Sync +func (d *Failstore) Sync(prefix ds.Key) error { + err := d.errfunc("sync") + if err != nil { + return err + } + + return d.child.Sync(prefix) +} + // Get retrieves a value from the datastore. func (d *Failstore) Get(k ds.Key) ([]byte, error) { err := d.errfunc("get") diff --git a/keytransform/keytransform.go b/keytransform/keytransform.go index ae0297307f014dba0d759fb0879a2a9fd08d98c5..cd03487ca06aafeb95e590fe953eeff8e6dccfdf 100644 --- a/keytransform/keytransform.go +++ b/keytransform/keytransform.go @@ -37,6 +37,11 @@ func (d *Datastore) Put(key ds.Key, value []byte) (err error) { return d.child.Put(d.ConvertKey(key), value) } +// Sync implements Datastore.Sync +func (d *Datastore) Sync(prefix ds.Key) error { + return d.child.Sync(d.ConvertKey(prefix)) +} + // Get returns the value for given key, transforming the key first. func (d *Datastore) Get(key ds.Key) (value []byte, err error) { return d.child.Get(d.ConvertKey(key)) diff --git a/mount/mount.go b/mount/mount.go index c092c262a13a16b31886b39b361402b744635f97..d2556b2e0786f405f3d87e1baecf798436dbd7c6 100644 --- a/mount/mount.go +++ b/mount/mount.go @@ -189,6 +189,24 @@ func (d *Datastore) Put(key ds.Key, value []byte) error { return cds.Put(k, value) } +// Sync implements Datastore.Sync +func (d *Datastore) Sync(prefix ds.Key) error { + // Sync all mount points below the prefix + // Sync the mount point right at (or above) the prefix + dstores, mountPts, rest := d.lookupAll(prefix) + for i, suffix := range rest { + if err := dstores[i].Sync(suffix); err != nil { + return err + } + + if mountPts[i].Equal(prefix) || suffix.String() != "/" { + return nil + } + } + + return nil +} + func (d *Datastore) Get(key ds.Key) (value []byte, err error) { cds, _, k := d.lookup(key) if cds == nil { diff --git a/mount/mount_test.go b/mount/mount_test.go index 36135363d3578de25b03d2426ffe62ac49f6e2aa..497ba2db951e8df6c594a4822ee22d4b8517c792 100644 --- a/mount/mount_test.go +++ b/mount/mount_test.go @@ -5,6 +5,7 @@ import ( "testing" datastore "github.com/ipfs/go-datastore" + autobatch "github.com/ipfs/go-datastore/autobatch" mount "github.com/ipfs/go-datastore/mount" query "github.com/ipfs/go-datastore/query" sync "github.com/ipfs/go-datastore/sync" @@ -641,6 +642,70 @@ func TestLookupPrio(t *testing.T) { } } +func TestNestedMountSync(t *testing.T) { + internalDSRoot := datastore.NewMapDatastore() + internalDSFoo := datastore.NewMapDatastore() + internalDSFooBar := datastore.NewMapDatastore() + + m := mount.New([]mount.Mount{ + {Prefix: datastore.NewKey("/foo"), Datastore: autobatch.NewAutoBatching(internalDSFoo, 10)}, + {Prefix: datastore.NewKey("/foo/bar"), Datastore: autobatch.NewAutoBatching(internalDSFooBar, 10)}, + {Prefix: datastore.NewKey("/"), Datastore: autobatch.NewAutoBatching(internalDSRoot, 10)}, + }) + + // Testing scenarios + // 1) Make sure child(ren) sync + // 2) Make sure parent syncs + // 3) Make sure parent only syncs the relevant subtree (instead of fully syncing) + + addToDS := func(str string) { + t.Helper() + if err := m.Put(datastore.NewKey(str), []byte(str)); err != nil { + t.Fatal(err) + } + } + + checkVal := func(d datastore.Datastore, str string, expectFound bool) { + t.Helper() + res, err := d.Has(datastore.NewKey(str)) + if err != nil { + t.Fatal(err) + } + if res != expectFound { + if expectFound { + t.Fatal("datastore is missing key") + } + t.Fatal("datastore has key it should not have") + } + } + + // Add /foo/bar/0, Add /foo/bar/0/1, Add /foo/baz, Add /beep/bop, Sync /foo: all added except last - checks 1 and 2 + addToDS("/foo/bar/0") + addToDS("/foo/bar/1") + addToDS("/foo/baz") + addToDS("/beep/bop") + + if err := m.Sync(datastore.NewKey("/foo")); err != nil { + t.Fatal(err) + } + + checkVal(internalDSFooBar, "/0", true) + checkVal(internalDSFooBar, "/1", true) + checkVal(internalDSFoo, "/baz", true) + checkVal(internalDSRoot, "/beep/bop", false) + + // Add /fwop Add /bloop Sync /fwop, both added - checks 3 + addToDS("/fwop") + addToDS("/bloop") + + if err := m.Sync(datastore.NewKey("/fwop")); err != nil { + t.Fatal(err) + } + + checkVal(internalDSRoot, "/fwop", true) + checkVal(internalDSRoot, "/bloop", false) +} + type errQueryDS struct { datastore.NullDatastore } diff --git a/retrystore/retrystore.go b/retrystore/retrystore.go index 02e44ab3d5a0cd4c468a707638445c67e3ac0aba..d4527581617ea37e354c5606f3bc083fedcd8ca3 100644 --- a/retrystore/retrystore.go +++ b/retrystore/retrystore.go @@ -73,6 +73,13 @@ func (d *Datastore) Put(k ds.Key, val []byte) error { }) } +// Sync implements Datastore.Sync +func (d *Datastore) Sync(prefix ds.Key) error { + return d.runOp(func() error { + return d.Batching.Sync(prefix) + }) +} + // Has checks if a key is stored. func (d *Datastore) Has(k ds.Key) (bool, error) { var has bool diff --git a/sync/sync.go b/sync/sync.go index 5a4bb0e774a78014c6119aa4901610734a8b1ed9..3eb7290ee6b753f7427eb929a6bb5cc24949e2d5 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -33,6 +33,13 @@ func (d *MutexDatastore) Put(key ds.Key, value []byte) (err error) { return d.child.Put(key, value) } +// Sync implements Datastore.Sync +func (d *MutexDatastore) Sync(prefix ds.Key) error { + d.Lock() + defer d.Unlock() + return d.child.Sync(prefix) +} + // Get implements Datastore.Get func (d *MutexDatastore) Get(key ds.Key) (value []byte, err error) { d.RLock() diff --git a/test/basic_tests.go b/test/basic_tests.go index b1c73de55f99a4828c93e01bdd7ab5873b62cea0..0506ee531c3d6d4931ce5c66aac3857d9aad3e66 100644 --- a/test/basic_tests.go +++ b/test/basic_tests.go @@ -176,6 +176,36 @@ func SubtestManyKeysAndQuery(t *testing.T, ds dstore.Datastore) { subtestQuery(t, ds, dsq.Query{KeysOnly: true}, 100) } +func SubtestBasicSync(t *testing.T, ds dstore.Datastore) { + if err := ds.Sync(dstore.NewKey("foo")); err != nil { + t.Fatal(err) + } + + if err := ds.Put(dstore.NewKey("/foo"), []byte("foo")); err != nil { + t.Fatal(err) + } + + if err := ds.Sync(dstore.NewKey("/foo")); err != nil { + t.Fatal(err) + } + + if err := ds.Put(dstore.NewKey("/foo/bar"), []byte("bar")); err != nil { + t.Fatal(err) + } + + if err := ds.Sync(dstore.NewKey("/foo")); err != nil { + t.Fatal(err) + } + + if err := ds.Sync(dstore.NewKey("/foo/bar")); err != nil { + t.Fatal(err) + } + + if err := ds.Sync(dstore.NewKey("")); err != nil { + t.Fatal(err) + } +} + // need a custom test filter to test the "fallback" filter case for unknown // filters. type testFilter struct{} diff --git a/test/suite.go b/test/suite.go index 3d1740115f474814da8aea023fb2775a9282edbb..afebd1ca813e5cb46c5fd2439e08d0d2f8eef932 100644 --- a/test/suite.go +++ b/test/suite.go @@ -19,6 +19,7 @@ var BasicSubtests = []func(t *testing.T, ds dstore.Datastore){ SubtestFilter, SubtestManyKeysAndQuery, SubtestReturnSizes, + SubtestBasicSync, } // BatchSubtests is a list of all basic batching datastore tests.