measure.go 11.4 KB
Newer Older
1
// Package measure provides a Datastore wrapper that records metrics
2
// using github.com/ipfs/go-metrics-interface
3 4 5
package measure

import (
Jeromy's avatar
Jeromy committed
6
	"io"
7 8
	"time"

Jakub Sztandera's avatar
Jakub Sztandera committed
9
	"github.com/ipfs/go-datastore"
Jeromy's avatar
Jeromy committed
10
	"github.com/ipfs/go-datastore/query"
11
	"github.com/ipfs/go-metrics-interface"
12 13
)

14 15
var (
	// sort latencies in buckets with following upper bounds in seconds
16
	datastoreLatencyBuckets = []float64{1e-4, 1e-3, 1e-2, 1e-1, 1}
17 18

	// sort sizes in buckets with following upper bounds in bytes
Jeromy's avatar
Jeromy committed
19
	datastoreSizeBuckets = []float64{1 << 6, 1 << 12, 1 << 18, 1 << 24}
20 21 22 23
)

// New wraps the datastore, providing metrics on the operations. The
// metrics are registered with names starting with prefix and a dot.
Jeromy's avatar
Jeromy committed
24
func New(prefix string, ds datastore.Datastore) *measure {
25 26 27
	m := &measure{
		backend: ds,

28 29 30 31 32 33 34
		putNum: metrics.New(prefix+".put_total", "Total number of Datastore.Put calls").Counter(),
		putErr: metrics.New(prefix+".put.errors_total", "Number of errored Datastore.Put calls").Counter(),
		putLatency: metrics.New(prefix+".put.latency_seconds",
			"Latency distribution of Datastore.Put calls").Histogram(datastoreLatencyBuckets),
		putSize: metrics.New(prefix+".put.size_bytes",
			"Size distribution of stored byte slices").Histogram(datastoreSizeBuckets),

35 36 37 38 39
		syncNum: metrics.New(prefix+".sync_total", "Total number of Datastore.Sync calls").Counter(),
		syncErr: metrics.New(prefix+".sync.errors_total", "Number of errored Datastore.Sync calls").Counter(),
		syncLatency: metrics.New(prefix+".sync.latency_seconds",
			"Latency distribution of Datastore.Sync calls").Histogram(datastoreLatencyBuckets),

40 41 42 43 44 45 46 47 48 49 50
		getNum: metrics.New(prefix+".get_total", "Total number of Datastore.Get calls").Counter(),
		getErr: metrics.New(prefix+".get.errors_total", "Number of errored Datastore.Get calls").Counter(),
		getLatency: metrics.New(prefix+".get.latency_seconds",
			"Latency distribution of Datastore.Get calls").Histogram(datastoreLatencyBuckets),
		getSize: metrics.New(prefix+".get.size_bytes",
			"Size distribution of retrieved byte slices").Histogram(datastoreSizeBuckets),

		hasNum: metrics.New(prefix+".has_total", "Total number of Datastore.Has calls").Counter(),
		hasErr: metrics.New(prefix+".has.errors_total", "Number of errored Datastore.Has calls").Counter(),
		hasLatency: metrics.New(prefix+".has.latency_seconds",
			"Latency distribution of Datastore.Has calls").Histogram(datastoreLatencyBuckets),
Steven Allen's avatar
Steven Allen committed
51 52 53 54
		getsizeNum: metrics.New(prefix+".getsize_total", "Total number of Datastore.GetSize calls").Counter(),
		getsizeErr: metrics.New(prefix+".getsize.errors_total", "Number of errored Datastore.GetSize calls").Counter(),
		getsizeLatency: metrics.New(prefix+".getsize.latency_seconds",
			"Latency distribution of Datastore.GetSize calls").Histogram(datastoreLatencyBuckets),
55 56 57 58 59 60 61 62 63 64

		deleteNum: metrics.New(prefix+".delete_total", "Total number of Datastore.Delete calls").Counter(),
		deleteErr: metrics.New(prefix+".delete.errors_total", "Number of errored Datastore.Delete calls").Counter(),
		deleteLatency: metrics.New(prefix+".delete.latency_seconds",
			"Latency distribution of Datastore.Delete calls").Histogram(datastoreLatencyBuckets),

		queryNum: metrics.New(prefix+".query_total", "Total number of Datastore.Query calls").Counter(),
		queryErr: metrics.New(prefix+".query.errors_total", "Number of errored Datastore.Query calls").Counter(),
		queryLatency: metrics.New(prefix+".query.latency_seconds",
			"Latency distribution of Datastore.Query calls").Histogram(datastoreLatencyBuckets),
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79

		checkNum: metrics.New(prefix+".check_total", "Total number of Datastore.Check calls").Counter(),
		checkErr: metrics.New(prefix+".check.errors_total", "Number of errored Datastore.Check calls").Counter(),
		checkLatency: metrics.New(prefix+".check.latency_seconds",
			"Latency distribution of Datastore.Check calls").Histogram(datastoreLatencyBuckets),

		scrubNum: metrics.New(prefix+".scrub_total", "Total number of Datastore.Scrub calls").Counter(),
		scrubErr: metrics.New(prefix+".scrub.errors_total", "Number of errored Datastore.Scrub calls").Counter(),
		scrubLatency: metrics.New(prefix+".scrub.latency_seconds",
			"Latency distribution of Datastore.Scrub calls").Histogram(datastoreLatencyBuckets),

		gcNum: metrics.New(prefix+".gc_total", "Total number of Datastore.CollectGarbage calls").Counter(),
		gcErr: metrics.New(prefix+".gc.errors_total", "Number of errored Datastore.CollectGarbage calls").Counter(),
		gcLatency: metrics.New(prefix+".gc.latency_seconds",
			"Latency distribution of Datastore.CollectGarbage calls").Histogram(datastoreLatencyBuckets),
80 81 82 83 84

		duNum: metrics.New(prefix+".du_total", "Total number of Datastore.DiskUsage calls").Counter(),
		duErr: metrics.New(prefix+".du.errors_total", "Number of errored Datastore.DiskUsage calls").Counter(),
		duLatency: metrics.New(prefix+".du.latency_seconds",
			"Latency distribution of Datastore.DiskUsage calls").Histogram(datastoreLatencyBuckets),
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101

		batchPutNum: metrics.New(prefix+".batchput_total", "Total number of Batch.Put calls").Counter(),
		batchPutErr: metrics.New(prefix+".batchput.errors_total", "Number of errored Batch.Put calls").Counter(),
		batchPutLatency: metrics.New(prefix+".batchput.latency_seconds",
			"Latency distribution of Batch.Put calls").Histogram(datastoreLatencyBuckets),
		batchPutSize: metrics.New(prefix+".batchput.size_bytes",
			"Size distribution of byte slices put into batches").Histogram(datastoreSizeBuckets),

		batchDeleteNum: metrics.New(prefix+".batchdelete_total", "Total number of Batch.Delete calls").Counter(),
		batchDeleteErr: metrics.New(prefix+".batchdelete.errors_total", "Number of errored Batch.Delete calls").Counter(),
		batchDeleteLatency: metrics.New(prefix+".batchdelete.latency_seconds",
			"Latency distribution of Batch.Delete calls").Histogram(datastoreLatencyBuckets),

		batchCommitNum: metrics.New(prefix+".batchcommit_total", "Total number of Batch.Commit calls").Counter(),
		batchCommitErr: metrics.New(prefix+".batchcommit.errors_total", "Number of errored Batch.Commit calls").Counter(),
		batchCommitLatency: metrics.New(prefix+".batchcommit.latency_seconds",
			"Latency distribution of Batch.Commit calls").Histogram(datastoreLatencyBuckets),
102 103 104 105 106 107 108 109 110
	}
	return m
}

type measure struct {
	backend datastore.Datastore

	putNum     metrics.Counter
	putErr     metrics.Counter
111 112
	putLatency metrics.Histogram
	putSize    metrics.Histogram
113

114 115 116 117
	syncNum     metrics.Counter
	syncErr     metrics.Counter
	syncLatency metrics.Histogram

118 119
	getNum     metrics.Counter
	getErr     metrics.Counter
120 121
	getLatency metrics.Histogram
	getSize    metrics.Histogram
122 123 124

	hasNum     metrics.Counter
	hasErr     metrics.Counter
125
	hasLatency metrics.Histogram
126

Steven Allen's avatar
Steven Allen committed
127 128 129 130
	getsizeNum     metrics.Counter
	getsizeErr     metrics.Counter
	getsizeLatency metrics.Histogram

131 132
	deleteNum     metrics.Counter
	deleteErr     metrics.Counter
133
	deleteLatency metrics.Histogram
134 135 136

	queryNum     metrics.Counter
	queryErr     metrics.Counter
137
	queryLatency metrics.Histogram
138 139 140 141 142 143 144 145 146 147 148 149

	checkNum     metrics.Counter
	checkErr     metrics.Counter
	checkLatency metrics.Histogram

	scrubNum     metrics.Counter
	scrubErr     metrics.Counter
	scrubLatency metrics.Histogram

	gcNum     metrics.Counter
	gcErr     metrics.Counter
	gcLatency metrics.Histogram
150 151 152 153

	duNum     metrics.Counter
	duErr     metrics.Counter
	duLatency metrics.Histogram
154 155 156 157 158 159 160 161 162 163 164 165 166

	batchPutNum     metrics.Counter
	batchPutErr     metrics.Counter
	batchPutLatency metrics.Histogram
	batchPutSize    metrics.Histogram

	batchDeleteNum     metrics.Counter
	batchDeleteErr     metrics.Counter
	batchDeleteLatency metrics.Histogram

	batchCommitNum     metrics.Counter
	batchCommitErr     metrics.Counter
	batchCommitLatency metrics.Histogram
167 168
}

169 170 171
func recordLatency(h metrics.Histogram, start time.Time) {
	elapsed := time.Since(start)
	h.Observe(elapsed.Seconds())
172 173
}

174
func (m *measure) Put(key datastore.Key, value []byte) error {
175
	defer recordLatency(m.putLatency, time.Now())
176
	m.putNum.Inc()
177
	m.putSize.Observe(float64(len(value)))
178 179
	err := m.backend.Put(key, value)
	if err != nil {
180
		m.putErr.Inc()
181 182 183 184
	}
	return err
}

Adin Schmahmann's avatar
Adin Schmahmann committed
185
func (m *measure) Sync(prefix datastore.Key) error {
186 187 188 189 190 191 192
	defer recordLatency(m.syncLatency, time.Now())
	m.syncNum.Inc()
	err := m.backend.Sync(prefix)
	if err != nil {
		m.syncErr.Inc()
	}
	return err
Adin Schmahmann's avatar
Adin Schmahmann committed
193 194
}

195
func (m *measure) Get(key datastore.Key) (value []byte, err error) {
196
	defer recordLatency(m.getLatency, time.Now())
197
	m.getNum.Inc()
198
	value, err = m.backend.Get(key)
199 200
	switch err {
	case nil:
201
		m.getSize.Observe(float64(len(value)))
202 203 204 205
	case datastore.ErrNotFound:
		// Not really an error.
	default:
		m.getErr.Inc()
206 207 208 209 210 211
	}
	return value, err
}

func (m *measure) Has(key datastore.Key) (exists bool, err error) {
	defer recordLatency(m.hasLatency, time.Now())
212
	m.hasNum.Inc()
213 214
	exists, err = m.backend.Has(key)
	if err != nil {
215
		m.hasErr.Inc()
216 217 218 219
	}
	return exists, err
}

Steven Allen's avatar
Steven Allen committed
220 221
func (m *measure) GetSize(key datastore.Key) (size int, err error) {
	defer recordLatency(m.getsizeLatency, time.Now())
222
	m.getsizeNum.Inc()
Steven Allen's avatar
Steven Allen committed
223 224 225
	size, err = m.backend.GetSize(key)
	switch err {
	case nil, datastore.ErrNotFound:
226
		// Not really an error.
Steven Allen's avatar
Steven Allen committed
227 228 229 230 231 232
	default:
		m.getsizeErr.Inc()
	}
	return size, err
}

233 234
func (m *measure) Delete(key datastore.Key) error {
	defer recordLatency(m.deleteLatency, time.Now())
235
	m.deleteNum.Inc()
236 237
	err := m.backend.Delete(key)
	if err != nil {
238
		m.deleteErr.Inc()
239 240 241 242 243 244
	}
	return err
}

func (m *measure) Query(q query.Query) (query.Results, error) {
	defer recordLatency(m.queryLatency, time.Now())
245
	m.queryNum.Inc()
246 247
	res, err := m.backend.Query(q)
	if err != nil {
248
		m.queryErr.Inc()
249 250 251
	}
	return res, err
}
252

253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
func (m *measure) Check() error {
	defer recordLatency(m.checkLatency, time.Now())
	m.checkNum.Inc()
	if c, ok := m.backend.(datastore.CheckedDatastore); ok {
		err := c.Check()
		if err != nil {
			m.checkErr.Inc()
		}
		return err
	}
	return nil
}

func (m *measure) Scrub() error {
	defer recordLatency(m.scrubLatency, time.Now())
	m.scrubNum.Inc()
	if c, ok := m.backend.(datastore.ScrubbedDatastore); ok {
		err := c.Scrub()
		if err != nil {
			m.scrubErr.Inc()
		}
		return err
	}
	return nil
}

func (m *measure) CollectGarbage() error {
	defer recordLatency(m.gcLatency, time.Now())
	m.gcNum.Inc()
	if c, ok := m.backend.(datastore.GCDatastore); ok {
		err := c.CollectGarbage()
		if err != nil {
			m.gcErr.Inc()
		}
		return err
	}
	return nil
}

292 293 294 295 296 297 298 299 300 301
func (m *measure) DiskUsage() (uint64, error) {
	defer recordLatency(m.duLatency, time.Now())
	m.duNum.Inc()
	size, err := datastore.DiskUsage(m.backend)
	if err != nil {
		m.duErr.Inc()
	}
	return size, err
}

Jeromy's avatar
Jeromy committed
302
type measuredBatch struct {
303
	b datastore.Batch
Jeromy's avatar
Jeromy committed
304 305 306 307
	m *measure
}

func (m *measure) Batch() (datastore.Batch, error) {
Jeromy's avatar
Jeromy committed
308
	bds, ok := m.backend.(datastore.Batching)
Jeromy's avatar
Jeromy committed
309 310 311
	if !ok {
		return nil, datastore.ErrBatchUnsupported
	}
312
	batch, err := bds.Batch()
Jeromy's avatar
Jeromy committed
313 314 315 316 317
	if err != nil {
		return nil, err
	}

	return &measuredBatch{
318
		b: batch,
Jeromy's avatar
Jeromy committed
319 320 321 322
		m: m,
	}, nil
}

323
func (mt *measuredBatch) Put(key datastore.Key, val []byte) error {
324 325 326 327
	defer recordLatency(mt.m.batchPutLatency, time.Now())
	mt.m.batchPutNum.Inc()
	mt.m.batchPutSize.Observe(float64(len(val)))
	err := mt.b.Put(key, val)
Jeromy's avatar
Jeromy committed
328
	if err != nil {
329
		mt.m.batchPutErr.Inc()
Jeromy's avatar
Jeromy committed
330
	}
331 332
	return err
}
Jeromy's avatar
Jeromy committed
333

334 335 336 337
func (mt *measuredBatch) Delete(key datastore.Key) error {
	defer recordLatency(mt.m.batchDeleteLatency, time.Now())
	mt.m.batchDeleteNum.Inc()
	err := mt.b.Delete(key)
Jeromy's avatar
Jeromy committed
338
	if err != nil {
339
		mt.m.batchDeleteErr.Inc()
Jeromy's avatar
Jeromy committed
340
	}
341
	return err
Jeromy's avatar
Jeromy committed
342 343
}

344 345 346 347 348 349
func (mt *measuredBatch) Commit() error {
	defer recordLatency(mt.m.batchCommitLatency, time.Now())
	mt.m.batchCommitNum.Inc()
	err := mt.b.Commit()
	if err != nil {
		mt.m.batchCommitErr.Inc()
Jeromy's avatar
Jeromy committed
350
	}
351
	return err
Jeromy's avatar
Jeromy committed
352 353
}

354
func (m *measure) Close() error {
Jeromy's avatar
Jeromy committed
355 356 357
	if c, ok := m.backend.(io.Closer); ok {
		return c.Close()
	}
358 359
	return nil
}