blockstore.go 7.42 KB
Newer Older
1
// Package blockstore implements a thin wrapper over a datastore, giving a
2 3 4 5
// clean interface for Getting and Putting block objects.
package blockstore

import (
6
	"context"
7
	"errors"
Jeromy's avatar
Jeromy committed
8
	"sync"
Jeromy's avatar
Jeromy committed
9
	"sync/atomic"
10

11 12 13 14 15 16 17
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	ds "github.com/ipfs/go-datastore"
	dsns "github.com/ipfs/go-datastore/namespace"
	dsq "github.com/ipfs/go-datastore/query"
	dshelp "github.com/ipfs/go-ipfs-ds-help"
	logging "github.com/ipfs/go-log"
postables's avatar
postables committed
18
	uatomic "go.uber.org/atomic"
19 20
)

Jeromy's avatar
Jeromy committed
21
var log = logging.Logger("blockstore")
22

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23
// BlockPrefix namespaces blockstore datastores
24
var BlockPrefix = ds.NewKey("blocks")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25

26 27
// ErrHashMismatch is an error returned when the hash of a block
// is different than expected.
28
var ErrHashMismatch = errors.New("block in storage has different hash than requested")
29

30
// ErrNotFound is an error returned when a block is not found.
31 32
var ErrNotFound = errors.New("blockstore: block not found")

33 34
// Blockstore wraps a Datastore block-centered methods and provides a layer
// of abstraction which allows to add different caching strategies.
35
type Blockstore interface {
36 37 38
	DeleteBlock(cid.Cid) error
	Has(cid.Cid) (bool, error)
	Get(cid.Cid) (blocks.Block, error)
Jeromy's avatar
Jeromy committed
39

40
	// GetSize returns the CIDs mapped BlockSize
41
	GetSize(cid.Cid) (int, error)
42

Jeromy's avatar
Jeromy committed
43
	// Put puts a given block to the underlying datastore
44
	Put(blocks.Block) error
Jeromy's avatar
Jeromy committed
45 46 47

	// PutMany puts a slice of blocks at the same time using batching
	// capabilities of the underlying datastore whenever possible.
48
	PutMany([]blocks.Block) error
Jeromy's avatar
Jeromy committed
49

50 51 52
	// AllKeysChan returns a channel from which
	// the CIDs in the Blockstore can be read. It should respect
	// the given context, closing the channel if it becomes Done.
53
	AllKeysChan(ctx context.Context) (<-chan cid.Cid, error)
Jeromy's avatar
Jeromy committed
54

55 56 57
	// HashOnRead specifies if every read block should be
	// rehashed to make sure it matches its CID.
	HashOnRead(enabled bool)
58 59
}

60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
// Viewer can be implemented by blockstores that offer zero-copy access to
// values.
//
// Callers of View must not mutate or retain the byte slice, as it could be
// an mmapped memory region, or a pooled byte buffer.
//
// View is especially suitable for deserialising in place.
//
// The callback will only be called iff the query operation is successful (and
// the block is found); otherwise, the error will be propagated. Errors returned
// by the callback will be propagated as well.
type Viewer interface {
	View(cid cid.Cid, callback func([]byte) error) error
}

75 76
// GCLocker abstract functionality to lock a blockstore when performing
// garbage-collection operations.
77
type GCLocker interface {
78 79 80
	// GCLock locks the blockstore for garbage collection. No operations
	// that expect to finish with a pin should ocurr simultaneously.
	// Reading during GC is safe, and requires no lock.
81
	GCLock() Unlocker
82 83 84

	// PinLock locks the blockstore for sequences of puts expected to finish
	// with a pin (before GC). Multiple put->pin sequences can write through
85
	// at the same time, but no GC should happen simulatenously.
86
	// Reading during Pinning is safe, and requires no lock.
87
	PinLock() Unlocker
Jeromy's avatar
Jeromy committed
88 89 90 91

	// GcRequested returns true if GCLock has been called and is waiting to
	// take the lock
	GCRequested() bool
Jeromy's avatar
Jeromy committed
92 93
}

94 95
// GCBlockstore is a blockstore that can safely run garbage-collection
// operations.
96 97 98 99 100
type GCBlockstore interface {
	Blockstore
	GCLocker
}

101 102
// NewGCBlockstore returns a default implementation of GCBlockstore
// using the given Blockstore and GCLocker.
103 104 105 106 107 108 109 110 111
func NewGCBlockstore(bs Blockstore, gcl GCLocker) GCBlockstore {
	return gcBlockstore{bs, gcl}
}

type gcBlockstore struct {
	Blockstore
	GCLocker
}

112 113 114
// NewBlockstore returns a default Blockstore implementation
// using the provided datastore.Batching backend.
func NewBlockstore(d ds.Batching) Blockstore {
Jeromy's avatar
Jeromy committed
115
	var dsb ds.Batching
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
116
	dd := dsns.Wrap(d, BlockPrefix)
Jeromy's avatar
Jeromy committed
117
	dsb = dd
118
	return &blockstore{
Jeromy's avatar
Jeromy committed
119
		datastore: dsb,
postables's avatar
postables committed
120
		rehash:    uatomic.NewBool(false),
121 122 123 124
	}
}

type blockstore struct {
Jeromy's avatar
Jeromy committed
125
	datastore ds.Batching
Jeromy's avatar
Jeromy committed
126

postables's avatar
postables committed
127
	rehash *uatomic.Bool
128 129
}

130
func (bs *blockstore) HashOnRead(enabled bool) {
postables's avatar
postables committed
131
	bs.rehash.Store(enabled)
132 133
}

134 135 136
func (bs *blockstore) Get(k cid.Cid) (blocks.Block, error) {
	if !k.Defined() {
		log.Error("undefined cid in blockstore")
jbenet's avatar
jbenet committed
137 138
		return nil, ErrNotFound
	}
139
	bdata, err := bs.datastore.Get(dshelp.MultihashToDsKey(k.Hash()))
140 141 142
	if err == ds.ErrNotFound {
		return nil, ErrNotFound
	}
143 144 145
	if err != nil {
		return nil, err
	}
postables's avatar
postables committed
146
	if bs.rehash.Load() {
147 148 149 150 151 152
		rbcid, err := k.Prefix().Sum(bdata)
		if err != nil {
			return nil, err
		}

		if !rbcid.Equals(k) {
153 154
			return nil, ErrHashMismatch
		}
155 156

		return blocks.NewBlockWithCid(bdata, rbcid)
157
	}
158
	return blocks.NewBlockWithCid(bdata, k)
159 160
}

161
func (bs *blockstore) Put(block blocks.Block) error {
162
	k := dshelp.MultihashToDsKey(block.Cid().Hash())
163 164

	// Has is cheaper than Put, so see if we already have it
165
	exists, err := bs.datastore.Has(k)
166
	if err == nil && exists {
167 168
		return nil // already stored.
	}
Jeromy's avatar
Jeromy committed
169
	return bs.datastore.Put(k, block.RawData())
170
}
171

172
func (bs *blockstore) PutMany(blocks []blocks.Block) error {
173 174 175 176 177
	t, err := bs.datastore.Batch()
	if err != nil {
		return err
	}
	for _, b := range blocks {
178
		k := dshelp.MultihashToDsKey(b.Cid().Hash())
179 180 181 182 183
		exists, err := bs.datastore.Has(k)
		if err == nil && exists {
			continue
		}

Jeromy's avatar
Jeromy committed
184
		err = t.Put(k, b.RawData())
185 186 187 188 189 190 191
		if err != nil {
			return err
		}
	}
	return t.Commit()
}

192
func (bs *blockstore) Has(k cid.Cid) (bool, error) {
193
	return bs.datastore.Has(dshelp.MultihashToDsKey(k.Hash()))
194 195
}

196
func (bs *blockstore) GetSize(k cid.Cid) (int, error) {
197
	size, err := bs.datastore.GetSize(dshelp.MultihashToDsKey(k.Hash()))
198 199 200
	if err == ds.ErrNotFound {
		return -1, ErrNotFound
	}
Steven Allen's avatar
Steven Allen committed
201
	return size, err
202 203
}

204
func (bs *blockstore) DeleteBlock(k cid.Cid) error {
205
	return bs.datastore.Delete(dshelp.MultihashToDsKey(k.Hash()))
206
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
207

208
// AllKeysChan runs a query for keys from the blockstore.
209 210
// this is very simplistic, in the future, take dsq.Query as a param?
//
211
// AllKeysChan respects context.
212
func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
213 214

	// KeysOnly, because that would be _a lot_ of data.
215
	q := dsq.Query{KeysOnly: true}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
216 217 218 219 220
	res, err := bs.datastore.Query(q)
	if err != nil {
		return nil, err
	}

221
	output := make(chan cid.Cid, dsq.KeysOnlyBufSize)
222 223
	go func() {
		defer func() {
224
			res.Close() // ensure exit (signals early exit, too)
225 226 227 228
			close(output)
		}()

		for {
229
			e, ok := res.NextSync()
230 231 232
			if !ok {
				return
			}
233
			if e.Error != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
234
				log.Errorf("blockstore.AllKeysChan got err: %s", e.Error)
235
				return
236 237 238
			}

			// need to convert to key.Key using key.KeyFromDsKey.
239
			bk, err := dshelp.BinaryFromDsKey(ds.RawKey(e.Key))
240
			if err != nil {
Alex Trottier's avatar
Alex Trottier committed
241
				log.Warningf("error parsing key from binary: %s", err)
242 243
				continue
			}
244
			k := cid.NewCidV1(cid.Raw, bk)
245 246 247 248 249 250 251 252 253
			select {
			case <-ctx.Done():
				return
			case output <- k:
			}
		}
	}()

	return output, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
254
}
Jeromy's avatar
Jeromy committed
255

256 257 258
// NewGCLocker returns a default implementation of
// GCLocker using standard [RW] mutexes.
func NewGCLocker() GCLocker {
259 260 261 262
	return &gclocker{}
}

type gclocker struct {
263 264
	lk    sync.RWMutex
	gcreq int32
265 266
}

267 268
// Unlocker represents an object which can Unlock
// something.
269 270 271 272 273 274 275 276 277 278 279 280 281
type Unlocker interface {
	Unlock()
}

type unlocker struct {
	unlock func()
}

func (u *unlocker) Unlock() {
	u.unlock()
	u.unlock = nil // ensure its not called twice
}

282
func (bs *gclocker) GCLock() Unlocker {
Jeromy's avatar
Jeromy committed
283
	atomic.AddInt32(&bs.gcreq, 1)
Jeromy's avatar
Jeromy committed
284
	bs.lk.Lock()
Jeromy's avatar
Jeromy committed
285
	atomic.AddInt32(&bs.gcreq, -1)
286
	return &unlocker{bs.lk.Unlock}
Jeromy's avatar
Jeromy committed
287 288
}

289
func (bs *gclocker) PinLock() Unlocker {
Jeromy's avatar
Jeromy committed
290
	bs.lk.RLock()
291
	return &unlocker{bs.lk.RUnlock}
Jeromy's avatar
Jeromy committed
292
}
Jeromy's avatar
Jeromy committed
293

294
func (bs *gclocker) GCRequested() bool {
Jeromy's avatar
Jeromy committed
295 296
	return atomic.LoadInt32(&bs.gcreq) > 0
}