coalesce.go 2.83 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package coalesce

import (
Jeromy's avatar
Jeromy committed
4
	"io"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
	"sync"

	ds "github.com/jbenet/go-datastore"
	dsq "github.com/jbenet/go-datastore/query"
)

// parent keys
var (
	putKey    = "put"
	getKey    = "get"
	hasKey    = "has"
	deleteKey = "delete"
)

type keySync struct {
	op    string
	k     ds.Key
	value interface{}
}

type valSync struct {
	val  interface{}
	err  error
	done chan struct{}
}

// Datastore uses golang-lru for internal storage.
type datastore struct {
	child ds.Datastore

	reqmu sync.Mutex
	req   map[keySync]*valSync
}

// Wrap wraps a given datastore with a coalescing datastore.
// All simultaenous requests which have the same keys will
// yield the exact same result. Note that this shares
// memory. It is not possible to copy a generic interface{}
func Wrap(d ds.Datastore) ds.Datastore {
	return &datastore{child: d, req: make(map[keySync]*valSync)}
}

// sync synchronizes requests for a given key.
func (d *datastore) sync(k keySync) (vs *valSync, found bool) {
	d.reqmu.Lock()
	vs, found = d.req[k]
	if !found {
		vs = &valSync{done: make(chan struct{})}
		d.req[k] = vs
	}
	d.reqmu.Unlock()

	// if we did find one, wait till it's done.
	if found {
		<-vs.done
	}
	return vs, found
}

// sync synchronizes requests for a given key.
func (d *datastore) syncDone(k keySync) {

	d.reqmu.Lock()
	vs, found := d.req[k]
	if !found {
		panic("attempt to syncDone non-existent request")
	}
	delete(d.req, k)
	d.reqmu.Unlock()

	// release all the waiters.
	close(vs.done)
}

// Put stores the object `value` named by `key`.
func (d *datastore) Put(key ds.Key, value interface{}) (err error) {
	ks := keySync{putKey, key, value}
	vs, found := d.sync(ks)
	if !found {
		vs.err = d.child.Put(key, value)
		d.syncDone(ks)
	}
	return err
}

// Get retrieves the object `value` named by `key`.
func (d *datastore) Get(key ds.Key) (value interface{}, err error) {
	ks := keySync{getKey, key, nil}
	vs, found := d.sync(ks)
	if !found {
		vs.val, vs.err = d.child.Get(key)
		d.syncDone(ks)
	}
	return vs.val, vs.err
}

// Has returns whether the `key` is mapped to a `value`.
func (d *datastore) Has(key ds.Key) (exists bool, err error) {
	ks := keySync{hasKey, key, nil}
	vs, found := d.sync(ks)
	if !found {
		vs.val, vs.err = d.child.Has(key)
		d.syncDone(ks)
	}
	return vs.val.(bool), vs.err
}

// Delete removes the value for given `key`.
func (d *datastore) Delete(key ds.Key) (err error) {
	ks := keySync{deleteKey, key, nil}
	vs, found := d.sync(ks)
	if !found {
		vs.err = d.child.Delete(key)
		d.syncDone(ks)
	}
	return vs.err
}

// Query returns a list of keys in the datastore
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
	// query not coalesced yet.
	return d.child.Query(q)
}
Jeromy's avatar
Jeromy committed
128 129 130 131 132 133 134 135 136 137 138 139 140

func (d *datastore) Close() error {
	d.reqmu.Lock()
	defer d.reqmu.Unlock()

	for _, s := range d.req {
		<-s.done
	}
	if c, ok := d.child.(io.Closer); ok {
		return c.Close()
	}
	return nil
}