measure.go 4.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// Package measure provides a Datastore wrapper that records metrics
// using github.com/codahale/metrics.
package measure

import (
	"time"

	"github.com/jbenet/go-datastore"
	"github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/codahale/metrics"
	"github.com/jbenet/go-datastore/query"
)

// Histogram measurements exceeding these limits are dropped. TODO
// maybe it would be better to cap the value? Should we keep track of
// drops?
const (
	maxLatency = int64(1 * time.Second)
	maxSize    = int64(1 << 32)
)

21 22 23 24 25
type DatastoreCloser interface {
	datastore.Datastore
	Close() error
}

26 27
// New wraps the datastore, providing metrics on the operations. The
// metrics are registered with names starting with prefix and a dot.
28 29 30 31
//
// If prefix is not unique, New will panic. Call Close to release the
// prefix.
func New(prefix string, ds datastore.Datastore) DatastoreCloser {
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
	m := &measure{
		backend: ds,

		putNum:     metrics.Counter(prefix + ".Put.num"),
		putErr:     metrics.Counter(prefix + ".Put.err"),
		putLatency: metrics.NewHistogram(prefix+".Put.latency", 0, maxLatency, 3),
		putSize:    metrics.NewHistogram(prefix+".Put.size", 0, maxSize, 3),

		getNum:     metrics.Counter(prefix + ".Get.num"),
		getErr:     metrics.Counter(prefix + ".Get.err"),
		getLatency: metrics.NewHistogram(prefix+".Get.latency", 0, maxLatency, 3),
		getSize:    metrics.NewHistogram(prefix+".Get.size", 0, maxSize, 3),

		hasNum:     metrics.Counter(prefix + ".Has.num"),
		hasErr:     metrics.Counter(prefix + ".Has.err"),
		hasLatency: metrics.NewHistogram(prefix+".Has.latency", 0, maxLatency, 3),

		deleteNum:     metrics.Counter(prefix + ".Delete.num"),
		deleteErr:     metrics.Counter(prefix + ".Delete.err"),
		deleteLatency: metrics.NewHistogram(prefix+".Delete.latency", 0, maxLatency, 3),

		queryNum:     metrics.Counter(prefix + ".Query.num"),
		queryErr:     metrics.Counter(prefix + ".Query.err"),
		queryLatency: metrics.NewHistogram(prefix+".Query.latency", 0, maxLatency, 3),
	}
	return m
}

type measure struct {
	backend datastore.Datastore

	putNum     metrics.Counter
	putErr     metrics.Counter
	putLatency *metrics.Histogram
	putSize    *metrics.Histogram

	getNum     metrics.Counter
	getErr     metrics.Counter
	getLatency *metrics.Histogram
	getSize    *metrics.Histogram

	hasNum     metrics.Counter
	hasErr     metrics.Counter
	hasLatency *metrics.Histogram

	deleteNum     metrics.Counter
	deleteErr     metrics.Counter
	deleteLatency *metrics.Histogram

	queryNum     metrics.Counter
	queryErr     metrics.Counter
	queryLatency *metrics.Histogram
}

var _ datastore.Datastore = (*measure)(nil)
87
var _ DatastoreCloser = (*measure)(nil)
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149

func recordLatency(h *metrics.Histogram, start time.Time) {
	elapsed := time.Now().Sub(start) / time.Microsecond
	_ = h.RecordValue(int64(elapsed))
}

func (m *measure) Put(key datastore.Key, value interface{}) error {
	defer recordLatency(m.putLatency, time.Now())
	m.putNum.Add()
	if b, ok := value.([]byte); ok {
		_ = m.putSize.RecordValue(int64(len(b)))
	}
	err := m.backend.Put(key, value)
	if err != nil {
		m.putErr.Add()
	}
	return err
}

func (m *measure) Get(key datastore.Key) (value interface{}, err error) {
	defer recordLatency(m.getLatency, time.Now())
	m.getNum.Add()
	value, err = m.backend.Get(key)
	if err != nil {
		m.getErr.Add()
	} else {
		if b, ok := value.([]byte); ok {
			_ = m.getSize.RecordValue(int64(len(b)))
		}
	}
	return value, err
}

func (m *measure) Has(key datastore.Key) (exists bool, err error) {
	defer recordLatency(m.hasLatency, time.Now())
	m.hasNum.Add()
	exists, err = m.backend.Has(key)
	if err != nil {
		m.hasErr.Add()
	}
	return exists, err
}

func (m *measure) Delete(key datastore.Key) error {
	defer recordLatency(m.deleteLatency, time.Now())
	m.deleteNum.Add()
	err := m.backend.Delete(key)
	if err != nil {
		m.deleteErr.Add()
	}
	return err
}

func (m *measure) Query(q query.Query) (query.Results, error) {
	defer recordLatency(m.queryLatency, time.Now())
	m.queryNum.Add()
	res, err := m.backend.Query(q)
	if err != nil {
		m.queryErr.Add()
	}
	return res, err
}
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170

func (m *measure) Close() error {
	m.putNum.Remove()
	m.putErr.Remove()
	m.putLatency.Remove()
	m.putSize.Remove()
	m.getNum.Remove()
	m.getErr.Remove()
	m.getLatency.Remove()
	m.getSize.Remove()
	m.hasNum.Remove()
	m.hasErr.Remove()
	m.hasLatency.Remove()
	m.deleteNum.Remove()
	m.deleteErr.Remove()
	m.deleteLatency.Remove()
	m.queryNum.Remove()
	m.queryErr.Remove()
	m.queryLatency.Remove()
	return nil
}