Commit 77ddfd5b authored by Steven Allen's avatar Steven Allen

reduce lock contention and fix a potential race

1. Handle `registered` inside the main loop. This way we can avoid atomics.
2. Don't start calculating bandwidth for newly active meters until they've been
active for a round. This:
  1. Ensures we only write to the snapshot from within the main loop.
  2. Gives us a better estimated bandwidth usage.
parent 9002d989
...@@ -108,6 +108,9 @@ func TestUnregister(t *testing.T) { ...@@ -108,6 +108,9 @@ func TestUnregister(t *testing.T) {
m := new(Meter) m := new(Meter)
go func() { go func() {
defer wg.Done() defer wg.Done()
m.Mark(1)
time.Sleep(time.Second)
ticker := time.NewTicker(100 * time.Millisecond) ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop() defer ticker.Stop()
for i := 0; i < 40; i++ { for i := 0; i < 40; i++ {
...@@ -117,6 +120,9 @@ func TestUnregister(t *testing.T) { ...@@ -117,6 +120,9 @@ func TestUnregister(t *testing.T) {
time.Sleep(62 * time.Second) time.Sleep(62 * time.Second)
m.Mark(2)
time.Sleep(time.Second)
for i := 0; i < 40; i++ { for i := 0; i < 40; i++ {
m.Mark(2) m.Mark(2)
<-ticker.C <-ticker.C
...@@ -124,7 +130,7 @@ func TestUnregister(t *testing.T) { ...@@ -124,7 +130,7 @@ func TestUnregister(t *testing.T) {
}() }()
go func() { go func() {
defer wg.Done() defer wg.Done()
time.Sleep(40 * 100 * time.Millisecond) time.Sleep(40*100*time.Millisecond + time.Second)
actual := m.Snapshot() actual := m.Snapshot()
if !approxEq(actual.Rate, 10, 1) { if !approxEq(actual.Rate, 10, 1) {
...@@ -137,10 +143,10 @@ func TestUnregister(t *testing.T) { ...@@ -137,10 +143,10 @@ func TestUnregister(t *testing.T) {
} }
actual = m.Snapshot() actual = m.Snapshot()
if actual.Total != 40 { if actual.Total != 41 {
t.Errorf("expected total 4000, got %d", actual.Total) t.Errorf("expected total 41, got %d", actual.Total)
} }
time.Sleep(2*time.Second + 40*100*time.Millisecond) time.Sleep(3*time.Second + 40*100*time.Millisecond)
actual = m.Snapshot() actual = m.Snapshot()
if !approxEq(actual.Rate, 20, 4) { if !approxEq(actual.Rate, 20, 4) {
...@@ -148,8 +154,8 @@ func TestUnregister(t *testing.T) { ...@@ -148,8 +154,8 @@ func TestUnregister(t *testing.T) {
} }
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
actual = m.Snapshot() actual = m.Snapshot()
if actual.Total != 120 { if actual.Total != 123 {
t.Errorf("expected total 120, got %d", actual.Total) t.Errorf("expected total 123, got %d", actual.Total)
} }
if atomic.LoadUint64(&m.accumulator) == 0 { if atomic.LoadUint64(&m.accumulator) == 0 {
t.Error("expected meter to be active") t.Error("expected meter to be active")
......
...@@ -32,7 +32,9 @@ func (s Snapshot) String() string { ...@@ -32,7 +32,9 @@ func (s Snapshot) String() string {
// Meter is a meter for monitoring a flow. // Meter is a meter for monitoring a flow.
type Meter struct { type Meter struct {
accumulator uint64 accumulator uint64
registered int32 // atomic bool
// managed by the sweeper loop.
registered bool
// Take lock. // Take lock.
snapshot Snapshot snapshot Snapshot
...@@ -41,12 +43,10 @@ type Meter struct { ...@@ -41,12 +43,10 @@ type Meter struct {
// Mark updates the total. // Mark updates the total.
func (m *Meter) Mark(count uint64) { func (m *Meter) Mark(count uint64) {
if count > 0 && atomic.AddUint64(&m.accumulator, count) == count { if count > 0 && atomic.AddUint64(&m.accumulator, count) == count {
just_registered := atomic.CompareAndSwapInt32(&m.registered, 0, 1) // The accumulator is 0 so we probably need to register. We may
if just_registered { // already _be_ registered however, if we are, the registration
// I'm the first one to bump this above 0. // loop will notice that `m.registered` is set and ignore us.
// Register it. globalSweeper.Register(m)
globalSweeper.Register(m)
}
} }
} }
...@@ -63,7 +63,7 @@ func (m *Meter) Reset() { ...@@ -63,7 +63,7 @@ func (m *Meter) Reset() {
defer globalSweeper.snapshotMu.Unlock() defer globalSweeper.snapshotMu.Unlock()
atomic.StoreUint64(&m.accumulator, 0) atomic.StoreUint64(&m.accumulator, 0)
m.snapshot.Rate = 0 m.snapshot.Rate = 0
atomic.StoreUint64(&m.snapshot.Total, 0) m.snapshot.Total = 0
m.snapshot.LastUpdate = time.Now() m.snapshot.LastUpdate = time.Now()
} }
......
...@@ -23,8 +23,9 @@ var globalSweeper sweeper ...@@ -23,8 +23,9 @@ var globalSweeper sweeper
type sweeper struct { type sweeper struct {
sweepOnce sync.Once sweepOnce sync.Once
snapshotMu sync.RWMutex snapshotMu sync.RWMutex
meters []*Meter meters []*Meter
activeMeters int
lastUpdateTime time.Time lastUpdateTime time.Time
registerChannel chan *Meter registerChannel chan *Meter
...@@ -43,9 +44,11 @@ func (sw *sweeper) run() { ...@@ -43,9 +44,11 @@ func (sw *sweeper) run() {
} }
func (sw *sweeper) register(m *Meter) { func (sw *sweeper) register(m *Meter) {
// Add back the snapshot total. If we unregistered this if m.registered {
// one, we set it to zero. // registered twice, move on.
atomic.AddUint64(&m.accumulator, atomic.LoadUint64(&m.snapshot.Total)) return
}
m.registered = true
sw.meters = append(sw.meters, m) sw.meters = append(sw.meters, m)
} }
...@@ -85,9 +88,9 @@ func (sw *sweeper) update() { ...@@ -85,9 +88,9 @@ func (sw *sweeper) update() {
sw.lastUpdateTime = now sw.lastUpdateTime = now
timeMultiplier := float64(time.Second) / float64(tdiff) timeMultiplier := float64(time.Second) / float64(tdiff)
// Calculate the bandwidth for all active meters.
newLen := len(sw.meters) newLen := len(sw.meters)
for i, m := range sw.meters[:sw.activeMeters] {
for i, m := range sw.meters {
total := atomic.LoadUint64(&m.accumulator) total := atomic.LoadUint64(&m.accumulator)
diff := total - m.snapshot.Total diff := total - m.snapshot.Total
instant := timeMultiplier * float64(diff) instant := timeMultiplier * float64(diff)
...@@ -113,7 +116,6 @@ func (sw *sweeper) update() { ...@@ -113,7 +116,6 @@ func (sw *sweeper) update() {
// Mark this as idle by zeroing the accumulator. // Mark this as idle by zeroing the accumulator.
swappedTotal := atomic.SwapUint64(&m.accumulator, 0) swappedTotal := atomic.SwapUint64(&m.accumulator, 0)
atomic.StoreInt32(&m.registered, 0)
// So..., are we really idle? // So..., are we really idle?
if swappedTotal > total { if swappedTotal > total {
...@@ -143,16 +145,27 @@ func (sw *sweeper) update() { ...@@ -143,16 +145,27 @@ func (sw *sweeper) update() {
} }
// Reset the rate, keep the total. // Reset the rate, keep the total.
m.registered = false
m.snapshot.Rate = 0 m.snapshot.Rate = 0
newLen-- newLen--
sw.meters[i] = sw.meters[newLen] sw.meters[i] = sw.meters[newLen]
} }
// Re-add the total to all the newly active accumulators.
// 1. We don't do this on register to avoid having to take the snapshot lock.
// 2. We skip calculating the bandwidth for this round so we get an _accurate_ bandwidth calculation.
for _, m := range sw.meters[sw.activeMeters:] {
atomic.AddUint64(&m.accumulator, m.snapshot.Total)
}
// trim the meter list // trim the meter list
for i := newLen; i < len(sw.meters); i++ { for i := newLen; i < len(sw.meters); i++ {
sw.meters[i] = nil sw.meters[i] = nil
} }
sw.meters = sw.meters[:newLen] sw.meters = sw.meters[:newLen]
// Finally, mark all meters still in the list as "active".
sw.activeMeters = len(sw.meters)
} }
func (sw *sweeper) Register(m *Meter) { func (sw *sweeper) Register(m *Meter) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment