Commit 933a589e authored by Kevin Atkinson's avatar Kevin Atkinson

Determine when to write the diskusage in the checkpoint go routine

This simplifies the code with perhaps a slightly higher overhead during
write operations.
parent 3633dfad
......@@ -100,7 +100,12 @@ type Datastore struct {
// sychronize all writes and directory changes for added safety
sync bool
diskUsage int64
// atmoic operations should always be used with diskUsage
diskUsage int64
// these values should only be used during internalization or
// inside the checkpoint loop
dirty bool
storedValue diskUsageValue
checkpointCh chan bool
......@@ -236,7 +241,12 @@ func Open(path string, syncFiles bool) (*Datastore, error) {
// elements in the datastore.
return nil, err
}
fs.checkpointLoop()
fs.checkpointCh = make(chan bool, 1)
fs.done = make(chan bool)
go func() {
fs.checkpointLoop()
}()
return fs, nil
}
......@@ -580,8 +590,8 @@ func (fs *Datastore) doDelete(key datastore.Key) error {
switch err := os.Remove(path); {
case err == nil:
newDu := atomic.AddInt64(&fs.diskUsage, -fSize)
fs.checkpointDiskUsage(newDu)
atomic.AddInt64(&fs.diskUsage, -fSize)
fs.checkpointDiskUsage()
return nil
case os.IsNotExist(err):
return datastore.ErrNotFound
......@@ -756,7 +766,7 @@ func (fs *Datastore) calculateDiskUsage() error {
fs.storedValue.accuracy = accuracy
fs.diskUsage = du
fs.persistDiskUsageFile()
fs.writeDiskUsageFile(du)
return nil
}
......@@ -779,24 +789,12 @@ func (fs *Datastore) updateDiskUsage(path string, add bool) {
}
if fsize != 0 {
newDu := atomic.AddInt64(&fs.diskUsage, fsize)
fs.checkpointDiskUsage(newDu)
}
}
func (fs *Datastore) checkpointDiskUsage(newDuInt int64) {
newDu := float64(newDuInt)
lastCheckpointDu := float64(atomic.LoadInt64(&fs.storedValue.diskUsage))
diff := math.Abs(newDu - lastCheckpointDu)
// If the difference between the checkpointed disk usage and
// current one is larger than than 1% of the checkpointed: store it.
if (lastCheckpointDu * diskUsageCheckpointPercent / 100.0) < diff {
fs.signalCheckpoint()
atomic.AddInt64(&fs.diskUsage, fsize)
fs.checkpointDiskUsage()
}
}
func (fs *Datastore) signalCheckpoint() {
func (fs *Datastore) checkpointDiskUsage() {
select {
case fs.checkpointCh <- true:
// msg send
......@@ -806,56 +804,53 @@ func (fs *Datastore) signalCheckpoint() {
}
func (fs *Datastore) checkpointLoop() {
fs.checkpointCh = make(chan bool, 1)
fs.done = make(chan bool)
go func() {
for {
//println("waiting...")
_, more := <-fs.checkpointCh
if more {
//println("checkpoint")
fs.persistDiskUsageFile()
} else {
//println("shutdown")
fs.done <- true
return
for {
_, more := <-fs.checkpointCh
fs.dirty = true
du := atomic.LoadInt64(&fs.diskUsage)
if more {
newDu := float64(du)
lastCheckpointDu := float64(fs.storedValue.diskUsage)
diff := math.Abs(newDu - lastCheckpointDu)
// If the difference between the checkpointed disk usage and
// current one is larger than than 1% of the checkpointed: store it.
if (lastCheckpointDu * diskUsageCheckpointPercent / 100.0) < diff {
fs.writeDiskUsageFile(du)
}
// FIXME: If dirty set a timer to write the diskusage
// anyway after X seconds of inactivity.
} else {
// shutting down, write the final value
fs.writeDiskUsageFile(du)
fs.done <- true
return
}
}()
}
}
// persistDiskUsageFile updates the diskusage file with the last known
// value, should only be called during initialization or within the
// checkpoint loop
func (fs *Datastore) persistDiskUsageFile() {
du := atomic.LoadInt64(&fs.diskUsage)
origVal := fs.storedValue.diskUsage
// update the stored diskUsage value now to prevent unnecessary
// calls to persistDiskUsageFile. On error role back the value to
// the original
atomic.StoreInt64(&fs.storedValue.diskUsage, du)
func (fs *Datastore) writeDiskUsageFile(du int64) {
tmp, err := ioutil.TempFile(fs.path, "du-")
if err != nil {
atomic.StoreInt64(&fs.storedValue.diskUsage, origVal)
return
}
encoder := json.NewEncoder(tmp)
if err := encoder.Encode(&fs.storedValue); err != nil {
atomic.StoreInt64(&fs.storedValue.diskUsage, origVal)
return
}
if err := tmp.Close(); err != nil {
atomic.StoreInt64(&fs.storedValue.diskUsage, origVal)
return
}
if err := os.Rename(tmp.Name(), filepath.Join(fs.path, DiskUsageFile)); err != nil {
atomic.StoreInt64(&fs.storedValue.diskUsage, origVal)
return
}
fs.storedValue.diskUsage = du
fs.dirty = false
}
// readDiskUsageFile is only safe to call in Open()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment