Commit 6151e9ca authored by Steven Allen's avatar Steven Allen

initial commit

parents
Pipeline #397 failed with stages
in 0 seconds
0.1.0: QmVchVT2biaYFaHGiX4iJcMVM9CSA7NfsQjS9EnMHm88pB
go-flow-metrics
==================
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai)
[![Travis CI](https://travis-ci.org/Stebalien/go-flow-metrics.svg?branch=master)](https://travis-ci.org/Stebalien/go-flow-metrics)
> A simple library for tracking flow metrics.
A simple alternative to [rcrowley's
go-metrics](https://github.com/rcrowley/go-metrics) that's a lot faster (and
only does simple bandwidth metrics).
## Table of Contents
- [Install](#install)
- [Contribute](#contribute)
- [License](#license)
## Install
```sh
make install
```
## Contribute
PRs are welcome!
Small note: If editing the Readme, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification.
## License
MIT © Protocol Labs
package flow
import (
"sync"
"sync/atomic"
)
// Snapshot is a rate/total snapshot.
type Snapshot struct {
Rate float64
Total uint64
}
// Meter is a meter for monitoring a flow.
type Meter struct {
total uint64
registered int32
// Take lock.
idleTime uint32
snapshot Snapshot
}
// Mark updates the total.
func (m *Meter) Mark(count uint64) {
atomic.AddUint64(&m.total, count)
globalSweeper.Register(m)
}
// Snapshot gets a consistent snapshot of the total and rate.
func (m *Meter) Snapshot() Snapshot {
globalSweeper.mutex.RLock()
defer globalSweeper.mutex.RUnlock()
return m.snapshot
}
// MeterRegistry is a registry for named meters.
type MeterRegistry struct {
meters sync.Map
}
// GetMeter gets (or creates) a meter by name.
func (r *MeterRegistry) GetMeter(name string) *Meter {
if m, ok := r.meters.Load(name); ok {
return m.(*Meter)
}
m, _ := r.meters.LoadOrStore(name, new(Meter))
return m.(*Meter)
}
{
"author": "steb",
"bugs": {
"url": "github.com/Stebalien/go-flow-metrics"
},
"gx": {
"dvcsimport": "github.com/Stebalien/go-flow-metrics"
},
"gxVersion": "0.12.1",
"language": "go",
"license": "",
"name": "go-flow-metrics",
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "0.1.0"
}
package flow
import (
"math"
"sync"
"sync/atomic"
"time"
)
// IdleTimeout is the amount of time a meter is allowed to be idle before it is suspended.
var IdleTimeout uint32 = 60 // a minute
// Alpha for EWMA of 1s
var alpha = 1 - math.Exp(-1.0)
// The global sweeper.
var globalSweeper sweeper
type sweeper struct {
sweepOnce sync.Once
meters []*Meter
mutex sync.RWMutex
registerChannel chan *Meter
}
func (sw *sweeper) start() {
sw.registerChannel = make(chan *Meter, 16)
go sw.run()
}
func (sw *sweeper) run() {
for m := range sw.registerChannel {
sw.meters = append(sw.meters, m)
sw.runActive()
}
}
func (sw *sweeper) runActive() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for len(sw.meters) > 0 {
// Scale back allocation.
if len(sw.meters)*2 < cap(sw.meters) {
newMeters := make([]*Meter, len(sw.meters))
copy(newMeters, sw.meters)
sw.meters = newMeters
}
select {
case t := <-ticker.C:
sw.update(t)
case m := <-sw.registerChannel:
sw.meters = append(sw.meters, m)
}
}
sw.meters = nil
// Till next time.
}
func (sw *sweeper) update(t time.Time) {
sw.mutex.Lock()
defer sw.mutex.Unlock()
for i := 0; i < len(sw.meters); i++ {
m := sw.meters[i]
total := atomic.LoadUint64(&m.total)
if total == m.snapshot.Total {
if m.idleTime == IdleTimeout {
// remove it.
sw.meters[i] = sw.meters[len(sw.meters)-1]
sw.meters[len(sw.meters)-1] = nil
sw.meters = sw.meters[:len(sw.meters)-1]
// reset these. The total can stay.
m.idleTime = 0
m.snapshot.Rate = 0
atomic.StoreInt32(&m.registered, 0)
} else {
m.idleTime++
}
continue
}
diff := float64(total - m.snapshot.Total)
if m.snapshot.Rate == 0 {
m.snapshot.Rate = diff
} else {
m.snapshot.Rate += alpha * (diff - m.snapshot.Rate)
}
m.snapshot.Total = total
m.idleTime = 0
}
}
func (sw *sweeper) Register(m *Meter) {
// Short cut. Swap is slow (and rarely needed).
if atomic.LoadInt32(&m.registered) == 1 {
return
}
if atomic.SwapInt32(&m.registered, 1) == 0 {
sw.sweepOnce.Do(sw.start)
sw.registerChannel <- m
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment