sync.go 2.96 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package sync

import (
Jeromy's avatar
Jeromy committed
4
	"io"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6
	"sync"

Jeromy's avatar
Jeromy committed
7 8
	ds "github.com/ipfs/go-datastore"
	dsq "github.com/ipfs/go-datastore/query"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9 10 11 12 13 14 15 16 17 18 19 20
)

// MutexDatastore contains a child datastire and a mutex.
// used for coarse sync
type MutexDatastore struct {
	sync.RWMutex

	child ds.Datastore
}

// MutexWrap constructs a datastore with a coarse lock around
// the entire datastore, for every single operation
21
func MutexWrap(d ds.Datastore) *MutexDatastore {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22 23 24 25 26 27 28 29 30 31 32 33
	return &MutexDatastore{child: d}
}

// Children implements Shim
func (d *MutexDatastore) Children() []ds.Datastore {
	return []ds.Datastore{d.child}
}

// IsThreadSafe implements ThreadSafeDatastore
func (d *MutexDatastore) IsThreadSafe() {}

// Put implements Datastore.Put
34
func (d *MutexDatastore) Put(key ds.Key, value []byte) (err error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
35 36 37 38 39 40
	d.Lock()
	defer d.Unlock()
	return d.child.Put(key, value)
}

// Get implements Datastore.Get
41
func (d *MutexDatastore) Get(key ds.Key) (value []byte, err error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
	d.RLock()
	defer d.RUnlock()
	return d.child.Get(key)
}

// Has implements Datastore.Has
func (d *MutexDatastore) Has(key ds.Key) (exists bool, err error) {
	d.RLock()
	defer d.RUnlock()
	return d.child.Has(key)
}

// Delete implements Datastore.Delete
func (d *MutexDatastore) Delete(key ds.Key) (err error) {
	d.Lock()
	defer d.Unlock()
	return d.child.Delete(key)
}

// KeyList implements Datastore.KeyList
62
func (d *MutexDatastore) Query(q dsq.Query) (dsq.Results, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
63 64
	d.RLock()
	defer d.RUnlock()
65
	return d.child.Query(q)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
66
}
Jeromy's avatar
Jeromy committed
67 68 69 70

func (d *MutexDatastore) Batch() (ds.Batch, error) {
	d.RLock()
	defer d.RUnlock()
Jeromy's avatar
Jeromy committed
71
	bds, ok := d.child.(ds.Batching)
Jeromy's avatar
Jeromy committed
72 73 74 75 76 77 78 79 80 81
	if !ok {
		return nil, ds.ErrBatchUnsupported
	}

	b, err := bds.Batch()
	if err != nil {
		return nil, err
	}
	return &syncBatch{
		batch: b,
Jeromy's avatar
Jeromy committed
82
		mds:   d,
Jeromy's avatar
Jeromy committed
83 84 85
	}, nil
}

Jeromy's avatar
Jeromy committed
86 87 88 89 90 91 92 93 94
func (d *MutexDatastore) Close() error {
	d.RWMutex.Lock()
	defer d.RWMutex.Unlock()
	if c, ok := d.child.(io.Closer); ok {
		return c.Close()
	}
	return nil
}

95 96 97 98 99 100 101
// DiskUsage implements the PersistentDatastore interface.
func (d *MutexDatastore) DiskUsage() (uint64, error) {
	d.RLock()
	defer d.RUnlock()
	return ds.DiskUsage(d.child)
}

Jeromy's avatar
Jeromy committed
102 103
type syncBatch struct {
	batch ds.Batch
Jeromy's avatar
Jeromy committed
104
	mds   *MutexDatastore
Jeromy's avatar
Jeromy committed
105 106
}

107
func (b *syncBatch) Put(key ds.Key, val []byte) error {
Jeromy's avatar
Jeromy committed
108 109
	b.mds.Lock()
	defer b.mds.Unlock()
Jeromy's avatar
Jeromy committed
110 111 112 113
	return b.batch.Put(key, val)
}

func (b *syncBatch) Delete(key ds.Key) error {
Jeromy's avatar
Jeromy committed
114 115
	b.mds.Lock()
	defer b.mds.Unlock()
Jeromy's avatar
Jeromy committed
116 117 118 119
	return b.batch.Delete(key)
}

func (b *syncBatch) Commit() error {
Jeromy's avatar
Jeromy committed
120 121
	b.mds.Lock()
	defer b.mds.Unlock()
Jeromy's avatar
Jeromy committed
122 123
	return b.batch.Commit()
}
124 125 126

func (d *MutexDatastore) Check() error {
	if c, ok := d.child.(ds.CheckedDatastore); ok {
127 128
		d.RWMutex.Lock()
		defer d.RWMutex.Unlock()
129 130 131 132 133 134 135
		return c.Check()
	}
	return nil
}

func (d *MutexDatastore) Scrub() error {
	if c, ok := d.child.(ds.ScrubbedDatastore); ok {
136 137
		d.RWMutex.Lock()
		defer d.RWMutex.Unlock()
138 139 140 141 142 143 144
		return c.Scrub()
	}
	return nil
}

func (d *MutexDatastore) CollectGarbage() error {
	if c, ok := d.child.(ds.GCDatastore); ok {
145 146
		d.RWMutex.Lock()
		defer d.RWMutex.Unlock()
147 148 149 150
		return c.CollectGarbage()
	}
	return nil
}