Commit c3fa34b4 authored by Jeromy's avatar Jeromy

implement batch ops for different datastore types

parent 3d673631
...@@ -40,3 +40,8 @@ func (c *Datastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -40,3 +40,8 @@ func (c *Datastore) Query(q dsq.Query) (dsq.Results, error) {
c.F() c.F()
return c.D.Query(q) return c.D.Query(q)
} }
func (c *Datastore) StartBatchOp() ds.Transaction {
c.F()
return c.D.StartBatchOp()
}
...@@ -124,3 +124,7 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -124,3 +124,7 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
// query not coalesced yet. // query not coalesced yet.
return d.child.Query(q) return d.child.Query(q)
} }
func (d *datastore) StartBatchOp() ds.Transaction {
return ds.NewBasicTransaction(d)
}
...@@ -130,6 +130,12 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { ...@@ -130,6 +130,12 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {
return r, nil return r, nil
} }
func (d *Datastore) StartBatchOp() ds.Transaction {
// just use basic transaction for now, this datastore
// isnt really used in performant code yet
return ds.NewBasicTransaction(d)
}
// isDir returns whether given path is a directory // isDir returns whether given path is a directory
func isDir(path string) bool { func isDir(path string) bool {
finfo, err := os.Stat(path) finfo, err := os.Stat(path)
......
...@@ -73,3 +73,28 @@ func (d *ktds) Query(q dsq.Query) (dsq.Results, error) { ...@@ -73,3 +73,28 @@ func (d *ktds) Query(q dsq.Query) (dsq.Results, error) {
return dsq.DerivedResults(qr, ch), nil return dsq.DerivedResults(qr, ch), nil
} }
func (d *ktds) StartBatchOp() ds.Transaction {
return &transformTransaction{
dst: d.child.StartBatchOp(),
f: d.ConvertKey,
}
}
type transformTransaction struct {
dst ds.Transaction
f KeyMapping
}
func (t *transformTransaction) Put(key ds.Key, val interface{}) error {
return t.dst.Put(t.f(key), val)
}
func (t *transformTransaction) Delete(key ds.Key) error {
return t.dst.Delete(t.f(key))
}
func (t *transformTransaction) Commit() error {
return t.dst.Commit()
}
...@@ -54,3 +54,7 @@ func (d *Datastore) Delete(key ds.Key) (err error) { ...@@ -54,3 +54,7 @@ func (d *Datastore) Delete(key ds.Key) (err error) {
func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) { func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
return nil, errors.New("KeyList not implemented.") return nil, errors.New("KeyList not implemented.")
} }
func (d *Datastore) StartBatchOp() ds.Transaction {
return ds.NewBasicTransaction(d)
}
...@@ -148,6 +148,67 @@ func (m *measure) Query(q query.Query) (query.Results, error) { ...@@ -148,6 +148,67 @@ func (m *measure) Query(q query.Query) (query.Results, error) {
return res, err return res, err
} }
type measuredTransaction struct {
puts int
deletes int
putts datastore.Transaction
delts datastore.Transaction
m *measure
}
func (m *measure) StartBatchOp() datastore.Transaction {
return &measuredTransaction{
putts: m.backend.StartBatchOp(),
delts: m.backend.StartBatchOp(),
m: m,
}
}
func (mt *measuredTransaction) Put(key datastore.Key, val interface{}) error {
mt.puts++
return mt.putts.Put(key, val)
}
func (mt *measuredTransaction) Delete(key datastore.Key) error {
mt.deletes++
return mt.delts.Delete(key)
}
func (mt *measuredTransaction) Commit() error {
if mt.deletes > 0 {
before := time.Now()
err := mt.delts.Commit()
took := int(time.Now().Sub(before)/time.Microsecond) / mt.deletes
mt.m.deleteNum.AddN(uint64(mt.deletes))
for i := 0; i < mt.deletes; i++ {
mt.m.deleteLatency.RecordValue(int64(took))
}
if err != nil {
mt.m.deleteErr.Add()
return err
}
}
if mt.puts > 0 {
before := time.Now()
err := mt.putts.Commit()
took := int(time.Now().Sub(before)/time.Microsecond) / mt.puts
mt.m.putNum.AddN(uint64(mt.puts))
for i := 0; i < mt.puts; i++ {
mt.m.putLatency.RecordValue(int64(took))
}
if err != nil {
mt.m.putErr.Add()
return err
}
}
return nil
}
func (m *measure) Close() error { func (m *measure) Close() error {
m.putNum.Remove() m.putNum.Remove()
m.putErr.Remove() m.putErr.Remove()
......
...@@ -114,3 +114,48 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { ...@@ -114,3 +114,48 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {
r = query.ResultsReplaceQuery(r, q) r = query.ResultsReplaceQuery(r, q)
return r, nil return r, nil
} }
type mountTransaction struct {
mounts map[string]datastore.Transaction
d *Datastore
}
func (d *Datastore) StartBatchOp() datastore.Transaction {
return &mountTransaction{
mounts: make(map[string]datastore.Transaction),
d: d,
}
}
func (mt *mountTransaction) Put(key datastore.Key, val interface{}) error {
child, loc, rest := mt.d.lookup(key)
t, ok := mt.mounts[loc.String()]
if !ok {
t = child.StartBatchOp()
mt.mounts[loc.String()] = t
}
return t.Put(rest, val)
}
func (mt *mountTransaction) Delete(key datastore.Key) error {
child, loc, rest := mt.d.lookup(key)
t, ok := mt.mounts[loc.String()]
if !ok {
t = child.StartBatchOp()
mt.mounts[loc.String()] = t
}
return t.Delete(rest)
}
func (mt *mountTransaction) Commit() error {
for _, t := range mt.mounts {
err := t.Commit()
if err != nil {
return err
}
}
return nil
}
...@@ -66,3 +66,38 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -66,3 +66,38 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
} }
return r, nil return r, nil
} }
type panicTransaction struct {
t ds.Transaction
}
func (p *panicTransaction) Put(key ds.Key, val interface{}) error {
err := p.t.Put(key, val)
if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err)
panic("panic datastore: transaction put failed")
}
return nil
}
func (p *panicTransaction) Delete(key ds.Key) error {
err := p.t.Delete(key)
if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err)
panic("panic datastore: transaction delete failed")
}
return nil
}
func (p *panicTransaction) Commit() error {
err := p.t.Commit()
if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err)
panic("panic datastore: transaction commit failed")
}
return nil
}
func (d *datastore) StartBatchOp() ds.Transaction {
return &panicTransaction{d.child.StartBatchOp()}
}
...@@ -63,3 +63,9 @@ func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -63,3 +63,9 @@ func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) {
defer d.RUnlock() defer d.RUnlock()
return d.child.Query(q) return d.child.Query(q)
} }
func (d *MutexDatastore) StartBatchOp() ds.Transaction {
d.RLock()
defer d.RUnlock()
return d.child.StartBatchOp()
}
...@@ -92,3 +92,43 @@ func (d tiered) Query(q dsq.Query) (dsq.Results, error) { ...@@ -92,3 +92,43 @@ func (d tiered) Query(q dsq.Query) (dsq.Results, error) {
// query always the last (most complete) one // query always the last (most complete) one
return d[len(d)-1].Query(q) return d[len(d)-1].Query(q)
} }
type tieredTransaction []ds.Transaction
func (d tiered) StartBatchOp() ds.Transaction {
var out tieredTransaction
for _, ds := range d {
out = append(out, ds.StartBatchOp())
}
return out
}
func (t tieredTransaction) Put(key ds.Key, val interface{}) error {
for _, ts := range t {
err := ts.Put(key, val)
if err != nil {
return err
}
}
return nil
}
func (t tieredTransaction) Delete(key ds.Key) error {
for _, ts := range t {
err := ts.Delete(key)
if err != nil {
return err
}
}
return nil
}
func (t tieredTransaction) Commit() error {
for _, ts := range t {
err := ts.Commit()
if err != nil {
return err
}
}
return nil
}
...@@ -94,3 +94,8 @@ func (d *datastore) Delete(key ds.Key) (err error) { ...@@ -94,3 +94,8 @@ func (d *datastore) Delete(key ds.Key) (err error) {
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
return d.cache.Query(q) return d.cache.Query(q)
} }
func (d *datastore) StartBatchOp() ds.Transaction {
// sorry, being lazy here
return ds.NewBasicTransaction(d)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment