Unverified Commit 6bc9dabc authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #82 from ipfs/feat/retry

move retries lower and retry rename ops
parents bcc4dcb5 c5ecd444
...@@ -52,6 +52,10 @@ var ( ...@@ -52,6 +52,10 @@ var (
// RetryDelay is a timeout for a backoff on retrying operations // RetryDelay is a timeout for a backoff on retrying operations
// that fail due to transient errors like too many file descriptors open. // that fail due to transient errors like too many file descriptors open.
RetryDelay = time.Millisecond * 200 RetryDelay = time.Millisecond * 200
// RetryAttempts is the maximum number of retries that will be attempted
// before giving up.
RetryAttempts = 6
) )
const ( const (
...@@ -364,13 +368,21 @@ func (fs *Datastore) renameAndUpdateDiskUsage(tmpPath, path string) error { ...@@ -364,13 +368,21 @@ func (fs *Datastore) renameAndUpdateDiskUsage(tmpPath, path string) error {
// Rename and add new file's diskUsage. If the rename fails, // Rename and add new file's diskUsage. If the rename fails,
// it will either a) Re-add the size of an existing file, which // it will either a) Re-add the size of an existing file, which
// was sustracted before b) Add 0 if there is no existing file. // was sustracted before b) Add 0 if there is no existing file.
err = os.Rename(tmpPath, path) for i := 0; i < RetryAttempts; i++ {
err = os.Rename(tmpPath, path)
// if there's no error, or the source file doesn't exist, abort.
if err == nil || os.IsNotExist(err) {
break
}
// Otherwise, this could be a transient error due to some other
// process holding open one of the files. Wait a bit and then
// retry.
time.Sleep(time.Duration(i+1) * RetryDelay)
}
fs.updateDiskUsage(path, true) fs.updateDiskUsage(path, true)
return err return err
} }
var putMaxRetries = 6
// Put stores a key/value in the datastore. // Put stores a key/value in the datastore.
// //
// Note, that we do not guarantee order of write operations (Put or Delete) // Note, that we do not guarantee order of write operations (Put or Delete)
...@@ -392,24 +404,11 @@ func (fs *Datastore) Put(key datastore.Key, value []byte) error { ...@@ -392,24 +404,11 @@ func (fs *Datastore) Put(key datastore.Key, value []byte) error {
return ErrClosed return ErrClosed
} }
var err error _, err := fs.doWriteOp(&op{
for i := 1; i <= putMaxRetries; i++ { typ: opPut,
_, err = fs.doWriteOp(&op{ key: key,
typ: opPut, v: value,
key: key, })
v: value,
})
if err == nil {
break
}
if !strings.Contains(err.Error(), "too many open files") {
break
}
log.Errorf("too many open files, retrying in %dms", 100*i)
time.Sleep(time.Millisecond * 100 * time.Duration(i))
}
return err return err
} }
...@@ -462,15 +461,7 @@ func (fs *Datastore) doWriteOp(oper *op) (done bool, err error) { ...@@ -462,15 +461,7 @@ func (fs *Datastore) doWriteOp(oper *op) (done bool, err error) {
return false, nil return false, nil
} }
// Do the operation err = fs.doOp(oper)
for i := 0; i < 6; i++ {
err = fs.doOp(oper)
if err == nil || !isTooManyFDError(err) {
break
}
time.Sleep(time.Duration(i+1) * RetryDelay)
}
// Finish it. If no error, it will signal other operations // Finish it. If no error, it will signal other operations
// waiting on this result to succeed. Otherwise, they will // waiting on this result to succeed. Otherwise, they will
...@@ -585,22 +576,17 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error { ...@@ -585,22 +576,17 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {
} }
dirsToSync[dir] = struct{}{} dirsToSync[dir] = struct{}{}
tmp, err := fs.tempFile() tmp, err := fs.tempFileOnce()
// Fallback retry for temporary error. // If we have too many files open, try closing some, then try
if err != nil && isTooManyFDError(err) { // again repeatedly.
if isTooManyFDError(err) {
if err = closer(); err != nil { if err = closer(); err != nil {
return err return err
} }
for i := 0; i < 6; i++ { tmp, err = fs.tempFile()
time.Sleep(time.Duration(i+1) * RetryDelay)
tmp, err = fs.tempFile()
if err == nil || !isTooManyFDError(err) {
break
}
}
} }
if err != nil { if err != nil {
return err return err
} }
...@@ -740,16 +726,22 @@ func (fs *Datastore) doDelete(key datastore.Key) error { ...@@ -740,16 +726,22 @@ func (fs *Datastore) doDelete(key datastore.Key) error {
fSize := fileSize(path) fSize := fileSize(path)
switch err := os.Remove(path); { var err error
case err == nil: for i := 0; i < RetryAttempts; i++ {
err = os.Remove(path)
if err == nil {
break
} else if os.IsNotExist(err) {
return nil
}
}
if err == nil {
atomic.AddInt64(&fs.diskUsage, -fSize) atomic.AddInt64(&fs.diskUsage, -fSize)
fs.checkpointDiskUsage() fs.checkpointDiskUsage()
return nil
case os.IsNotExist(err):
return nil
default:
return err
} }
return err
} }
func (fs *Datastore) Query(q query.Query) (query.Results, error) { func (fs *Datastore) Query(q query.Query) (query.Results, error) {
...@@ -1122,6 +1114,10 @@ func (fs *Datastore) tempFile() (*os.File, error) { ...@@ -1122,6 +1114,10 @@ func (fs *Datastore) tempFile() (*os.File, error) {
return file, err return file, err
} }
func (fs *Datastore) tempFileOnce() (*os.File, error) {
return tempFileOnce(fs.tempPath, "temp-")
}
// only call this on directories. // only call this on directories.
func (fs *Datastore) walk(path string, qrb *query.ResultBuilder) error { func (fs *Datastore) walk(path string, qrb *query.ResultBuilder) error {
dir, err := os.Open(path) dir, err := os.Open(path)
......
...@@ -23,7 +23,7 @@ func DirIsEmpty(name string) (bool, error) { ...@@ -23,7 +23,7 @@ func DirIsEmpty(name string) (bool, error) {
func readFile(filename string) (data []byte, err error) { func readFile(filename string) (data []byte, err error) {
// Fallback retry for temporary error. // Fallback retry for temporary error.
for i := 0; i < 6; i++ { for i := 0; i < RetryAttempts; i++ {
data, err = readFileOnce(filename) data, err = readFileOnce(filename)
if err == nil || !isTooManyFDError(err) { if err == nil || !isTooManyFDError(err) {
break break
...@@ -32,3 +32,14 @@ func readFile(filename string) (data []byte, err error) { ...@@ -32,3 +32,14 @@ func readFile(filename string) (data []byte, err error) {
} }
return data, err return data, err
} }
func tempFile(dir, pattern string) (fi *os.File, err error) {
for i := 0; i < RetryAttempts; i++ {
fi, err = tempFileOnce(dir, pattern)
if err == nil || !isTooManyFDError(err) {
break
}
time.Sleep(time.Duration(i+1) * RetryDelay)
}
return fi, err
}
...@@ -7,7 +7,7 @@ import ( ...@@ -7,7 +7,7 @@ import (
"os" "os"
) )
func tempFile(dir, pattern string) (*os.File, error) { func tempFileOnce(dir, pattern string) (*os.File, error) {
return ioutil.TempFile(dir, pattern) return ioutil.TempFile(dir, pattern)
} }
......
...@@ -51,7 +51,7 @@ func prefixAndSuffix(pattern string) (prefix, suffix string) { ...@@ -51,7 +51,7 @@ func prefixAndSuffix(pattern string) (prefix, suffix string) {
return return
} }
func tempFile(dir, pattern string) (f *os.File, err error) { func tempFileOnce(dir, pattern string) (f *os.File, err error) {
if dir == "" { if dir == "" {
dir = os.TempDir() dir = os.TempDir()
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment