Unverified Commit 27897a02 authored by Hector Sanjuan's avatar Hector Sanjuan Committed by GitHub

Merge pull request #84 from ipfs/feat/import-whys-datastores

Feat: import utility datastores from whyrusleeping repos
parents 198d4614 c5f89f4d
2.2.0: QmcgP7TRPoo2pButWCAhPieXmgCWecBu8rNcxChTYF2FuA
2.3.0: QmRoDtSqqEAUfLRLhr9ZpXicFEci6mTPxLS2vfXb9TCKnP
# autobatch
Autobatch is an implementation of
[go-datastore](https://github.com/ipfs/go-datastore) that automatically batches
together writes by holding puts in memory until a certain threshold is met.
This can improve disk performance at the cost of memory in certain situations.
## Usage
Simply wrap your existing datastore in an autobatching layer like so:
```go
bds := NewAutoBatching(basedstore, 128)
```
And make all future calls to the autobatching object.
## License
MIT
// Package autobatch provides a go-datastore implementation that
// automatically batches together writes by holding puts in memory until
// a certain threshold is met.
package autobatch
import (
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
)
// Datastore implements a go-datatsore.
type Datastore struct {
child ds.Batching
// TODO: discuss making ds.Batch implement the full ds.Datastore interface
buffer map[ds.Key]interface{}
maxBufferEntries int
}
// NewAutoBatching returns a new datastore that automatically
// batches writes using the given Batching datastore. The size
// of the memory pool is given by size.
func NewAutoBatching(d ds.Batching, size int) *Datastore {
return &Datastore{
child: d,
buffer: make(map[ds.Key]interface{}),
maxBufferEntries: size,
}
}
// Delete deletes a key/value
func (d *Datastore) Delete(k ds.Key) error {
delete(d.buffer, k)
return d.child.Delete(k)
}
// Get retrieves a value given a key.
func (d *Datastore) Get(k ds.Key) (interface{}, error) {
val, ok := d.buffer[k]
if ok {
return val, nil
}
return d.child.Get(k)
}
// Put stores a key/value.
func (d *Datastore) Put(k ds.Key, val interface{}) error {
d.buffer[k] = val
if len(d.buffer) > d.maxBufferEntries {
return d.Flush()
}
return nil
}
// Flush flushes the current batch to the underlying datastore.
func (d *Datastore) Flush() error {
b, err := d.child.Batch()
if err != nil {
return err
}
for k, v := range d.buffer {
err := b.Put(k, v)
if err != nil {
return err
}
}
// clear out buffer
d.buffer = make(map[ds.Key]interface{})
return b.Commit()
}
// Has checks if a key is stored.
func (d *Datastore) Has(k ds.Key) (bool, error) {
_, ok := d.buffer[k]
if ok {
return true, nil
}
return d.child.Has(k)
}
// Query performs a query
func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
err := d.Flush()
if err != nil {
return nil, err
}
return d.child.Query(q)
}
package autobatch
import (
"fmt"
"testing"
ds "github.com/ipfs/go-datastore"
)
func TestBasicPuts(t *testing.T) {
d := NewAutoBatching(ds.NewMapDatastore(), 16)
k := ds.NewKey("test")
v := "hello world"
err := d.Put(k, v)
if err != nil {
t.Fatal(err)
}
out, err := d.Get(k)
if err != nil {
t.Fatal(err)
}
if out != v {
t.Fatal("wasnt the same! ITS NOT THE SAME")
}
}
func TestFlushing(t *testing.T) {
child := ds.NewMapDatastore()
d := NewAutoBatching(child, 16)
var keys []ds.Key
for i := 0; i < 16; i++ {
keys = append(keys, ds.NewKey(fmt.Sprintf("test%d", i)))
}
v := "hello world"
for _, k := range keys {
err := d.Put(k, v)
if err != nil {
t.Fatal(err)
}
}
_, err := child.Get(keys[0])
if err != ds.ErrNotFound {
t.Fatal("shouldnt have found value")
}
err = d.Put(ds.NewKey("test16"), v)
if err != nil {
t.Fatal(err)
}
// should be flushed now, try to get keys from child datastore
for _, k := range keys {
val, err := child.Get(k)
if err != nil {
t.Fatal(err)
}
if val != v {
t.Fatal(err)
}
}
}
// Package failstore implements a datastore which can produce
// custom failures on operations by calling a user-provided
// error function.
package failstore
import (
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
)
// Failstore is a datastore which fails according to a user-provided
// function.
type Failstore struct {
child ds.Datastore
errfunc func(string) error
}
// NewFailstore creates a new datastore with the given error function.
// The efunc will be called with different strings depending on the
// datastore function: put, get, has, delete, query, batch, batch-put,
// batch-delete and batch-commit are the possible values.
func NewFailstore(c ds.Datastore, efunc func(string) error) *Failstore {
return &Failstore{
child: c,
errfunc: efunc,
}
}
// Put puts a key/value into the datastore.
func (d *Failstore) Put(k ds.Key, val interface{}) error {
err := d.errfunc("put")
if err != nil {
return err
}
return d.child.Put(k, val)
}
// Get retrieves a value from the datastore.
func (d *Failstore) Get(k ds.Key) (interface{}, error) {
err := d.errfunc("get")
if err != nil {
return nil, err
}
return d.child.Get(k)
}
// Has returns if the datastore contains a key/value.
func (d *Failstore) Has(k ds.Key) (bool, error) {
err := d.errfunc("has")
if err != nil {
return false, err
}
return d.child.Has(k)
}
// Delete removes a key/value from the datastore.
func (d *Failstore) Delete(k ds.Key) error {
err := d.errfunc("delete")
if err != nil {
return err
}
return d.child.Delete(k)
}
// Query performs a query on the datastore.
func (d *Failstore) Query(q dsq.Query) (dsq.Results, error) {
err := d.errfunc("query")
if err != nil {
return nil, err
}
return d.child.Query(q)
}
// FailBatch implements batching operations on the Failstore.
type FailBatch struct {
cb ds.Batch
dstore *Failstore
}
// Batch returns a new Batch Failstore.
func (d *Failstore) Batch() (ds.Batch, error) {
if err := d.errfunc("batch"); err != nil {
return nil, err
}
b, err := d.child.(ds.Batching).Batch()
if err != nil {
return nil, err
}
return &FailBatch{
cb: b,
dstore: d,
}, nil
}
// Put does a batch put.
func (b *FailBatch) Put(k ds.Key, val interface{}) error {
if err := b.dstore.errfunc("batch-put"); err != nil {
return err
}
return b.cb.Put(k, val)
}
// Delete does a batch delete.
func (b *FailBatch) Delete(k ds.Key) error {
if err := b.dstore.errfunc("batch-delete"); err != nil {
return err
}
return b.cb.Delete(k)
}
// Commit commits all operations in the batch.
func (b *FailBatch) Commit() error {
if err := b.dstore.errfunc("batch-commit"); err != nil {
return err
}
return b.cb.Commit()
}
......@@ -37,6 +37,6 @@
"license": "MIT",
"name": "go-datastore",
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "2.2.0"
"version": "2.3.0"
}
// Package retrystore provides a datastore wrapper which
// allows to retry operations.
package retrystore
import (
"fmt"
"time"
ds "github.com/ipfs/go-datastore"
)
// Datastore wraps a Batching datastore with a
// user-provided TempErrorFunc -which determines if an error
// is a temporal error and thus, worth retrying-, an amount of Retries
// -which specify how many times to retry an operation after
// a temporal error- and a base Delay, which is multiplied by the
// current retry and performs a pause before attempting the operation again.
type Datastore struct {
TempErrFunc func(error) bool
Retries int
Delay time.Duration
ds.Batching
}
var errFmtString = "ran out of retries trying to get past temporary error: %s"
func (d *Datastore) runOp(op func() error) error {
err := op()
if err == nil || !d.TempErrFunc(err) {
return err
}
for i := 0; i < d.Retries; i++ {
time.Sleep(time.Duration(i+1) * d.Delay)
err = op()
if err == nil || !d.TempErrFunc(err) {
return err
}
}
return fmt.Errorf(errFmtString, err)
}
// Get retrieves a value given a key.
func (d *Datastore) Get(k ds.Key) (interface{}, error) {
var val interface{}
err := d.runOp(func() error {
var err error
val, err = d.Batching.Get(k)
return err
})
return val, err
}
// Put stores a key/value.
func (d *Datastore) Put(k ds.Key, val interface{}) error {
return d.runOp(func() error {
return d.Batching.Put(k, val)
})
}
// Has checks if a key is stored.
func (d *Datastore) Has(k ds.Key) (bool, error) {
var has bool
err := d.runOp(func() error {
var err error
has, err = d.Batching.Has(k)
return err
})
return has, err
}
package retrystore
import (
"fmt"
"strings"
"testing"
ds "github.com/ipfs/go-datastore"
failstore "github.com/ipfs/go-datastore/failstore"
)
func TestRetryFailure(t *testing.T) {
myErr := fmt.Errorf("this is an actual error")
var count int
fstore := failstore.NewFailstore(ds.NewMapDatastore(), func(op string) error {
count++
return myErr
})
rds := &Datastore{
Batching: fstore,
Retries: 5,
TempErrFunc: func(err error) bool {
return err == myErr
},
}
k := ds.NewKey("test")
_, err := rds.Get(k)
if err == nil {
t.Fatal("expected this to fail")
}
if !strings.Contains(err.Error(), "ran out of retries") {
t.Fatal("got different error than expected: ", err)
}
if count != 6 {
t.Fatal("expected five retries (six executions), got: ", count)
}
}
func TestRealErrorGetsThrough(t *testing.T) {
myErr := fmt.Errorf("this is an actual error")
fstore := failstore.NewFailstore(ds.NewMapDatastore(), func(op string) error {
return myErr
})
rds := &Datastore{
Batching: fstore,
Retries: 5,
TempErrFunc: func(err error) bool {
return false
},
}
k := ds.NewKey("test")
_, err := rds.Get(k)
if err != myErr {
t.Fatal("expected my own error")
}
_, err = rds.Has(k)
if err != myErr {
t.Fatal("expected my own error")
}
err = rds.Put(k, nil)
if err != myErr {
t.Fatal("expected my own error")
}
}
func TestRealErrorAfterTemp(t *testing.T) {
myErr := fmt.Errorf("this is an actual error")
tempErr := fmt.Errorf("this is a temp error")
var count int
fstore := failstore.NewFailstore(ds.NewMapDatastore(), func(op string) error {
count++
if count < 3 {
return tempErr
}
return myErr
})
rds := &Datastore{
Batching: fstore,
Retries: 5,
TempErrFunc: func(err error) bool {
return err == tempErr
},
}
k := ds.NewKey("test")
_, err := rds.Get(k)
if err != myErr {
t.Fatal("expected my own error")
}
}
func TestSuccessAfterTemp(t *testing.T) {
tempErr := fmt.Errorf("this is a temp error")
var count int
fstore := failstore.NewFailstore(ds.NewMapDatastore(), func(op string) error {
count++
if count < 3 {
return tempErr
}
count = 0
return nil
})
rds := &Datastore{
Batching: fstore,
Retries: 5,
TempErrFunc: func(err error) bool {
return err == tempErr
},
}
k := ds.NewKey("test")
val := []byte("foo")
err := rds.Put(k, val)
if err != nil {
t.Fatal(err)
}
has, err := rds.Has(k)
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatal("should have this thing")
}
out, err := rds.Get(k)
if err != nil {
t.Fatal(err)
}
if string(out.([]byte)) != string(val) {
t.Fatal("got wrong value")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment