Commit 9b83d068 authored by Steven Allen's avatar Steven Allen

feat: move retries lower

Instead or retrying the whole operation, retry the specific parts that can fail.

I believe this also fixes cases where we might end up with a retry loop within a
retry loop? I'm not sure.
parent bcc4dcb5
...@@ -52,6 +52,10 @@ var ( ...@@ -52,6 +52,10 @@ var (
// RetryDelay is a timeout for a backoff on retrying operations // RetryDelay is a timeout for a backoff on retrying operations
// that fail due to transient errors like too many file descriptors open. // that fail due to transient errors like too many file descriptors open.
RetryDelay = time.Millisecond * 200 RetryDelay = time.Millisecond * 200
// RetryAttempts is the maximum number of retries that will be attempted
// before giving up.
RetryAttempts = 6
) )
const ( const (
...@@ -369,8 +373,6 @@ func (fs *Datastore) renameAndUpdateDiskUsage(tmpPath, path string) error { ...@@ -369,8 +373,6 @@ func (fs *Datastore) renameAndUpdateDiskUsage(tmpPath, path string) error {
return err return err
} }
var putMaxRetries = 6
// Put stores a key/value in the datastore. // Put stores a key/value in the datastore.
// //
// Note, that we do not guarantee order of write operations (Put or Delete) // Note, that we do not guarantee order of write operations (Put or Delete)
...@@ -392,24 +394,11 @@ func (fs *Datastore) Put(key datastore.Key, value []byte) error { ...@@ -392,24 +394,11 @@ func (fs *Datastore) Put(key datastore.Key, value []byte) error {
return ErrClosed return ErrClosed
} }
var err error _, err := fs.doWriteOp(&op{
for i := 1; i <= putMaxRetries; i++ { typ: opPut,
_, err = fs.doWriteOp(&op{ key: key,
typ: opPut, v: value,
key: key, })
v: value,
})
if err == nil {
break
}
if !strings.Contains(err.Error(), "too many open files") {
break
}
log.Errorf("too many open files, retrying in %dms", 100*i)
time.Sleep(time.Millisecond * 100 * time.Duration(i))
}
return err return err
} }
...@@ -462,15 +451,7 @@ func (fs *Datastore) doWriteOp(oper *op) (done bool, err error) { ...@@ -462,15 +451,7 @@ func (fs *Datastore) doWriteOp(oper *op) (done bool, err error) {
return false, nil return false, nil
} }
// Do the operation err = fs.doOp(oper)
for i := 0; i < 6; i++ {
err = fs.doOp(oper)
if err == nil || !isTooManyFDError(err) {
break
}
time.Sleep(time.Duration(i+1) * RetryDelay)
}
// Finish it. If no error, it will signal other operations // Finish it. If no error, it will signal other operations
// waiting on this result to succeed. Otherwise, they will // waiting on this result to succeed. Otherwise, they will
...@@ -585,22 +566,17 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error { ...@@ -585,22 +566,17 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {
} }
dirsToSync[dir] = struct{}{} dirsToSync[dir] = struct{}{}
tmp, err := fs.tempFile() tmp, err := fs.tempFileOnce()
// Fallback retry for temporary error. // If we have too many files open, try closing some, then try
if err != nil && isTooManyFDError(err) { // again repeatedly.
if isTooManyFDError(err) {
if err = closer(); err != nil { if err = closer(); err != nil {
return err return err
} }
for i := 0; i < 6; i++ { tmp, err = fs.tempFile()
time.Sleep(time.Duration(i+1) * RetryDelay)
tmp, err = fs.tempFile()
if err == nil || !isTooManyFDError(err) {
break
}
}
} }
if err != nil { if err != nil {
return err return err
} }
...@@ -1122,6 +1098,11 @@ func (fs *Datastore) tempFile() (*os.File, error) { ...@@ -1122,6 +1098,11 @@ func (fs *Datastore) tempFile() (*os.File, error) {
return file, err return file, err
} }
func (fs *Datastore) tempFileOnce() (*os.File, error) {
file, err := tempFileOnce(fs.tempPath, "temp-")
return file, err
}
// only call this on directories. // only call this on directories.
func (fs *Datastore) walk(path string, qrb *query.ResultBuilder) error { func (fs *Datastore) walk(path string, qrb *query.ResultBuilder) error {
dir, err := os.Open(path) dir, err := os.Open(path)
......
...@@ -23,7 +23,7 @@ func DirIsEmpty(name string) (bool, error) { ...@@ -23,7 +23,7 @@ func DirIsEmpty(name string) (bool, error) {
func readFile(filename string) (data []byte, err error) { func readFile(filename string) (data []byte, err error) {
// Fallback retry for temporary error. // Fallback retry for temporary error.
for i := 0; i < 6; i++ { for i := 0; i < RetryAttempts; i++ {
data, err = readFileOnce(filename) data, err = readFileOnce(filename)
if err == nil || !isTooManyFDError(err) { if err == nil || !isTooManyFDError(err) {
break break
...@@ -32,3 +32,14 @@ func readFile(filename string) (data []byte, err error) { ...@@ -32,3 +32,14 @@ func readFile(filename string) (data []byte, err error) {
} }
return data, err return data, err
} }
func tempFile(dir, pattern string) (fi *os.File, err error) {
for i := 0; i < RetryAttempts; i++ {
fi, err = tempFileOnce(dir, pattern)
if err == nil || !isTooManyFDError(err) {
break
}
time.Sleep(time.Duration(i+1) * RetryDelay)
}
return fi, err
}
...@@ -7,7 +7,7 @@ import ( ...@@ -7,7 +7,7 @@ import (
"os" "os"
) )
func tempFile(dir, pattern string) (*os.File, error) { func tempFileOnce(dir, pattern string) (*os.File, error) {
return ioutil.TempFile(dir, pattern) return ioutil.TempFile(dir, pattern)
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment