Commit 8028a6d4 authored by Jeromy's avatar Jeromy

add in support for batched writes

implement batch ops for different datastore types

rename Transaction to Batch

Revert "add in support for batched writes"

add in benchmarks for put and batchput

move batching into separate interface

address concerns from PR

regrab old code
parent 6cb658cb
...@@ -323,38 +323,32 @@ func (fs *Datastore) enumerateKeys(fi os.FileInfo, res []query.Entry) ([]query.E ...@@ -323,38 +323,32 @@ func (fs *Datastore) enumerateKeys(fi os.FileInfo, res []query.Entry) ([]query.E
return res, nil return res, nil
} }
type flatfsTransaction struct { type flatfsBatch struct {
puts map[datastore.Key]interface{} puts map[datastore.Key]interface{}
gets map[datastore.Key]datastore.GetCallback
deletes map[datastore.Key]struct{} deletes map[datastore.Key]struct{}
ds *Datastore ds *Datastore
} }
func (fs *Datastore) StartBatchOp() datastore.Transaction { func (fs *Datastore) Batch() datastore.Batch {
return &flatfsTransaction{ return &flatfsBatch{
puts: make(map[datastore.Key]interface{}), puts: make(map[datastore.Key]interface{}),
gets: make(map[datastore.Key]datastore.GetCallback),
deletes: make(map[datastore.Key]struct{}), deletes: make(map[datastore.Key]struct{}),
ds: fs, ds: fs,
} }
} }
func (bt *flatfsTransaction) Put(key datastore.Key, val interface{}) error { func (bt *flatfsBatch) Put(key datastore.Key, val interface{}) error {
bt.puts[key] = val bt.puts[key] = val
return nil return nil
} }
func (bt *flatfsTransaction) Delete(key datastore.Key) error { func (bt *flatfsBatch) Delete(key datastore.Key) error {
bt.deletes[key] = struct{}{} bt.deletes[key] = struct{}{}
return nil return nil
} }
func (bt *flatfsTransaction) Commit() error { func (bt *flatfsBatch) Commit() error {
for k, cb := range bt.gets {
cb(bt.ds.Get(k))
}
if err := bt.ds.putMany(bt.puts); err != nil { if err := bt.ds.putMany(bt.puts); err != nil {
return err return err
} }
......
package flatfs_test package flatfs_test
import ( import (
"encoding/base32"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
...@@ -10,6 +11,8 @@ import ( ...@@ -10,6 +11,8 @@ import (
"github.com/jbenet/go-datastore" "github.com/jbenet/go-datastore"
"github.com/jbenet/go-datastore/flatfs" "github.com/jbenet/go-datastore/flatfs"
"github.com/jbenet/go-datastore/query" "github.com/jbenet/go-datastore/query"
rand "github.com/dustin/randbo"
) )
func tempdir(t testing.TB) (path string, cleanup func()) { func tempdir(t testing.TB) (path string, cleanup func()) {
...@@ -316,3 +319,71 @@ func TestQuerySimple(t *testing.T) { ...@@ -316,3 +319,71 @@ func TestQuerySimple(t *testing.T) {
t.Errorf("did not see wanted key %q in %+v", myKey, entries) t.Errorf("did not see wanted key %q in %+v", myKey, entries)
} }
} }
func BenchmarkConsecutivePut(b *testing.B) {
r := rand.New()
var blocks [][]byte
var keys []datastore.Key
for i := 0; i < b.N; i++ {
blk := make([]byte, 256*1024)
r.Read(blk)
blocks = append(blocks, blk)
key := base32.StdEncoding.EncodeToString(blk[:8])
keys = append(keys, datastore.NewKey(key))
}
temp, cleanup := tempdir(b)
defer cleanup()
fs, err := flatfs.New(temp, 2)
if err != nil {
b.Fatalf("New fail: %v\n", err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := fs.Put(keys[i], blocks[i])
if err != nil {
b.Fatal(err)
}
}
}
func BenchmarkBatchedPut(b *testing.B) {
r := rand.New()
var blocks [][]byte
var keys []datastore.Key
for i := 0; i < b.N; i++ {
blk := make([]byte, 256*1024)
r.Read(blk)
blocks = append(blocks, blk)
key := base32.StdEncoding.EncodeToString(blk[:8])
keys = append(keys, datastore.NewKey(key))
}
temp, cleanup := tempdir(b)
defer cleanup()
fs, err := flatfs.New(temp, 2)
if err != nil {
b.Fatalf("New fail: %v\n", err)
}
b.ResetTimer()
for i := 0; i < b.N; {
batch := fs.Batch()
for n := i; i-n < 512 && i < b.N; i++ {
err := batch.Put(keys[i], blocks[i])
if err != nil {
b.Fatal(err)
}
}
err = batch.Commit()
if err != nil {
b.Fatal(err)
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment