blockstore.go 6.79 KB
Newer Older
1
// Package blockstore implements a thin wrapper over a datastore, giving a
2 3 4 5
// clean interface for Getting and Putting block objects.
package blockstore

import (
6
	"context"
7
	"errors"
Jeromy's avatar
Jeromy committed
8
	"sync"
Jeromy's avatar
Jeromy committed
9
	"sync/atomic"
10

11
	dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help"
12
	blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
13

14
	cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
Jeromy's avatar
Jeromy committed
15
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
16 17 18
	ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
	dsns "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore/namespace"
	dsq "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore/query"
19 20
)

Jeromy's avatar
Jeromy committed
21
var log = logging.Logger("blockstore")
22

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23
// BlockPrefix namespaces blockstore datastores
24
var BlockPrefix = ds.NewKey("blocks")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25

26 27 28 29 30 31
// ErrValueTypeMismatch is an error returned when the item retrieved from
// the datatstore is not a block.
var ErrValueTypeMismatch = errors.New("the retrieved value is not a Block")

// ErrHashMismatch is an error returned when the hash of a block
// is different than expected.
32
var ErrHashMismatch = errors.New("block in storage has different hash than requested")
33

34
// ErrNotFound is an error returned when a block is not found.
35 36
var ErrNotFound = errors.New("blockstore: block not found")

37 38
// Blockstore wraps a Datastore block-centered methods and provides a layer
// of abstraction which allows to add different caching strategies.
39
type Blockstore interface {
40 41 42
	DeleteBlock(*cid.Cid) error
	Has(*cid.Cid) (bool, error)
	Get(*cid.Cid) (blocks.Block, error)
43 44
	Put(blocks.Block) error
	PutMany([]blocks.Block) error
45 46 47
	// AllKeysChan returns a channel from which
	// the CIDs in the Blockstore can be read. It should respect
	// the given context, closing the channel if it becomes Done.
48
	AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error)
49 50 51
	// HashOnRead specifies if every read block should be
	// rehashed to make sure it matches its CID.
	HashOnRead(enabled bool)
52 53
}

54 55
// GCLocker abstract functionality to lock a blockstore when performing
// garbage-collection operations.
56
type GCLocker interface {
57 58 59
	// GCLock locks the blockstore for garbage collection. No operations
	// that expect to finish with a pin should ocurr simultaneously.
	// Reading during GC is safe, and requires no lock.
60
	GCLock() Unlocker
61 62 63 64 65

	// PinLock locks the blockstore for sequences of puts expected to finish
	// with a pin (before GC). Multiple put->pin sequences can write through
	// at the same time, but no GC should not happen simulatenously.
	// Reading during Pinning is safe, and requires no lock.
66
	PinLock() Unlocker
Jeromy's avatar
Jeromy committed
67 68 69 70

	// GcRequested returns true if GCLock has been called and is waiting to
	// take the lock
	GCRequested() bool
Jeromy's avatar
Jeromy committed
71 72
}

73 74
// GCBlockstore is a blockstore that can safely run garbage-collection
// operations.
75 76 77 78 79
type GCBlockstore interface {
	Blockstore
	GCLocker
}

80 81
// NewGCBlockstore returns a default implementation of GCBlockstore
// using the given Blockstore and GCLocker.
82 83 84 85 86 87 88 89 90
func NewGCBlockstore(bs Blockstore, gcl GCLocker) GCBlockstore {
	return gcBlockstore{bs, gcl}
}

type gcBlockstore struct {
	Blockstore
	GCLocker
}

91 92 93
// NewBlockstore returns a default Blockstore implementation
// using the provided datastore.Batching backend.
func NewBlockstore(d ds.Batching) Blockstore {
Jeromy's avatar
Jeromy committed
94
	var dsb ds.Batching
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
95
	dd := dsns.Wrap(d, BlockPrefix)
Jeromy's avatar
Jeromy committed
96
	dsb = dd
97
	return &blockstore{
Jeromy's avatar
Jeromy committed
98
		datastore: dsb,
99 100 101 102
	}
}

type blockstore struct {
Jeromy's avatar
Jeromy committed
103
	datastore ds.Batching
Jeromy's avatar
Jeromy committed
104

105 106 107
	rehash bool
}

108
func (bs *blockstore) HashOnRead(enabled bool) {
109
	bs.rehash = enabled
110 111
}

112 113 114
func (bs *blockstore) Get(k *cid.Cid) (blocks.Block, error) {
	if k == nil {
		log.Error("nil cid in blockstore")
jbenet's avatar
jbenet committed
115 116 117
		return nil, ErrNotFound
	}

118
	maybeData, err := bs.datastore.Get(dshelp.CidToDsKey(k))
119 120 121
	if err == ds.ErrNotFound {
		return nil, ErrNotFound
	}
122 123 124 125 126
	if err != nil {
		return nil, err
	}
	bdata, ok := maybeData.([]byte)
	if !ok {
127
		return nil, ErrValueTypeMismatch
128 129
	}

130
	if bs.rehash {
131 132 133 134 135 136
		rbcid, err := k.Prefix().Sum(bdata)
		if err != nil {
			return nil, err
		}

		if !rbcid.Equals(k) {
137 138
			return nil, ErrHashMismatch
		}
139 140

		return blocks.NewBlockWithCid(bdata, rbcid)
141
	}
142
	return blocks.NewBlockWithCid(bdata, k)
143 144
}

145
func (bs *blockstore) Put(block blocks.Block) error {
146
	k := dshelp.CidToDsKey(block.Cid())
147 148

	// Has is cheaper than Put, so see if we already have it
149
	exists, err := bs.datastore.Has(k)
150
	if err == nil && exists {
151 152
		return nil // already stored.
	}
Jeromy's avatar
Jeromy committed
153
	return bs.datastore.Put(k, block.RawData())
154
}
155

156
func (bs *blockstore) PutMany(blocks []blocks.Block) error {
157 158 159 160 161
	t, err := bs.datastore.Batch()
	if err != nil {
		return err
	}
	for _, b := range blocks {
162
		k := dshelp.CidToDsKey(b.Cid())
163 164 165 166 167
		exists, err := bs.datastore.Has(k)
		if err == nil && exists {
			continue
		}

Jeromy's avatar
Jeromy committed
168
		err = t.Put(k, b.RawData())
169 170 171 172 173 174 175
		if err != nil {
			return err
		}
	}
	return t.Commit()
}

176
func (bs *blockstore) Has(k *cid.Cid) (bool, error) {
177
	return bs.datastore.Has(dshelp.CidToDsKey(k))
178 179
}

180 181
func (bs *blockstore) DeleteBlock(k *cid.Cid) error {
	err := bs.datastore.Delete(dshelp.CidToDsKey(k))
182 183 184 185
	if err == ds.ErrNotFound {
		return ErrNotFound
	}
	return err
186
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187

188
// AllKeysChan runs a query for keys from the blockstore.
189 190
// this is very simplistic, in the future, take dsq.Query as a param?
//
191
// AllKeysChan respects context.
192
func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
193 194

	// KeysOnly, because that would be _a lot_ of data.
195
	q := dsq.Query{KeysOnly: true}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
196 197 198 199 200
	res, err := bs.datastore.Query(q)
	if err != nil {
		return nil, err
	}

201
	output := make(chan *cid.Cid, dsq.KeysOnlyBufSize)
202 203
	go func() {
		defer func() {
204
			res.Close() // ensure exit (signals early exit, too)
205 206 207 208
			close(output)
		}()

		for {
209
			e, ok := res.NextSync()
210 211 212
			if !ok {
				return
			}
213
			if e.Error != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
214
				log.Errorf("blockstore.AllKeysChan got err: %s", e.Error)
215
				return
216 217 218 219 220
			}

			// need to convert to key.Key using key.KeyFromDsKey.
			k, err := dshelp.DsKeyToCid(ds.RawKey(e.Key))
			if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
221
				log.Warningf("error parsing key from DsKey: %s", err)
222 223
				continue
			}
224 225 226 227 228 229 230 231 232 233

			select {
			case <-ctx.Done():
				return
			case output <- k:
			}
		}
	}()

	return output, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
234
}
Jeromy's avatar
Jeromy committed
235

236 237 238
// NewGCLocker returns a default implementation of
// GCLocker using standard [RW] mutexes.
func NewGCLocker() GCLocker {
239 240 241 242
	return &gclocker{}
}

type gclocker struct {
243 244
	lk    sync.RWMutex
	gcreq int32
245 246
}

247 248
// Unlocker represents an object which can Unlock
// something.
249 250 251 252 253 254 255 256 257 258 259 260 261
type Unlocker interface {
	Unlock()
}

type unlocker struct {
	unlock func()
}

func (u *unlocker) Unlock() {
	u.unlock()
	u.unlock = nil // ensure its not called twice
}

262
func (bs *gclocker) GCLock() Unlocker {
Jeromy's avatar
Jeromy committed
263
	atomic.AddInt32(&bs.gcreq, 1)
Jeromy's avatar
Jeromy committed
264
	bs.lk.Lock()
Jeromy's avatar
Jeromy committed
265
	atomic.AddInt32(&bs.gcreq, -1)
266
	return &unlocker{bs.lk.Unlock}
Jeromy's avatar
Jeromy committed
267 268
}

269
func (bs *gclocker) PinLock() Unlocker {
Jeromy's avatar
Jeromy committed
270
	bs.lk.RLock()
271
	return &unlocker{bs.lk.RUnlock}
Jeromy's avatar
Jeromy committed
272
}
Jeromy's avatar
Jeromy committed
273

274
func (bs *gclocker) GCRequested() bool {
Jeromy's avatar
Jeromy committed
275 276
	return atomic.LoadInt32(&bs.gcreq) > 0
}