Commit 1de972e3 authored by Steven Allen's avatar Steven Allen

fix(test): fix racy test

Also renames a confusing mutex to make it clear what it protects. It protects
snapshots, not the meter slice.
parent 8ee3d596
......@@ -3,6 +3,7 @@ package flow
import (
"math"
"sync"
"sync/atomic"
"testing"
"time"
)
......@@ -103,8 +104,6 @@ func TestShared(t *testing.T) {
func TestUnregister(t *testing.T) {
var wg sync.WaitGroup
wg.Add(100 * 2)
pause := make(chan struct{})
for i := 0; i < 100; i++ {
m := new(Meter)
go func() {
......@@ -116,8 +115,7 @@ func TestUnregister(t *testing.T) {
<-ticker.C
}
<-pause
time.Sleep(2 * time.Second)
time.Sleep(62 * time.Second)
for i := 0; i < 40; i++ {
m.Mark(2)
......@@ -133,7 +131,10 @@ func TestUnregister(t *testing.T) {
t.Errorf("expected rate 10 (±1), got %f", actual.Rate)
}
<-pause
time.Sleep(60 * time.Second)
if atomic.LoadUint64(&m.accumulator) != 0 {
t.Error("expected meter to be paused")
}
actual = m.Snapshot()
if actual.Total != 40 {
......@@ -150,24 +151,13 @@ func TestUnregister(t *testing.T) {
if actual.Total != 120 {
t.Errorf("expected total 120, got %d", actual.Total)
}
if atomic.LoadUint64(&m.accumulator) == 0 {
t.Error("expected meter to be active")
}
}()
}
time.Sleep(60 * time.Second)
globalSweeper.mutex.Lock()
if len(globalSweeper.meters) != 0 {
t.Errorf("expected all sweepers to be unregistered: %d", len(globalSweeper.meters))
}
globalSweeper.mutex.Unlock()
close(pause)
wg.Wait()
globalSweeper.mutex.Lock()
if len(globalSweeper.meters) != 100 {
t.Errorf("expected all sweepers to be registered: %d", len(globalSweeper.meters))
}
globalSweeper.mutex.Unlock()
}
func approxEq(a, b, err float64) bool {
......
......@@ -48,8 +48,8 @@ func (m *Meter) Mark(count uint64) {
// Snapshot gets a snapshot of the total and rate.
func (m *Meter) Snapshot() Snapshot {
globalSweeper.mutex.RLock()
defer globalSweeper.mutex.RUnlock()
globalSweeper.snapshotMu.RLock()
defer globalSweeper.snapshotMu.RUnlock()
return m.snapshot
}
......
......@@ -45,8 +45,8 @@ func (r *MeterRegistry) TrimIdle(since time.Time) (trimmed int) {
func (r *MeterRegistry) walkIdle(since time.Time, cb func(key interface{})) {
// Yes, this is a global lock. However, all taking this does is pause
// snapshotting.
globalSweeper.mutex.RLock()
defer globalSweeper.mutex.RUnlock()
globalSweeper.snapshotMu.RLock()
defer globalSweeper.snapshotMu.RUnlock()
r.meters.Range(func(k, v interface{}) bool {
// So, this _is_ slightly inaccurate.
......
......@@ -21,9 +21,11 @@ var alpha = 1 - math.Exp(-1.0)
var globalSweeper sweeper
type sweeper struct {
sweepOnce sync.Once
meters []*Meter
mutex sync.RWMutex
sweepOnce sync.Once
snapshotMu sync.RWMutex
meters []*Meter
lastUpdateTime time.Time
registerChannel chan *Meter
}
......@@ -72,8 +74,8 @@ func (sw *sweeper) runActive() {
}
func (sw *sweeper) update() {
sw.mutex.Lock()
defer sw.mutex.Unlock()
sw.snapshotMu.Lock()
defer sw.snapshotMu.Unlock()
now := time.Now()
tdiff := now.Sub(sw.lastUpdateTime)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment