coalesce.go 2.96 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package coalesce

import (
Jeromy's avatar
Jeromy committed
4
	"io"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6
	"sync"

Jeromy's avatar
Jeromy committed
7 8
	ds "github.com/ipfs/go-datastore"
	dsq "github.com/ipfs/go-datastore/query"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
)

// parent keys
var (
	putKey    = "put"
	getKey    = "get"
	hasKey    = "has"
	deleteKey = "delete"
)

type keySync struct {
	op    string
	k     ds.Key
	value interface{}
}

type valSync struct {
	val  interface{}
	err  error
	done chan struct{}
}

// Datastore uses golang-lru for internal storage.
type datastore struct {
	child ds.Datastore

	reqmu sync.Mutex
	req   map[keySync]*valSync
}

// Wrap wraps a given datastore with a coalescing datastore.
// All simultaenous requests which have the same keys will
// yield the exact same result. Note that this shares
// memory. It is not possible to copy a generic interface{}
func Wrap(d ds.Datastore) ds.Datastore {
	return &datastore{child: d, req: make(map[keySync]*valSync)}
}

// sync synchronizes requests for a given key.
func (d *datastore) sync(k keySync) (vs *valSync, found bool) {
	d.reqmu.Lock()
	vs, found = d.req[k]
	if !found {
		vs = &valSync{done: make(chan struct{})}
		d.req[k] = vs
	}
	d.reqmu.Unlock()

	// if we did find one, wait till it's done.
	if found {
		<-vs.done
	}
	return vs, found
}

// sync synchronizes requests for a given key.
func (d *datastore) syncDone(k keySync) {

	d.reqmu.Lock()
	vs, found := d.req[k]
	if !found {
		panic("attempt to syncDone non-existent request")
	}
	delete(d.req, k)
	d.reqmu.Unlock()

	// release all the waiters.
	close(vs.done)
}

// Put stores the object `value` named by `key`.
func (d *datastore) Put(key ds.Key, value interface{}) (err error) {
	ks := keySync{putKey, key, value}
	vs, found := d.sync(ks)
	if !found {
		vs.err = d.child.Put(key, value)
		d.syncDone(ks)
	}
	return err
}

// Get retrieves the object `value` named by `key`.
func (d *datastore) Get(key ds.Key) (value interface{}, err error) {
	ks := keySync{getKey, key, nil}
	vs, found := d.sync(ks)
	if !found {
		vs.val, vs.err = d.child.Get(key)
		d.syncDone(ks)
	}
	return vs.val, vs.err
}

// Has returns whether the `key` is mapped to a `value`.
func (d *datastore) Has(key ds.Key) (exists bool, err error) {
	ks := keySync{hasKey, key, nil}
	vs, found := d.sync(ks)
	if !found {
		vs.val, vs.err = d.child.Has(key)
		d.syncDone(ks)
	}
	return vs.val.(bool), vs.err
}

// Delete removes the value for given `key`.
func (d *datastore) Delete(key ds.Key) (err error) {
	ks := keySync{deleteKey, key, nil}
	vs, found := d.sync(ks)
	if !found {
		vs.err = d.child.Delete(key)
		d.syncDone(ks)
	}
	return vs.err
}

// Query returns a list of keys in the datastore
func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
	// query not coalesced yet.
	return d.child.Query(q)
}
Jeromy's avatar
Jeromy committed
128 129 130 131 132 133 134 135 136 137 138 139 140

func (d *datastore) Close() error {
	d.reqmu.Lock()
	defer d.reqmu.Unlock()

	for _, s := range d.req {
		<-s.done
	}
	if c, ok := d.child.(io.Closer); ok {
		return c.Close()
	}
	return nil
}
141 142 143 144 145

// DiskUsage implements the PersistentDatastore interface.
func (d *datastore) DiskUsage() (uint64, error) {
	return ds.DiskUsage(d.child)
}