measure.go 5.86 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// Package measure provides a Datastore wrapper that records metrics
// using github.com/codahale/metrics.
package measure

import (
	"time"

	"github.com/jbenet/go-datastore"
	"github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/codahale/metrics"
	"github.com/jbenet/go-datastore/query"
)

// Histogram measurements exceeding these limits are dropped. TODO
// maybe it would be better to cap the value? Should we keep track of
// drops?
const (
	maxLatency = int64(1 * time.Second)
	maxSize    = int64(1 << 32)
)

21 22 23 24 25
type DatastoreCloser interface {
	datastore.Datastore
	Close() error
}

26 27
// New wraps the datastore, providing metrics on the operations. The
// metrics are registered with names starting with prefix and a dot.
28 29 30 31
//
// If prefix is not unique, New will panic. Call Close to release the
// prefix.
func New(prefix string, ds datastore.Datastore) DatastoreCloser {
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
	m := &measure{
		backend: ds,

		putNum:     metrics.Counter(prefix + ".Put.num"),
		putErr:     metrics.Counter(prefix + ".Put.err"),
		putLatency: metrics.NewHistogram(prefix+".Put.latency", 0, maxLatency, 3),
		putSize:    metrics.NewHistogram(prefix+".Put.size", 0, maxSize, 3),

		getNum:     metrics.Counter(prefix + ".Get.num"),
		getErr:     metrics.Counter(prefix + ".Get.err"),
		getLatency: metrics.NewHistogram(prefix+".Get.latency", 0, maxLatency, 3),
		getSize:    metrics.NewHistogram(prefix+".Get.size", 0, maxSize, 3),

		hasNum:     metrics.Counter(prefix + ".Has.num"),
		hasErr:     metrics.Counter(prefix + ".Has.err"),
		hasLatency: metrics.NewHistogram(prefix+".Has.latency", 0, maxLatency, 3),

		deleteNum:     metrics.Counter(prefix + ".Delete.num"),
		deleteErr:     metrics.Counter(prefix + ".Delete.err"),
		deleteLatency: metrics.NewHistogram(prefix+".Delete.latency", 0, maxLatency, 3),

		queryNum:     metrics.Counter(prefix + ".Query.num"),
		queryErr:     metrics.Counter(prefix + ".Query.err"),
		queryLatency: metrics.NewHistogram(prefix+".Query.latency", 0, maxLatency, 3),
	}
	return m
}

type measure struct {
	backend datastore.Datastore

	putNum     metrics.Counter
	putErr     metrics.Counter
	putLatency *metrics.Histogram
	putSize    *metrics.Histogram

	getNum     metrics.Counter
	getErr     metrics.Counter
	getLatency *metrics.Histogram
	getSize    *metrics.Histogram

	hasNum     metrics.Counter
	hasErr     metrics.Counter
	hasLatency *metrics.Histogram

	deleteNum     metrics.Counter
	deleteErr     metrics.Counter
	deleteLatency *metrics.Histogram

	queryNum     metrics.Counter
	queryErr     metrics.Counter
	queryLatency *metrics.Histogram
}

var _ datastore.Datastore = (*measure)(nil)
87
var _ DatastoreCloser = (*measure)(nil)
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149

func recordLatency(h *metrics.Histogram, start time.Time) {
	elapsed := time.Now().Sub(start) / time.Microsecond
	_ = h.RecordValue(int64(elapsed))
}

func (m *measure) Put(key datastore.Key, value interface{}) error {
	defer recordLatency(m.putLatency, time.Now())
	m.putNum.Add()
	if b, ok := value.([]byte); ok {
		_ = m.putSize.RecordValue(int64(len(b)))
	}
	err := m.backend.Put(key, value)
	if err != nil {
		m.putErr.Add()
	}
	return err
}

func (m *measure) Get(key datastore.Key) (value interface{}, err error) {
	defer recordLatency(m.getLatency, time.Now())
	m.getNum.Add()
	value, err = m.backend.Get(key)
	if err != nil {
		m.getErr.Add()
	} else {
		if b, ok := value.([]byte); ok {
			_ = m.getSize.RecordValue(int64(len(b)))
		}
	}
	return value, err
}

func (m *measure) Has(key datastore.Key) (exists bool, err error) {
	defer recordLatency(m.hasLatency, time.Now())
	m.hasNum.Add()
	exists, err = m.backend.Has(key)
	if err != nil {
		m.hasErr.Add()
	}
	return exists, err
}

func (m *measure) Delete(key datastore.Key) error {
	defer recordLatency(m.deleteLatency, time.Now())
	m.deleteNum.Add()
	err := m.backend.Delete(key)
	if err != nil {
		m.deleteErr.Add()
	}
	return err
}

func (m *measure) Query(q query.Query) (query.Results, error) {
	defer recordLatency(m.queryLatency, time.Now())
	m.queryNum.Add()
	res, err := m.backend.Query(q)
	if err != nil {
		m.queryErr.Add()
	}
	return res, err
}
150

Jeromy's avatar
Jeromy committed
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
type measuredBatch struct {
	puts    int
	deletes int

	putts datastore.Batch
	delts datastore.Batch

	m *measure
}

func (m *measure) Batch() (datastore.Batch, error) {
	bds, ok := m.backend.(datastore.BatchingDatastore)
	if !ok {
		return nil, datastore.ErrBatchUnsupported
	}
	pb, err := bds.Batch()
	if err != nil {
		return nil, err
	}

	db, err := bds.Batch()
	if err != nil {
		return nil, err
	}

	return &measuredBatch{
		putts: pb,
		delts: db,

		m: m,
	}, nil
}

func (mt *measuredBatch) Put(key datastore.Key, val interface{}) error {
	mt.puts++
	valb, ok := val.([]byte)
	if !ok {
		return datastore.ErrInvalidType
	}
	_ = mt.m.putSize.RecordValue(int64(len(valb)))
	return mt.putts.Put(key, val)
}

func (mt *measuredBatch) Delete(key datastore.Key) error {
	mt.deletes++
	return mt.delts.Delete(key)
}

func (mt *measuredBatch) Commit() error {
	err := logBatchCommit(mt.delts, mt.deletes, mt.m.deleteNum, mt.m.deleteErr, mt.m.deleteLatency)
	if err != nil {
		return err
	}

	err = logBatchCommit(mt.putts, mt.puts, mt.m.putNum, mt.m.putErr, mt.m.putLatency)
	if err != nil {
		return err
	}

	return nil
}

func logBatchCommit(b datastore.Batch, n int, num, errs metrics.Counter, lat *metrics.Histogram) error {
	if n > 0 {
		before := time.Now()
		err := b.Commit()
		took := int(time.Now().Sub(before)/time.Microsecond) / n
		num.AddN(uint64(n))
		for i := 0; i < n; i++ {
			_ = lat.RecordValue(int64(took))
		}
		if err != nil {
			errs.Add()
			return err
		}
	}
	return nil
}

230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
func (m *measure) Close() error {
	m.putNum.Remove()
	m.putErr.Remove()
	m.putLatency.Remove()
	m.putSize.Remove()
	m.getNum.Remove()
	m.getErr.Remove()
	m.getLatency.Remove()
	m.getSize.Remove()
	m.hasNum.Remove()
	m.hasErr.Remove()
	m.hasLatency.Remove()
	m.deleteNum.Remove()
	m.deleteErr.Remove()
	m.deleteLatency.Remove()
	m.queryNum.Remove()
	m.queryErr.Remove()
	m.queryLatency.Remove()
	return nil
}