Commit aa97a1c8 authored by Vasily Kolobkov's avatar Vasily Kolobkov

Gather datastore metrics using go-metrics-interface

License: MIT
Signed-off-by: default avatarVasily Kolobkov <polezaivsani@openmailbox.org>
parent 278566f7
// Package measure provides a Datastore wrapper that records metrics
// using github.com/codahale/metrics.
// using github.com/ipfs/go-metrics-interface
package measure
import (
"io"
"time"
"github.com/codahale/metrics"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/ipfs/go-metrics-interface"
)
// Histogram measurements exceeding these limits are dropped. TODO
// maybe it would be better to cap the value? Should we keep track of
// drops?
const (
maxLatency = int64(1 * time.Second)
maxSize = int64(1 << 32)
var (
// sort latencies in buckets with following upper bounds in seconds
datastoreLatencyBuckets = []float64{1e-4, 1e-3, 1e-2, 1e-1}
// sort sizes in buckets with following upper bounds in bytes
datastoreSizeBuckets = []float64{1<<6, 1<<12, 1<<18, 1<<24}
)
// New wraps the datastore, providing metrics on the operations. The
// metrics are registered with names starting with prefix and a dot.
//
// If prefix is not unique, New will panic. Call Close to release the
// prefix.
func New(prefix string, ds datastore.Datastore) *measure {
m := &measure{
backend: ds,
putNum: metrics.Counter(prefix + ".Put.num"),
putErr: metrics.Counter(prefix + ".Put.err"),
putLatency: metrics.NewHistogram(prefix+".Put.latency", 0, maxLatency, 3),
putSize: metrics.NewHistogram(prefix+".Put.size", 0, maxSize, 3),
getNum: metrics.Counter(prefix + ".Get.num"),
getErr: metrics.Counter(prefix + ".Get.err"),
getLatency: metrics.NewHistogram(prefix+".Get.latency", 0, maxLatency, 3),
getSize: metrics.NewHistogram(prefix+".Get.size", 0, maxSize, 3),
hasNum: metrics.Counter(prefix + ".Has.num"),
hasErr: metrics.Counter(prefix + ".Has.err"),
hasLatency: metrics.NewHistogram(prefix+".Has.latency", 0, maxLatency, 3),
deleteNum: metrics.Counter(prefix + ".Delete.num"),
deleteErr: metrics.Counter(prefix + ".Delete.err"),
deleteLatency: metrics.NewHistogram(prefix+".Delete.latency", 0, maxLatency, 3),
queryNum: metrics.Counter(prefix + ".Query.num"),
queryErr: metrics.Counter(prefix + ".Query.err"),
queryLatency: metrics.NewHistogram(prefix+".Query.latency", 0, maxLatency, 3),
putNum: metrics.New(prefix+".put_total", "Total number of Datastore.Put calls").Counter(),
putErr: metrics.New(prefix+".put.errors_total", "Number of errored Datastore.Put calls").Counter(),
putLatency: metrics.New(prefix+".put.latency_seconds",
"Latency distribution of Datastore.Put calls").Histogram(datastoreLatencyBuckets),
putSize: metrics.New(prefix+".put.size_bytes",
"Size distribution of stored byte slices").Histogram(datastoreSizeBuckets),
getNum: metrics.New(prefix+".get_total", "Total number of Datastore.Get calls").Counter(),
getErr: metrics.New(prefix+".get.errors_total", "Number of errored Datastore.Get calls").Counter(),
getLatency: metrics.New(prefix+".get.latency_seconds",
"Latency distribution of Datastore.Get calls").Histogram(datastoreLatencyBuckets),
getSize: metrics.New(prefix+".get.size_bytes",
"Size distribution of retrieved byte slices").Histogram(datastoreSizeBuckets),
hasNum: metrics.New(prefix+".has_total", "Total number of Datastore.Has calls").Counter(),
hasErr: metrics.New(prefix+".has.errors_total", "Number of errored Datastore.Has calls").Counter(),
hasLatency: metrics.New(prefix+".has.latency_seconds",
"Latency distribution of Datastore.Has calls").Histogram(datastoreLatencyBuckets),
deleteNum: metrics.New(prefix+".delete_total", "Total number of Datastore.Delete calls").Counter(),
deleteErr: metrics.New(prefix+".delete.errors_total", "Number of errored Datastore.Delete calls").Counter(),
deleteLatency: metrics.New(prefix+".delete.latency_seconds",
"Latency distribution of Datastore.Delete calls").Histogram(datastoreLatencyBuckets),
queryNum: metrics.New(prefix+".query_total", "Total number of Datastore.Query calls").Counter(),
queryErr: metrics.New(prefix+".query.errors_total", "Number of errored Datastore.Query calls").Counter(),
queryLatency: metrics.New(prefix+".query.latency_seconds",
"Latency distribution of Datastore.Query calls").Histogram(datastoreLatencyBuckets),
}
return m
}
......@@ -58,56 +62,54 @@ type measure struct {
putNum metrics.Counter
putErr metrics.Counter
putLatency *metrics.Histogram
putSize *metrics.Histogram
putLatency metrics.Histogram
putSize metrics.Histogram
getNum metrics.Counter
getErr metrics.Counter
getLatency *metrics.Histogram
getSize *metrics.Histogram
getLatency metrics.Histogram
getSize metrics.Histogram
hasNum metrics.Counter
hasErr metrics.Counter
hasLatency *metrics.Histogram
hasLatency metrics.Histogram
deleteNum metrics.Counter
deleteErr metrics.Counter
deleteLatency *metrics.Histogram
deleteLatency metrics.Histogram
queryNum metrics.Counter
queryErr metrics.Counter
queryLatency *metrics.Histogram
queryLatency metrics.Histogram
}
var _ datastore.Datastore = (*measure)(nil)
func recordLatency(h *metrics.Histogram, start time.Time) {
elapsed := time.Now().Sub(start) / time.Microsecond
_ = h.RecordValue(int64(elapsed))
func recordLatency(h metrics.Histogram, start time.Time) {
elapsed := time.Since(start)
h.Observe(elapsed.Seconds())
}
func (m *measure) Put(key datastore.Key, value interface{}) error {
defer recordLatency(m.putLatency, time.Now())
m.putNum.Add()
m.putNum.Inc()
if b, ok := value.([]byte); ok {
_ = m.putSize.RecordValue(int64(len(b)))
m.putSize.Observe(float64(len(b)))
}
err := m.backend.Put(key, value)
if err != nil {
m.putErr.Add()
m.putErr.Inc()
}
return err
}
func (m *measure) Get(key datastore.Key) (value interface{}, err error) {
defer recordLatency(m.getLatency, time.Now())
m.getNum.Add()
m.getNum.Inc()
value, err = m.backend.Get(key)
if err != nil {
m.getErr.Add()
m.getErr.Inc()
} else {
if b, ok := value.([]byte); ok {
_ = m.getSize.RecordValue(int64(len(b)))
m.getSize.Observe(float64(len(b)))
}
}
return value, err
......@@ -115,30 +117,30 @@ func (m *measure) Get(key datastore.Key) (value interface{}, err error) {
func (m *measure) Has(key datastore.Key) (exists bool, err error) {
defer recordLatency(m.hasLatency, time.Now())
m.hasNum.Add()
m.hasNum.Inc()
exists, err = m.backend.Has(key)
if err != nil {
m.hasErr.Add()
m.hasErr.Inc()
}
return exists, err
}
func (m *measure) Delete(key datastore.Key) error {
defer recordLatency(m.deleteLatency, time.Now())
m.deleteNum.Add()
m.deleteNum.Inc()
err := m.backend.Delete(key)
if err != nil {
m.deleteErr.Add()
m.deleteErr.Inc()
}
return err
}
func (m *measure) Query(q query.Query) (query.Results, error) {
defer recordLatency(m.queryLatency, time.Now())
m.queryNum.Add()
m.queryNum.Inc()
res, err := m.backend.Query(q)
if err != nil {
m.queryErr.Add()
m.queryErr.Inc()
}
return res, err
}
......@@ -180,7 +182,7 @@ func (mt *measuredBatch) Put(key datastore.Key, val interface{}) error {
mt.puts++
valb, ok := val.([]byte)
if ok {
_ = mt.m.putSize.RecordValue(int64(len(valb)))
mt.m.putSize.Observe(float64(len(valb)))
}
return mt.putts.Put(key, val)
}
......@@ -204,17 +206,17 @@ func (mt *measuredBatch) Commit() error {
return nil
}
func logBatchCommit(b datastore.Batch, n int, num, errs metrics.Counter, lat *metrics.Histogram) error {
func logBatchCommit(b datastore.Batch, n int, num, errs metrics.Counter, lat metrics.Histogram) error {
if n > 0 {
before := time.Now()
err := b.Commit()
took := int(time.Now().Sub(before)/time.Microsecond) / n
num.AddN(uint64(n))
took := time.Since(before) / time.Duration(n)
num.Add(float64(n))
for i := 0; i < n; i++ {
_ = lat.RecordValue(int64(took))
lat.Observe(took.Seconds())
}
if err != nil {
errs.Add()
errs.Inc()
return err
}
}
......@@ -222,24 +224,6 @@ func logBatchCommit(b datastore.Batch, n int, num, errs metrics.Counter, lat *me
}
func (m *measure) Close() error {
m.putNum.Remove()
m.putErr.Remove()
m.putLatency.Remove()
m.putSize.Remove()
m.getNum.Remove()
m.getErr.Remove()
m.getLatency.Remove()
m.getSize.Remove()
m.hasNum.Remove()
m.hasErr.Remove()
m.hasLatency.Remove()
m.deleteNum.Remove()
m.deleteErr.Remove()
m.deleteLatency.Remove()
m.queryNum.Remove()
m.queryErr.Remove()
m.queryLatency.Remove()
if c, ok := m.backend.(io.Closer); ok {
return c.Close()
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment