Commit 67d8e74a authored by Jeromy's avatar Jeromy

implement batching for leveldb

parent 9b48c4ee
......@@ -99,7 +99,7 @@ func (fs *Datastore) makePrefixDirNoSync(dir string) error {
return nil
}
var putMaxRetries = 3
var putMaxRetries = 6
func (fs *Datastore) Put(key datastore.Key, value interface{}) error {
val, ok := value.([]byte)
......@@ -111,15 +111,15 @@ func (fs *Datastore) Put(key datastore.Key, value interface{}) error {
for i := 0; i < putMaxRetries; i++ {
err = fs.doPut(key, val)
if err == nil {
return nil
break
}
if !strings.Contains(err.Error(), "too many open files") {
return err
break
}
log.Errorf("too many open files, retrying in %dms", 100*i)
time.Sleep(time.Millisecond * 100 * time.Duration(i))
time.Sleep(time.Millisecond * 100 * time.Duration(i+1))
}
return err
}
......
......@@ -147,14 +147,40 @@ func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
}
}
func (d *datastore) Batch() (ds.Batch, error) {
// TODO: implement batch on leveldb
return nil, ds.ErrBatchUnsupported
}
// LevelDB needs to be closed.
func (d *datastore) Close() (err error) {
return d.DB.Close()
}
func (d *datastore) IsThreadSafe() {}
type leveldbBatch struct {
b *leveldb.Batch
db *leveldb.DB
}
func (d *datastore) Batch() (ds.Batch, error) {
return &leveldbBatch{
b: new(leveldb.Batch),
db: d.DB,
}, nil
}
func (b *leveldbBatch) Put(key ds.Key, value interface{}) error {
val, ok := value.([]byte)
if !ok {
return ds.ErrInvalidType
}
b.b.Put(key.Bytes(), val)
return nil
}
func (b *leveldbBatch) Commit() error {
return b.db.Write(b.b, nil)
}
func (b *leveldbBatch) Delete(key ds.Key) error {
b.b.Delete(key.Bytes())
return nil
}
......@@ -122,3 +122,36 @@ func expectMatches(t *testing.T, expect []string, actualR dsq.Results) {
}
}
}
func TestBatching(t *testing.T) {
d, done := newDS(t)
defer done()
b, err := d.Batch()
if err != nil {
t.Fatal(err)
}
for k, v := range testcases {
err := b.Put(ds.NewKey(k), []byte(v))
if err != nil {
t.Fatal(err)
}
}
err = b.Commit()
if err != nil {
t.Fatal(err)
}
for k, v := range testcases {
val, err := d.Get(ds.NewKey(k))
if err != nil {
t.Fatal(err)
}
if v != string(val.([]byte)) {
t.Fatal("got wrong data!")
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment