Commit 708291f6 authored by Juan Benet's avatar Juan Benet

Merge pull request #27 from whyrusleeping/master

add threadsafe mount datastore and small fixups on keytransform
parents 1cc48d47 d46c7c6f
......@@ -21,7 +21,7 @@ type Datastore interface {
// Wrap wraps a given datastore with a KeyTransform function.
// The resulting wrapped datastore will use the transform on all Datastore
// operations.
func Wrap(child ds.Datastore, t KeyTransform) Datastore {
func Wrap(child ds.Datastore, t KeyTransform) *ktds {
if t == nil {
panic("t (KeyTransform) is nil")
}
......
......@@ -36,7 +36,7 @@ func PrefixTransform(prefix ds.Key) ktds.KeyTransform {
}
// Wrap wraps a given datastore with a key-prefix.
func Wrap(child ds.Datastore, prefix ds.Key) ktds.Datastore {
func Wrap(child ds.Datastore, prefix ds.Key) *datastore {
if child == nil {
panic("child (ds.Datastore) is nil")
}
......@@ -81,3 +81,11 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
return dsq.DerivedResults(qr, ch), nil
}
func (d *datastore) Batch() (ds.Batch, error) {
if bds, ok := d.Datastore.(ds.Batching); ok {
return bds.Batch()
}
return nil, ds.ErrBatchUnsupported
}
......@@ -18,7 +18,7 @@ type MutexDatastore struct {
// MutexWrap constructs a datastore with a coarse lock around
// the entire datastore, for every single operation
func MutexWrap(d ds.Datastore) ds.ThreadSafeDatastore {
func MutexWrap(d ds.Datastore) *MutexDatastore {
return &MutexDatastore{child: d}
}
......
// Package mount provides a Datastore that has other Datastores
// mounted at various key prefixes and is threadsafe
package syncmount
import (
"errors"
"io"
"strings"
"sync"
ds "github.com/jbenet/go-datastore"
"github.com/jbenet/go-datastore/keytransform"
"github.com/jbenet/go-datastore/query"
)
var (
ErrNoMount = errors.New("no datastore mounted for this key")
)
type Mount struct {
Prefix ds.Key
Datastore ds.Datastore
}
func New(mounts []Mount) *Datastore {
// make a copy so we're sure it doesn't mutate
m := make([]Mount, len(mounts))
for i, v := range mounts {
m[i] = v
}
return &Datastore{mounts: m}
}
type Datastore struct {
mounts []Mount
lk sync.Mutex
}
var _ ds.Datastore = (*Datastore)(nil)
func (d *Datastore) lookup(key ds.Key) (ds.Datastore, ds.Key, ds.Key) {
d.lk.Lock()
defer d.lk.Unlock()
for _, m := range d.mounts {
if m.Prefix.Equal(key) || m.Prefix.IsAncestorOf(key) {
s := strings.TrimPrefix(key.String(), m.Prefix.String())
k := ds.NewKey(s)
return m.Datastore, m.Prefix, k
}
}
return nil, ds.NewKey("/"), key
}
func (d *Datastore) Put(key ds.Key, value interface{}) error {
cds, _, k := d.lookup(key)
if cds == nil {
return ErrNoMount
}
return cds.Put(k, value)
}
func (d *Datastore) Get(key ds.Key) (value interface{}, err error) {
cds, _, k := d.lookup(key)
if cds == nil {
return nil, ds.ErrNotFound
}
return cds.Get(k)
}
func (d *Datastore) Has(key ds.Key) (exists bool, err error) {
cds, _, k := d.lookup(key)
if cds == nil {
return false, nil
}
return cds.Has(k)
}
func (d *Datastore) Delete(key ds.Key) error {
cds, _, k := d.lookup(key)
if cds == nil {
return ds.ErrNotFound
}
return cds.Delete(k)
}
func (d *Datastore) Query(q query.Query) (query.Results, error) {
if len(q.Filters) > 0 ||
len(q.Orders) > 0 ||
q.Limit > 0 ||
q.Offset > 0 {
// TODO this is overly simplistic, but the only caller is
// `ipfs refs local` for now, and this gets us moving.
return nil, errors.New("mount only supports listing all prefixed keys in random order")
}
key := ds.NewKey(q.Prefix)
cds, mount, k := d.lookup(key)
if cds == nil {
return nil, errors.New("mount only supports listing a mount point")
}
// TODO support listing cross mount points too
// delegate the query to the mounted datastore, while adjusting
// keys in and out
q2 := q
q2.Prefix = k.String()
wrapDS := keytransform.Wrap(cds, &keytransform.Pair{
Convert: func(ds.Key) ds.Key {
panic("this should never be called")
},
Invert: func(k ds.Key) ds.Key {
return mount.Child(k)
},
})
r, err := wrapDS.Query(q2)
if err != nil {
return nil, err
}
r = query.ResultsReplaceQuery(r, q)
return r, nil
}
func (d *Datastore) IsThreadSafe() {}
func (d *Datastore) Close() error {
for _, d := range d.mounts {
if c, ok := d.Datastore.(io.Closer); ok {
err := c.Close()
if err != nil {
return err
}
}
}
return nil
}
type mountBatch struct {
mounts map[string]ds.Batch
lk sync.Mutex
d *Datastore
}
func (d *Datastore) Batch() (ds.Batch, error) {
return &mountBatch{
mounts: make(map[string]ds.Batch),
d: d,
}, nil
}
func (mt *mountBatch) lookupBatch(key ds.Key) (ds.Batch, ds.Key, error) {
mt.lk.Lock()
defer mt.lk.Unlock()
child, loc, rest := mt.d.lookup(key)
t, ok := mt.mounts[loc.String()]
if !ok {
bds, ok := child.(ds.Batching)
if !ok {
return nil, ds.NewKey(""), ds.ErrBatchUnsupported
}
var err error
t, err = bds.Batch()
if err != nil {
return nil, ds.NewKey(""), err
}
mt.mounts[loc.String()] = t
}
return t, rest, nil
}
func (mt *mountBatch) Put(key ds.Key, val interface{}) error {
t, rest, err := mt.lookupBatch(key)
if err != nil {
return err
}
return t.Put(rest, val)
}
func (mt *mountBatch) Delete(key ds.Key) error {
t, rest, err := mt.lookupBatch(key)
if err != nil {
return err
}
return t.Delete(rest)
}
func (mt *mountBatch) Commit() error {
for _, t := range mt.mounts {
err := t.Commit()
if err != nil {
return err
}
}
return nil
}
package syncmount_test
import (
"testing"
"github.com/jbenet/go-datastore"
"github.com/jbenet/go-datastore/mount"
"github.com/jbenet/go-datastore/query"
)
func TestPutBadNothing(t *testing.T) {
m := mount.New(nil)
err := m.Put(datastore.NewKey("quux"), []byte("foobar"))
if g, e := err, mount.ErrNoMount; g != e {
t.Fatalf("Put got wrong error: %v != %v", g, e)
}
}
func TestPutBadNoMount(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/redherring"), Datastore: mapds},
})
err := m.Put(datastore.NewKey("/quux/thud"), []byte("foobar"))
if g, e := err, mount.ErrNoMount; g != e {
t.Fatalf("expected ErrNoMount, got: %v\n", g)
}
}
func TestPut(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/quux"), Datastore: mapds},
})
if err := m.Put(datastore.NewKey("/quux/thud"), []byte("foobar")); err != nil {
t.Fatalf("Put error: %v", err)
}
val, err := mapds.Get(datastore.NewKey("/thud"))
if err != nil {
t.Fatalf("Get error: %v", err)
}
buf, ok := val.([]byte)
if !ok {
t.Fatalf("Get value is not []byte: %T %v", val, val)
}
if g, e := string(buf), "foobar"; g != e {
t.Errorf("wrong value: %q != %q", g, e)
}
}
func TestGetBadNothing(t *testing.T) {
m := mount.New([]mount.Mount{})
_, err := m.Get(datastore.NewKey("/quux/thud"))
if g, e := err, datastore.ErrNotFound; g != e {
t.Fatalf("expected ErrNotFound, got: %v\n", g)
}
}
func TestGetBadNoMount(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/redherring"), Datastore: mapds},
})
_, err := m.Get(datastore.NewKey("/quux/thud"))
if g, e := err, datastore.ErrNotFound; g != e {
t.Fatalf("expected ErrNotFound, got: %v\n", g)
}
}
func TestGetNotFound(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/quux"), Datastore: mapds},
})
_, err := m.Get(datastore.NewKey("/quux/thud"))
if g, e := err, datastore.ErrNotFound; g != e {
t.Fatalf("expected ErrNotFound, got: %v\n", g)
}
}
func TestGet(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/quux"), Datastore: mapds},
})
if err := mapds.Put(datastore.NewKey("/thud"), []byte("foobar")); err != nil {
t.Fatalf("Get error: %v", err)
}
val, err := m.Get(datastore.NewKey("/quux/thud"))
if err != nil {
t.Fatalf("Put error: %v", err)
}
buf, ok := val.([]byte)
if !ok {
t.Fatalf("Get value is not []byte: %T %v", val, val)
}
if g, e := string(buf), "foobar"; g != e {
t.Errorf("wrong value: %q != %q", g, e)
}
}
func TestHasBadNothing(t *testing.T) {
m := mount.New([]mount.Mount{})
found, err := m.Has(datastore.NewKey("/quux/thud"))
if err != nil {
t.Fatalf("Has error: %v", err)
}
if g, e := found, false; g != e {
t.Fatalf("wrong value: %v != %v", g, e)
}
}
func TestHasBadNoMount(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/redherring"), Datastore: mapds},
})
found, err := m.Has(datastore.NewKey("/quux/thud"))
if err != nil {
t.Fatalf("Has error: %v", err)
}
if g, e := found, false; g != e {
t.Fatalf("wrong value: %v != %v", g, e)
}
}
func TestHasNotFound(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/quux"), Datastore: mapds},
})
found, err := m.Has(datastore.NewKey("/quux/thud"))
if err != nil {
t.Fatalf("Has error: %v", err)
}
if g, e := found, false; g != e {
t.Fatalf("wrong value: %v != %v", g, e)
}
}
func TestHas(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/quux"), Datastore: mapds},
})
if err := mapds.Put(datastore.NewKey("/thud"), []byte("foobar")); err != nil {
t.Fatalf("Put error: %v", err)
}
found, err := m.Has(datastore.NewKey("/quux/thud"))
if err != nil {
t.Fatalf("Has error: %v", err)
}
if g, e := found, true; g != e {
t.Fatalf("wrong value: %v != %v", g, e)
}
}
func TestDeleteNotFound(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/quux"), Datastore: mapds},
})
err := m.Delete(datastore.NewKey("/quux/thud"))
if g, e := err, datastore.ErrNotFound; g != e {
t.Fatalf("expected ErrNotFound, got: %v\n", g)
}
}
func TestDelete(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/quux"), Datastore: mapds},
})
if err := mapds.Put(datastore.NewKey("/thud"), []byte("foobar")); err != nil {
t.Fatalf("Put error: %v", err)
}
err := m.Delete(datastore.NewKey("/quux/thud"))
if err != nil {
t.Fatalf("Delete error: %v", err)
}
// make sure it disappeared
found, err := mapds.Has(datastore.NewKey("/thud"))
if err != nil {
t.Fatalf("Has error: %v", err)
}
if g, e := found, false; g != e {
t.Fatalf("wrong value: %v != %v", g, e)
}
}
func TestQuerySimple(t *testing.T) {
mapds := datastore.NewMapDatastore()
m := mount.New([]mount.Mount{
{Prefix: datastore.NewKey("/quux"), Datastore: mapds},
})
const myKey = "/quux/thud"
if err := m.Put(datastore.NewKey(myKey), []byte("foobar")); err != nil {
t.Fatalf("Put error: %v", err)
}
res, err := m.Query(query.Query{Prefix: "/quux"})
if err != nil {
t.Fatalf("Query fail: %v\n", err)
}
entries, err := res.Rest()
if err != nil {
t.Fatalf("Query Results.Rest fail: %v\n", err)
}
seen := false
for _, e := range entries {
switch e.Key {
case datastore.NewKey(myKey).String():
seen = true
default:
t.Errorf("saw unexpected key: %q", e.Key)
}
}
if !seen {
t.Errorf("did not see wanted key %q in %+v", myKey, entries)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment