Commit 1d9a4fcf authored by Steven Allen's avatar Steven Allen

fix panic on write after close

(and make Close threadsafe)
parent 43fc5aaf
...@@ -170,8 +170,8 @@ func Move(oldPath string, newPath string, out io.Writer) error { ...@@ -170,8 +170,8 @@ func Move(oldPath string, newPath string, out io.Writer) error {
func moveKey(oldDS *Datastore, newDS *Datastore, key datastore.Key) error { func moveKey(oldDS *Datastore, newDS *Datastore, key datastore.Key) error {
_, oldPath := oldDS.encode(key) _, oldPath := oldDS.encode(key)
dir, newPath := newDS.encode(key) dir, newPath := newDS.encode(key)
err := newDS.makeDirNoSync(dir) err := os.Mkdir(dir, 0755)
if err != nil { if err != nil && !os.IsExist(err) {
return err return err
} }
err = os.Rename(oldPath, newPath) err = os.Rename(oldPath, newPath)
......
...@@ -94,6 +94,7 @@ var ( ...@@ -94,6 +94,7 @@ var (
ErrDatastoreExists = errors.New("datastore already exists") ErrDatastoreExists = errors.New("datastore already exists")
ErrDatastoreDoesNotExist = errors.New("datastore directory does not exist") ErrDatastoreDoesNotExist = errors.New("datastore directory does not exist")
ErrShardingFileMissing = fmt.Errorf("%s file not found in datastore", SHARDING_FN) ErrShardingFileMissing = fmt.Errorf("%s file not found in datastore", SHARDING_FN)
ErrClosed = errors.New("datastore closed")
) )
func init() { func init() {
...@@ -123,9 +124,13 @@ type Datastore struct { ...@@ -123,9 +124,13 @@ type Datastore struct {
dirty bool dirty bool
storedValue diskUsageValue storedValue diskUsageValue
// Used to trigger a checkpoint.
checkpointCh chan struct{} checkpointCh chan struct{}
done chan struct{} done chan struct{}
shutdownLock sync.RWMutex
shutdown bool
// opMap handles concurrent write operations (put/delete) // opMap handles concurrent write operations (put/delete)
// to the same key // to the same key
opMap *opMap opMap *opMap
...@@ -238,12 +243,14 @@ func Open(path string, syncFiles bool) (*Datastore, error) { ...@@ -238,12 +243,14 @@ func Open(path string, syncFiles bool) (*Datastore, error) {
} }
fs := &Datastore{ fs := &Datastore{
path: path, path: path,
shardStr: shardId.String(), shardStr: shardId.String(),
getDir: shardId.Func(), getDir: shardId.Func(),
sync: syncFiles, sync: syncFiles,
diskUsage: 0, checkpointCh: make(chan struct{}, 1),
opMap: new(opMap), done: make(chan struct{}),
diskUsage: 0,
opMap: new(opMap),
} }
// This sets diskUsage to the correct value // This sets diskUsage to the correct value
...@@ -257,8 +264,6 @@ func Open(path string, syncFiles bool) (*Datastore, error) { ...@@ -257,8 +264,6 @@ func Open(path string, syncFiles bool) (*Datastore, error) {
return nil, err return nil, err
} }
fs.checkpointCh = make(chan struct{}, 1)
fs.done = make(chan struct{})
go fs.checkpointLoop() go fs.checkpointLoop()
return fs, nil return fs, nil
} }
...@@ -356,6 +361,12 @@ var putMaxRetries = 6 ...@@ -356,6 +361,12 @@ var putMaxRetries = 6
// concurrent Put and a Delete operation, we cannot guarantee which one // concurrent Put and a Delete operation, we cannot guarantee which one
// will win. // will win.
func (fs *Datastore) Put(key datastore.Key, value []byte) error { func (fs *Datastore) Put(key datastore.Key, value []byte) error {
fs.shutdownLock.RLock()
defer fs.shutdownLock.RUnlock()
if fs.shutdown {
return ErrClosed
}
var err error var err error
for i := 1; i <= putMaxRetries; i++ { for i := 1; i <= putMaxRetries; i++ {
err = fs.doWriteOp(&op{ err = fs.doWriteOp(&op{
...@@ -466,6 +477,12 @@ func (fs *Datastore) doPut(key datastore.Key, val []byte) error { ...@@ -466,6 +477,12 @@ func (fs *Datastore) doPut(key datastore.Key, val []byte) error {
} }
func (fs *Datastore) putMany(data map[datastore.Key]interface{}) error { func (fs *Datastore) putMany(data map[datastore.Key]interface{}) error {
fs.shutdownLock.RLock()
defer fs.shutdownLock.RUnlock()
if fs.shutdown {
return ErrClosed
}
var dirsToSync []string var dirsToSync []string
files := make(map[*os.File]*op) files := make(map[*os.File]*op)
...@@ -594,6 +611,12 @@ func (fs *Datastore) GetSize(key datastore.Key) (size int, err error) { ...@@ -594,6 +611,12 @@ func (fs *Datastore) GetSize(key datastore.Key) (size int, err error) {
// the Put() explanation about the handling of concurrent write // the Put() explanation about the handling of concurrent write
// operations to the same key. // operations to the same key.
func (fs *Datastore) Delete(key datastore.Key) error { func (fs *Datastore) Delete(key datastore.Key) error {
fs.shutdownLock.RLock()
defer fs.shutdownLock.RUnlock()
if fs.shutdown {
return ErrClosed
}
return fs.doWriteOp(&op{ return fs.doWriteOp(&op{
typ: opDelete, typ: opDelete,
key: key, key: key,
...@@ -845,6 +868,8 @@ func (fs *Datastore) checkpointDiskUsage() { ...@@ -845,6 +868,8 @@ func (fs *Datastore) checkpointDiskUsage() {
} }
func (fs *Datastore) checkpointLoop() { func (fs *Datastore) checkpointLoop() {
defer close(fs.done)
timerActive := true timerActive := true
timer := time.NewTimer(0) timer := time.NewTimer(0)
defer timer.Stop() defer timer.Stop()
...@@ -858,7 +883,6 @@ func (fs *Datastore) checkpointLoop() { ...@@ -858,7 +883,6 @@ func (fs *Datastore) checkpointLoop() {
if fs.dirty { if fs.dirty {
log.Errorf("could not store final value of disk usage to file, future estimates may be inaccurate") log.Errorf("could not store final value of disk usage to file, future estimates may be inaccurate")
} }
fs.done <- struct{}{}
return return
} }
// If the difference between the checkpointed disk usage and // If the difference between the checkpointed disk usage and
...@@ -1023,11 +1047,14 @@ func (fs *Datastore) walk(path string, result *query.ResultBuilder) error { ...@@ -1023,11 +1047,14 @@ func (fs *Datastore) walk(path string, result *query.ResultBuilder) error {
// operations will fail but readonly operations will continue to // operations will fail but readonly operations will continue to
// function // function
func (fs *Datastore) deactivate() error { func (fs *Datastore) deactivate() error {
if fs.checkpointCh != nil { fs.shutdownLock.Lock()
close(fs.checkpointCh) defer fs.shutdownLock.Unlock()
<-fs.done if fs.shutdown {
fs.checkpointCh = nil return nil
} }
fs.shutdown = true
close(fs.checkpointCh)
<-fs.done
return nil return nil
} }
......
...@@ -810,6 +810,30 @@ func testBatchDelete(dirFunc mkShardFunc, t *testing.T) { ...@@ -810,6 +810,30 @@ func testBatchDelete(dirFunc mkShardFunc, t *testing.T) {
func TestBatchDelete(t *testing.T) { tryAllShardFuncs(t, testBatchDelete) } func TestBatchDelete(t *testing.T) { tryAllShardFuncs(t, testBatchDelete) }
func testClose(dirFunc mkShardFunc, t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
err = fs.Put(datastore.NewKey("quux"), []byte("foobar"))
if err != nil {
t.Fatalf("Put fail: %v\n", err)
}
fs.Close()
err = fs.Put(datastore.NewKey("qaax"), []byte("foobar"))
if err == nil {
t.Fatal("expected put on closed datastore to fail")
}
}
func TestClose(t *testing.T) { tryAllShardFuncs(t, testClose) }
func TestSHARDINGFile(t *testing.T) { func TestSHARDINGFile(t *testing.T) {
tempdir, cleanup := tempdir(t) tempdir, cleanup := tempdir(t)
defer cleanup() defer cleanup()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment