Unverified Commit 319998e6 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #76 from ipfs/fix/remove-temporary-files-put-many

fix: remove temporary files when multiple write operations conflict
parents 53d4c9b1 579b3c8a
...@@ -394,7 +394,7 @@ func (fs *Datastore) Put(key datastore.Key, value []byte) error { ...@@ -394,7 +394,7 @@ func (fs *Datastore) Put(key datastore.Key, value []byte) error {
var err error var err error
for i := 1; i <= putMaxRetries; i++ { for i := 1; i <= putMaxRetries; i++ {
err = fs.doWriteOp(&op{ _, err = fs.doWriteOp(&op{
typ: opPut, typ: opPut,
key: key, key: key,
v: value, v: value,
...@@ -451,16 +451,18 @@ func isTooManyFDError(err error) bool { ...@@ -451,16 +451,18 @@ func isTooManyFDError(err error) bool {
// we assume that the first succeeding operation // we assume that the first succeeding operation
// on that key was the last one to happen after // on that key was the last one to happen after
// all successful others. // all successful others.
func (fs *Datastore) doWriteOp(oper *op) error { //
// done is true if we actually performed the operation, false if we skipped or
// failed.
func (fs *Datastore) doWriteOp(oper *op) (done bool, err error) {
keyStr := oper.key.String() keyStr := oper.key.String()
opRes := fs.opMap.Begin(keyStr) opRes := fs.opMap.Begin(keyStr)
if opRes == nil { // nothing to do, a concurrent op succeeded if opRes == nil { // nothing to do, a concurrent op succeeded
return nil return false, nil
} }
// Do the operation // Do the operation
var err error
for i := 0; i < 6; i++ { for i := 0; i < 6; i++ {
err = fs.doOp(oper) err = fs.doOp(oper)
...@@ -474,7 +476,7 @@ func (fs *Datastore) doWriteOp(oper *op) error { ...@@ -474,7 +476,7 @@ func (fs *Datastore) doWriteOp(oper *op) error {
// waiting on this result to succeed. Otherwise, they will // waiting on this result to succeed. Otherwise, they will
// retry. // retry.
opRes.Finish(err == nil) opRes.Finish(err == nil)
return err return err == nil, err
} }
func (fs *Datastore) doPut(key datastore.Key, val []byte) error { func (fs *Datastore) doPut(key datastore.Key, val []byte) error {
...@@ -623,12 +625,13 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error { ...@@ -623,12 +625,13 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {
// move files to their proper places // move files to their proper places
for fi, op := range files { for fi, op := range files {
err := fs.doWriteOp(op) done, err := fs.doWriteOp(op)
if err != nil { if err != nil {
return err return err
} else if done {
// signify removed
ops[fi] = 2
} }
// signify removed
ops[fi] = 2
} }
// now sync the dirs for those files // now sync the dirs for those files
...@@ -733,11 +736,12 @@ func (fs *Datastore) Delete(key datastore.Key) error { ...@@ -733,11 +736,12 @@ func (fs *Datastore) Delete(key datastore.Key) error {
return ErrClosed return ErrClosed
} }
return fs.doWriteOp(&op{ _, err := fs.doWriteOp(&op{
typ: opDelete, typ: opDelete,
key: key, key: key,
v: nil, v: nil,
}) })
return err
} }
// This function always runs within an opLock for the given // This function always runs within an opLock for the given
......
...@@ -23,6 +23,26 @@ import ( ...@@ -23,6 +23,26 @@ import (
"github.com/ipfs/go-ds-flatfs" "github.com/ipfs/go-ds-flatfs"
) )
func checkTemp(t *testing.T, dir string) {
tempDir, err := os.Open(filepath.Join(dir, ".temp"))
if err != nil {
t.Errorf("failed to open temp dir: %s", err)
return
}
names, err := tempDir.Readdirnames(-1)
tempDir.Close()
if err != nil {
t.Errorf("failed to read temp dir: %s", err)
return
}
for _, name := range names {
t.Errorf("found leftover temporary file: %s", name)
}
}
func tempdir(t testing.TB) (path string, cleanup func()) { func tempdir(t testing.TB) (path string, cleanup func()) {
path, err := ioutil.TempDir("", "test-datastore-flatfs-") path, err := ioutil.TempDir("", "test-datastore-flatfs-")
if err != nil { if err != nil {
...@@ -45,9 +65,73 @@ func tryAllShardFuncs(t *testing.T, testFunc func(mkShardFunc, *testing.T)) { ...@@ -45,9 +65,73 @@ func tryAllShardFuncs(t *testing.T, testFunc func(mkShardFunc, *testing.T)) {
type mkShardFunc func(int) *flatfs.ShardIdV1 type mkShardFunc func(int) *flatfs.ShardIdV1
func testBatch(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
defer fs.Close()
batches := make([]datastore.Batch, 9)
for i := range batches {
batch, err := fs.Batch()
if err != nil {
t.Fatal(err)
}
batches[i] = batch
err = batch.Put(datastore.NewKey("QUUX"), []byte("foo"))
if err != nil {
t.Fatal(err)
}
err = batch.Put(datastore.NewKey(fmt.Sprintf("Q%dX", i)), []byte(fmt.Sprintf("bar%d", i)))
if err != nil {
t.Fatal(err)
}
}
var wg sync.WaitGroup
wg.Add(len(batches))
for _, batch := range batches {
batch := batch
go func() {
defer wg.Done()
err := batch.Commit()
if err != nil {
t.Error(err)
}
}()
}
check := func(key, expected string) {
actual, err := fs.Get(datastore.NewKey(key))
if err != nil {
t.Fatalf("get for key %s, error: %s", key, err)
}
if string(actual) != expected {
t.Fatalf("for key %s, expected %s, got %s", key, expected, string(actual))
}
}
wg.Wait()
check("QUUX", "foo")
for i := range batches {
check(fmt.Sprintf("Q%dX", i), fmt.Sprintf("bar%d", i))
}
}
func TestBatch(t *testing.T) { tryAllShardFuncs(t, testBatch) }
func testPut(dirFunc mkShardFunc, t *testing.T) { func testPut(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -71,6 +155,7 @@ func TestPut(t *testing.T) { tryAllShardFuncs(t, testPut) } ...@@ -71,6 +155,7 @@ func TestPut(t *testing.T) { tryAllShardFuncs(t, testPut) }
func testGet(dirFunc mkShardFunc, t *testing.T) { func testGet(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -103,6 +188,7 @@ func TestGet(t *testing.T) { tryAllShardFuncs(t, testGet) } ...@@ -103,6 +188,7 @@ func TestGet(t *testing.T) { tryAllShardFuncs(t, testGet) }
func testPutOverwrite(dirFunc mkShardFunc, t *testing.T) { func testPutOverwrite(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -138,6 +224,7 @@ func TestPutOverwrite(t *testing.T) { tryAllShardFuncs(t, testPutOverwrite) } ...@@ -138,6 +224,7 @@ func TestPutOverwrite(t *testing.T) { tryAllShardFuncs(t, testPutOverwrite) }
func testGetNotFoundError(dirFunc mkShardFunc, t *testing.T) { func testGetNotFoundError(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -162,6 +249,7 @@ type params struct { ...@@ -162,6 +249,7 @@ type params struct {
func testStorage(p *params, t *testing.T) { func testStorage(p *params, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
target := p.dir + string(os.PathSeparator) + p.key + ".data" target := p.dir + string(os.PathSeparator) + p.key + ".data"
fs, err := flatfs.CreateOrOpen(temp, p.shard, false) fs, err := flatfs.CreateOrOpen(temp, p.shard, false)
...@@ -256,6 +344,7 @@ func TestStorage(t *testing.T) { ...@@ -256,6 +344,7 @@ func TestStorage(t *testing.T) {
func testHasNotFound(dirFunc mkShardFunc, t *testing.T) { func testHasNotFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -277,6 +366,7 @@ func TestHasNotFound(t *testing.T) { tryAllShardFuncs(t, testHasNotFound) } ...@@ -277,6 +366,7 @@ func TestHasNotFound(t *testing.T) { tryAllShardFuncs(t, testHasNotFound) }
func testHasFound(dirFunc mkShardFunc, t *testing.T) { func testHasFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -303,6 +393,7 @@ func TestHasFound(t *testing.T) { tryAllShardFuncs(t, testHasFound) } ...@@ -303,6 +393,7 @@ func TestHasFound(t *testing.T) { tryAllShardFuncs(t, testHasFound) }
func testGetSizeFound(dirFunc mkShardFunc, t *testing.T) { func testGetSizeFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -321,6 +412,7 @@ func TestGetSizeFound(t *testing.T) { tryAllShardFuncs(t, testGetSizeFound) } ...@@ -321,6 +412,7 @@ func TestGetSizeFound(t *testing.T) { tryAllShardFuncs(t, testGetSizeFound) }
func testGetSizeNotFound(dirFunc mkShardFunc, t *testing.T) { func testGetSizeNotFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -347,6 +439,7 @@ func TestGetSizeNotFound(t *testing.T) { tryAllShardFuncs(t, testGetSizeNotFound ...@@ -347,6 +439,7 @@ func TestGetSizeNotFound(t *testing.T) { tryAllShardFuncs(t, testGetSizeNotFound
func testDeleteNotFound(dirFunc mkShardFunc, t *testing.T) { func testDeleteNotFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -365,6 +458,7 @@ func TestDeleteNotFound(t *testing.T) { tryAllShardFuncs(t, testDeleteNotFound) ...@@ -365,6 +458,7 @@ func TestDeleteNotFound(t *testing.T) { tryAllShardFuncs(t, testDeleteNotFound)
func testDeleteFound(dirFunc mkShardFunc, t *testing.T) { func testDeleteFound(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -394,6 +488,7 @@ func TestDeleteFound(t *testing.T) { tryAllShardFuncs(t, testDeleteFound) } ...@@ -394,6 +488,7 @@ func TestDeleteFound(t *testing.T) { tryAllShardFuncs(t, testDeleteFound) }
func testQuerySimple(dirFunc mkShardFunc, t *testing.T) { func testQuerySimple(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -434,6 +529,7 @@ func TestQuerySimple(t *testing.T) { tryAllShardFuncs(t, testQuerySimple) } ...@@ -434,6 +529,7 @@ func TestQuerySimple(t *testing.T) { tryAllShardFuncs(t, testQuerySimple) }
func testDiskUsage(dirFunc mkShardFunc, t *testing.T) { func testDiskUsage(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -555,6 +651,7 @@ func TestDiskUsageDoubleCount(t *testing.T) { ...@@ -555,6 +651,7 @@ func TestDiskUsageDoubleCount(t *testing.T) {
func testDiskUsageDoubleCount(dirFunc mkShardFunc, t *testing.T) { func testDiskUsageDoubleCount(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -628,6 +725,7 @@ func testDiskUsageDoubleCount(dirFunc mkShardFunc, t *testing.T) { ...@@ -628,6 +725,7 @@ func testDiskUsageDoubleCount(dirFunc mkShardFunc, t *testing.T) {
func testDiskUsageBatch(dirFunc mkShardFunc, t *testing.T) { func testDiskUsageBatch(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -728,6 +826,7 @@ func TestDiskUsageBatch(t *testing.T) { tryAllShardFuncs(t, testDiskUsageBatch) ...@@ -728,6 +826,7 @@ func TestDiskUsageBatch(t *testing.T) { tryAllShardFuncs(t, testDiskUsageBatch)
func testDiskUsageEstimation(dirFunc mkShardFunc, t *testing.T) { func testDiskUsageEstimation(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -812,6 +911,7 @@ func TestDiskUsageEstimation(t *testing.T) { tryAllShardFuncs(t, testDiskUsageEs ...@@ -812,6 +911,7 @@ func TestDiskUsageEstimation(t *testing.T) { tryAllShardFuncs(t, testDiskUsageEs
func testBatchPut(dirFunc mkShardFunc, t *testing.T) { func testBatchPut(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -827,6 +927,7 @@ func TestBatchPut(t *testing.T) { tryAllShardFuncs(t, testBatchPut) } ...@@ -827,6 +927,7 @@ func TestBatchPut(t *testing.T) { tryAllShardFuncs(t, testBatchPut) }
func testBatchDelete(dirFunc mkShardFunc, t *testing.T) { func testBatchDelete(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -842,6 +943,7 @@ func TestBatchDelete(t *testing.T) { tryAllShardFuncs(t, testBatchDelete) } ...@@ -842,6 +943,7 @@ func TestBatchDelete(t *testing.T) { tryAllShardFuncs(t, testBatchDelete) }
func testClose(dirFunc mkShardFunc, t *testing.T) { func testClose(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil { if err != nil {
...@@ -921,6 +1023,7 @@ func TestNonDatastoreDir(t *testing.T) { ...@@ -921,6 +1023,7 @@ func TestNonDatastoreDir(t *testing.T) {
func TestNoCluster(t *testing.T) { func TestNoCluster(t *testing.T) {
tempdir, cleanup := tempdir(t) tempdir, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, tempdir)
fs, err := flatfs.CreateOrOpen(tempdir, flatfs.NextToLast(1), false) fs, err := flatfs.CreateOrOpen(tempdir, flatfs.NextToLast(1), false)
if err != nil { if err != nil {
...@@ -1079,6 +1182,7 @@ func TestQueryLeak(t *testing.T) { ...@@ -1079,6 +1182,7 @@ func TestQueryLeak(t *testing.T) {
func TestSuite(t *testing.T) { func TestSuite(t *testing.T) {
temp, cleanup := tempdir(t) temp, cleanup := tempdir(t)
defer cleanup() defer cleanup()
defer checkTemp(t, temp)
fs, err := flatfs.CreateOrOpen(temp, flatfs.Prefix(2), false) fs, err := flatfs.CreateOrOpen(temp, flatfs.Prefix(2), false)
if err != nil { if err != nil {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment