Commit 30c9012f authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

operation coalescing

parent ac663fae
package callback
import (
ds "github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-datastore/query"
)
type Datastore struct {
D ds.Datastore
F func()
}
func Wrap(ds ds.Datastore, f func()) ds.Datastore {
return &Datastore{ds, f}
}
func (c *Datastore) SetFunc(f func()) { c.F = f }
func (c *Datastore) Put(key ds.Key, value interface{}) (err error) {
c.F()
return c.D.Put(key, value)
}
func (c *Datastore) Get(key ds.Key) (value interface{}, err error) {
c.F()
return c.D.Get(key)
}
func (c *Datastore) Has(key ds.Key) (exists bool, err error) {
c.F()
return c.D.Has(key)
}
func (c *Datastore) Delete(key ds.Key) (err error) {
c.F()
return c.D.Delete(key)
}
func (c *Datastore) Query(q dsq.Query) (dsq.Results, error) {
c.F()
return c.D.Query(q)
}
package coalesce
import (
"sync"
ds "github.com/jbenet/go-datastore"
dsq "github.com/jbenet/go-datastore/query"
)
// parent keys
var (
putKey = "put"
getKey = "get"
hasKey = "has"
deleteKey = "delete"
)
type keySync struct {
op string
k ds.Key
value interface{}
}
type valSync struct {
val interface{}
err error
done chan struct{}
}
// Datastore uses golang-lru for internal storage.
type datastore struct {
child ds.Datastore
reqmu sync.Mutex
req map[keySync]*valSync
}
// Wrap wraps a given datastore with a coalescing datastore.
// All simultaenous requests which have the same keys will
// yield the exact same result. Note that this shares
// memory. It is not possible to copy a generic interface{}
func Wrap(d ds.Datastore) ds.Datastore {
return &datastore{child: d, req: make(map[keySync]*valSync)}
}
// sync synchronizes requests for a given key.
func (d *datastore) sync(k keySync) (vs *valSync, found bool) {
d.reqmu.Lock()
vs, found = d.req[k]
if !found {
vs = &valSync{done: make(chan struct{})}
d.req[k] = vs
}
d.reqmu.Unlock()
// if we did find one, wait till it's done.
if found {
<-vs.done
}
return vs, found
}
// sync synchronizes requests for a given key.
func (d *datastore) syncDone(k keySync) {
d.reqmu.Lock()
vs, found := d.req[k]
if !found {
panic("attempt to syncDone non-existent request")
}
delete(d.req, k)
d.reqmu.Unlock()
// release all the waiters.
close(vs.done)
}
// Put stores the object `value` named by `key`.
func (d *datastore) Put(key ds.Key, value interface{}) (err error) {
ks := keySync{putKey, key, value}
vs, found := d.sync(ks)
if !found {
vs.err = d.child.Put(key, value)
d.syncDone(ks)
}
return err
}
// Get retrieves the object `value` named by `key`.
func (d *datastore) Get(key ds.Key) (value interface{}, err error) {
ks := keySync{getKey, key, nil}
vs, found := d.sync(ks)
if !found {
vs.val, vs.err = d.child.Get(key)
d.syncDone(ks)
}
return vs.val, vs.err
}
// Has returns whether the `key` is mapped to a `value`.
func (d *datastore) Has(key ds.Key) (exists bool, err error) {
ks := keySync{hasKey, key, nil}
vs, found := d.sync(ks)
if !found {
vs.val, vs.err = d.child.Has(key)
d.syncDone(ks)
}
return vs.val.(bool), vs.err
}
// Delete removes the value for given `key`.
func (d *datastore) Delete(key ds.Key) (err error) {
ks := keySync{deleteKey, key, nil}
vs, found := d.sync(ks)
if !found {
vs.err = d.child.Delete(key)
d.syncDone(ks)
}
return vs.err
}
// Query returns a list of keys in the datastore
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
// query not coalesced yet.
return d.child.Query(q)
}
package coalesce
import (
"fmt"
"sync"
"testing"
"time"
ds "github.com/jbenet/go-datastore"
dscb "github.com/jbenet/go-datastore/callback"
dssync "github.com/jbenet/go-datastore/sync"
)
type mock struct {
sync.Mutex
inside int
outside int
ds ds.Datastore
}
func setup() *mock {
m := &mock{}
mp := ds.NewMapDatastore()
ts := dssync.MutexWrap(mp)
cb1 := dscb.Wrap(ts, func() {
m.Lock()
m.inside++
m.Unlock()
<-time.After(20 * time.Millisecond)
})
cd := Wrap(cb1)
cb2 := dscb.Wrap(cd, func() {
m.Lock()
m.outside++
m.Unlock()
})
m.ds = cb2
return m
}
func TestCoalesceSamePut(t *testing.T) {
m := setup()
done := make(chan struct{})
go func() {
m.ds.Put(ds.NewKey("foo"), "bar")
done <- struct{}{}
}()
go func() {
m.ds.Put(ds.NewKey("foo"), "bar")
done <- struct{}{}
}()
go func() {
m.ds.Put(ds.NewKey("foo"), "bar")
done <- struct{}{}
}()
<-done
<-done
<-done
if m.inside != 1 {
t.Error("incalls should be 1", m.inside)
}
if m.outside != 3 {
t.Error("outcalls should be 3", m.outside)
}
}
func TestCoalesceSamePutDiffPut(t *testing.T) {
m := setup()
done := make(chan struct{})
go func() {
m.ds.Put(ds.NewKey("foo"), "bar")
done <- struct{}{}
}()
go func() {
m.ds.Put(ds.NewKey("foo"), "bar")
done <- struct{}{}
}()
go func() {
m.ds.Put(ds.NewKey("foo"), "bar2")
done <- struct{}{}
}()
go func() {
m.ds.Put(ds.NewKey("foo"), "bar3")
done <- struct{}{}
}()
<-done
<-done
<-done
<-done
if m.inside != 3 {
t.Error("incalls should be 3", m.inside)
}
if m.outside != 4 {
t.Error("outcalls should be 4", m.outside)
}
}
func TestCoalesceSameGet(t *testing.T) {
m := setup()
done := make(chan struct{})
errs := make(chan error, 30)
m.ds.Put(ds.NewKey("foo1"), "bar")
m.ds.Put(ds.NewKey("foo2"), "baz")
for i := 0; i < 10; i++ {
go func() {
v, err := m.ds.Get(ds.NewKey("foo1"))
if err != nil {
errs <- err
}
if v != "bar" {
errs <- fmt.Errorf("v is not bar", v)
}
done <- struct{}{}
}()
}
for i := 0; i < 10; i++ {
go func() {
v, err := m.ds.Get(ds.NewKey("foo2"))
if err != nil {
errs <- err
}
if v != "baz" {
errs <- fmt.Errorf("v is not baz", v)
}
done <- struct{}{}
}()
}
for i := 0; i < 10; i++ {
go func() {
_, err := m.ds.Get(ds.NewKey("foo3"))
if err == nil {
errs <- fmt.Errorf("no error")
}
done <- struct{}{}
}()
}
for i := 0; i < 30; i++ {
<-done
}
if m.inside != 5 {
t.Error("incalls should be 3", m.inside)
}
if m.outside != 32 {
t.Error("outcalls should be 30", m.outside)
}
}
func TestCoalesceHas(t *testing.T) {
m := setup()
done := make(chan struct{})
errs := make(chan error, 30)
m.ds.Put(ds.NewKey("foo1"), "bar")
m.ds.Put(ds.NewKey("foo2"), "baz")
for i := 0; i < 10; i++ {
go func() {
v, err := m.ds.Has(ds.NewKey("foo1"))
if err != nil {
errs <- err
}
if !v {
errs <- fmt.Errorf("should have foo1")
}
done <- struct{}{}
}()
}
for i := 0; i < 10; i++ {
go func() {
v, err := m.ds.Has(ds.NewKey("foo2"))
if err != nil {
errs <- err
}
if !v {
errs <- fmt.Errorf("should have foo2")
}
done <- struct{}{}
}()
}
for i := 0; i < 10; i++ {
go func() {
v, err := m.ds.Has(ds.NewKey("foo3"))
if err != nil {
errs <- err
}
if v {
errs <- fmt.Errorf("should not have foo3")
}
done <- struct{}{}
}()
}
for i := 0; i < 30; i++ {
<-done
}
if m.inside != 5 {
t.Error("incalls should be 3", m.inside)
}
if m.outside != 32 {
t.Error("outcalls should be 30", m.outside)
}
}
func TestCoalesceDelete(t *testing.T) {
m := setup()
done := make(chan struct{})
errs := make(chan error, 30)
m.ds.Put(ds.NewKey("foo1"), "bar1")
m.ds.Put(ds.NewKey("foo2"), "bar2")
m.ds.Put(ds.NewKey("foo3"), "bar3")
for i := 0; i < 10; i++ {
go func() {
err := m.ds.Delete(ds.NewKey("foo1"))
if err != nil {
errs <- err
}
has, err := m.ds.Has(ds.NewKey("foo1"))
if err != nil {
errs <- err
}
if has {
t.Error("still have it after deleting")
}
done <- struct{}{}
}()
}
for i := 0; i < 10; i++ {
go func() {
err := m.ds.Delete(ds.NewKey("foo2"))
if err != nil {
errs <- err
}
has, err := m.ds.Has(ds.NewKey("foo2"))
if err != nil {
errs <- err
}
if has {
t.Error("still have it after deleting")
}
done <- struct{}{}
}()
}
for i := 0; i < 10; i++ {
go func() {
has, err := m.ds.Has(ds.NewKey("foo3"))
if err != nil {
errs <- err
}
if !has {
t.Error("should still have foo3")
}
done <- struct{}{}
}()
}
for i := 0; i < 10; i++ {
go func() {
has, err := m.ds.Has(ds.NewKey("foo4"))
if err != nil {
errs <- err
}
if has {
t.Error("should not have foo4")
}
done <- struct{}{}
}()
}
for i := 0; i < 40; i++ {
<-done
}
if m.inside != 9 {
t.Error("incalls should be 9", m.inside)
}
if m.outside != 63 {
t.Error("outcalls should be 63", m.outside)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment