Commit 6b5bc561 authored by Adin Schmahmann's avatar Adin Schmahmann

fix: batched put, delete and commit operations are now measured separately from regular operations

parent fb847172
......@@ -77,6 +77,23 @@ func New(prefix string, ds datastore.Datastore) *measure {
duErr: metrics.New(prefix+".du.errors_total", "Number of errored Datastore.DiskUsage calls").Counter(),
duLatency: metrics.New(prefix+".du.latency_seconds",
"Latency distribution of Datastore.DiskUsage calls").Histogram(datastoreLatencyBuckets),
batchPutNum: metrics.New(prefix+".batchput_total", "Total number of Batch.Put calls").Counter(),
batchPutErr: metrics.New(prefix+".batchput.errors_total", "Number of errored Batch.Put calls").Counter(),
batchPutLatency: metrics.New(prefix+".batchput.latency_seconds",
"Latency distribution of Batch.Put calls").Histogram(datastoreLatencyBuckets),
batchPutSize: metrics.New(prefix+".batchput.size_bytes",
"Size distribution of byte slices put into batches").Histogram(datastoreSizeBuckets),
batchDeleteNum: metrics.New(prefix+".batchdelete_total", "Total number of Batch.Delete calls").Counter(),
batchDeleteErr: metrics.New(prefix+".batchdelete.errors_total", "Number of errored Batch.Delete calls").Counter(),
batchDeleteLatency: metrics.New(prefix+".batchdelete.latency_seconds",
"Latency distribution of Batch.Delete calls").Histogram(datastoreLatencyBuckets),
batchCommitNum: metrics.New(prefix+".batchcommit_total", "Total number of Batch.Commit calls").Counter(),
batchCommitErr: metrics.New(prefix+".batchcommit.errors_total", "Number of errored Batch.Commit calls").Counter(),
batchCommitLatency: metrics.New(prefix+".batchcommit.latency_seconds",
"Latency distribution of Batch.Commit calls").Histogram(datastoreLatencyBuckets),
}
return m
}
......@@ -125,6 +142,19 @@ type measure struct {
duNum metrics.Counter
duErr metrics.Counter
duLatency metrics.Histogram
batchPutNum metrics.Counter
batchPutErr metrics.Counter
batchPutLatency metrics.Histogram
batchPutSize metrics.Histogram
batchDeleteNum metrics.Counter
batchDeleteErr metrics.Counter
batchDeleteLatency metrics.Histogram
batchCommitNum metrics.Counter
batchCommitErr metrics.Counter
batchCommitLatency metrics.Histogram
}
func recordLatency(h metrics.Histogram, start time.Time) {
......@@ -251,12 +281,7 @@ func (m *measure) DiskUsage() (uint64, error) {
}
type measuredBatch struct {
puts int
deletes int
putts datastore.Batch
delts datastore.Batch
b datastore.Batch
m *measure
}
......@@ -265,64 +290,46 @@ func (m *measure) Batch() (datastore.Batch, error) {
if !ok {
return nil, datastore.ErrBatchUnsupported
}
pb, err := bds.Batch()
if err != nil {
return nil, err
}
db, err := bds.Batch()
batch, err := bds.Batch()
if err != nil {
return nil, err
}
return &measuredBatch{
putts: pb,
delts: db,
b: batch,
m: m,
}, nil
}
func (mt *measuredBatch) Put(key datastore.Key, val []byte) error {
mt.puts++
mt.m.putSize.Observe(float64(len(val)))
return mt.putts.Put(key, val)
}
func (mt *measuredBatch) Delete(key datastore.Key) error {
mt.deletes++
return mt.delts.Delete(key)
}
func (mt *measuredBatch) Commit() error {
err := logBatchCommit(mt.delts, mt.deletes, mt.m.deleteNum, mt.m.deleteErr, mt.m.deleteLatency)
defer recordLatency(mt.m.batchPutLatency, time.Now())
mt.m.batchPutNum.Inc()
mt.m.batchPutSize.Observe(float64(len(val)))
err := mt.b.Put(key, val)
if err != nil {
return err
mt.m.batchPutErr.Inc()
}
return err
}
err = logBatchCommit(mt.putts, mt.puts, mt.m.putNum, mt.m.putErr, mt.m.putLatency)
func (mt *measuredBatch) Delete(key datastore.Key) error {
defer recordLatency(mt.m.batchDeleteLatency, time.Now())
mt.m.batchDeleteNum.Inc()
err := mt.b.Delete(key)
if err != nil {
return err
mt.m.batchDeleteErr.Inc()
}
return nil
return err
}
func logBatchCommit(b datastore.Batch, n int, num, errs metrics.Counter, lat metrics.Histogram) error {
if n > 0 {
before := time.Now()
err := b.Commit()
took := time.Since(before) / time.Duration(n)
num.Add(float64(n))
for i := 0; i < n; i++ {
lat.Observe(took.Seconds())
}
if err != nil {
errs.Inc()
return err
}
func (mt *measuredBatch) Commit() error {
defer recordLatency(mt.m.batchCommitLatency, time.Now())
mt.m.batchCommitNum.Inc()
err := mt.b.Commit()
if err != nil {
mt.m.batchCommitErr.Inc()
}
return nil
return err
}
func (m *measure) Close() error {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment