measure.go 9.52 KB
Newer Older
1
// Package measure provides a Datastore wrapper that records metrics
2
// using github.com/ipfs/go-metrics-interface
3 4 5
package measure

import (
Jeromy's avatar
Jeromy committed
6
	"io"
7 8
	"time"

Jakub Sztandera's avatar
Jakub Sztandera committed
9
	"github.com/ipfs/go-datastore"
Jeromy's avatar
Jeromy committed
10
	"github.com/ipfs/go-datastore/query"
11
	"github.com/ipfs/go-metrics-interface"
12 13
)

14 15
var (
	// sort latencies in buckets with following upper bounds in seconds
16
	datastoreLatencyBuckets = []float64{1e-4, 1e-3, 1e-2, 1e-1, 1}
17 18

	// sort sizes in buckets with following upper bounds in bytes
Jeromy's avatar
Jeromy committed
19
	datastoreSizeBuckets = []float64{1 << 6, 1 << 12, 1 << 18, 1 << 24}
20 21 22 23
)

// New wraps the datastore, providing metrics on the operations. The
// metrics are registered with names starting with prefix and a dot.
Jeromy's avatar
Jeromy committed
24
func New(prefix string, ds datastore.Datastore) *measure {
25 26 27
	m := &measure{
		backend: ds,

28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
		putNum: metrics.New(prefix+".put_total", "Total number of Datastore.Put calls").Counter(),
		putErr: metrics.New(prefix+".put.errors_total", "Number of errored Datastore.Put calls").Counter(),
		putLatency: metrics.New(prefix+".put.latency_seconds",
			"Latency distribution of Datastore.Put calls").Histogram(datastoreLatencyBuckets),
		putSize: metrics.New(prefix+".put.size_bytes",
			"Size distribution of stored byte slices").Histogram(datastoreSizeBuckets),

		getNum: metrics.New(prefix+".get_total", "Total number of Datastore.Get calls").Counter(),
		getErr: metrics.New(prefix+".get.errors_total", "Number of errored Datastore.Get calls").Counter(),
		getLatency: metrics.New(prefix+".get.latency_seconds",
			"Latency distribution of Datastore.Get calls").Histogram(datastoreLatencyBuckets),
		getSize: metrics.New(prefix+".get.size_bytes",
			"Size distribution of retrieved byte slices").Histogram(datastoreSizeBuckets),

		hasNum: metrics.New(prefix+".has_total", "Total number of Datastore.Has calls").Counter(),
		hasErr: metrics.New(prefix+".has.errors_total", "Number of errored Datastore.Has calls").Counter(),
		hasLatency: metrics.New(prefix+".has.latency_seconds",
			"Latency distribution of Datastore.Has calls").Histogram(datastoreLatencyBuckets),
Steven Allen's avatar
Steven Allen committed
46 47 48 49
		getsizeNum: metrics.New(prefix+".getsize_total", "Total number of Datastore.GetSize calls").Counter(),
		getsizeErr: metrics.New(prefix+".getsize.errors_total", "Number of errored Datastore.GetSize calls").Counter(),
		getsizeLatency: metrics.New(prefix+".getsize.latency_seconds",
			"Latency distribution of Datastore.GetSize calls").Histogram(datastoreLatencyBuckets),
50 51 52 53 54 55 56 57 58 59

		deleteNum: metrics.New(prefix+".delete_total", "Total number of Datastore.Delete calls").Counter(),
		deleteErr: metrics.New(prefix+".delete.errors_total", "Number of errored Datastore.Delete calls").Counter(),
		deleteLatency: metrics.New(prefix+".delete.latency_seconds",
			"Latency distribution of Datastore.Delete calls").Histogram(datastoreLatencyBuckets),

		queryNum: metrics.New(prefix+".query_total", "Total number of Datastore.Query calls").Counter(),
		queryErr: metrics.New(prefix+".query.errors_total", "Number of errored Datastore.Query calls").Counter(),
		queryLatency: metrics.New(prefix+".query.latency_seconds",
			"Latency distribution of Datastore.Query calls").Histogram(datastoreLatencyBuckets),
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74

		checkNum: metrics.New(prefix+".check_total", "Total number of Datastore.Check calls").Counter(),
		checkErr: metrics.New(prefix+".check.errors_total", "Number of errored Datastore.Check calls").Counter(),
		checkLatency: metrics.New(prefix+".check.latency_seconds",
			"Latency distribution of Datastore.Check calls").Histogram(datastoreLatencyBuckets),

		scrubNum: metrics.New(prefix+".scrub_total", "Total number of Datastore.Scrub calls").Counter(),
		scrubErr: metrics.New(prefix+".scrub.errors_total", "Number of errored Datastore.Scrub calls").Counter(),
		scrubLatency: metrics.New(prefix+".scrub.latency_seconds",
			"Latency distribution of Datastore.Scrub calls").Histogram(datastoreLatencyBuckets),

		gcNum: metrics.New(prefix+".gc_total", "Total number of Datastore.CollectGarbage calls").Counter(),
		gcErr: metrics.New(prefix+".gc.errors_total", "Number of errored Datastore.CollectGarbage calls").Counter(),
		gcLatency: metrics.New(prefix+".gc.latency_seconds",
			"Latency distribution of Datastore.CollectGarbage calls").Histogram(datastoreLatencyBuckets),
75 76 77 78 79

		duNum: metrics.New(prefix+".du_total", "Total number of Datastore.DiskUsage calls").Counter(),
		duErr: metrics.New(prefix+".du.errors_total", "Number of errored Datastore.DiskUsage calls").Counter(),
		duLatency: metrics.New(prefix+".du.latency_seconds",
			"Latency distribution of Datastore.DiskUsage calls").Histogram(datastoreLatencyBuckets),
80 81 82 83 84 85 86 87 88
	}
	return m
}

type measure struct {
	backend datastore.Datastore

	putNum     metrics.Counter
	putErr     metrics.Counter
89 90
	putLatency metrics.Histogram
	putSize    metrics.Histogram
91 92 93

	getNum     metrics.Counter
	getErr     metrics.Counter
94 95
	getLatency metrics.Histogram
	getSize    metrics.Histogram
96 97 98

	hasNum     metrics.Counter
	hasErr     metrics.Counter
99
	hasLatency metrics.Histogram
100

Steven Allen's avatar
Steven Allen committed
101 102 103 104
	getsizeNum     metrics.Counter
	getsizeErr     metrics.Counter
	getsizeLatency metrics.Histogram

105 106
	deleteNum     metrics.Counter
	deleteErr     metrics.Counter
107
	deleteLatency metrics.Histogram
108 109 110

	queryNum     metrics.Counter
	queryErr     metrics.Counter
111
	queryLatency metrics.Histogram
112 113 114 115 116 117 118 119 120 121 122 123

	checkNum     metrics.Counter
	checkErr     metrics.Counter
	checkLatency metrics.Histogram

	scrubNum     metrics.Counter
	scrubErr     metrics.Counter
	scrubLatency metrics.Histogram

	gcNum     metrics.Counter
	gcErr     metrics.Counter
	gcLatency metrics.Histogram
124 125 126 127

	duNum     metrics.Counter
	duErr     metrics.Counter
	duLatency metrics.Histogram
128 129
}

130 131 132
func recordLatency(h metrics.Histogram, start time.Time) {
	elapsed := time.Since(start)
	h.Observe(elapsed.Seconds())
133 134
}

135
func (m *measure) Put(key datastore.Key, value []byte) error {
136
	defer recordLatency(m.putLatency, time.Now())
137
	m.putNum.Inc()
138
	m.putSize.Observe(float64(len(value)))
139 140
	err := m.backend.Put(key, value)
	if err != nil {
141
		m.putErr.Inc()
142 143 144 145
	}
	return err
}

146
func (m *measure) Get(key datastore.Key) (value []byte, err error) {
147
	defer recordLatency(m.getLatency, time.Now())
148
	m.getNum.Inc()
149
	value, err = m.backend.Get(key)
150 151
	switch err {
	case nil:
152
		m.getSize.Observe(float64(len(value)))
153 154 155 156
	case datastore.ErrNotFound:
		// Not really an error.
	default:
		m.getErr.Inc()
157 158 159 160 161 162
	}
	return value, err
}

func (m *measure) Has(key datastore.Key) (exists bool, err error) {
	defer recordLatency(m.hasLatency, time.Now())
163
	m.hasNum.Inc()
164 165
	exists, err = m.backend.Has(key)
	if err != nil {
166
		m.hasErr.Inc()
167 168 169 170
	}
	return exists, err
}

Steven Allen's avatar
Steven Allen committed
171 172
func (m *measure) GetSize(key datastore.Key) (size int, err error) {
	defer recordLatency(m.getsizeLatency, time.Now())
173
	m.getsizeNum.Inc()
Steven Allen's avatar
Steven Allen committed
174 175 176
	size, err = m.backend.GetSize(key)
	switch err {
	case nil, datastore.ErrNotFound:
177
		// Not really an error.
Steven Allen's avatar
Steven Allen committed
178 179 180 181 182 183
	default:
		m.getsizeErr.Inc()
	}
	return size, err
}

184 185
func (m *measure) Delete(key datastore.Key) error {
	defer recordLatency(m.deleteLatency, time.Now())
186
	m.deleteNum.Inc()
187 188
	err := m.backend.Delete(key)
	if err != nil {
189
		m.deleteErr.Inc()
190 191 192 193 194 195
	}
	return err
}

func (m *measure) Query(q query.Query) (query.Results, error) {
	defer recordLatency(m.queryLatency, time.Now())
196
	m.queryNum.Inc()
197 198
	res, err := m.backend.Query(q)
	if err != nil {
199
		m.queryErr.Inc()
200 201 202
	}
	return res, err
}
203

204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
func (m *measure) Check() error {
	defer recordLatency(m.checkLatency, time.Now())
	m.checkNum.Inc()
	if c, ok := m.backend.(datastore.CheckedDatastore); ok {
		err := c.Check()
		if err != nil {
			m.checkErr.Inc()
		}
		return err
	}
	return nil
}

func (m *measure) Scrub() error {
	defer recordLatency(m.scrubLatency, time.Now())
	m.scrubNum.Inc()
	if c, ok := m.backend.(datastore.ScrubbedDatastore); ok {
		err := c.Scrub()
		if err != nil {
			m.scrubErr.Inc()
		}
		return err
	}
	return nil
}

func (m *measure) CollectGarbage() error {
	defer recordLatency(m.gcLatency, time.Now())
	m.gcNum.Inc()
	if c, ok := m.backend.(datastore.GCDatastore); ok {
		err := c.CollectGarbage()
		if err != nil {
			m.gcErr.Inc()
		}
		return err
	}
	return nil
}

243 244 245 246 247 248 249 250 251 252
func (m *measure) DiskUsage() (uint64, error) {
	defer recordLatency(m.duLatency, time.Now())
	m.duNum.Inc()
	size, err := datastore.DiskUsage(m.backend)
	if err != nil {
		m.duErr.Inc()
	}
	return size, err
}

Jeromy's avatar
Jeromy committed
253 254 255 256 257 258 259 260 261 262 263
type measuredBatch struct {
	puts    int
	deletes int

	putts datastore.Batch
	delts datastore.Batch

	m *measure
}

func (m *measure) Batch() (datastore.Batch, error) {
Jeromy's avatar
Jeromy committed
264
	bds, ok := m.backend.(datastore.Batching)
Jeromy's avatar
Jeromy committed
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
	if !ok {
		return nil, datastore.ErrBatchUnsupported
	}
	pb, err := bds.Batch()
	if err != nil {
		return nil, err
	}

	db, err := bds.Batch()
	if err != nil {
		return nil, err
	}

	return &measuredBatch{
		putts: pb,
		delts: db,

		m: m,
	}, nil
}

286
func (mt *measuredBatch) Put(key datastore.Key, val []byte) error {
Jeromy's avatar
Jeromy committed
287
	mt.puts++
288
	mt.m.putSize.Observe(float64(len(val)))
Jeromy's avatar
Jeromy committed
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
	return mt.putts.Put(key, val)
}

func (mt *measuredBatch) Delete(key datastore.Key) error {
	mt.deletes++
	return mt.delts.Delete(key)
}

func (mt *measuredBatch) Commit() error {
	err := logBatchCommit(mt.delts, mt.deletes, mt.m.deleteNum, mt.m.deleteErr, mt.m.deleteLatency)
	if err != nil {
		return err
	}

	err = logBatchCommit(mt.putts, mt.puts, mt.m.putNum, mt.m.putErr, mt.m.putLatency)
	if err != nil {
		return err
	}

	return nil
}

311
func logBatchCommit(b datastore.Batch, n int, num, errs metrics.Counter, lat metrics.Histogram) error {
Jeromy's avatar
Jeromy committed
312 313 314
	if n > 0 {
		before := time.Now()
		err := b.Commit()
315 316
		took := time.Since(before) / time.Duration(n)
		num.Add(float64(n))
Jeromy's avatar
Jeromy committed
317
		for i := 0; i < n; i++ {
318
			lat.Observe(took.Seconds())
Jeromy's avatar
Jeromy committed
319 320
		}
		if err != nil {
321
			errs.Inc()
Jeromy's avatar
Jeromy committed
322 323 324 325 326 327
			return err
		}
	}
	return nil
}

328
func (m *measure) Close() error {
Jeromy's avatar
Jeromy committed
329 330 331
	if c, ok := m.backend.(io.Closer); ok {
		return c.Close()
	}
332 333
	return nil
}