Commit 1258c87d authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

Merge pull request #25 from whyrusleeping/batching

Batching
parents 8f9904f3 7f5d43e1
......@@ -148,6 +148,85 @@ func (m *measure) Query(q query.Query) (query.Results, error) {
return res, err
}
type measuredBatch struct {
puts int
deletes int
putts datastore.Batch
delts datastore.Batch
m *measure
}
func (m *measure) Batch() (datastore.Batch, error) {
bds, ok := m.backend.(datastore.BatchingDatastore)
if !ok {
return nil, datastore.ErrBatchUnsupported
}
pb, err := bds.Batch()
if err != nil {
return nil, err
}
db, err := bds.Batch()
if err != nil {
return nil, err
}
return &measuredBatch{
putts: pb,
delts: db,
m: m,
}, nil
}
func (mt *measuredBatch) Put(key datastore.Key, val interface{}) error {
mt.puts++
valb, ok := val.([]byte)
if !ok {
return datastore.ErrInvalidType
}
_ = mt.m.putSize.RecordValue(int64(len(valb)))
return mt.putts.Put(key, val)
}
func (mt *measuredBatch) Delete(key datastore.Key) error {
mt.deletes++
return mt.delts.Delete(key)
}
func (mt *measuredBatch) Commit() error {
err := logBatchCommit(mt.delts, mt.deletes, mt.m.deleteNum, mt.m.deleteErr, mt.m.deleteLatency)
if err != nil {
return err
}
err = logBatchCommit(mt.putts, mt.puts, mt.m.putNum, mt.m.putErr, mt.m.putLatency)
if err != nil {
return err
}
return nil
}
func logBatchCommit(b datastore.Batch, n int, num, errs metrics.Counter, lat *metrics.Histogram) error {
if n > 0 {
before := time.Now()
err := b.Commit()
took := int(time.Now().Sub(before)/time.Microsecond) / n
num.AddN(uint64(n))
for i := 0; i < n; i++ {
_ = lat.RecordValue(int64(took))
}
if err != nil {
errs.Add()
return err
}
}
return nil
}
func (m *measure) Close() error {
m.putNum.Remove()
m.putErr.Remove()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment