Commit c18ac933 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

Merge pull request #23 from jbenet/revert-22-batching

Revert "add in support for batched writes"
parents e9a2ec20 60c74efc
...@@ -63,10 +63,6 @@ func (d *MapDatastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -63,10 +63,6 @@ func (d *MapDatastore) Query(q dsq.Query) (dsq.Results, error) {
return r, nil return r, nil
} }
func (d *MapDatastore) Batch() Batch {
return NewBasicBatch(d)
}
// NullDatastore stores nothing, but conforms to the API. // NullDatastore stores nothing, but conforms to the API.
// Useful to test with. // Useful to test with.
type NullDatastore struct { type NullDatastore struct {
...@@ -102,10 +98,6 @@ func (d *NullDatastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -102,10 +98,6 @@ func (d *NullDatastore) Query(q dsq.Query) (dsq.Results, error) {
return dsq.ResultsWithEntries(q, nil), nil return dsq.ResultsWithEntries(q, nil), nil
} }
func (d *NullDatastore) Batch() Batch {
return NewBasicBatch(d)
}
// LogDatastore logs all accesses through the datastore. // LogDatastore logs all accesses through the datastore.
type LogDatastore struct { type LogDatastore struct {
Name string Name string
...@@ -162,8 +154,3 @@ func (d *LogDatastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -162,8 +154,3 @@ func (d *LogDatastore) Query(q dsq.Query) (dsq.Results, error) {
log.Printf("%s: Query\n", d.Name) log.Printf("%s: Query\n", d.Name)
return d.child.Query(q) return d.child.Query(q)
} }
func (d *LogDatastore) Batch() Batch {
log.Printf("%s: Batch\n", d.Name)
return d.child.Batch()
}
...@@ -40,8 +40,3 @@ func (c *Datastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -40,8 +40,3 @@ func (c *Datastore) Query(q dsq.Query) (dsq.Results, error) {
c.F() c.F()
return c.D.Query(q) return c.D.Query(q)
} }
func (c *Datastore) Batch() ds.Batch {
c.F()
return c.D.Batch()
}
...@@ -124,7 +124,3 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -124,7 +124,3 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
// query not coalesced yet. // query not coalesced yet.
return d.child.Query(q) return d.child.Query(q)
} }
func (d *datastore) Batch() ds.Batch {
return ds.NewBasicBatch(d)
}
...@@ -67,9 +67,6 @@ type Datastore interface { ...@@ -67,9 +67,6 @@ type Datastore interface {
// result.AllEntries() // result.AllEntries()
// //
Query(q query.Query) (query.Results, error) Query(q query.Query) (query.Results, error)
// Batch begins a datastore transaction
Batch() Batch
} }
// ThreadSafeDatastore is an interface that all threadsafe datastore should // ThreadSafeDatastore is an interface that all threadsafe datastore should
...@@ -107,11 +104,3 @@ func GetBackedHas(ds Datastore, key Key) (bool, error) { ...@@ -107,11 +104,3 @@ func GetBackedHas(ds Datastore, key Key) (bool, error) {
return false, err return false, err
} }
} }
type Batch interface {
Put(key Key, val interface{}) error
Delete(key Key) error
Commit() error
}
...@@ -68,8 +68,12 @@ func (fs *Datastore) decode(file string) (key datastore.Key, ok bool) { ...@@ -68,8 +68,12 @@ func (fs *Datastore) decode(file string) (key datastore.Key, ok bool) {
} }
func (fs *Datastore) makePrefixDir(dir string) error { func (fs *Datastore) makePrefixDir(dir string) error {
if err := fs.makePrefixDirNoSync(dir); err != nil { if err := os.Mkdir(dir, 0777); err != nil {
return err // EEXIST is safe to ignore here, that just means the prefix
// directory already existed.
if !os.IsExist(err) {
return err
}
} }
// In theory, if we create a new prefix dir and add a file to // In theory, if we create a new prefix dir and add a file to
...@@ -82,17 +86,6 @@ func (fs *Datastore) makePrefixDir(dir string) error { ...@@ -82,17 +86,6 @@ func (fs *Datastore) makePrefixDir(dir string) error {
return nil return nil
} }
func (fs *Datastore) makePrefixDirNoSync(dir string) error {
if err := os.Mkdir(dir, 0777); err != nil {
// EEXIST is safe to ignore here, that just means the prefix
// directory already existed.
if !os.IsExist(err) {
return err
}
}
return nil
}
func (fs *Datastore) Put(key datastore.Key, value interface{}) error { func (fs *Datastore) Put(key datastore.Key, value interface{}) error {
val, ok := value.([]byte) val, ok := value.([]byte)
if !ok { if !ok {
...@@ -144,88 +137,6 @@ func (fs *Datastore) Put(key datastore.Key, value interface{}) error { ...@@ -144,88 +137,6 @@ func (fs *Datastore) Put(key datastore.Key, value interface{}) error {
return nil return nil
} }
func (fs *Datastore) putMany(data map[datastore.Key]interface{}) error {
var dirsToSync []string
files := make(map[*os.File]string)
for key, value := range data {
val, ok := value.([]byte)
if !ok {
return datastore.ErrInvalidType
}
dir, path := fs.encode(key)
if err := fs.makePrefixDirNoSync(dir); err != nil {
return err
}
dirsToSync = append(dirsToSync, dir)
tmp, err := ioutil.TempFile(dir, "put-")
if err != nil {
return err
}
if _, err := tmp.Write(val); err != nil {
return err
}
files[tmp] = path
}
ops := make(map[*os.File]int)
defer func() {
for fi, _ := range files {
val, _ := ops[fi]
switch val {
case 0:
_ = fi.Close()
fallthrough
case 1:
_ = os.Remove(fi.Name())
}
}
}()
// Now we sync everything
// sync and close files
for fi, _ := range files {
if err := fi.Sync(); err != nil {
return err
}
if err := fi.Close(); err != nil {
return err
}
// signify closed
ops[fi] = 1
}
// move files to their proper places
for fi, path := range files {
if err := osrename.Rename(fi.Name(), path); err != nil {
return err
}
// signify removed
ops[fi] = 2
}
// now sync the dirs for those files
for _, dir := range dirsToSync {
if err := syncDir(dir); err != nil {
return err
}
}
// sync top flatfs dir
if err := syncDir(fs.path); err != nil {
return err
}
return nil
}
func (fs *Datastore) Get(key datastore.Key) (value interface{}, err error) { func (fs *Datastore) Get(key datastore.Key) (value interface{}, err error) {
_, path := fs.encode(key) _, path := fs.encode(key)
data, err := ioutil.ReadFile(path) data, err := ioutil.ReadFile(path)
...@@ -323,45 +234,6 @@ func (fs *Datastore) enumerateKeys(fi os.FileInfo, res []query.Entry) ([]query.E ...@@ -323,45 +234,6 @@ func (fs *Datastore) enumerateKeys(fi os.FileInfo, res []query.Entry) ([]query.E
return res, nil return res, nil
} }
type flatfsBatch struct {
puts map[datastore.Key]interface{}
deletes map[datastore.Key]struct{}
ds *Datastore
}
func (fs *Datastore) Batch() datastore.Batch {
return &flatfsBatch{
puts: make(map[datastore.Key]interface{}),
deletes: make(map[datastore.Key]struct{}),
ds: fs,
}
}
func (bt *flatfsBatch) Put(key datastore.Key, val interface{}) error {
bt.puts[key] = val
return nil
}
func (bt *flatfsBatch) Delete(key datastore.Key) error {
bt.deletes[key] = struct{}{}
return nil
}
func (bt *flatfsBatch) Commit() error {
if err := bt.ds.putMany(bt.puts); err != nil {
return err
}
for k, _ := range bt.deletes {
if err := bt.ds.Delete(k); err != nil {
return err
}
}
return nil
}
var _ datastore.ThreadSafeDatastore = (*Datastore)(nil) var _ datastore.ThreadSafeDatastore = (*Datastore)(nil)
func (*Datastore) IsThreadSafe() {} func (*Datastore) IsThreadSafe() {}
...@@ -130,12 +130,6 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { ...@@ -130,12 +130,6 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {
return r, nil return r, nil
} }
func (d *Datastore) Batch() ds.Batch {
// just use basic transaction for now, this datastore
// isnt really used in performant code yet
return ds.NewBasicBatch(d)
}
// isDir returns whether given path is a directory // isDir returns whether given path is a directory
func isDir(path string) bool { func isDir(path string) bool {
finfo, err := os.Stat(path) finfo, err := os.Stat(path)
......
...@@ -73,28 +73,3 @@ func (d *ktds) Query(q dsq.Query) (dsq.Results, error) { ...@@ -73,28 +73,3 @@ func (d *ktds) Query(q dsq.Query) (dsq.Results, error) {
return dsq.DerivedResults(qr, ch), nil return dsq.DerivedResults(qr, ch), nil
} }
func (d *ktds) Batch() ds.Batch {
return &transformBatch{
dst: d.child.Batch(),
f: d.ConvertKey,
}
}
type transformBatch struct {
dst ds.Batch
f KeyMapping
}
func (t *transformBatch) Put(key ds.Key, val interface{}) error {
return t.dst.Put(t.f(key), val)
}
func (t *transformBatch) Delete(key ds.Key) error {
return t.dst.Delete(t.f(key))
}
func (t *transformBatch) Commit() error {
return t.dst.Commit()
}
...@@ -100,40 +100,6 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -100,40 +100,6 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
return qr, nil return qr, nil
} }
type ldbBatch struct {
b *leveldb.Batch
d *datastore
}
func (d *datastore) Batch() ds.Batch {
return &ldbBatch{
b: new(leveldb.Batch),
d: d,
}
}
func (b *ldbBatch) Put(key ds.Key, val interface{}) error {
v, ok := val.([]byte)
if !ok {
return ds.ErrInvalidType
}
b.b.Put(key.Bytes(), v) // #dealwithit
return nil
}
func (b *ldbBatch) Delete(key ds.Key) error {
b.b.Delete(key.Bytes())
return nil
}
func (b *ldbBatch) Commit() error {
opts := &opt.WriteOptions{Sync: true}
if err := b.d.DB.Write(b.b, opts); err != nil {
return err
}
return nil
}
func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) { func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
var rnge *util.Range var rnge *util.Range
......
...@@ -54,7 +54,3 @@ func (d *Datastore) Delete(key ds.Key) (err error) { ...@@ -54,7 +54,3 @@ func (d *Datastore) Delete(key ds.Key) (err error) {
func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) { func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
return nil, errors.New("KeyList not implemented.") return nil, errors.New("KeyList not implemented.")
} }
func (d *Datastore) Batch() ds.Batch {
return ds.NewBasicBatch(d)
}
...@@ -148,67 +148,6 @@ func (m *measure) Query(q query.Query) (query.Results, error) { ...@@ -148,67 +148,6 @@ func (m *measure) Query(q query.Query) (query.Results, error) {
return res, err return res, err
} }
type measuredBatch struct {
puts int
deletes int
putts datastore.Batch
delts datastore.Batch
m *measure
}
func (m *measure) Batch() datastore.Batch {
return &measuredBatch{
putts: m.backend.Batch(),
delts: m.backend.Batch(),
m: m,
}
}
func (mt *measuredBatch) Put(key datastore.Key, val interface{}) error {
mt.puts++
return mt.putts.Put(key, val)
}
func (mt *measuredBatch) Delete(key datastore.Key) error {
mt.deletes++
return mt.delts.Delete(key)
}
func (mt *measuredBatch) Commit() error {
if mt.deletes > 0 {
before := time.Now()
err := mt.delts.Commit()
took := int(time.Now().Sub(before)/time.Microsecond) / mt.deletes
mt.m.deleteNum.AddN(uint64(mt.deletes))
for i := 0; i < mt.deletes; i++ {
mt.m.deleteLatency.RecordValue(int64(took))
}
if err != nil {
mt.m.deleteErr.Add()
return err
}
}
if mt.puts > 0 {
before := time.Now()
err := mt.putts.Commit()
took := int(time.Now().Sub(before)/time.Microsecond) / mt.puts
mt.m.putNum.AddN(uint64(mt.puts))
for i := 0; i < mt.puts; i++ {
mt.m.putLatency.RecordValue(int64(took))
}
if err != nil {
mt.m.putErr.Add()
return err
}
}
return nil
}
func (m *measure) Close() error { func (m *measure) Close() error {
m.putNum.Remove() m.putNum.Remove()
m.putErr.Remove() m.putErr.Remove()
......
...@@ -114,48 +114,3 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { ...@@ -114,48 +114,3 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {
r = query.ResultsReplaceQuery(r, q) r = query.ResultsReplaceQuery(r, q)
return r, nil return r, nil
} }
type mountBatch struct {
mounts map[string]datastore.Batch
d *Datastore
}
func (d *Datastore) Batch() datastore.Batch {
return &mountBatch{
mounts: make(map[string]datastore.Batch),
d: d,
}
}
func (mt *mountBatch) Put(key datastore.Key, val interface{}) error {
child, loc, rest := mt.d.lookup(key)
t, ok := mt.mounts[loc.String()]
if !ok {
t = child.Batch()
mt.mounts[loc.String()] = t
}
return t.Put(rest, val)
}
func (mt *mountBatch) Delete(key datastore.Key) error {
child, loc, rest := mt.d.lookup(key)
t, ok := mt.mounts[loc.String()]
if !ok {
t = child.Batch()
mt.mounts[loc.String()] = t
}
return t.Delete(rest)
}
func (mt *mountBatch) Commit() error {
for _, t := range mt.mounts {
err := t.Commit()
if err != nil {
return err
}
}
return nil
}
...@@ -66,38 +66,3 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -66,38 +66,3 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
} }
return r, nil return r, nil
} }
type panicBatch struct {
t ds.Batch
}
func (p *panicBatch) Put(key ds.Key, val interface{}) error {
err := p.t.Put(key, val)
if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err)
panic("panic datastore: transaction put failed")
}
return nil
}
func (p *panicBatch) Delete(key ds.Key) error {
err := p.t.Delete(key)
if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err)
panic("panic datastore: transaction delete failed")
}
return nil
}
func (p *panicBatch) Commit() error {
err := p.t.Commit()
if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err)
panic("panic datastore: transaction commit failed")
}
return nil
}
func (d *datastore) Batch() ds.Batch {
return &panicBatch{d.child.Batch()}
}
...@@ -63,9 +63,3 @@ func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -63,9 +63,3 @@ func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) {
defer d.RUnlock() defer d.RUnlock()
return d.child.Query(q) return d.child.Query(q)
} }
func (d *MutexDatastore) Batch() ds.Batch {
d.RLock()
defer d.RUnlock()
return d.child.Batch()
}
...@@ -92,43 +92,3 @@ func (d tiered) Query(q dsq.Query) (dsq.Results, error) { ...@@ -92,43 +92,3 @@ func (d tiered) Query(q dsq.Query) (dsq.Results, error) {
// query always the last (most complete) one // query always the last (most complete) one
return d[len(d)-1].Query(q) return d[len(d)-1].Query(q)
} }
type tieredBatch []ds.Batch
func (d tiered) Batch() ds.Batch {
var out tieredBatch
for _, ds := range d {
out = append(out, ds.Batch())
}
return out
}
func (t tieredBatch) Put(key ds.Key, val interface{}) error {
for _, ts := range t {
err := ts.Put(key, val)
if err != nil {
return err
}
}
return nil
}
func (t tieredBatch) Delete(key ds.Key) error {
for _, ts := range t {
err := ts.Delete(key)
if err != nil {
return err
}
}
return nil
}
func (t tieredBatch) Commit() error {
for _, ts := range t {
err := ts.Commit()
if err != nil {
return err
}
}
return nil
}
...@@ -94,8 +94,3 @@ func (d *datastore) Delete(key ds.Key) (err error) { ...@@ -94,8 +94,3 @@ func (d *datastore) Delete(key ds.Key) (err error) {
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
return d.cache.Query(q) return d.cache.Query(q)
} }
func (d *datastore) Batch() ds.Batch {
// sorry, being lazy here
return ds.NewBasicBatch(d)
}
package datastore
// basicBatch implements the transaction interface for datastores who do
// not have any sort of underlying transactional support
type basicBatch struct {
puts map[Key]interface{}
deletes map[Key]struct{}
target Datastore
}
func NewBasicBatch(ds Datastore) Batch {
return &basicBatch{
puts: make(map[Key]interface{}),
deletes: make(map[Key]struct{}),
target: ds,
}
}
func (bt *basicBatch) Put(key Key, val interface{}) error {
bt.puts[key] = val
return nil
}
func (bt *basicBatch) Delete(key Key) error {
bt.deletes[key] = struct{}{}
return nil
}
func (bt *basicBatch) Commit() error {
for k, val := range bt.puts {
if err := bt.target.Put(k, val); err != nil {
return err
}
}
for k, _ := range bt.deletes {
if err := bt.target.Delete(k); err != nil {
return err
}
}
return nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment