Commit 72cf3024 authored by Jeromy's avatar Jeromy

betterify the everything

parent 0ab20f41
package datastore
import (
"io"
"log"
dsq "github.com/jbenet/go-datastore/query"
......@@ -67,6 +68,10 @@ func (d *MapDatastore) Batch() (Batch, error) {
return NewBasicBatch(d), nil
}
func (d *MapDatastore) Close() error {
return nil
}
// NullDatastore stores nothing, but conforms to the API.
// Useful to test with.
type NullDatastore struct {
......@@ -106,6 +111,10 @@ func (d *NullDatastore) Batch() (Batch, error) {
return NewBasicBatch(d), nil
}
func (d *NullDatastore) Close() error {
return nil
}
// LogDatastore logs all accesses through the datastore.
type LogDatastore struct {
Name string
......@@ -165,9 +174,16 @@ func (d *LogDatastore) Query(q dsq.Query) (dsq.Results, error) {
func (d *LogDatastore) Batch() (Batch, error) {
log.Printf("%s: Batch\n", d.Name)
bds, ok := d.child.(BatchingDatastore)
if !ok {
return nil, ErrBatchUnsupported
if bds, ok := d.child.(Batching); ok {
return bds.Batch()
}
return bds.Batch()
return nil, ErrBatchUnsupported
}
func (d *LogDatastore) Close() error {
log.Printf("%s: Close\n", d.Name)
if cds, ok := d.child.(io.Closer); ok {
return cds.Close()
}
return nil
}
package coalesce
import (
"io"
"sync"
ds "github.com/jbenet/go-datastore"
......@@ -124,3 +125,16 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
// query not coalesced yet.
return d.child.Query(q)
}
func (d *datastore) Close() error {
d.reqmu.Lock()
defer d.reqmu.Unlock()
for _, s := range d.req {
<-s.done
}
if c, ok := d.child.(io.Closer); ok {
return c.Close()
}
return nil
}
......@@ -69,7 +69,7 @@ type Datastore interface {
Query(q query.Query) (query.Results, error)
}
type BatchingDatastore interface {
type Batching interface {
Datastore
Batch() (Batch, error)
......
......@@ -323,6 +323,10 @@ func (fs *Datastore) enumerateKeys(fi os.FileInfo, res []query.Entry) ([]query.E
return res, nil
}
func (fs *Datastore) Close() error {
return nil
}
type flatfsBatch struct {
puts map[datastore.Key]interface{}
deletes map[datastore.Key]struct{}
......
......@@ -149,3 +149,11 @@ func isFile(path string) bool {
return !finfo.IsDir()
}
func (d *Datastore) Close() error {
return nil
}
func (d *Datastore) Batch() (ds.Batch, error) {
return ds.NewBasicBatch(d), nil
}
......@@ -16,8 +16,6 @@ type KeyTransform interface {
type Datastore interface {
ds.Shim
KeyTransform
Batch() (ds.Batch, error)
}
// Wrap wraps a given datastore with a KeyTransform function.
......
package keytransform
import (
"io"
ds "github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-datastore/query"
)
......@@ -74,8 +76,15 @@ func (d *ktds) Query(q dsq.Query) (dsq.Results, error) {
return dsq.DerivedResults(qr, ch), nil
}
func (d *ktds) Close() error {
if c, ok := d.child.(io.Closer); ok {
return c.Close()
}
return nil
}
func (d *ktds) Batch() (ds.Batch, error) {
bds, ok := d.child.(ds.BatchingDatastore)
bds, ok := d.child.(ds.Batching)
if !ok {
return nil, ds.ErrBatchUnsupported
}
......
package leveldb
import (
"io"
ds "github.com/jbenet/go-datastore"
"github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/jbenet/goprocess"
"github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb"
......@@ -11,18 +9,13 @@ import (
dsq "github.com/jbenet/go-datastore/query"
)
type Datastore interface {
ds.ThreadSafeDatastore
io.Closer
}
type datastore struct {
DB *leveldb.DB
}
type Options opt.Options
func NewDatastore(path string, opts *Options) (Datastore, error) {
func NewDatastore(path string, opts *Options) (*datastore, error) {
var nopts opt.Options
if opts != nil {
nopts = opt.Options(*opts)
......@@ -148,6 +141,11 @@ func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
}
}
func (d *datastore) Batch() (ds.Batch, error) {
// TODO: implement batch on leveldb
return nil, ds.ErrBatchUnsupported
}
// LevelDB needs to be closed.
func (d *datastore) Close() (err error) {
return d.DB.Close()
......
......@@ -25,7 +25,7 @@ var testcases = map[string]string{
//
// d, close := newDS(t)
// defer close()
func newDS(t *testing.T) (Datastore, func()) {
func newDS(t *testing.T) (*datastore, func()) {
path, err := ioutil.TempDir("/tmp", "testing_leveldb_")
if err != nil {
t.Fatal(err)
......@@ -41,7 +41,7 @@ func newDS(t *testing.T) (Datastore, func()) {
}
}
func addTestCases(t *testing.T, d Datastore, testcases map[string]string) {
func addTestCases(t *testing.T, d *datastore, testcases map[string]string) {
for k, v := range testcases {
dsk := ds.NewKey(k)
if err := d.Put(dsk, []byte(v)); err != nil {
......
......@@ -54,3 +54,11 @@ func (d *Datastore) Delete(key ds.Key) (err error) {
func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
return nil, errors.New("KeyList not implemented.")
}
func (d *Datastore) Close() error {
return nil
}
func (d *Datastore) Batch() (ds.Batch, error) {
return nil, ds.ErrBatchUnsupported
}
......@@ -3,6 +3,7 @@
package measure
import (
"io"
"time"
"github.com/jbenet/go-datastore"
......@@ -18,11 +19,6 @@ const (
maxSize = int64(1 << 32)
)
type DatastoreCloser interface {
datastore.Datastore
Close() error
}
// New wraps the datastore, providing metrics on the operations. The
// metrics are registered with names starting with prefix and a dot.
//
......@@ -84,7 +80,6 @@ type measure struct {
}
var _ datastore.Datastore = (*measure)(nil)
var _ DatastoreCloser = (*measure)(nil)
func recordLatency(h *metrics.Histogram, start time.Time) {
elapsed := time.Now().Sub(start) / time.Microsecond
......@@ -159,7 +154,7 @@ type measuredBatch struct {
}
func (m *measure) Batch() (datastore.Batch, error) {
bds, ok := m.backend.(datastore.BatchingDatastore)
bds, ok := m.backend.(datastore.Batching)
if !ok {
return nil, datastore.ErrBatchUnsupported
}
......@@ -245,5 +240,9 @@ func (m *measure) Close() error {
m.queryNum.Remove()
m.queryErr.Remove()
m.queryLatency.Remove()
if c, ok := m.backend.(io.Closer); ok {
return c.Close()
}
return nil
}
......@@ -4,6 +4,7 @@ package mount
import (
"errors"
"io"
"strings"
"github.com/jbenet/go-datastore"
......@@ -115,6 +116,18 @@ func (d *Datastore) Query(q query.Query) (query.Results, error) {
return r, nil
}
func (d *Datastore) Close() error {
for _, d := range d.mounts {
if c, ok := d.Datastore.(io.Closer); ok {
err := c.Close()
if err != nil {
return err
}
}
}
return nil
}
type mountBatch struct {
mounts map[string]datastore.Batch
......@@ -132,7 +145,7 @@ func (mt *mountBatch) lookupBatch(key datastore.Key) (datastore.Batch, datastore
child, loc, rest := mt.d.lookup(key)
t, ok := mt.mounts[loc.String()]
if !ok {
bds, ok := child.(datastore.BatchingDatastore)
bds, ok := child.(datastore.Batching)
if !ok {
return nil, datastore.NewKey(""), datastore.ErrBatchUnsupported
}
......
......@@ -2,6 +2,7 @@ package sync
import (
"fmt"
"io"
"os"
ds "github.com/jbenet/go-datastore"
......@@ -67,6 +68,26 @@ func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
return r, nil
}
func (d *datastore) Close() error {
if c, ok := d.child.(io.Closer); ok {
err := c.Close()
if err != nil {
fmt.Fprintf(os.Stdout, "panic datastore: %s", err)
panic("panic datastore: Close failed")
}
}
return nil
}
func (d *datastore) Batch() (ds.Batch, error) {
b, err := d.child.(ds.Batching).Batch()
if err != nil {
return nil, err
}
return &panicBatch{b}, nil
}
type panicBatch struct {
t ds.Batch
}
......
......@@ -17,14 +17,14 @@ var _ datastore.ThreadSafeDatastore = &Datastore{}
var ErrInvalidType = errors.New("redis datastore: invalid type error. this datastore only supports []byte values")
func NewExpiringDatastore(client *redis.Client, ttl time.Duration) (datastore.ThreadSafeDatastore, error) {
func NewExpiringDatastore(client *redis.Client, ttl time.Duration) (*Datastore, error) {
return &Datastore{
client: client,
ttl: ttl,
}, nil
}
func NewDatastore(client *redis.Client) (datastore.ThreadSafeDatastore, error) {
func NewDatastore(client *redis.Client) (*Datastore, error) {
return &Datastore{
client: client,
}, nil
......@@ -83,3 +83,11 @@ func (ds *Datastore) Query(q query.Query) (query.Results, error) {
}
func (ds *Datastore) IsThreadSafe() {}
func (ds *Datastore) Batch() (datastore.Batch, error) {
return nil, datastore.ErrBatchUnsupported
}
func (ds *Datastore) Close() error {
return ds.client.Close()
}
package sync
import (
"io"
"sync"
ds "github.com/jbenet/go-datastore"
......@@ -67,7 +68,7 @@ func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) {
func (d *MutexDatastore) Batch() (ds.Batch, error) {
d.RLock()
defer d.RUnlock()
bds, ok := d.child.(ds.BatchingDatastore)
bds, ok := d.child.(ds.Batching)
if !ok {
return nil, ds.ErrBatchUnsupported
}
......@@ -81,6 +82,15 @@ func (d *MutexDatastore) Batch() (ds.Batch, error) {
}, nil
}
func (d *MutexDatastore) Close() error {
d.RWMutex.Lock()
defer d.RWMutex.Unlock()
if c, ok := d.child.(io.Closer); ok {
return c.Close()
}
return nil
}
type syncBatch struct {
lk sync.Mutex
batch ds.Batch
......
......@@ -9,7 +9,7 @@ import (
rand "github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/dustin/randbo"
)
func RunBatchTest(t *testing.T, ds dstore.BatchingDatastore) {
func RunBatchTest(t *testing.T, ds dstore.Batching) {
batch, err := ds.Batch()
if err != nil {
t.Fatal(err)
......@@ -58,7 +58,7 @@ func RunBatchTest(t *testing.T, ds dstore.BatchingDatastore) {
}
}
func RunBatchDeleteTest(t *testing.T, ds dstore.BatchingDatastore) {
func RunBatchDeleteTest(t *testing.T, ds dstore.Batching) {
r := rand.New()
var keys []dstore.Key
for i := 0; i < 20; i++ {
......
......@@ -13,7 +13,7 @@ type tiered []ds.Datastore
// New returns a tiered datastore. Puts and Deletes will write-through to
// all datastores, Has and Get will try each datastore sequentially, and
// Query will always try the last one (most complete) first.
func New(dses ...ds.Datastore) ds.Datastore {
func New(dses ...ds.Datastore) tiered {
return tiered(dses)
}
......
......@@ -49,19 +49,19 @@ func TestTiered(t *testing.T) {
td := New(d1, d2, d3, d4)
td.Put(ds.NewKey("foo"), "bar")
testHas(t, []ds.Datastore{td}, ds.NewKey("foo"), "bar")
testHas(t, td.(tiered), ds.NewKey("foo"), "bar") // all children
testHas(t, td, ds.NewKey("foo"), "bar") // all children
// remove it from, say, caches.
d1.Delete(ds.NewKey("foo"))
d2.Delete(ds.NewKey("foo"))
testHas(t, []ds.Datastore{td}, ds.NewKey("foo"), "bar")
testHas(t, td.(tiered)[2:], ds.NewKey("foo"), "bar")
testNotHas(t, td.(tiered)[:2], ds.NewKey("foo"))
testHas(t, td[2:], ds.NewKey("foo"), "bar")
testNotHas(t, td[:2], ds.NewKey("foo"))
// write it again.
td.Put(ds.NewKey("foo"), "bar2")
testHas(t, []ds.Datastore{td}, ds.NewKey("foo"), "bar2")
testHas(t, td.(tiered), ds.NewKey("foo"), "bar2")
testHas(t, td, ds.NewKey("foo"), "bar2")
}
func TestQueryCallsLast(t *testing.T) {
......
package timecache
import (
"io"
"sync"
"time"
......@@ -24,13 +25,13 @@ type datastore struct {
ttls map[ds.Key]time.Time
}
func WithTTL(ttl time.Duration) ds.Datastore {
func WithTTL(ttl time.Duration) *datastore {
return WithCache(ds.NewMapDatastore(), ttl)
}
// WithCache wraps a given datastore as a timecache.
// Get + Has requests are considered expired after a TTL.
func WithCache(d ds.Datastore, ttl time.Duration) ds.Datastore {
func WithCache(d ds.Datastore, ttl time.Duration) *datastore {
return &datastore{cache: d, ttl: ttl, ttls: make(map[ds.Key]time.Time)}
}
......@@ -94,3 +95,10 @@ func (d *datastore) Delete(key ds.Key) (err error) {
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
return d.cache.Query(q)
}
func (d *datastore) Close() error {
if c, ok := d.cache.(io.Closer); ok {
return c.Close()
}
return nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment