measure.go 6.13 KB
Newer Older
1
// Package measure provides a Datastore wrapper that records metrics
2
// using github.com/ipfs/go-metrics-interface
3 4 5
package measure

import (
Jeromy's avatar
Jeromy committed
6
	"io"
7 8
	"time"

Jakub Sztandera's avatar
Jakub Sztandera committed
9
	"github.com/ipfs/go-datastore"
Jeromy's avatar
Jeromy committed
10
	"github.com/ipfs/go-datastore/query"
11
	"github.com/ipfs/go-metrics-interface"
12 13
)

14 15 16 17 18
var (
	// sort latencies in buckets with following upper bounds in seconds
	datastoreLatencyBuckets = []float64{1e-4, 1e-3, 1e-2, 1e-1}

	// sort sizes in buckets with following upper bounds in bytes
Jeromy's avatar
Jeromy committed
19
	datastoreSizeBuckets = []float64{1 << 6, 1 << 12, 1 << 18, 1 << 24}
20 21 22 23
)

// New wraps the datastore, providing metrics on the operations. The
// metrics are registered with names starting with prefix and a dot.
Jeromy's avatar
Jeromy committed
24
func New(prefix string, ds datastore.Datastore) *measure {
25 26 27
	m := &measure{
		backend: ds,

28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
		putNum: metrics.New(prefix+".put_total", "Total number of Datastore.Put calls").Counter(),
		putErr: metrics.New(prefix+".put.errors_total", "Number of errored Datastore.Put calls").Counter(),
		putLatency: metrics.New(prefix+".put.latency_seconds",
			"Latency distribution of Datastore.Put calls").Histogram(datastoreLatencyBuckets),
		putSize: metrics.New(prefix+".put.size_bytes",
			"Size distribution of stored byte slices").Histogram(datastoreSizeBuckets),

		getNum: metrics.New(prefix+".get_total", "Total number of Datastore.Get calls").Counter(),
		getErr: metrics.New(prefix+".get.errors_total", "Number of errored Datastore.Get calls").Counter(),
		getLatency: metrics.New(prefix+".get.latency_seconds",
			"Latency distribution of Datastore.Get calls").Histogram(datastoreLatencyBuckets),
		getSize: metrics.New(prefix+".get.size_bytes",
			"Size distribution of retrieved byte slices").Histogram(datastoreSizeBuckets),

		hasNum: metrics.New(prefix+".has_total", "Total number of Datastore.Has calls").Counter(),
		hasErr: metrics.New(prefix+".has.errors_total", "Number of errored Datastore.Has calls").Counter(),
		hasLatency: metrics.New(prefix+".has.latency_seconds",
			"Latency distribution of Datastore.Has calls").Histogram(datastoreLatencyBuckets),

		deleteNum: metrics.New(prefix+".delete_total", "Total number of Datastore.Delete calls").Counter(),
		deleteErr: metrics.New(prefix+".delete.errors_total", "Number of errored Datastore.Delete calls").Counter(),
		deleteLatency: metrics.New(prefix+".delete.latency_seconds",
			"Latency distribution of Datastore.Delete calls").Histogram(datastoreLatencyBuckets),

		queryNum: metrics.New(prefix+".query_total", "Total number of Datastore.Query calls").Counter(),
		queryErr: metrics.New(prefix+".query.errors_total", "Number of errored Datastore.Query calls").Counter(),
		queryLatency: metrics.New(prefix+".query.latency_seconds",
			"Latency distribution of Datastore.Query calls").Histogram(datastoreLatencyBuckets),
56 57 58 59 60 61 62 63 64
	}
	return m
}

type measure struct {
	backend datastore.Datastore

	putNum     metrics.Counter
	putErr     metrics.Counter
65 66
	putLatency metrics.Histogram
	putSize    metrics.Histogram
67 68 69

	getNum     metrics.Counter
	getErr     metrics.Counter
70 71
	getLatency metrics.Histogram
	getSize    metrics.Histogram
72 73 74

	hasNum     metrics.Counter
	hasErr     metrics.Counter
75
	hasLatency metrics.Histogram
76 77 78

	deleteNum     metrics.Counter
	deleteErr     metrics.Counter
79
	deleteLatency metrics.Histogram
80 81 82

	queryNum     metrics.Counter
	queryErr     metrics.Counter
83
	queryLatency metrics.Histogram
84 85
}

86 87 88
func recordLatency(h metrics.Histogram, start time.Time) {
	elapsed := time.Since(start)
	h.Observe(elapsed.Seconds())
89 90 91 92
}

func (m *measure) Put(key datastore.Key, value interface{}) error {
	defer recordLatency(m.putLatency, time.Now())
93
	m.putNum.Inc()
94
	if b, ok := value.([]byte); ok {
95
		m.putSize.Observe(float64(len(b)))
96 97 98
	}
	err := m.backend.Put(key, value)
	if err != nil {
99
		m.putErr.Inc()
100 101 102 103 104 105
	}
	return err
}

func (m *measure) Get(key datastore.Key) (value interface{}, err error) {
	defer recordLatency(m.getLatency, time.Now())
106
	m.getNum.Inc()
107 108
	value, err = m.backend.Get(key)
	if err != nil {
109
		m.getErr.Inc()
110 111
	} else {
		if b, ok := value.([]byte); ok {
112
			m.getSize.Observe(float64(len(b)))
113 114 115 116 117 118 119
		}
	}
	return value, err
}

func (m *measure) Has(key datastore.Key) (exists bool, err error) {
	defer recordLatency(m.hasLatency, time.Now())
120
	m.hasNum.Inc()
121 122
	exists, err = m.backend.Has(key)
	if err != nil {
123
		m.hasErr.Inc()
124 125 126 127 128 129
	}
	return exists, err
}

func (m *measure) Delete(key datastore.Key) error {
	defer recordLatency(m.deleteLatency, time.Now())
130
	m.deleteNum.Inc()
131 132
	err := m.backend.Delete(key)
	if err != nil {
133
		m.deleteErr.Inc()
134 135 136 137 138 139
	}
	return err
}

func (m *measure) Query(q query.Query) (query.Results, error) {
	defer recordLatency(m.queryLatency, time.Now())
140
	m.queryNum.Inc()
141 142
	res, err := m.backend.Query(q)
	if err != nil {
143
		m.queryErr.Inc()
144 145 146
	}
	return res, err
}
147

Jeromy's avatar
Jeromy committed
148 149 150 151 152 153 154 155 156 157 158
type measuredBatch struct {
	puts    int
	deletes int

	putts datastore.Batch
	delts datastore.Batch

	m *measure
}

func (m *measure) Batch() (datastore.Batch, error) {
Jeromy's avatar
Jeromy committed
159
	bds, ok := m.backend.(datastore.Batching)
Jeromy's avatar
Jeromy committed
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
	if !ok {
		return nil, datastore.ErrBatchUnsupported
	}
	pb, err := bds.Batch()
	if err != nil {
		return nil, err
	}

	db, err := bds.Batch()
	if err != nil {
		return nil, err
	}

	return &measuredBatch{
		putts: pb,
		delts: db,

		m: m,
	}, nil
}

func (mt *measuredBatch) Put(key datastore.Key, val interface{}) error {
	mt.puts++
	valb, ok := val.([]byte)
184
	if ok {
185
		mt.m.putSize.Observe(float64(len(valb)))
Jeromy's avatar
Jeromy committed
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
	}
	return mt.putts.Put(key, val)
}

func (mt *measuredBatch) Delete(key datastore.Key) error {
	mt.deletes++
	return mt.delts.Delete(key)
}

func (mt *measuredBatch) Commit() error {
	err := logBatchCommit(mt.delts, mt.deletes, mt.m.deleteNum, mt.m.deleteErr, mt.m.deleteLatency)
	if err != nil {
		return err
	}

	err = logBatchCommit(mt.putts, mt.puts, mt.m.putNum, mt.m.putErr, mt.m.putLatency)
	if err != nil {
		return err
	}

	return nil
}

209
func logBatchCommit(b datastore.Batch, n int, num, errs metrics.Counter, lat metrics.Histogram) error {
Jeromy's avatar
Jeromy committed
210 211 212
	if n > 0 {
		before := time.Now()
		err := b.Commit()
213 214
		took := time.Since(before) / time.Duration(n)
		num.Add(float64(n))
Jeromy's avatar
Jeromy committed
215
		for i := 0; i < n; i++ {
216
			lat.Observe(took.Seconds())
Jeromy's avatar
Jeromy committed
217 218
		}
		if err != nil {
219
			errs.Inc()
Jeromy's avatar
Jeromy committed
220 221 222 223 224 225
			return err
		}
	}
	return nil
}

226
func (m *measure) Close() error {
Jeromy's avatar
Jeromy committed
227 228 229
	if c, ok := m.backend.(io.Closer); ok {
		return c.Close()
	}
230 231
	return nil
}