Commit 0e571da0 authored by tavit ohanian's avatar tavit ohanian

reference basis

parents bc7be76c 6ef46444
Pipeline #396 failed with stages
in 0 seconds
os:
- linux
language: go
go:
- 1.13.x
env:
global:
- GOTFLAGS="-race"
matrix:
- BUILD_DEPTYPE=gomod
# disable travis install
install:
- true
script:
- bash <(curl -s https://raw.githubusercontent.com/ipfs/ci-helpers/master/travis-ci/run-standard-tests.sh)
cache:
directories:
- $GOPATH/pkg/mod
- $HOME/.cache/go-build
notifications:
email: false
The MIT License (MIT)
Copyright (c) 2017 Protocol Labs
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
# go-flow-metrics go-flow-metrics
==================
dms3 p2p go-flow-metrics [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai)
\ No newline at end of file [![](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p)
[![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](https://libp2p.io/)
[![Travis CI](https://travis-ci.org/libp2p/go-flow-metrics.svg?branch=master)](https://travis-ci.org/libp2p/go-flow-metrics)
[![Discourse posts](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg)](https://discuss.libp2p.io)
> A simple library for tracking flow metrics.
A simple alternative to [rcrowley's
go-metrics](https://github.com/rcrowley/go-metrics) that's a lot faster (and
only does simple bandwidth metrics).
## Table of Contents
- [Install](#install)
- [Contribute](#contribute)
- [License](#license)
## Install
```sh
make install
```
## Contribute
PRs are welcome!
Small note: If editing the Readme, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification.
## License
MIT © Protocol Labs
---
The last gx published version of this module was: 0.2.0: QmQFXpvKpF34dK9HcE7k8Ksk8V4BwWYZtdEcjzu5aUgRVr
package flow
import (
"math"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestBasic(t *testing.T) {
var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
ticker := time.NewTicker(40 * time.Millisecond)
defer ticker.Stop()
m := new(Meter)
for i := 0; i < 300; i++ {
m.Mark(1000)
<-ticker.C
}
actual := m.Snapshot()
if !approxEq(actual.Rate, 25000, 1000) {
t.Errorf("expected rate 25000 (±1000), got %f", actual.Rate)
}
for i := 0; i < 200; i++ {
m.Mark(200)
<-ticker.C
}
// Adjusts
actual = m.Snapshot()
if !approxEq(actual.Rate, 5000, 200) {
t.Errorf("expected rate 5000 (±200), got %f", actual.Rate)
}
// Let it settle.
time.Sleep(2 * time.Second)
// get the right total
actual = m.Snapshot()
if actual.Total != 340000 {
t.Errorf("expected total %d, got %d", 340000, actual.Total)
}
}()
}
wg.Wait()
}
func TestShared(t *testing.T) {
var wg sync.WaitGroup
wg.Add(20 * 21)
for i := 0; i < 20; i++ {
m := new(Meter)
for j := 0; j < 20; j++ {
go func() {
defer wg.Done()
ticker := time.NewTicker(40 * time.Millisecond)
defer ticker.Stop()
for i := 0; i < 300; i++ {
m.Mark(50)
<-ticker.C
}
for i := 0; i < 200; i++ {
m.Mark(10)
<-ticker.C
}
}()
}
go func() {
defer wg.Done()
time.Sleep(40 * 300 * time.Millisecond)
actual := m.Snapshot()
if !approxEq(actual.Rate, 25000, 250) {
t.Errorf("expected rate 25000 (±250), got %f", actual.Rate)
}
time.Sleep(40 * 200 * time.Millisecond)
// Adjusts
actual = m.Snapshot()
if !approxEq(actual.Rate, 5000, 50) {
t.Errorf("expected rate 5000 (±50), got %f", actual.Rate)
}
// Let it settle.
time.Sleep(2 * time.Second)
// get the right total
actual = m.Snapshot()
if actual.Total != 340000 {
t.Errorf("expected total %d, got %d", 340000, actual.Total)
}
}()
}
wg.Wait()
}
func TestUnregister(t *testing.T) {
var wg sync.WaitGroup
wg.Add(100 * 2)
for i := 0; i < 100; i++ {
m := new(Meter)
go func() {
defer wg.Done()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for i := 0; i < 40; i++ {
m.Mark(1)
<-ticker.C
}
time.Sleep(62 * time.Second)
for i := 0; i < 40; i++ {
m.Mark(2)
<-ticker.C
}
}()
go func() {
defer wg.Done()
time.Sleep(40 * 100 * time.Millisecond)
actual := m.Snapshot()
if !approxEq(actual.Rate, 10, 1) {
t.Errorf("expected rate 10 (±1), got %f", actual.Rate)
}
time.Sleep(60 * time.Second)
if atomic.LoadUint64(&m.accumulator) != 0 {
t.Error("expected meter to be paused")
}
actual = m.Snapshot()
if actual.Total != 40 {
t.Errorf("expected total 4000, got %d", actual.Total)
}
time.Sleep(2*time.Second + 40*100*time.Millisecond)
actual = m.Snapshot()
if !approxEq(actual.Rate, 20, 4) {
t.Errorf("expected rate 20 (±4), got %f", actual.Rate)
}
time.Sleep(2 * time.Second)
actual = m.Snapshot()
if actual.Total != 120 {
t.Errorf("expected total 120, got %d", actual.Total)
}
if atomic.LoadUint64(&m.accumulator) == 0 {
t.Error("expected meter to be active")
}
}()
}
wg.Wait()
}
func approxEq(a, b, err float64) bool {
return math.Abs(a-b) < err
}
package flow
import (
"fmt"
"sync/atomic"
"time"
)
// Snapshot is a rate/total snapshot.
type Snapshot struct {
Rate float64
Total uint64
LastUpdate time.Time
}
// NewMeter returns a new Meter with the correct idle time.
//
// While zero-value Meters can be used, their "last update" time will start at
// the program start instead of when the meter was created.
func NewMeter() *Meter {
return &Meter{
snapshot: Snapshot{
LastUpdate: time.Now(),
},
}
}
func (s Snapshot) String() string {
return fmt.Sprintf("%d (%f/s)", s.Total, s.Rate)
}
// Meter is a meter for monitoring a flow.
type Meter struct {
accumulator uint64
// managed by the sweeper loop.
registered bool
// Take lock.
snapshot Snapshot
}
// Mark updates the total.
func (m *Meter) Mark(count uint64) {
if count > 0 && atomic.AddUint64(&m.accumulator, count) == count {
// The accumulator is 0 so we probably need to register. We may
// already _be_ registered however, if we are, the registration
// loop will notice that `m.registered` is set and ignore us.
globalSweeper.Register(m)
}
}
// Snapshot gets a snapshot of the total and rate.
func (m *Meter) Snapshot() Snapshot {
globalSweeper.snapshotMu.RLock()
defer globalSweeper.snapshotMu.RUnlock()
return m.snapshot
}
// Reset sets accumulator, total and rate to zero.
func (m *Meter) Reset() {
globalSweeper.snapshotMu.Lock()
atomic.StoreUint64(&m.accumulator, 0)
m.snapshot.Rate = 0
m.snapshot.Total = 0
globalSweeper.snapshotMu.Unlock()
}
func (m *Meter) String() string {
return m.Snapshot().String()
}
package flow
import (
"fmt"
"math"
"sync"
"testing"
"time"
)
func ExampleMeter() {
meter := new(Meter)
t := time.NewTicker(100 * time.Millisecond)
for i := 0; i < 100; i++ {
<-t.C
meter.Mark(30)
}
// Get the current rate. This will be accurate *now* but not after we
// sleep (because we calculate it using EWMA).
rate := meter.Snapshot().Rate
// Sleep 2 seconds to allow the total to catch up. We snapshot every
// second so the total may not yet be accurate.
time.Sleep(2 * time.Second)
// Get the current total.
total := meter.Snapshot().Total
fmt.Printf("%d (%d/s)\n", total, roundTens(rate))
// Output: 3000 (300/s)
}
func TestResetMeter(t *testing.T) {
meter := new(Meter)
meter.Mark(30)
time.Sleep(2 * time.Second)
if total := meter.Snapshot().Total; total != 30 {
t.Errorf("total = %d; want 30", total)
}
meter.Reset()
if total := meter.Snapshot().Total; total != 0 {
t.Errorf("total = %d; want 0", total)
}
}
func TestMarkResetMeterMulti(t *testing.T) {
var wg sync.WaitGroup
wg.Add(2)
meter := new(Meter)
go func(meter *Meter) {
meter.Mark(30)
meter.Mark(30)
wg.Done()
}(meter)
go func(meter *Meter) {
meter.Reset()
wg.Done()
}(meter)
wg.Wait()
}
func roundTens(x float64) int64 {
return int64(math.Floor(x/10+0.5)) * 10
}
package flow
import (
"sync"
"time"
)
// MeterRegistry is a registry for named meters.
type MeterRegistry struct {
meters sync.Map
}
// Get gets (or creates) a meter by name.
func (r *MeterRegistry) Get(name string) *Meter {
if m, ok := r.meters.Load(name); ok {
return m.(*Meter)
}
m, _ := r.meters.LoadOrStore(name, NewMeter())
return m.(*Meter)
}
// FindIdle finds all meters that haven't been used since the given time.
func (r *MeterRegistry) FindIdle(since time.Time) []string {
var idle []string
r.walkIdle(since, func(key interface{}) {
idle = append(idle, key.(string))
})
return idle
}
// TrimIdle trims that haven't been updated since the given time. Returns the
// number of timers trimmed.
func (r *MeterRegistry) TrimIdle(since time.Time) (trimmed int) {
// keep these as interfaces to avoid allocating when calling delete.
var idle []interface{}
r.walkIdle(since, func(key interface{}) {
idle = append(idle, since)
})
for _, i := range idle {
r.meters.Delete(i)
}
return len(idle)
}
func (r *MeterRegistry) walkIdle(since time.Time, cb func(key interface{})) {
// Yes, this is a global lock. However, all taking this does is pause
// snapshotting.
globalSweeper.snapshotMu.RLock()
defer globalSweeper.snapshotMu.RUnlock()
r.meters.Range(func(k, v interface{}) bool {
// So, this _is_ slightly inaccurate.
if v.(*Meter).snapshot.LastUpdate.Before(since) {
cb(k)
}
return true
})
}
// Remove removes the named meter from the registry.
//
// Note: The only reason to do this is to save a bit of memory. Unused meters
// don't consume any CPU (after they go idle).
func (r *MeterRegistry) Remove(name string) {
r.meters.Delete(name)
}
// ForEach calls the passed function for each registered meter.
func (r *MeterRegistry) ForEach(iterFunc func(string, *Meter)) {
r.meters.Range(func(k, v interface{}) bool {
iterFunc(k.(string), v.(*Meter))
return true
})
}
// Clear removes all meters from the registry.
func (r *MeterRegistry) Clear() {
r.meters.Range(func(k, v interface{}) bool {
r.meters.Delete(k)
return true
})
}
package flow
import (
"testing"
"time"
)
func TestRegistry(t *testing.T) {
r := new(MeterRegistry)
m1 := r.Get("first")
m2 := r.Get("second")
m1Update := m1.Snapshot().LastUpdate
m1.Mark(10)
m2.Mark(30)
time.Sleep(2*time.Second + time.Millisecond)
if total := r.Get("first").Snapshot().Total; total != 10 {
t.Errorf("expected first total to be 10, got %d", total)
}
if total := r.Get("second").Snapshot().Total; total != 30 {
t.Errorf("expected second total to be 30, got %d", total)
}
if !m1.Snapshot().LastUpdate.After(m1Update) {
t.Error("expected the last update to have been updated")
}
expectedMeters := map[string]*Meter{
"first": m1,
"second": m2,
}
r.ForEach(func(n string, m *Meter) {
if expectedMeters[n] != m {
t.Errorf("wrong meter '%s'", n)
}
delete(expectedMeters, n)
})
if len(expectedMeters) != 0 {
t.Errorf("missing meters: '%v'", expectedMeters)
}
r.Remove("first")
found := false
r.ForEach(func(n string, m *Meter) {
if n != "second" {
t.Errorf("found unexpected meter: %s", n)
return
}
if found {
t.Error("found meter twice")
}
found = true
})
if !found {
t.Errorf("didn't find second meter")
}
m3 := r.Get("first")
if m3 == m1 {
t.Error("should have gotten a new meter")
}
if total := m3.Snapshot().Total; total != 0 {
t.Errorf("expected first total to now be 0, got %d", total)
}
expectedMeters = map[string]*Meter{
"first": m3,
"second": m2,
}
r.ForEach(func(n string, m *Meter) {
if expectedMeters[n] != m {
t.Errorf("wrong meter '%s'", n)
}
delete(expectedMeters, n)
})
if len(expectedMeters) != 0 {
t.Errorf("missing meters: '%v'", expectedMeters)
}
before := time.Now()
m3.Mark(1)
time.Sleep(2 * time.Second)
after := time.Now()
if len(r.FindIdle(before)) != 1 {
t.Error("expected 1 idle timer")
}
if len(r.FindIdle(after)) != 2 {
t.Error("expected 2 idle timers")
}
count := r.TrimIdle(after)
if count != 2 {
t.Error("expected to trim 2 idle timers")
}
}
func TestClearRegistry(t *testing.T) {
r := new(MeterRegistry)
m1 := r.Get("first")
m2 := r.Get("second")
m1.Mark(10)
m2.Mark(30)
time.Sleep(2 * time.Second)
r.Clear()
r.ForEach(func(n string, _m *Meter) {
t.Errorf("expected no meters at all, found a meter %s", n)
})
if total := r.Get("first").Snapshot().Total; total != 0 {
t.Errorf("expected first total to be 0, got %d", total)
}
if total := r.Get("second").Snapshot().Total; total != 0 {
t.Errorf("expected second total to be 0, got %d", total)
}
}
package flow
import (
"math"
"sync"
"sync/atomic"
"time"
)
// IdleRate the rate at which we declare a meter idle (and stop tracking it
// until it's re-registered).
//
// The default ensures that 1 event every ~30s will keep the meter from going
// idle.
var IdleRate = 1e-13
// Alpha for EWMA of 1s
var alpha = 1 - math.Exp(-1.0)
// The global sweeper.
var globalSweeper sweeper
type sweeper struct {
sweepOnce sync.Once
snapshotMu sync.RWMutex
meters []*Meter
activeMeters int
lastUpdateTime time.Time
registerChannel chan *Meter
}
func (sw *sweeper) start() {
sw.registerChannel = make(chan *Meter, 16)
go sw.run()
}
func (sw *sweeper) run() {
for m := range sw.registerChannel {
sw.register(m)
sw.runActive()
}
}
func (sw *sweeper) register(m *Meter) {
if m.registered {
// registered twice, move on.
return
}
m.registered = true
sw.meters = append(sw.meters, m)
}
func (sw *sweeper) runActive() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
sw.lastUpdateTime = time.Now()
for len(sw.meters) > 0 {
// Scale back allocation.
if len(sw.meters)*2 < cap(sw.meters) {
newMeters := make([]*Meter, len(sw.meters))
copy(newMeters, sw.meters)
sw.meters = newMeters
}
select {
case <-ticker.C:
sw.update()
case m := <-sw.registerChannel:
sw.register(m)
}
}
sw.meters = nil
// Till next time.
}
func (sw *sweeper) update() {
sw.snapshotMu.Lock()
defer sw.snapshotMu.Unlock()
now := time.Now()
tdiff := now.Sub(sw.lastUpdateTime)
if tdiff <= 0 {
return
}
sw.lastUpdateTime = now
timeMultiplier := float64(time.Second) / float64(tdiff)
// Calculate the bandwidth for all active meters.
for i, m := range sw.meters[:sw.activeMeters] {
total := atomic.LoadUint64(&m.accumulator)
diff := total - m.snapshot.Total
instant := timeMultiplier * float64(diff)
if diff > 0 {
m.snapshot.LastUpdate = now
}
if m.snapshot.Rate == 0 {
m.snapshot.Rate = instant
} else {
m.snapshot.Rate += alpha * (instant - m.snapshot.Rate)
}
m.snapshot.Total = total
// This is equivalent to one zeros, then one, then 30 zeros.
// We'll consider that to be "idle".
if m.snapshot.Rate > IdleRate {
continue
}
// Ok, so we are idle...
// Mark this as idle by zeroing the accumulator.
swappedTotal := atomic.SwapUint64(&m.accumulator, 0)
// So..., are we really idle?
if swappedTotal > total {
// Not so idle...
// Now we need to make sure this gets re-registered.
// First, add back what we removed. If we can do this
// fast enough, we can put it back before anyone
// notices.
currentTotal := atomic.AddUint64(&m.accumulator, swappedTotal)
// Did we make it?
if currentTotal == swappedTotal {
// Yes! Nobody noticed, move along.
continue
}
// No. Someone noticed and will (or has) put back into
// the registration channel.
//
// Remove the snapshot total, it'll get added back on
// registration.
//
// `^uint64(total - 1)` is the two's complement of
// `total`. It's the "correct" way to subtract
// atomically in go.
atomic.AddUint64(&m.accumulator, ^uint64(m.snapshot.Total-1))
}
// Reset the rate, keep the total.
m.registered = false
m.snapshot.Rate = 0
sw.meters[i] = nil
}
// Re-add the total to all the newly active accumulators and set the snapshot to the total.
// 1. We don't do this on register to avoid having to take the snapshot lock.
// 2. We skip calculating the bandwidth for this round so we get an _accurate_ bandwidth calculation.
for _, m := range sw.meters[sw.activeMeters:] {
total := atomic.AddUint64(&m.accumulator, m.snapshot.Total)
if total > m.snapshot.Total {
m.snapshot.LastUpdate = now
}
m.snapshot.Total = total
}
// compress and trim the meter list
var newLen int
for _, m := range sw.meters {
if m != nil {
sw.meters[newLen] = m
newLen++
}
}
sw.meters = sw.meters[:newLen]
// Finally, mark all meters still in the list as "active".
sw.activeMeters = len(sw.meters)
}
func (sw *sweeper) Register(m *Meter) {
sw.sweepOnce.Do(sw.start)
sw.registerChannel <- m
}
package flow
import (
"testing"
"time"
)
// regression test for libp2p/go-libp2p-core#65
func TestIdleInconsistency(t *testing.T) {
r := new(MeterRegistry)
m1 := r.Get("first")
m2 := r.Get("second")
m3 := r.Get("third")
m1.Mark(10)
m2.Mark(20)
m3.Mark(30)
// make m1 and m3 go idle
for i := 0; i < 30; i++ {
time.Sleep(time.Second)
m2.Mark(1)
}
time.Sleep(time.Second)
// re-activate m3
m3.Mark(20)
time.Sleep(time.Second + time.Millisecond)
// check the totals
if total := r.Get("first").Snapshot().Total; total != 10 {
t.Errorf("expected first total to be 10, got %d", total)
}
if total := r.Get("second").Snapshot().Total; total != 50 {
t.Errorf("expected second total to be 50, got %d", total)
}
if total := r.Get("third").Snapshot().Total; total != 50 {
t.Errorf("expected third total to be 50, got %d", total)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment