blockstore.go 5.7 KB
Newer Older
1 2 3 4 5
// package blockstore implements a thin wrapper over a datastore, giving a
// clean interface for Getting and Putting block objects.
package blockstore

import (
6
	"context"
7
	"errors"
Jeromy's avatar
Jeromy committed
8
	"sync"
Jeromy's avatar
Jeromy committed
9
	"sync/atomic"
10

11
	blocks "github.com/ipfs/go-ipfs/blocks"
12 13
	dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help"

Jeromy's avatar
Jeromy committed
14 15 16
	ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
	dsns "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/namespace"
	dsq "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/query"
Jeromy's avatar
Jeromy committed
17
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
18
	cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
19 20
)

Jeromy's avatar
Jeromy committed
21
var log = logging.Logger("blockstore")
22

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23
// BlockPrefix namespaces blockstore datastores
24
var BlockPrefix = ds.NewKey("blocks")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25

26 27
var ValueTypeMismatch = errors.New("the retrieved value is not a Block")
var ErrHashMismatch = errors.New("block in storage has different hash than requested")
28

29 30
var ErrNotFound = errors.New("blockstore: block not found")

Jeromy's avatar
Jeromy committed
31
// Blockstore wraps a Datastore
32
type Blockstore interface {
33 34 35
	DeleteBlock(*cid.Cid) error
	Has(*cid.Cid) (bool, error)
	Get(*cid.Cid) (blocks.Block, error)
36 37
	Put(blocks.Block) error
	PutMany([]blocks.Block) error
38

39
	AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error)
40 41
}

42
type GCLocker interface {
43 44 45
	// GCLock locks the blockstore for garbage collection. No operations
	// that expect to finish with a pin should ocurr simultaneously.
	// Reading during GC is safe, and requires no lock.
46
	GCLock() Unlocker
47 48 49 50 51

	// PinLock locks the blockstore for sequences of puts expected to finish
	// with a pin (before GC). Multiple put->pin sequences can write through
	// at the same time, but no GC should not happen simulatenously.
	// Reading during Pinning is safe, and requires no lock.
52
	PinLock() Unlocker
Jeromy's avatar
Jeromy committed
53 54 55 56

	// GcRequested returns true if GCLock has been called and is waiting to
	// take the lock
	GCRequested() bool
Jeromy's avatar
Jeromy committed
57 58
}

59 60 61 62 63 64 65 66 67 68 69 70 71 72
type GCBlockstore interface {
	Blockstore
	GCLocker
}

func NewGCBlockstore(bs Blockstore, gcl GCLocker) GCBlockstore {
	return gcBlockstore{bs, gcl}
}

type gcBlockstore struct {
	Blockstore
	GCLocker
}

Jeromy's avatar
Jeromy committed
73
func NewBlockstore(d ds.Batching) *blockstore {
Jeromy's avatar
Jeromy committed
74
	var dsb ds.Batching
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
75
	dd := dsns.Wrap(d, BlockPrefix)
Jeromy's avatar
Jeromy committed
76
	dsb = dd
77
	return &blockstore{
Jeromy's avatar
Jeromy committed
78
		datastore: dsb,
79 80 81 82
	}
}

type blockstore struct {
Jeromy's avatar
Jeromy committed
83
	datastore ds.Batching
Jeromy's avatar
Jeromy committed
84

Jeromy's avatar
Jeromy committed
85 86 87
	lk      sync.RWMutex
	gcreq   int32
	gcreqlk sync.Mutex
88 89 90 91

	rehash bool
}

92
func (bs *blockstore) HashOnRead(enabled bool) {
93
	bs.rehash = enabled
94 95
}

96 97 98
func (bs *blockstore) Get(k *cid.Cid) (blocks.Block, error) {
	if k == nil {
		log.Error("nil cid in blockstore")
jbenet's avatar
jbenet committed
99 100 101
		return nil, ErrNotFound
	}

102
	maybeData, err := bs.datastore.Get(dshelp.CidToDsKey(k))
103 104 105
	if err == ds.ErrNotFound {
		return nil, ErrNotFound
	}
106 107 108 109 110 111 112 113
	if err != nil {
		return nil, err
	}
	bdata, ok := maybeData.([]byte)
	if !ok {
		return nil, ValueTypeMismatch
	}

114
	if bs.rehash {
115 116 117 118 119 120
		rbcid, err := k.Prefix().Sum(bdata)
		if err != nil {
			return nil, err
		}

		if !rbcid.Equals(k) {
121 122
			return nil, ErrHashMismatch
		}
123 124

		return blocks.NewBlockWithCid(bdata, rbcid)
125
	} else {
126
		return blocks.NewBlockWithCid(bdata, k)
127
	}
128 129
}

130
func (bs *blockstore) Put(block blocks.Block) error {
131
	k := dshelp.CidToDsKey(block.Cid())
132 133

	// Has is cheaper than Put, so see if we already have it
134
	exists, err := bs.datastore.Has(k)
135
	if err == nil && exists {
136 137
		return nil // already stored.
	}
Jeromy's avatar
Jeromy committed
138
	return bs.datastore.Put(k, block.RawData())
139
}
140

141
func (bs *blockstore) PutMany(blocks []blocks.Block) error {
142 143 144 145 146
	t, err := bs.datastore.Batch()
	if err != nil {
		return err
	}
	for _, b := range blocks {
147
		k := dshelp.CidToDsKey(b.Cid())
148 149 150 151 152
		exists, err := bs.datastore.Has(k)
		if err == nil && exists {
			continue
		}

Jeromy's avatar
Jeromy committed
153
		err = t.Put(k, b.RawData())
154 155 156 157 158 159 160
		if err != nil {
			return err
		}
	}
	return t.Commit()
}

161
func (bs *blockstore) Has(k *cid.Cid) (bool, error) {
162
	return bs.datastore.Has(dshelp.CidToDsKey(k))
163 164
}

165
func (s *blockstore) DeleteBlock(k *cid.Cid) error {
166 167 168 169 170
	err := s.datastore.Delete(dshelp.CidToDsKey(k))
	if err == ds.ErrNotFound {
		return ErrNotFound
	}
	return err
171
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
172

173
// AllKeysChan runs a query for keys from the blockstore.
174 175
// this is very simplistic, in the future, take dsq.Query as a param?
//
176
// AllKeysChan respects context
177
func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
178 179

	// KeysOnly, because that would be _a lot_ of data.
180
	q := dsq.Query{KeysOnly: true}
181 182
	// datastore/namespace does *NOT* fix up Query.Prefix
	q.Prefix = BlockPrefix.String()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
183 184 185 186 187
	res, err := bs.datastore.Query(q)
	if err != nil {
		return nil, err
	}

188
	output := make(chan *cid.Cid, dsq.KeysOnlyBufSize)
189 190
	go func() {
		defer func() {
191
			res.Close() // ensure exit (signals early exit, too)
192 193 194 195
			close(output)
		}()

		for {
196
			e, ok := res.NextSync()
197 198 199
			if !ok {
				return
			}
200
			if e.Error != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
201
				log.Errorf("blockstore.AllKeysChan got err: %s", e.Error)
202
				return
203 204 205 206 207
			}

			// need to convert to key.Key using key.KeyFromDsKey.
			k, err := dshelp.DsKeyToCid(ds.RawKey(e.Key))
			if err != nil {
Jakub Sztandera's avatar
Jakub Sztandera committed
208
				log.Warningf("error parsing key from DsKey: %s", err)
209 210
				continue
			}
211 212 213 214 215 216 217 218 219 220

			select {
			case <-ctx.Done():
				return
			case output <- k:
			}
		}
	}()

	return output, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
221
}
Jeromy's avatar
Jeromy committed
222

223 224 225 226 227 228 229 230 231 232
func NewGCLocker() *gclocker {
	return &gclocker{}
}

type gclocker struct {
	lk      sync.RWMutex
	gcreq   int32
	gcreqlk sync.Mutex
}

233 234 235 236 237 238 239 240 241 242 243 244 245
type Unlocker interface {
	Unlock()
}

type unlocker struct {
	unlock func()
}

func (u *unlocker) Unlock() {
	u.unlock()
	u.unlock = nil // ensure its not called twice
}

246
func (bs *gclocker) GCLock() Unlocker {
Jeromy's avatar
Jeromy committed
247
	atomic.AddInt32(&bs.gcreq, 1)
Jeromy's avatar
Jeromy committed
248
	bs.lk.Lock()
Jeromy's avatar
Jeromy committed
249
	atomic.AddInt32(&bs.gcreq, -1)
250
	return &unlocker{bs.lk.Unlock}
Jeromy's avatar
Jeromy committed
251 252
}

253
func (bs *gclocker) PinLock() Unlocker {
Jeromy's avatar
Jeromy committed
254
	bs.lk.RLock()
255
	return &unlocker{bs.lk.RUnlock}
Jeromy's avatar
Jeromy committed
256
}
Jeromy's avatar
Jeromy committed
257

258
func (bs *gclocker) GCRequested() bool {
Jeromy's avatar
Jeromy committed
259 260
	return atomic.LoadInt32(&bs.gcreq) > 0
}