Unverified Commit 99ae9155 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #9 from libp2p/feat/idle-triming

add a feature for tracking idle timers
parents b1aae8c2 1de972e3
......@@ -3,6 +3,7 @@ package flow
import (
"math"
"sync"
"sync/atomic"
"testing"
"time"
)
......@@ -22,8 +23,8 @@ func TestBasic(t *testing.T) {
<-ticker.C
}
actual := m.Snapshot()
if !approxEq(actual.Rate, 25000, 500) {
t.Errorf("expected rate 25000 (±500), got %f", actual.Rate)
if !approxEq(actual.Rate, 25000, 1000) {
t.Errorf("expected rate 25000 (±1000), got %f", actual.Rate)
}
for i := 0; i < 200; i++ {
......@@ -103,8 +104,6 @@ func TestShared(t *testing.T) {
func TestUnregister(t *testing.T) {
var wg sync.WaitGroup
wg.Add(100 * 2)
pause := make(chan struct{})
for i := 0; i < 100; i++ {
m := new(Meter)
go func() {
......@@ -116,8 +115,7 @@ func TestUnregister(t *testing.T) {
<-ticker.C
}
<-pause
time.Sleep(2 * time.Second)
time.Sleep(62 * time.Second)
for i := 0; i < 40; i++ {
m.Mark(2)
......@@ -133,7 +131,10 @@ func TestUnregister(t *testing.T) {
t.Errorf("expected rate 10 (±1), got %f", actual.Rate)
}
<-pause
time.Sleep(60 * time.Second)
if atomic.LoadUint64(&m.accumulator) != 0 {
t.Error("expected meter to be paused")
}
actual = m.Snapshot()
if actual.Total != 40 {
......@@ -150,24 +151,13 @@ func TestUnregister(t *testing.T) {
if actual.Total != 120 {
t.Errorf("expected total 120, got %d", actual.Total)
}
if atomic.LoadUint64(&m.accumulator) == 0 {
t.Error("expected meter to be active")
}
}()
}
time.Sleep(60 * time.Second)
globalSweeper.mutex.Lock()
if len(globalSweeper.meters) != 0 {
t.Errorf("expected all sweepers to be unregistered: %d", len(globalSweeper.meters))
}
globalSweeper.mutex.Unlock()
close(pause)
wg.Wait()
globalSweeper.mutex.Lock()
if len(globalSweeper.meters) != 100 {
t.Errorf("expected all sweepers to be registered: %d", len(globalSweeper.meters))
}
globalSweeper.mutex.Unlock()
}
func approxEq(a, b, err float64) bool {
......
......@@ -3,12 +3,26 @@ package flow
import (
"fmt"
"sync/atomic"
"time"
)
// Snapshot is a rate/total snapshot.
type Snapshot struct {
Rate float64
Total uint64
Rate float64
Total uint64
LastUpdate time.Time
}
// NewMeter returns a new Meter with the correct idle time.
//
// While zero-value Meters can be used, their "last update" time will start at
// the program start instead of when the meter was created.
func NewMeter() *Meter {
return &Meter{
snapshot: Snapshot{
LastUpdate: time.Now(),
},
}
}
func (s Snapshot) String() string {
......@@ -32,10 +46,10 @@ func (m *Meter) Mark(count uint64) {
}
}
// Snapshot gets a consistent snapshot of the total and rate.
// Snapshot gets a snapshot of the total and rate.
func (m *Meter) Snapshot() Snapshot {
globalSweeper.mutex.RLock()
defer globalSweeper.mutex.RUnlock()
globalSweeper.snapshotMu.RLock()
defer globalSweeper.snapshotMu.RUnlock()
return m.snapshot
}
......
......@@ -2,6 +2,7 @@ package flow
import (
"sync"
"time"
)
// MeterRegistry is a registry for named meters.
......@@ -14,10 +15,48 @@ func (r *MeterRegistry) Get(name string) *Meter {
if m, ok := r.meters.Load(name); ok {
return m.(*Meter)
}
m, _ := r.meters.LoadOrStore(name, new(Meter))
m, _ := r.meters.LoadOrStore(name, NewMeter())
return m.(*Meter)
}
// FindIdle finds all meters that haven't been used since the given time.
func (r *MeterRegistry) FindIdle(since time.Time) []string {
var idle []string
r.walkIdle(since, func(key interface{}) {
idle = append(idle, key.(string))
})
return idle
}
// TrimIdle trims that haven't been updated since the given time. Returns the
// number of timers trimmed.
func (r *MeterRegistry) TrimIdle(since time.Time) (trimmed int) {
// keep these as interfaces to avoid allocating when calling delete.
var idle []interface{}
r.walkIdle(since, func(key interface{}) {
idle = append(idle, since)
})
for _, i := range idle {
r.meters.Delete(i)
}
return len(idle)
}
func (r *MeterRegistry) walkIdle(since time.Time, cb func(key interface{})) {
// Yes, this is a global lock. However, all taking this does is pause
// snapshotting.
globalSweeper.snapshotMu.RLock()
defer globalSweeper.snapshotMu.RUnlock()
r.meters.Range(func(k, v interface{}) bool {
// So, this _is_ slightly inaccurate.
if v.(*Meter).snapshot.LastUpdate.Before(since) {
cb(k)
}
return true
})
}
// Remove removes the named meter from the registry.
//
// Note: The only reason to do this is to save a bit of memory. Unused meters
......
......@@ -9,6 +9,9 @@ func TestRegistry(t *testing.T) {
r := new(MeterRegistry)
m1 := r.Get("first")
m2 := r.Get("second")
m1Update := m1.Snapshot().LastUpdate
m1.Mark(10)
m2.Mark(30)
......@@ -21,6 +24,10 @@ func TestRegistry(t *testing.T) {
t.Errorf("expected second total to be 30, got %d", total)
}
if !m1.Snapshot().LastUpdate.After(m1Update) {
t.Error("expected the last update to have been updated")
}
expectedMeters := map[string]*Meter{
"first": m1,
"second": m2,
......@@ -74,4 +81,20 @@ func TestRegistry(t *testing.T) {
if len(expectedMeters) != 0 {
t.Errorf("missing meters: '%v'", expectedMeters)
}
before := time.Now()
m3.Mark(1)
time.Sleep(2 * time.Second)
after := time.Now()
if len(r.FindIdle(before)) != 1 {
t.Error("expected 1 idle timer")
}
if len(r.FindIdle(after)) != 2 {
t.Error("expected 2 idle timers")
}
count := r.TrimIdle(after)
if count != 2 {
t.Error("expected to trim 2 idle timers")
}
}
......@@ -21,9 +21,11 @@ var alpha = 1 - math.Exp(-1.0)
var globalSweeper sweeper
type sweeper struct {
sweepOnce sync.Once
meters []*Meter
mutex sync.RWMutex
sweepOnce sync.Once
snapshotMu sync.RWMutex
meters []*Meter
lastUpdateTime time.Time
registerChannel chan *Meter
}
......@@ -72,8 +74,8 @@ func (sw *sweeper) runActive() {
}
func (sw *sweeper) update() {
sw.mutex.Lock()
defer sw.mutex.Unlock()
sw.snapshotMu.Lock()
defer sw.snapshotMu.Unlock()
now := time.Now()
tdiff := now.Sub(sw.lastUpdateTime)
......@@ -87,7 +89,12 @@ func (sw *sweeper) update() {
for i, m := range sw.meters {
total := atomic.LoadUint64(&m.accumulator)
instant := timeMultiplier * float64(total-m.snapshot.Total)
diff := total - m.snapshot.Total
instant := timeMultiplier * float64(diff)
if diff > 0 {
m.snapshot.LastUpdate = now
}
if m.snapshot.Rate == 0 {
m.snapshot.Rate = instant
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment