Commit 1702774c authored by Jeromy's avatar Jeromy

rename Transaction to Batch

parent c3fa34b4
...@@ -63,8 +63,8 @@ func (d *MapDatastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -63,8 +63,8 @@ func (d *MapDatastore) Query(q dsq.Query) (dsq.Results, error) {
return r, nil return r, nil
} }
func (d *MapDatastore) StartBatchOp() Transaction { func (d *MapDatastore) Batch() Batch {
return NewBasicTransaction(d) return NewBasicBatch(d)
} }
// NullDatastore stores nothing, but conforms to the API. // NullDatastore stores nothing, but conforms to the API.
...@@ -102,8 +102,8 @@ func (d *NullDatastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -102,8 +102,8 @@ func (d *NullDatastore) Query(q dsq.Query) (dsq.Results, error) {
return dsq.ResultsWithEntries(q, nil), nil return dsq.ResultsWithEntries(q, nil), nil
} }
func (d *NullDatastore) StartBatchOp() Transaction { func (d *NullDatastore) Batch() Batch {
return NewBasicTransaction(d) return NewBasicBatch(d)
} }
// LogDatastore logs all accesses through the datastore. // LogDatastore logs all accesses through the datastore.
...@@ -163,7 +163,7 @@ func (d *LogDatastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -163,7 +163,7 @@ func (d *LogDatastore) Query(q dsq.Query) (dsq.Results, error) {
return d.child.Query(q) return d.child.Query(q)
} }
func (d *LogDatastore) StartBatchOp() Transaction { func (d *LogDatastore) Batch() Batch {
log.Printf("%s: StartBatchOp\n", d.Name) log.Printf("%s: Batch\n", d.Name)
return d.child.StartBatchOp() return d.child.Batch()
} }
...@@ -41,7 +41,7 @@ func (c *Datastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -41,7 +41,7 @@ func (c *Datastore) Query(q dsq.Query) (dsq.Results, error) {
return c.D.Query(q) return c.D.Query(q)
} }
func (c *Datastore) StartBatchOp() ds.Transaction { func (c *Datastore) Batch() ds.Batch {
c.F() c.F()
return c.D.StartBatchOp() return c.D.Batch()
} }
...@@ -125,6 +125,6 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -125,6 +125,6 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
return d.child.Query(q) return d.child.Query(q)
} }
func (d *datastore) StartBatchOp() ds.Transaction { func (d *datastore) Batch() ds.Batch {
return ds.NewBasicTransaction(d) return ds.NewBasicBatch(d)
} }
...@@ -68,8 +68,8 @@ type Datastore interface { ...@@ -68,8 +68,8 @@ type Datastore interface {
// //
Query(q query.Query) (query.Results, error) Query(q query.Query) (query.Results, error)
// StartBatchOp begins a datastore transaction // Batch begins a datastore transaction
StartBatchOp() Transaction Batch() Batch
} }
// ThreadSafeDatastore is an interface that all threadsafe datastore should // ThreadSafeDatastore is an interface that all threadsafe datastore should
...@@ -108,7 +108,7 @@ func GetBackedHas(ds Datastore, key Key) (bool, error) { ...@@ -108,7 +108,7 @@ func GetBackedHas(ds Datastore, key Key) (bool, error) {
} }
} }
type Transaction interface { type Batch interface {
Put(key Key, val interface{}) error Put(key Key, val interface{}) error
Delete(key Key) error Delete(key Key) error
......
...@@ -323,32 +323,32 @@ func (fs *Datastore) enumerateKeys(fi os.FileInfo, res []query.Entry) ([]query.E ...@@ -323,32 +323,32 @@ func (fs *Datastore) enumerateKeys(fi os.FileInfo, res []query.Entry) ([]query.E
return res, nil return res, nil
} }
type flatfsTransaction struct { type flatfsBatch struct {
puts map[datastore.Key]interface{} puts map[datastore.Key]interface{}
deletes map[datastore.Key]struct{} deletes map[datastore.Key]struct{}
ds *Datastore ds *Datastore
} }
func (fs *Datastore) StartBatchOp() datastore.Transaction { func (fs *Datastore) Batch() datastore.Batch {
return &flatfsTransaction{ return &flatfsBatch{
puts: make(map[datastore.Key]interface{}), puts: make(map[datastore.Key]interface{}),
deletes: make(map[datastore.Key]struct{}), deletes: make(map[datastore.Key]struct{}),
ds: fs, ds: fs,
} }
} }
func (bt *flatfsTransaction) Put(key datastore.Key, val interface{}) error { func (bt *flatfsBatch) Put(key datastore.Key, val interface{}) error {
bt.puts[key] = val bt.puts[key] = val
return nil return nil
} }
func (bt *flatfsTransaction) Delete(key datastore.Key) error { func (bt *flatfsBatch) Delete(key datastore.Key) error {
bt.deletes[key] = struct{}{} bt.deletes[key] = struct{}{}
return nil return nil
} }
func (bt *flatfsTransaction) Commit() error { func (bt *flatfsBatch) Commit() error {
if err := bt.ds.putMany(bt.puts); err != nil { if err := bt.ds.putMany(bt.puts); err != nil {
return err return err
} }
......
...@@ -130,10 +130,10 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { ...@@ -130,10 +130,10 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {
return r, nil return r, nil
} }
func (d *Datastore) StartBatchOp() ds.Transaction { func (d *Datastore) Batch() ds.Batch {
// just use basic transaction for now, this datastore // just use basic transaction for now, this datastore
// isnt really used in performant code yet // isnt really used in performant code yet
return ds.NewBasicTransaction(d) return ds.NewBasicBatch(d)
} }
// isDir returns whether given path is a directory // isDir returns whether given path is a directory
......
...@@ -74,27 +74,27 @@ func (d *ktds) Query(q dsq.Query) (dsq.Results, error) { ...@@ -74,27 +74,27 @@ func (d *ktds) Query(q dsq.Query) (dsq.Results, error) {
return dsq.DerivedResults(qr, ch), nil return dsq.DerivedResults(qr, ch), nil
} }
func (d *ktds) StartBatchOp() ds.Transaction { func (d *ktds) Batch() ds.Batch {
return &transformTransaction{ return &transformBatch{
dst: d.child.StartBatchOp(), dst: d.child.Batch(),
f: d.ConvertKey, f: d.ConvertKey,
} }
} }
type transformTransaction struct { type transformBatch struct {
dst ds.Transaction dst ds.Batch
f KeyMapping f KeyMapping
} }
func (t *transformTransaction) Put(key ds.Key, val interface{}) error { func (t *transformBatch) Put(key ds.Key, val interface{}) error {
return t.dst.Put(t.f(key), val) return t.dst.Put(t.f(key), val)
} }
func (t *transformTransaction) Delete(key ds.Key) error { func (t *transformBatch) Delete(key ds.Key) error {
return t.dst.Delete(t.f(key)) return t.dst.Delete(t.f(key))
} }
func (t *transformTransaction) Commit() error { func (t *transformBatch) Commit() error {
return t.dst.Commit() return t.dst.Commit()
} }
...@@ -105,7 +105,7 @@ type ldbBatch struct { ...@@ -105,7 +105,7 @@ type ldbBatch struct {
d *datastore d *datastore
} }
func (d *datastore) StartBatchOp() ds.Transaction { func (d *datastore) Batch() ds.Batch {
return &ldbBatch{ return &ldbBatch{
b: new(leveldb.Batch), b: new(leveldb.Batch),
d: d, d: d,
......
...@@ -55,6 +55,6 @@ func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -55,6 +55,6 @@ func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
return nil, errors.New("KeyList not implemented.") return nil, errors.New("KeyList not implemented.")
} }
func (d *Datastore) StartBatchOp() ds.Transaction { func (d *Datastore) Batch() ds.Batch {
return ds.NewBasicTransaction(d) return ds.NewBasicBatch(d)
} }
...@@ -148,36 +148,36 @@ func (m *measure) Query(q query.Query) (query.Results, error) { ...@@ -148,36 +148,36 @@ func (m *measure) Query(q query.Query) (query.Results, error) {
return res, err return res, err
} }
type measuredTransaction struct { type measuredBatch struct {
puts int puts int
deletes int deletes int
putts datastore.Transaction putts datastore.Batch
delts datastore.Transaction delts datastore.Batch
m *measure m *measure
} }
func (m *measure) StartBatchOp() datastore.Transaction { func (m *measure) Batch() datastore.Batch {
return &measuredTransaction{ return &measuredBatch{
putts: m.backend.StartBatchOp(), putts: m.backend.Batch(),
delts: m.backend.StartBatchOp(), delts: m.backend.Batch(),
m: m, m: m,
} }
} }
func (mt *measuredTransaction) Put(key datastore.Key, val interface{}) error { func (mt *measuredBatch) Put(key datastore.Key, val interface{}) error {
mt.puts++ mt.puts++
return mt.putts.Put(key, val) return mt.putts.Put(key, val)
} }
func (mt *measuredTransaction) Delete(key datastore.Key) error { func (mt *measuredBatch) Delete(key datastore.Key) error {
mt.deletes++ mt.deletes++
return mt.delts.Delete(key) return mt.delts.Delete(key)
} }
func (mt *measuredTransaction) Commit() error { func (mt *measuredBatch) Commit() error {
if mt.deletes > 0 { if mt.deletes > 0 {
before := time.Now() before := time.Now()
err := mt.delts.Commit() err := mt.delts.Commit()
......
...@@ -115,42 +115,42 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { ...@@ -115,42 +115,42 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {
return r, nil return r, nil
} }
type mountTransaction struct { type mountBatch struct {
mounts map[string]datastore.Transaction mounts map[string]datastore.Batch
d *Datastore d *Datastore
} }
func (d *Datastore) StartBatchOp() datastore.Transaction { func (d *Datastore) Batch() datastore.Batch {
return &mountTransaction{ return &mountBatch{
mounts: make(map[string]datastore.Transaction), mounts: make(map[string]datastore.Batch),
d: d, d: d,
} }
} }
func (mt *mountTransaction) Put(key datastore.Key, val interface{}) error { func (mt *mountBatch) Put(key datastore.Key, val interface{}) error {
child, loc, rest := mt.d.lookup(key) child, loc, rest := mt.d.lookup(key)
t, ok := mt.mounts[loc.String()] t, ok := mt.mounts[loc.String()]
if !ok { if !ok {
t = child.StartBatchOp() t = child.Batch()
mt.mounts[loc.String()] = t mt.mounts[loc.String()] = t
} }
return t.Put(rest, val) return t.Put(rest, val)
} }
func (mt *mountTransaction) Delete(key datastore.Key) error { func (mt *mountBatch) Delete(key datastore.Key) error {
child, loc, rest := mt.d.lookup(key) child, loc, rest := mt.d.lookup(key)
t, ok := mt.mounts[loc.String()] t, ok := mt.mounts[loc.String()]
if !ok { if !ok {
t = child.StartBatchOp() t = child.Batch()
mt.mounts[loc.String()] = t mt.mounts[loc.String()] = t
} }
return t.Delete(rest) return t.Delete(rest)
} }
func (mt *mountTransaction) Commit() error { func (mt *mountBatch) Commit() error {
for _, t := range mt.mounts { for _, t := range mt.mounts {
err := t.Commit() err := t.Commit()
if err != nil { if err != nil {
......
...@@ -67,11 +67,11 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -67,11 +67,11 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
return r, nil return r, nil
} }
type panicTransaction struct { type panicBatch struct {
t ds.Transaction t ds.Batch
} }
func (p *panicTransaction) Put(key ds.Key, val interface{}) error { func (p *panicBatch) Put(key ds.Key, val interface{}) error {
err := p.t.Put(key, val) err := p.t.Put(key, val)
if err != nil { if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err) fmt.Fprintf(os.Stdout, "panic datastore: %s", err)
...@@ -80,7 +80,7 @@ func (p *panicTransaction) Put(key ds.Key, val interface{}) error { ...@@ -80,7 +80,7 @@ func (p *panicTransaction) Put(key ds.Key, val interface{}) error {
return nil return nil
} }
func (p *panicTransaction) Delete(key ds.Key) error { func (p *panicBatch) Delete(key ds.Key) error {
err := p.t.Delete(key) err := p.t.Delete(key)
if err != nil { if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err) fmt.Fprintf(os.Stdout, "panic datastore: %s", err)
...@@ -89,7 +89,7 @@ func (p *panicTransaction) Delete(key ds.Key) error { ...@@ -89,7 +89,7 @@ func (p *panicTransaction) Delete(key ds.Key) error {
return nil return nil
} }
func (p *panicTransaction) Commit() error { func (p *panicBatch) Commit() error {
err := p.t.Commit() err := p.t.Commit()
if err != nil { if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err) fmt.Fprintf(os.Stdout, "panic datastore: %s", err)
...@@ -98,6 +98,6 @@ func (p *panicTransaction) Commit() error { ...@@ -98,6 +98,6 @@ func (p *panicTransaction) Commit() error {
return nil return nil
} }
func (d *datastore) StartBatchOp() ds.Transaction { func (d *datastore) Batch() ds.Batch {
return &panicTransaction{d.child.StartBatchOp()} return &panicBatch{d.child.Batch()}
} }
...@@ -64,8 +64,8 @@ func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -64,8 +64,8 @@ func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) {
return d.child.Query(q) return d.child.Query(q)
} }
func (d *MutexDatastore) StartBatchOp() ds.Transaction { func (d *MutexDatastore) Batch() ds.Batch {
d.RLock() d.RLock()
defer d.RUnlock() defer d.RUnlock()
return d.child.StartBatchOp() return d.child.Batch()
} }
...@@ -93,17 +93,17 @@ func (d tiered) Query(q dsq.Query) (dsq.Results, error) { ...@@ -93,17 +93,17 @@ func (d tiered) Query(q dsq.Query) (dsq.Results, error) {
return d[len(d)-1].Query(q) return d[len(d)-1].Query(q)
} }
type tieredTransaction []ds.Transaction type tieredBatch []ds.Batch
func (d tiered) StartBatchOp() ds.Transaction { func (d tiered) Batch() ds.Batch {
var out tieredTransaction var out tieredBatch
for _, ds := range d { for _, ds := range d {
out = append(out, ds.StartBatchOp()) out = append(out, ds.Batch())
} }
return out return out
} }
func (t tieredTransaction) Put(key ds.Key, val interface{}) error { func (t tieredBatch) Put(key ds.Key, val interface{}) error {
for _, ts := range t { for _, ts := range t {
err := ts.Put(key, val) err := ts.Put(key, val)
if err != nil { if err != nil {
...@@ -113,7 +113,7 @@ func (t tieredTransaction) Put(key ds.Key, val interface{}) error { ...@@ -113,7 +113,7 @@ func (t tieredTransaction) Put(key ds.Key, val interface{}) error {
return nil return nil
} }
func (t tieredTransaction) Delete(key ds.Key) error { func (t tieredBatch) Delete(key ds.Key) error {
for _, ts := range t { for _, ts := range t {
err := ts.Delete(key) err := ts.Delete(key)
if err != nil { if err != nil {
...@@ -123,7 +123,7 @@ func (t tieredTransaction) Delete(key ds.Key) error { ...@@ -123,7 +123,7 @@ func (t tieredTransaction) Delete(key ds.Key) error {
return nil return nil
} }
func (t tieredTransaction) Commit() error { func (t tieredBatch) Commit() error {
for _, ts := range t { for _, ts := range t {
err := ts.Commit() err := ts.Commit()
if err != nil { if err != nil {
......
...@@ -95,7 +95,7 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -95,7 +95,7 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
return d.cache.Query(q) return d.cache.Query(q)
} }
func (d *datastore) StartBatchOp() ds.Transaction { func (d *datastore) Batch() ds.Batch {
// sorry, being lazy here // sorry, being lazy here
return ds.NewBasicTransaction(d) return ds.NewBasicBatch(d)
} }
package datastore package datastore
// basicTransaction implements the transaction interface for datastores who do // basicBatch implements the transaction interface for datastores who do
// not have any sort of underlying transactional support // not have any sort of underlying transactional support
type basicTransaction struct { type basicBatch struct {
puts map[Key]interface{} puts map[Key]interface{}
deletes map[Key]struct{} deletes map[Key]struct{}
target Datastore target Datastore
} }
func NewBasicTransaction(ds Datastore) Transaction { func NewBasicBatch(ds Datastore) Batch {
return &basicTransaction{ return &basicBatch{
puts: make(map[Key]interface{}), puts: make(map[Key]interface{}),
deletes: make(map[Key]struct{}), deletes: make(map[Key]struct{}),
target: ds, target: ds,
} }
} }
func (bt *basicTransaction) Put(key Key, val interface{}) error { func (bt *basicBatch) Put(key Key, val interface{}) error {
bt.puts[key] = val bt.puts[key] = val
return nil return nil
} }
func (bt *basicTransaction) Delete(key Key) error { func (bt *basicBatch) Delete(key Key) error {
bt.deletes[key] = struct{}{} bt.deletes[key] = struct{}{}
return nil return nil
} }
func (bt *basicTransaction) Commit() error { func (bt *basicBatch) Commit() error {
for k, val := range bt.puts { for k, val := range bt.puts {
if err := bt.target.Put(k, val); err != nil { if err := bt.target.Put(k, val); err != nil {
return err return err
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment