Commit 0c5f74f5 authored by Steven Allen's avatar Steven Allen

feat(registry): add function for triming idle timers

parent b1aae8c2
...@@ -3,12 +3,26 @@ package flow ...@@ -3,12 +3,26 @@ package flow
import ( import (
"fmt" "fmt"
"sync/atomic" "sync/atomic"
"time"
) )
// Snapshot is a rate/total snapshot. // Snapshot is a rate/total snapshot.
type Snapshot struct { type Snapshot struct {
Rate float64 Rate float64
Total uint64 Total uint64
LastUpdate time.Time
}
// NewMeter returns a new Meter with the correct idle time.
//
// While zero-value Meters can be used, their "last update" time will start at
// the program start instead of when the meter was created.
func NewMeter() *Meter {
return &Meter{
snapshot: Snapshot{
LastUpdate: time.Now(),
},
}
} }
func (s Snapshot) String() string { func (s Snapshot) String() string {
...@@ -32,7 +46,7 @@ func (m *Meter) Mark(count uint64) { ...@@ -32,7 +46,7 @@ func (m *Meter) Mark(count uint64) {
} }
} }
// Snapshot gets a consistent snapshot of the total and rate. // Snapshot gets a snapshot of the total and rate.
func (m *Meter) Snapshot() Snapshot { func (m *Meter) Snapshot() Snapshot {
globalSweeper.mutex.RLock() globalSweeper.mutex.RLock()
defer globalSweeper.mutex.RUnlock() defer globalSweeper.mutex.RUnlock()
......
...@@ -2,6 +2,7 @@ package flow ...@@ -2,6 +2,7 @@ package flow
import ( import (
"sync" "sync"
"time"
) )
// MeterRegistry is a registry for named meters. // MeterRegistry is a registry for named meters.
...@@ -14,10 +15,48 @@ func (r *MeterRegistry) Get(name string) *Meter { ...@@ -14,10 +15,48 @@ func (r *MeterRegistry) Get(name string) *Meter {
if m, ok := r.meters.Load(name); ok { if m, ok := r.meters.Load(name); ok {
return m.(*Meter) return m.(*Meter)
} }
m, _ := r.meters.LoadOrStore(name, new(Meter)) m, _ := r.meters.LoadOrStore(name, NewMeter())
return m.(*Meter) return m.(*Meter)
} }
// FindIdle finds all meters that haven't been used since the given time.
func (r *MeterRegistry) FindIdle(since time.Time) []string {
var idle []string
r.walkIdle(since, func(key interface{}) {
idle = append(idle, key.(string))
})
return idle
}
// TrimIdle trims that haven't been updated since the given time. Returns the
// number of timers trimmed.
func (r *MeterRegistry) TrimIdle(since time.Time) (trimmed int) {
// keep these as interfaces to avoid allocating when calling delete.
var idle []interface{}
r.walkIdle(since, func(key interface{}) {
idle = append(idle, since)
})
for _, i := range idle {
r.meters.Delete(i)
}
return len(idle)
}
func (r *MeterRegistry) walkIdle(since time.Time, cb func(key interface{})) {
// Yes, this is a global lock. However, all taking this does is pause
// snapshotting.
globalSweeper.mutex.RLock()
defer globalSweeper.mutex.RUnlock()
r.meters.Range(func(k, v interface{}) bool {
// So, this _is_ slightly inaccurate.
if v.(*Meter).snapshot.LastUpdate.Before(since) {
cb(k)
}
return true
})
}
// Remove removes the named meter from the registry. // Remove removes the named meter from the registry.
// //
// Note: The only reason to do this is to save a bit of memory. Unused meters // Note: The only reason to do this is to save a bit of memory. Unused meters
......
...@@ -9,6 +9,9 @@ func TestRegistry(t *testing.T) { ...@@ -9,6 +9,9 @@ func TestRegistry(t *testing.T) {
r := new(MeterRegistry) r := new(MeterRegistry)
m1 := r.Get("first") m1 := r.Get("first")
m2 := r.Get("second") m2 := r.Get("second")
m1Update := m1.Snapshot().LastUpdate
m1.Mark(10) m1.Mark(10)
m2.Mark(30) m2.Mark(30)
...@@ -21,6 +24,10 @@ func TestRegistry(t *testing.T) { ...@@ -21,6 +24,10 @@ func TestRegistry(t *testing.T) {
t.Errorf("expected second total to be 30, got %d", total) t.Errorf("expected second total to be 30, got %d", total)
} }
if !m1.Snapshot().LastUpdate.After(m1Update) {
t.Error("expected the last update to have been updated")
}
expectedMeters := map[string]*Meter{ expectedMeters := map[string]*Meter{
"first": m1, "first": m1,
"second": m2, "second": m2,
...@@ -74,4 +81,20 @@ func TestRegistry(t *testing.T) { ...@@ -74,4 +81,20 @@ func TestRegistry(t *testing.T) {
if len(expectedMeters) != 0 { if len(expectedMeters) != 0 {
t.Errorf("missing meters: '%v'", expectedMeters) t.Errorf("missing meters: '%v'", expectedMeters)
} }
before := time.Now()
m3.Mark(1)
time.Sleep(2 * time.Second)
after := time.Now()
if len(r.FindIdle(before)) != 1 {
t.Error("expected 1 idle timer")
}
if len(r.FindIdle(after)) != 2 {
t.Error("expected 2 idle timers")
}
count := r.TrimIdle(after)
if count != 2 {
t.Error("expected to trim 2 idle timers")
}
} }
...@@ -87,7 +87,12 @@ func (sw *sweeper) update() { ...@@ -87,7 +87,12 @@ func (sw *sweeper) update() {
for i, m := range sw.meters { for i, m := range sw.meters {
total := atomic.LoadUint64(&m.accumulator) total := atomic.LoadUint64(&m.accumulator)
instant := timeMultiplier * float64(total-m.snapshot.Total) diff := total - m.snapshot.Total
instant := timeMultiplier * float64(diff)
if diff > 0 {
m.snapshot.LastUpdate = now
}
if m.snapshot.Rate == 0 { if m.snapshot.Rate == 0 {
m.snapshot.Rate = instant m.snapshot.Rate = instant
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment