Commit 1202de47 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

Merge pull request #25 from whyrusleeping/batching

Batching
parents c18ac933 7b9bbbc9
...@@ -17,6 +17,10 @@ ...@@ -17,6 +17,10 @@
"ImportPath": "github.com/codahale/metrics", "ImportPath": "github.com/codahale/metrics",
"Rev": "7d3beb1b480077e77c08a6f6c65ea969f6e91420" "Rev": "7d3beb1b480077e77c08a6f6c65ea969f6e91420"
}, },
{
"ImportPath": "github.com/dustin/randbo",
"Rev": "7f1b564ca7242d22bcc6e2128beb90d9fa38b9f0"
},
{ {
"ImportPath": "github.com/hashicorp/golang-lru", "ImportPath": "github.com/hashicorp/golang-lru",
"Rev": "4dfff096c4973178c8f35cf6dd1a732a0a139370" "Rev": "4dfff096c4973178c8f35cf6dd1a732a0a139370"
...@@ -29,10 +33,6 @@ ...@@ -29,10 +33,6 @@
"ImportPath": "github.com/jbenet/goprocess", "ImportPath": "github.com/jbenet/goprocess",
"Rev": "5b02f8d275a2dd882fb06f8bbdf74347795ff3b1" "Rev": "5b02f8d275a2dd882fb06f8bbdf74347795ff3b1"
}, },
{
"ImportPath": "github.com/satori/go.uuid",
"Rev": "7c7f2020c4c9491594b85767967f4619c2fa75f9"
},
{ {
"ImportPath": "github.com/mattbaird/elastigo/api", "ImportPath": "github.com/mattbaird/elastigo/api",
"Rev": "041b88c1fcf6489a5721ede24378ce1253b9159d" "Rev": "041b88c1fcf6489a5721ede24378ce1253b9159d"
...@@ -41,6 +41,10 @@ ...@@ -41,6 +41,10 @@
"ImportPath": "github.com/mattbaird/elastigo/core", "ImportPath": "github.com/mattbaird/elastigo/core",
"Rev": "041b88c1fcf6489a5721ede24378ce1253b9159d" "Rev": "041b88c1fcf6489a5721ede24378ce1253b9159d"
}, },
{
"ImportPath": "github.com/satori/go.uuid",
"Rev": "7c7f2020c4c9491594b85767967f4619c2fa75f9"
},
{ {
"ImportPath": "github.com/syndtr/goleveldb/leveldb", "ImportPath": "github.com/syndtr/goleveldb/leveldb",
"Rev": "871eee0a7546bb7d1b2795142e29c4534abc49b3" "Rev": "871eee0a7546bb7d1b2795142e29c4534abc49b3"
......
A fast random number `io.Reader` implementation.
![randbo](https://raw.github.com/dustin/randbo/master/randbo.png)
> IN A WORLD where no integer sequence is certain ...
>
> ONE MAN must become statistically indistinguishable from noise
>
> THIS SUMMER, entropy has a new name: RANDBO
(thanks @snej)
package randbo
import (
"io"
"math/rand"
"time"
)
// Randbo creates a stream of non-crypto quality random bytes
type randbo struct {
rand.Source
}
// New creates a new random reader with a time source.
func New() io.Reader {
return NewFrom(rand.NewSource(time.Now().UnixNano()))
}
// NewFrom creates a new reader from your own rand.Source
func NewFrom(src rand.Source) io.Reader {
return &randbo{src}
}
// Read satisfies io.Reader
func (r *randbo) Read(p []byte) (n int, err error) {
todo := len(p)
offset := 0
for {
val := int64(r.Int63())
for i := 0; i < 8; i++ {
p[offset] = byte(val)
todo--
if todo == 0 {
return len(p), nil
}
offset++
val >>= 8
}
}
}
package randbo
import (
"crypto/rand"
"io"
"io/ioutil"
"testing"
)
func TestRandbo(t *testing.T) {
buf := make([]byte, 16)
n, err := New().Read(buf)
if err != nil {
t.Fatalf("Error reading: %v", err)
}
if n != len(buf) {
t.Fatalf("Short read: %v", n)
}
t.Logf("Read %x", buf)
}
const toCopy = 1024 * 1024
func BenchmarkRandbo(b *testing.B) {
b.SetBytes(toCopy)
r := New()
for i := 0; i < b.N; i++ {
io.CopyN(ioutil.Discard, r, toCopy)
}
}
func BenchmarkCrypto(b *testing.B) {
b.SetBytes(toCopy)
for i := 0; i < b.N; i++ {
io.CopyN(ioutil.Discard, rand.Reader, toCopy)
}
}
...@@ -63,6 +63,10 @@ func (d *MapDatastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -63,6 +63,10 @@ func (d *MapDatastore) Query(q dsq.Query) (dsq.Results, error) {
return r, nil return r, nil
} }
func (d *MapDatastore) Batch() (Batch, error) {
return NewBasicBatch(d), nil
}
// NullDatastore stores nothing, but conforms to the API. // NullDatastore stores nothing, but conforms to the API.
// Useful to test with. // Useful to test with.
type NullDatastore struct { type NullDatastore struct {
...@@ -98,6 +102,10 @@ func (d *NullDatastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -98,6 +102,10 @@ func (d *NullDatastore) Query(q dsq.Query) (dsq.Results, error) {
return dsq.ResultsWithEntries(q, nil), nil return dsq.ResultsWithEntries(q, nil), nil
} }
func (d *NullDatastore) Batch() (Batch, error) {
return NewBasicBatch(d), nil
}
// LogDatastore logs all accesses through the datastore. // LogDatastore logs all accesses through the datastore.
type LogDatastore struct { type LogDatastore struct {
Name string Name string
...@@ -112,7 +120,7 @@ type Shim interface { ...@@ -112,7 +120,7 @@ type Shim interface {
} }
// NewLogDatastore constructs a log datastore. // NewLogDatastore constructs a log datastore.
func NewLogDatastore(ds Datastore, name string) Shim { func NewLogDatastore(ds Datastore, name string) *LogDatastore {
if len(name) < 1 { if len(name) < 1 {
name = "LogDatastore" name = "LogDatastore"
} }
...@@ -154,3 +162,12 @@ func (d *LogDatastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -154,3 +162,12 @@ func (d *LogDatastore) Query(q dsq.Query) (dsq.Results, error) {
log.Printf("%s: Query\n", d.Name) log.Printf("%s: Query\n", d.Name)
return d.child.Query(q) return d.child.Query(q)
} }
func (d *LogDatastore) Batch() (Batch, error) {
log.Printf("%s: Batch\n", d.Name)
bds, ok := d.child.(BatchingDatastore)
if !ok {
return nil, ErrBatchUnsupported
}
return bds.Batch()
}
package datastore
// basicBatch implements the transaction interface for datastores who do
// not have any sort of underlying transactional support
type basicBatch struct {
puts map[Key]interface{}
deletes map[Key]struct{}
target Datastore
}
func NewBasicBatch(ds Datastore) Batch {
return &basicBatch{
puts: make(map[Key]interface{}),
deletes: make(map[Key]struct{}),
target: ds,
}
}
func (bt *basicBatch) Put(key Key, val interface{}) error {
bt.puts[key] = val
return nil
}
func (bt *basicBatch) Delete(key Key) error {
bt.deletes[key] = struct{}{}
return nil
}
func (bt *basicBatch) Commit() error {
for k, val := range bt.puts {
if err := bt.target.Put(k, val); err != nil {
return err
}
}
for k, _ := range bt.deletes {
if err := bt.target.Delete(k); err != nil {
return err
}
}
return nil
}
...@@ -10,7 +10,7 @@ type Datastore struct { ...@@ -10,7 +10,7 @@ type Datastore struct {
F func() F func()
} }
func Wrap(ds ds.Datastore, f func()) ds.Datastore { func Wrap(ds ds.Datastore, f func()) *Datastore {
return &Datastore{ds, f} return &Datastore{ds, f}
} }
......
...@@ -69,6 +69,14 @@ type Datastore interface { ...@@ -69,6 +69,14 @@ type Datastore interface {
Query(q query.Query) (query.Results, error) Query(q query.Query) (query.Results, error)
} }
type BatchingDatastore interface {
Datastore
Batch() (Batch, error)
}
var ErrBatchUnsupported = errors.New("this datastore does not support batching")
// ThreadSafeDatastore is an interface that all threadsafe datastore should // ThreadSafeDatastore is an interface that all threadsafe datastore should
// implement to leverage type safety checks. // implement to leverage type safety checks.
type ThreadSafeDatastore interface { type ThreadSafeDatastore interface {
...@@ -104,3 +112,11 @@ func GetBackedHas(ds Datastore, key Key) (bool, error) { ...@@ -104,3 +112,11 @@ func GetBackedHas(ds Datastore, key Key) (bool, error) {
return false, err return false, err
} }
} }
type Batch interface {
Put(key Key, val interface{}) error
Delete(key Key) error
Commit() error
}
...@@ -68,12 +68,8 @@ func (fs *Datastore) decode(file string) (key datastore.Key, ok bool) { ...@@ -68,12 +68,8 @@ func (fs *Datastore) decode(file string) (key datastore.Key, ok bool) {
} }
func (fs *Datastore) makePrefixDir(dir string) error { func (fs *Datastore) makePrefixDir(dir string) error {
if err := os.Mkdir(dir, 0777); err != nil { if err := fs.makePrefixDirNoSync(dir); err != nil {
// EEXIST is safe to ignore here, that just means the prefix return err
// directory already existed.
if !os.IsExist(err) {
return err
}
} }
// In theory, if we create a new prefix dir and add a file to // In theory, if we create a new prefix dir and add a file to
...@@ -86,6 +82,17 @@ func (fs *Datastore) makePrefixDir(dir string) error { ...@@ -86,6 +82,17 @@ func (fs *Datastore) makePrefixDir(dir string) error {
return nil return nil
} }
func (fs *Datastore) makePrefixDirNoSync(dir string) error {
if err := os.Mkdir(dir, 0777); err != nil {
// EEXIST is safe to ignore here, that just means the prefix
// directory already existed.
if !os.IsExist(err) {
return err
}
}
return nil
}
func (fs *Datastore) Put(key datastore.Key, value interface{}) error { func (fs *Datastore) Put(key datastore.Key, value interface{}) error {
val, ok := value.([]byte) val, ok := value.([]byte)
if !ok { if !ok {
...@@ -137,6 +144,88 @@ func (fs *Datastore) Put(key datastore.Key, value interface{}) error { ...@@ -137,6 +144,88 @@ func (fs *Datastore) Put(key datastore.Key, value interface{}) error {
return nil return nil
} }
func (fs *Datastore) putMany(data map[datastore.Key]interface{}) error {
var dirsToSync []string
files := make(map[*os.File]string)
for key, value := range data {
val, ok := value.([]byte)
if !ok {
return datastore.ErrInvalidType
}
dir, path := fs.encode(key)
if err := fs.makePrefixDirNoSync(dir); err != nil {
return err
}
dirsToSync = append(dirsToSync, dir)
tmp, err := ioutil.TempFile(dir, "put-")
if err != nil {
return err
}
if _, err := tmp.Write(val); err != nil {
return err
}
files[tmp] = path
}
ops := make(map[*os.File]int)
defer func() {
for fi, _ := range files {
val, _ := ops[fi]
switch val {
case 0:
_ = fi.Close()
fallthrough
case 1:
_ = os.Remove(fi.Name())
}
}
}()
// Now we sync everything
// sync and close files
for fi, _ := range files {
if err := fi.Sync(); err != nil {
return err
}
if err := fi.Close(); err != nil {
return err
}
// signify closed
ops[fi] = 1
}
// move files to their proper places
for fi, path := range files {
if err := osrename.Rename(fi.Name(), path); err != nil {
return err
}
// signify removed
ops[fi] = 2
}
// now sync the dirs for those files
for _, dir := range dirsToSync {
if err := syncDir(dir); err != nil {
return err
}
}
// sync top flatfs dir
if err := syncDir(fs.path); err != nil {
return err
}
return nil
}
func (fs *Datastore) Get(key datastore.Key) (value interface{}, err error) { func (fs *Datastore) Get(key datastore.Key) (value interface{}, err error) {
_, path := fs.encode(key) _, path := fs.encode(key)
data, err := ioutil.ReadFile(path) data, err := ioutil.ReadFile(path)
...@@ -234,6 +323,45 @@ func (fs *Datastore) enumerateKeys(fi os.FileInfo, res []query.Entry) ([]query.E ...@@ -234,6 +323,45 @@ func (fs *Datastore) enumerateKeys(fi os.FileInfo, res []query.Entry) ([]query.E
return res, nil return res, nil
} }
type flatfsBatch struct {
puts map[datastore.Key]interface{}
deletes map[datastore.Key]struct{}
ds *Datastore
}
func (fs *Datastore) Batch() (datastore.Batch, error) {
return &flatfsBatch{
puts: make(map[datastore.Key]interface{}),
deletes: make(map[datastore.Key]struct{}),
ds: fs,
}, nil
}
func (bt *flatfsBatch) Put(key datastore.Key, val interface{}) error {
bt.puts[key] = val
return nil
}
func (bt *flatfsBatch) Delete(key datastore.Key) error {
bt.deletes[key] = struct{}{}
return nil
}
func (bt *flatfsBatch) Commit() error {
if err := bt.ds.putMany(bt.puts); err != nil {
return err
}
for k, _ := range bt.deletes {
if err := bt.ds.Delete(k); err != nil {
return err
}
}
return nil
}
var _ datastore.ThreadSafeDatastore = (*Datastore)(nil) var _ datastore.ThreadSafeDatastore = (*Datastore)(nil)
func (*Datastore) IsThreadSafe() {} func (*Datastore) IsThreadSafe() {}
package flatfs_test package flatfs_test
import ( import (
"encoding/base32"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
...@@ -10,6 +11,9 @@ import ( ...@@ -10,6 +11,9 @@ import (
"github.com/jbenet/go-datastore" "github.com/jbenet/go-datastore"
"github.com/jbenet/go-datastore/flatfs" "github.com/jbenet/go-datastore/flatfs"
"github.com/jbenet/go-datastore/query" "github.com/jbenet/go-datastore/query"
dstest "github.com/jbenet/go-datastore/test"
rand "github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/dustin/randbo"
) )
func tempdir(t testing.TB) (path string, cleanup func()) { func tempdir(t testing.TB) (path string, cleanup func()) {
...@@ -316,3 +320,98 @@ func TestQuerySimple(t *testing.T) { ...@@ -316,3 +320,98 @@ func TestQuerySimple(t *testing.T) {
t.Errorf("did not see wanted key %q in %+v", myKey, entries) t.Errorf("did not see wanted key %q in %+v", myKey, entries)
} }
} }
func TestBatchPut(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, 2)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
dstest.RunBatchTest(t, fs)
}
func TestBatchDelete(t *testing.T) {
temp, cleanup := tempdir(t)
defer cleanup()
fs, err := flatfs.New(temp, 2)
if err != nil {
t.Fatalf("New fail: %v\n", err)
}
dstest.RunBatchDeleteTest(t, fs)
}
func BenchmarkConsecutivePut(b *testing.B) {
r := rand.New()
var blocks [][]byte
var keys []datastore.Key
for i := 0; i < b.N; i++ {
blk := make([]byte, 256*1024)
r.Read(blk)
blocks = append(blocks, blk)
key := base32.StdEncoding.EncodeToString(blk[:8])
keys = append(keys, datastore.NewKey(key))
}
temp, cleanup := tempdir(b)
defer cleanup()
fs, err := flatfs.New(temp, 2)
if err != nil {
b.Fatalf("New fail: %v\n", err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := fs.Put(keys[i], blocks[i])
if err != nil {
b.Fatal(err)
}
}
}
func BenchmarkBatchedPut(b *testing.B) {
r := rand.New()
var blocks [][]byte
var keys []datastore.Key
for i := 0; i < b.N; i++ {
blk := make([]byte, 256*1024)
r.Read(blk)
blocks = append(blocks, blk)
key := base32.StdEncoding.EncodeToString(blk[:8])
keys = append(keys, datastore.NewKey(key))
}
temp, cleanup := tempdir(b)
defer cleanup()
fs, err := flatfs.New(temp, 2)
if err != nil {
b.Fatalf("New fail: %v\n", err)
}
b.ResetTimer()
for i := 0; i < b.N; {
batch, err := fs.Batch()
if err != nil {
b.Fatal(err)
}
for n := i; i-n < 512 && i < b.N; i++ {
err := batch.Put(keys[i], blocks[i])
if err != nil {
b.Fatal(err)
}
}
err = batch.Commit()
if err != nil {
b.Fatal(err)
}
}
}
...@@ -16,6 +16,8 @@ type KeyTransform interface { ...@@ -16,6 +16,8 @@ type KeyTransform interface {
type Datastore interface { type Datastore interface {
ds.Shim ds.Shim
KeyTransform KeyTransform
Batch() (ds.Batch, error)
} }
// Wrap wraps a given datastore with a KeyTransform function. // Wrap wraps a given datastore with a KeyTransform function.
......
...@@ -73,3 +73,37 @@ func (d *ktds) Query(q dsq.Query) (dsq.Results, error) { ...@@ -73,3 +73,37 @@ func (d *ktds) Query(q dsq.Query) (dsq.Results, error) {
return dsq.DerivedResults(qr, ch), nil return dsq.DerivedResults(qr, ch), nil
} }
func (d *ktds) Batch() (ds.Batch, error) {
bds, ok := d.child.(ds.BatchingDatastore)
if !ok {
return nil, ds.ErrBatchUnsupported
}
childbatch, err := bds.Batch()
if err != nil {
return nil, err
}
return &transformBatch{
dst: childbatch,
f: d.ConvertKey,
}, nil
}
type transformBatch struct {
dst ds.Batch
f KeyMapping
}
func (t *transformBatch) Put(key ds.Key, val interface{}) error {
return t.dst.Put(t.f(key), val)
}
func (t *transformBatch) Delete(key ds.Key) error {
return t.dst.Delete(t.f(key))
}
func (t *transformBatch) Commit() error {
return t.dst.Commit()
}
...@@ -148,6 +148,85 @@ func (m *measure) Query(q query.Query) (query.Results, error) { ...@@ -148,6 +148,85 @@ func (m *measure) Query(q query.Query) (query.Results, error) {
return res, err return res, err
} }
type measuredBatch struct {
puts int
deletes int
putts datastore.Batch
delts datastore.Batch
m *measure
}
func (m *measure) Batch() (datastore.Batch, error) {
bds, ok := m.backend.(datastore.BatchingDatastore)
if !ok {
return nil, datastore.ErrBatchUnsupported
}
pb, err := bds.Batch()
if err != nil {
return nil, err
}
db, err := bds.Batch()
if err != nil {
return nil, err
}
return &measuredBatch{
putts: pb,
delts: db,
m: m,
}, nil
}
func (mt *measuredBatch) Put(key datastore.Key, val interface{}) error {
mt.puts++
valb, ok := val.([]byte)
if !ok {
return datastore.ErrInvalidType
}
_ = mt.m.putSize.RecordValue(int64(len(valb)))
return mt.putts.Put(key, val)
}
func (mt *measuredBatch) Delete(key datastore.Key) error {
mt.deletes++
return mt.delts.Delete(key)
}
func (mt *measuredBatch) Commit() error {
err := logBatchCommit(mt.delts, mt.deletes, mt.m.deleteNum, mt.m.deleteErr, mt.m.deleteLatency)
if err != nil {
return err
}
err = logBatchCommit(mt.putts, mt.puts, mt.m.putNum, mt.m.putErr, mt.m.putLatency)
if err != nil {
return err
}
return nil
}
func logBatchCommit(b datastore.Batch, n int, num, errs metrics.Counter, lat *metrics.Histogram) error {
if n > 0 {
before := time.Now()
err := b.Commit()
took := int(time.Now().Sub(before)/time.Microsecond) / n
num.AddN(uint64(n))
for i := 0; i < n; i++ {
_ = lat.RecordValue(int64(took))
}
if err != nil {
errs.Add()
return err
}
}
return nil
}
func (m *measure) Close() error { func (m *measure) Close() error {
m.putNum.Remove() m.putNum.Remove()
m.putErr.Remove() m.putErr.Remove()
......
...@@ -114,3 +114,62 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) { ...@@ -114,3 +114,62 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {
r = query.ResultsReplaceQuery(r, q) r = query.ResultsReplaceQuery(r, q)
return r, nil return r, nil
} }
type mountBatch struct {
mounts map[string]datastore.Batch
d *Datastore
}
func (d *Datastore) Batch() (datastore.Batch, error) {
return &mountBatch{
mounts: make(map[string]datastore.Batch),
d: d,
}, nil
}
func (mt *mountBatch) lookupBatch(key datastore.Key) (datastore.Batch, datastore.Key, error) {
child, loc, rest := mt.d.lookup(key)
t, ok := mt.mounts[loc.String()]
if !ok {
bds, ok := child.(datastore.BatchingDatastore)
if !ok {
return nil, datastore.NewKey(""), datastore.ErrBatchUnsupported
}
var err error
t, err = bds.Batch()
if err != nil {
return nil, datastore.NewKey(""), err
}
mt.mounts[loc.String()] = t
}
return t, rest, nil
}
func (mt *mountBatch) Put(key datastore.Key, val interface{}) error {
t, rest, err := mt.lookupBatch(key)
if err != nil {
return err
}
return t.Put(rest, val)
}
func (mt *mountBatch) Delete(key datastore.Key) error {
t, rest, err := mt.lookupBatch(key)
if err != nil {
return err
}
return t.Delete(rest)
}
func (mt *mountBatch) Commit() error {
for _, t := range mt.mounts {
err := t.Commit()
if err != nil {
return err
}
}
return nil
}
...@@ -66,3 +66,34 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -66,3 +66,34 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
} }
return r, nil return r, nil
} }
type panicBatch struct {
t ds.Batch
}
func (p *panicBatch) Put(key ds.Key, val interface{}) error {
err := p.t.Put(key, val)
if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err)
panic("panic datastore: transaction put failed")
}
return nil
}
func (p *panicBatch) Delete(key ds.Key) error {
err := p.t.Delete(key)
if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err)
panic("panic datastore: transaction delete failed")
}
return nil
}
func (p *panicBatch) Commit() error {
err := p.t.Commit()
if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err)
panic("panic datastore: transaction commit failed")
}
return nil
}
...@@ -63,3 +63,43 @@ func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -63,3 +63,43 @@ func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) {
defer d.RUnlock() defer d.RUnlock()
return d.child.Query(q) return d.child.Query(q)
} }
func (d *MutexDatastore) Batch() (ds.Batch, error) {
d.RLock()
defer d.RUnlock()
bds, ok := d.child.(ds.BatchingDatastore)
if !ok {
return nil, ds.ErrBatchUnsupported
}
b, err := bds.Batch()
if err != nil {
return nil, err
}
return &syncBatch{
batch: b,
}, nil
}
type syncBatch struct {
lk sync.Mutex
batch ds.Batch
}
func (b *syncBatch) Put(key ds.Key, val interface{}) error {
b.lk.Lock()
defer b.lk.Unlock()
return b.batch.Put(key, val)
}
func (b *syncBatch) Delete(key ds.Key) error {
b.lk.Lock()
defer b.lk.Unlock()
return b.batch.Delete(key)
}
func (b *syncBatch) Commit() error {
b.lk.Lock()
defer b.lk.Unlock()
return b.batch.Commit()
}
package dstest
import (
"bytes"
"encoding/base32"
"testing"
dstore "github.com/jbenet/go-datastore"
rand "github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/dustin/randbo"
)
func RunBatchTest(t *testing.T, ds dstore.BatchingDatastore) {
batch, err := ds.Batch()
if err != nil {
t.Fatal(err)
}
r := rand.New()
var blocks [][]byte
var keys []dstore.Key
for i := 0; i < 20; i++ {
blk := make([]byte, 256*1024)
r.Read(blk)
blocks = append(blocks, blk)
key := dstore.NewKey(base32.StdEncoding.EncodeToString(blk[:8]))
keys = append(keys, key)
err := batch.Put(key, blk)
if err != nil {
t.Fatal(err)
}
}
// Ensure they are not in the datastore before comitting
for _, k := range keys {
_, err := ds.Get(k)
if err == nil {
t.Fatal("should not have found this block")
}
}
// commit, write them to the datastore
err = batch.Commit()
if err != nil {
t.Fatal(err)
}
for i, k := range keys {
blk, err := ds.Get(k)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(blk.([]byte), blocks[i]) {
t.Fatal("blocks not correct!")
}
}
}
func RunBatchDeleteTest(t *testing.T, ds dstore.BatchingDatastore) {
r := rand.New()
var keys []dstore.Key
for i := 0; i < 20; i++ {
blk := make([]byte, 16)
r.Read(blk)
key := dstore.NewKey(base32.StdEncoding.EncodeToString(blk[:8]))
keys = append(keys, key)
err := ds.Put(key, blk)
if err != nil {
t.Fatal(err)
}
}
batch, err := ds.Batch()
if err != nil {
t.Fatal(err)
}
for _, k := range keys {
err := batch.Delete(k)
if err != nil {
t.Fatal(err)
}
}
err = batch.Commit()
if err != nil {
t.Fatal(err)
}
for _, k := range keys {
_, err := ds.Get(k)
if err == nil {
t.Fatal("shouldnt have found block")
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment