Commit bc42aaa6 authored by Steven Allen's avatar Steven Allen

cleanup putMany implementation

* Use a map for dirsToSync to avoid syncing the same dir multiple times.
* Keep track of files in a slice, and use offsets into the slice to keep track
of which ones have been closed/removed.

Also, record the fact that we've created a temporary file _before_ we try to
write to it, in case the write fails. That way, we'll try to remove it when we
abort.
parent c1f339f4
...@@ -537,42 +537,43 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error { ...@@ -537,42 +537,43 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {
return ErrClosed return ErrClosed
} }
var dirsToSync []string type putManyOp struct {
key datastore.Key
files := make(map[*os.File]*op, len(data)) file *os.File
ops := make(map[*os.File]int, len(data)) dstPath string
srcPath string
}
var (
dirsToSync = make(map[string]struct{}, len(data))
files = make([]putManyOp, 0, len(data))
closed int
removed int
)
defer func() { defer func() {
for fi := range files { for closed < len(files) {
val := ops[fi] files[closed].file.Close()
switch val { closed++
case 0:
_ = fi.Close()
fallthrough
case 1:
_ = os.Remove(fi.Name())
} }
for removed < len(files) {
_ = os.Remove(files[removed].srcPath)
removed++
} }
}() }()
closer := func() error { closer := func() error {
for fi := range files { for closed < len(files) {
if ops[fi] != 0 { fi := files[closed].file
continue
}
if fs.sync { if fs.sync {
if err := syncFile(fi); err != nil { if err := syncFile(fi); err != nil {
return err return err
} }
} }
if err := fi.Close(); err != nil { if err := fi.Close(); err != nil {
return err return err
} }
closed++
// signify closed
ops[fi] = 1
} }
return nil return nil
} }
...@@ -582,7 +583,7 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error { ...@@ -582,7 +583,7 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {
if err := fs.makeDirNoSync(dir); err != nil { if err := fs.makeDirNoSync(dir); err != nil {
return err return err
} }
dirsToSync = append(dirsToSync, dir) dirsToSync[dir] = struct{}{}
tmp, err := fs.tempFile() tmp, err := fs.tempFile()
...@@ -604,16 +605,17 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error { ...@@ -604,16 +605,17 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {
return err return err
} }
// Do this _first_ so we close it if writing fails.
files = append(files, putManyOp{
key: key,
file: tmp,
dstPath: path,
srcPath: tmp.Name(),
})
if _, err := tmp.Write(value); err != nil { if _, err := tmp.Write(value); err != nil {
return err return err
} }
files[tmp] = &op{
typ: opRename,
path: path,
tmp: tmp.Name(),
key: key,
}
} }
// Now we sync everything // Now we sync everything
...@@ -624,19 +626,24 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error { ...@@ -624,19 +626,24 @@ func (fs *Datastore) putMany(data map[datastore.Key][]byte) error {
} }
// move files to their proper places // move files to their proper places
for fi, op := range files { for _, pop := range files {
done, err := fs.doWriteOp(op) done, err := fs.doWriteOp(&op{
typ: opRename,
key: pop.key,
tmp: pop.srcPath,
path: pop.dstPath,
})
if err != nil { if err != nil {
return err return err
} else if done { } else if !done {
// signify removed _ = os.Remove(pop.file.Name())
ops[fi] = 2
} }
removed++
} }
// now sync the dirs for those files // now sync the dirs for those files
if fs.sync { if fs.sync {
for _, dir := range dirsToSync { for dir := range dirsToSync {
if err := syncDir(dir); err != nil { if err := syncDir(dir); err != nil {
return err return err
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment