measure.go 3.81 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
// Package measure provides a Datastore wrapper that records metrics
// using github.com/codahale/metrics.
package measure

import (
	"time"

	"github.com/jbenet/go-datastore"
	"github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/codahale/metrics"
	"github.com/jbenet/go-datastore/query"
)

// Histogram measurements exceeding these limits are dropped. TODO
// maybe it would be better to cap the value? Should we keep track of
// drops?
const (
	maxLatency = int64(1 * time.Second)
	maxSize    = int64(1 << 32)
)

// New wraps the datastore, providing metrics on the operations. The
// metrics are registered with names starting with prefix and a dot.
func New(prefix string, ds datastore.Datastore) datastore.Datastore {
	m := &measure{
		backend: ds,

		putNum:     metrics.Counter(prefix + ".Put.num"),
		putErr:     metrics.Counter(prefix + ".Put.err"),
		putLatency: metrics.NewHistogram(prefix+".Put.latency", 0, maxLatency, 3),
		putSize:    metrics.NewHistogram(prefix+".Put.size", 0, maxSize, 3),

		getNum:     metrics.Counter(prefix + ".Get.num"),
		getErr:     metrics.Counter(prefix + ".Get.err"),
		getLatency: metrics.NewHistogram(prefix+".Get.latency", 0, maxLatency, 3),
		getSize:    metrics.NewHistogram(prefix+".Get.size", 0, maxSize, 3),

		hasNum:     metrics.Counter(prefix + ".Has.num"),
		hasErr:     metrics.Counter(prefix + ".Has.err"),
		hasLatency: metrics.NewHistogram(prefix+".Has.latency", 0, maxLatency, 3),

		deleteNum:     metrics.Counter(prefix + ".Delete.num"),
		deleteErr:     metrics.Counter(prefix + ".Delete.err"),
		deleteLatency: metrics.NewHistogram(prefix+".Delete.latency", 0, maxLatency, 3),

		queryNum:     metrics.Counter(prefix + ".Query.num"),
		queryErr:     metrics.Counter(prefix + ".Query.err"),
		queryLatency: metrics.NewHistogram(prefix+".Query.latency", 0, maxLatency, 3),
	}
	return m
}

type measure struct {
	backend datastore.Datastore

	putNum     metrics.Counter
	putErr     metrics.Counter
	putLatency *metrics.Histogram
	putSize    *metrics.Histogram

	getNum     metrics.Counter
	getErr     metrics.Counter
	getLatency *metrics.Histogram
	getSize    *metrics.Histogram

	hasNum     metrics.Counter
	hasErr     metrics.Counter
	hasLatency *metrics.Histogram

	deleteNum     metrics.Counter
	deleteErr     metrics.Counter
	deleteLatency *metrics.Histogram

	queryNum     metrics.Counter
	queryErr     metrics.Counter
	queryLatency *metrics.Histogram
}

var _ datastore.Datastore = (*measure)(nil)

func recordLatency(h *metrics.Histogram, start time.Time) {
	elapsed := time.Now().Sub(start) / time.Microsecond
	_ = h.RecordValue(int64(elapsed))
}

func (m *measure) Put(key datastore.Key, value interface{}) error {
	defer recordLatency(m.putLatency, time.Now())
	m.putNum.Add()
	if b, ok := value.([]byte); ok {
		_ = m.putSize.RecordValue(int64(len(b)))
	}
	err := m.backend.Put(key, value)
	if err != nil {
		m.putErr.Add()
	}
	return err
}

func (m *measure) Get(key datastore.Key) (value interface{}, err error) {
	defer recordLatency(m.getLatency, time.Now())
	m.getNum.Add()
	value, err = m.backend.Get(key)
	if err != nil {
		m.getErr.Add()
	} else {
		if b, ok := value.([]byte); ok {
			_ = m.getSize.RecordValue(int64(len(b)))
		}
	}
	return value, err
}

func (m *measure) Has(key datastore.Key) (exists bool, err error) {
	defer recordLatency(m.hasLatency, time.Now())
	m.hasNum.Add()
	exists, err = m.backend.Has(key)
	if err != nil {
		m.hasErr.Add()
	}
	return exists, err
}

func (m *measure) Delete(key datastore.Key) error {
	defer recordLatency(m.deleteLatency, time.Now())
	m.deleteNum.Add()
	err := m.backend.Delete(key)
	if err != nil {
		m.deleteErr.Add()
	}
	return err
}

func (m *measure) Query(q query.Query) (query.Results, error) {
	defer recordLatency(m.queryLatency, time.Now())
	m.queryNum.Add()
	res, err := m.backend.Query(q)
	if err != nil {
		m.queryErr.Add()
	}
	return res, err
}