Commit 579b3c8a authored by Steven Allen's avatar Steven Allen

fix: remove temporary files when multiple write operations conflict

When multiple write operations conflict, we:

1. Try them in-order till one succeeds.
2. After the fact, re-order them such that the pending operations "happen" after
   the one that succeeds.
3. Return "success" for all the pending write operations for that key. This is
   acceptable because we're claiming that the operation that _actually_ succeeded
   happened "last" so it would have clobbered the other operations.

However, in the case of put, we still need to remove the temporary file that we
didn't end up moving into place.
parent 53d4c9b1
...@@ -394,7 +394,7 @@ func (fs *Datastore) Put(key datastore.Key, value []byte) error { ...@@ -394,7 +394,7 @@ func (fs *Datastore) Put(key datastore.Key, value []byte) error {
var err error var err error
for i := 1; i <= putMaxRetries; i++ { for i := 1; i <= putMaxRetries; i++ {
err = fs.doWriteOp(&op{ _, err = fs.doWriteOp(&op{
typ: opPut, typ: opPut,
key: key, key: key,
v: value, v: value,
...@@ -451,16 +451,18 @@ func isTooManyFDError(err error) bool { ...@@ -451,16 +451,18 @@ func isTooManyFDError(err error) bool {
// we assume that the first succeeding operation // we assume that the first succeeding operation
// on that key was the last one to happen after // on that key was the last one to happen after
// all successful others. // all successful others.
func (fs *Datastore) doWriteOp(oper *op) error { //
// done is true if we actually performed the operation, false if we skipped or
// failed.
func (fs *Datastore) doWriteOp(oper *op) (done bool, err error) {
keyStr := oper.key.String() keyStr := oper.key.String()
opRes := fs.opMap.Begin(keyStr) opRes := fs.opMap.Begin(keyStr)
if opRes == nil { // nothing to do, a concurrent op succeeded if opRes == nil { // nothing to do, a concurrent op succeeded
return nil return false, nil
} }
// Do the operation // Do the operation
var err error
for i := 0; i < 6; i++ { for i := 0; i < 6; i++ {
err = fs.doOp(oper) err = fs.doOp(oper)
...@@ -474,7 +476,7 @@ func (fs *Datastore) doWriteOp(oper *op) error { ...@@ -474,7 +476,7 @@ func (fs *Datastore) doWriteOp(oper *op) error {
// waiting on this result to succeed. Otherwise, they will // waiting on this result to succeed. Otherwise, they will
// retry. // retry.
opRes.Finish(err == nil) opRes.Finish(err == nil)
return err return err == nil, err
} }
func (fs *Datastore) doPut(key datastore.Key, val []byte) error { func (fs *Datastore) doPut(key datastore.Key, val []byte) error {
...@@ -623,12 +625,13 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error { ...@@ -623,12 +625,13 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {
// move files to their proper places // move files to their proper places
for fi, op := range files { for fi, op := range files {
err := fs.doWriteOp(op) done, err := fs.doWriteOp(op)
if err != nil { if err != nil {
return err return err
} else if done {
// signify removed
ops[fi] = 2
} }
// signify removed
ops[fi] = 2
} }
// now sync the dirs for those files // now sync the dirs for those files
...@@ -733,11 +736,12 @@ func (fs *Datastore) Delete(key datastore.Key) error { ...@@ -733,11 +736,12 @@ func (fs *Datastore) Delete(key datastore.Key) error {
return ErrClosed return ErrClosed
} }
return fs.doWriteOp(&op{ _, err := fs.doWriteOp(&op{
typ: opDelete, typ: opDelete,
key: key, key: key,
v: nil, v: nil,
}) })
return err
} }
// This function always runs within an opLock for the given // This function always runs within an opLock for the given
......
...@@ -23,6 +23,26 @@ import ( ...@@ -23,6 +23,26 @@ import (
"github.com/ipfs/go-ds-flatfs" "github.com/ipfs/go-ds-flatfs"
) )
func checkTemp(t *testing.T, dir string) {
tempDir, err := os.Open(filepath.Join(dir, ".temp"))
if err != nil {
t.Errorf("failed to open temp dir: %s", err)
return
}
names, err := tempDir.Readdirnames(-1)
tempDir.Close()
if err != nil {
t.Errorf("failed to read temp dir: %s", err)
return
}
for _, name := range names {
t.Errorf("found leftover temporary file: %s", name)
}
}
func tempdir(t testing.TB) (path string, cleanup func()) { func tempdir(t testing.TB) (path string, cleanup func()) {
path, err := ioutil.TempDir("", "test-datastore-flatfs-") path, err := ioutil.TempDir("", "test-datastore-flatfs-")
if err != nil { if err != nil {
...@@ -45,9 +65,73 @@ func tryAllShardFuncs(t *testing.T, testFunc func(mkShardFunc, *testing.T)) { ...@@ -45,9 +65,73 @@ func tryAllShardFuncs(t *testing.T, testFunc func(mkShardFunc, *testing.T)) {
type mkShardFunc func(int) *flatfs.ShardIdV1 type mkShardFunc func(int) *flatfs.ShardIdV1
func testBatch(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
defer fs.Close()
batches := make([]datastore.Batch, 9)
for i := range batches {
batch, err := fs.Batch()
if err != nil {
t.Fatal(err)
}
batches[i] = batch
err = batch.Put(datastore.NewKey("QUUX"), []byte("foo"))
if err != nil {
t.Fatal(err)
}
err = batch.Put(datastore.NewKey(fmt.Sprintf("Q%dX", i)), []byte(fmt.Sprintf("bar%d", i)))
if err != nil {
t.Fatal(err)
}
}
var wg sync.WaitGroup
wg.Add(len(batches))
for _, batch := range batches {
batch := batch
go func() {
defer wg.Done()
err := batch.Commit()
if err != nil {
t.Error(err)
}
}()
}
check := func(key, expected string) {
actual, err := fs.Get(datastore.NewKey(key))
if err != nil {
t.Fatalf("get for key %s, error: %s", key, err)
}
if string(actual) != expected {
t.Fatalf("for key %s, expected %s, got %s", key, expected, string(actual))
}
}
wg.Wait()
check("QUUX", "foo")
for i := range batches {
check(fmt.Sprintf("Q%dX", i), fmt.Sprintf("bar%d", i))
}
}
func TestBatch(t *testing.T) { tryAllShardFuncs(t, testBatch) }
func testPut(dirFunc mkShardFunc, t *testing.T) { func testPut(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -71,6 +155,7 @@ func TestPut(t *testing.T) { tryAllShardFuncs(t, testPut) } ...@@ -71,6 +155,7 @@ func TestPut(t *testing.T) { tryAllShardFuncs(t, testPut) }
func testGet(dirFunc mkShardFunc, t *testing.T) { func testGet(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -103,6 +188,7 @@ func TestGet(t *testing.T) { tryAllShardFuncs(t, testGet) } ...@@ -103,6 +188,7 @@ func TestGet(t *testing.T) { tryAllShardFuncs(t, testGet) }
func testPutOverwrite(dirFunc mkShardFunc, t *testing.T) { func testPutOverwrite(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -138,6 +224,7 @@ func TestPutOverwrite(t *testing.T) { tryAllShardFuncs(t, testPutOverwrite) } ...@@ -138,6 +224,7 @@ func TestPutOverwrite(t *testing.T) { tryAllShardFuncs(t, testPutOverwrite) }
func testGetNotFoundError(dirFunc mkShardFunc, t *testing.T) { func testGetNotFoundError(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -162,6 +249,7 @@ type params struct { ...@@ -162,6 +249,7 @@ type params struct {
func testStorage(p *params, t *testing.T) { func testStorage(p *params, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
target := p.dir + string(os.PathSeparator) + p.key + ".data" target := p.dir + string(os.PathSeparator) + p.key + ".data"
fs, err := flatfs.CreateOrOpen(temp, p.shard, false) fs, err := flatfs.CreateOrOpen(temp, p.shard, false)
...@@ -256,6 +344,7 @@ func TestStorage(t *testing.T) { ...@@ -256,6 +344,7 @@ func TestStorage(t *testing.T) {
func testHasNotFound(dirFunc mkShardFunc, t *testing.T) { func testHasNotFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -277,6 +366,7 @@ func TestHasNotFound(t *testing.T) { tryAllShardFuncs(t, testHasNotFound) } ...@@ -277,6 +366,7 @@ func TestHasNotFound(t *testing.T) { tryAllShardFuncs(t, testHasNotFound) }
func testHasFound(dirFunc mkShardFunc, t *testing.T) { func testHasFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -303,6 +393,7 @@ func TestHasFound(t *testing.T) { tryAllShardFuncs(t, testHasFound) } ...@@ -303,6 +393,7 @@ func TestHasFound(t *testing.T) { tryAllShardFuncs(t, testHasFound) }
func testGetSizeFound(dirFunc mkShardFunc, t *testing.T) { func testGetSizeFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -321,6 +412,7 @@ func TestGetSizeFound(t *testing.T) { tryAllShardFuncs(t, testGetSizeFound) } ...@@ -321,6 +412,7 @@ func TestGetSizeFound(t *testing.T) { tryAllShardFuncs(t, testGetSizeFound) }
func testGetSizeNotFound(dirFunc mkShardFunc, t *testing.T) { func testGetSizeNotFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -347,6 +439,7 @@ func TestGetSizeNotFound(t *testing.T) { tryAllShardFuncs(t, testGetSizeNotFound ...@@ -347,6 +439,7 @@ func TestGetSizeNotFound(t *testing.T) { tryAllShardFuncs(t, testGetSizeNotFound
func testDeleteNotFound(dirFunc mkShardFunc, t *testing.T) { func testDeleteNotFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -365,6 +458,7 @@ func TestDeleteNotFound(t *testing.T) { tryAllShardFuncs(t, testDeleteNotFound) ...@@ -365,6 +458,7 @@ func TestDeleteNotFound(t *testing.T) { tryAllShardFuncs(t, testDeleteNotFound)
func testDeleteFound(dirFunc mkShardFunc, t *testing.T) { func testDeleteFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -394,6 +488,7 @@ func TestDeleteFound(t *testing.T) { tryAllShardFuncs(t, testDeleteFound) } ...@@ -394,6 +488,7 @@ func TestDeleteFound(t *testing.T) { tryAllShardFuncs(t, testDeleteFound) }
func testQuerySimple(dirFunc mkShardFunc, t *testing.T) { func testQuerySimple(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -434,6 +529,7 @@ func TestQuerySimple(t *testing.T) { tryAllShardFuncs(t, testQuerySimple) } ...@@ -434,6 +529,7 @@ func TestQuerySimple(t *testing.T) { tryAllShardFuncs(t, testQuerySimple) }
func testDiskUsage(dirFunc mkShardFunc, t *testing.T) { func testDiskUsage(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -555,6 +651,7 @@ func TestDiskUsageDoubleCount(t *testing.T) { ...@@ -555,6 +651,7 @@ func TestDiskUsageDoubleCount(t *testing.T) {
func testDiskUsageDoubleCount(dirFunc mkShardFunc, t *testing.T) { func testDiskUsageDoubleCount(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -628,6 +725,7 @@ func testDiskUsageDoubleCount(dirFunc mkShardFunc, t *testing.T) { ...@@ -628,6 +725,7 @@ func testDiskUsageDoubleCount(dirFunc mkShardFunc, t *testing.T) {
func testDiskUsageBatch(dirFunc mkShardFunc, t *testing.T) { func testDiskUsageBatch(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -728,6 +826,7 @@ func TestDiskUsageBatch(t *testing.T) { tryAllShardFuncs(t, testDiskUsageBatch) ...@@ -728,6 +826,7 @@ func TestDiskUsageBatch(t *testing.T) { tryAllShardFuncs(t, testDiskUsageBatch)
func testDiskUsageEstimation(dirFunc mkShardFunc, t *testing.T) { func testDiskUsageEstimation(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -812,6 +911,7 @@ func TestDiskUsageEstimation(t *testing.T) { tryAllShardFuncs(t, testDiskUsageEs ...@@ -812,6 +911,7 @@ func TestDiskUsageEstimation(t *testing.T) { tryAllShardFuncs(t, testDiskUsageEs
func testBatchPut(dirFunc mkShardFunc, t *testing.T) { func testBatchPut(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -827,6 +927,7 @@ func TestBatchPut(t *testing.T) { tryAllShardFuncs(t, testBatchPut) } ...@@ -827,6 +927,7 @@ func TestBatchPut(t *testing.T) { tryAllShardFuncs(t, testBatchPut) }
func testBatchDelete(dirFunc mkShardFunc, t *testing.T) { func testBatchDelete(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -842,6 +943,7 @@ func TestBatchDelete(t *testing.T) { tryAllShardFuncs(t, testBatchDelete) } ...@@ -842,6 +943,7 @@ func TestBatchDelete(t *testing.T) { tryAllShardFuncs(t, testBatchDelete) }
func testClose(dirFunc mkShardFunc, t *testing.T) { func testClose(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -921,6 +1023,7 @@ func TestNonDatastoreDir(t *testing.T) { ...@@ -921,6 +1023,7 @@ func TestNonDatastoreDir(t *testing.T) {
func TestNoCluster(t *testing.T) { func TestNoCluster(t *testing.T) {
tempdir, cleanup := tempdir(t) tempdir, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, tempdir)
fs, err := flatfs.CreateOrOpen(tempdir, flatfs.NextToLast(1), false) fs, err := flatfs.CreateOrOpen(tempdir, flatfs.NextToLast(1), false)
if err != nil { if err != nil {
...@@ -1079,6 +1182,7 @@ func TestQueryLeak(t *testing.T) { ...@@ -1079,6 +1182,7 @@ func TestQueryLeak(t *testing.T) {
func TestSuite(t *testing.T) { func TestSuite(t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, flatfs.Prefix(2), false) fs, err := flatfs.CreateOrOpen(temp, flatfs.Prefix(2), false)
if err != nil { if err != nil {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment