Unverified Commit 61b3889a authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #104 from ipfs/fix/no-txns-for-batch

Do not implement batches using transactions
parents ff1c156e bfd76766
...@@ -44,6 +44,13 @@ type Datastore struct { ...@@ -44,6 +44,13 @@ type Datastore struct {
syncWrites bool syncWrites bool
} }
// Implements the datastore.Batch interface, enabling batching support for
// the badger Datastore.
type batch struct {
ds *Datastore
writeBatch *badger.WriteBatch
}
// Implements the datastore.Txn interface, enabling transaction support for // Implements the datastore.Txn interface, enabling transaction support for
// the badger Datastore. // the badger Datastore.
type txn struct { type txn struct {
...@@ -112,6 +119,7 @@ var _ ds.Datastore = (*Datastore)(nil) ...@@ -112,6 +119,7 @@ var _ ds.Datastore = (*Datastore)(nil)
var _ ds.TxnDatastore = (*Datastore)(nil) var _ ds.TxnDatastore = (*Datastore)(nil)
var _ ds.TTLDatastore = (*Datastore)(nil) var _ ds.TTLDatastore = (*Datastore)(nil)
var _ ds.GCDatastore = (*Datastore)(nil) var _ ds.GCDatastore = (*Datastore)(nil)
var _ ds.Batching = (*Datastore)(nil)
// NewDatastore creates a new badger datastore. // NewDatastore creates a new badger datastore.
// //
...@@ -388,9 +396,21 @@ func (d *Datastore) Close() error { ...@@ -388,9 +396,21 @@ func (d *Datastore) Close() error {
return d.DB.Close() return d.DB.Close()
} }
// Batch creats a new Batch object. This provides a way to do many writes, when
// there may be too many to fit into a single transaction.
//
// After writing to a Batch, always call Commit whether or not writing to the
// batch was completed successfully or not. This is necessary to flush any
// remaining data and free any resources associated with an incomplete
// transaction.
func (d *Datastore) Batch() (ds.Batch, error) { func (d *Datastore) Batch() (ds.Batch, error) {
tx, _ := d.NewTransaction(false) d.closeLk.RLock()
return tx, nil defer d.closeLk.RUnlock()
if d.closed {
return nil, ErrClosed
}
return &batch{d, d.DB.NewWriteBatch()}, nil
} }
func (d *Datastore) CollectGarbage() (err error) { func (d *Datastore) CollectGarbage() (err error) {
...@@ -416,6 +436,54 @@ func (d *Datastore) gcOnce() error { ...@@ -416,6 +436,54 @@ func (d *Datastore) gcOnce() error {
return d.DB.RunValueLogGC(d.gcDiscardRatio) return d.DB.RunValueLogGC(d.gcDiscardRatio)
} }
var _ ds.Batch = (*batch)(nil)
func (b *batch) Put(key ds.Key, value []byte) error {
b.ds.closeLk.RLock()
defer b.ds.closeLk.RUnlock()
if b.ds.closed {
return ErrClosed
}
return b.put(key, value)
}
func (b *batch) put(key ds.Key, value []byte) error {
return b.writeBatch.Set(key.Bytes(), value)
}
func (b *batch) Delete(key ds.Key) error {
b.ds.closeLk.RLock()
defer b.ds.closeLk.RUnlock()
if b.ds.closed {
return ErrClosed
}
return b.delete(key)
}
func (b *batch) delete(key ds.Key) error {
return b.writeBatch.Delete(key.Bytes())
}
func (b *batch) Commit() error {
b.ds.closeLk.RLock()
defer b.ds.closeLk.RUnlock()
if b.ds.closed {
return ErrClosed
}
return b.commit()
}
func (b *batch) commit() error {
err := b.writeBatch.Flush()
if err != nil {
// Discard incomplete transaction held by b.writeBatch
b.writeBatch.Cancel()
}
return err
}
var _ ds.Datastore = (*txn)(nil) var _ ds.Datastore = (*txn)(nil)
var _ ds.TTLDatastore = (*txn)(nil) var _ ds.TTLDatastore = (*txn)(nil)
......
...@@ -309,6 +309,71 @@ func TestBatching(t *testing.T) { ...@@ -309,6 +309,71 @@ func TestBatching(t *testing.T) {
} }
func TestBatchingRequired(t *testing.T) {
path, err := ioutil.TempDir(os.TempDir(), "testing_badger_")
if err != nil {
t.Fatal(err)
}
dsOpts := DefaultOptions
d, err := NewDatastore(path, &dsOpts)
if err != nil {
t.Fatal(err)
}
defer func() {
d.Close()
os.RemoveAll(path)
}()
const valSize = 1000
// Check that transaction fails when there are too many writes. This is
// not testing batching logic, but is here to prove that batching works
// where a transaction fails.
t.Logf("putting %d byte values until transaction overflows", valSize)
tx, err := d.NewTransaction(false)
if err != nil {
t.Fatal(err)
}
var puts int
for ; puts < 10000000; puts++ {
buf := make([]byte, valSize)
rand.Read(buf)
err = tx.Put(ds.NewKey(fmt.Sprintf("/key%d", puts)), buf)
if err != nil {
break
}
puts++
}
if err == nil {
t.Error("expected transaction to fail")
} else {
t.Logf("OK - transaction cannot handle %d puts: %s", puts, err)
}
tx.Discard()
// Check that batch succeeds with the same number of writes that caused a
// transaction to fail.
t.Logf("putting %d %d byte values using batch", puts, valSize)
b, err := d.Batch()
if err != nil {
t.Fatal(err)
}
for i := 0; i < puts; i++ {
buf := make([]byte, valSize)
rand.Read(buf)
err = b.Put(ds.NewKey(fmt.Sprintf("/key%d", i)), buf)
if err != nil {
t.Fatal(err)
}
}
err = b.Commit()
if err != nil {
t.Fatal(err)
}
}
// Tests from basic_tests from go-datastore // Tests from basic_tests from go-datastore
func TestBasicPutGet(t *testing.T) { func TestBasicPutGet(t *testing.T) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment