blockstore.go 5.73 KB
Newer Older
1 2 3 4 5 6
// package blockstore implements a thin wrapper over a datastore, giving a
// clean interface for Getting and Putting block objects.
package blockstore

import (
	"errors"
Jeromy's avatar
Jeromy committed
7
	"sync"
Jeromy's avatar
Jeromy committed
8
	"sync/atomic"
9

10
	blocks "github.com/ipfs/go-ipfs/blocks"
11
	key "github.com/ipfs/go-ipfs/blocks/key"
12
	logging "gx/ipfs/QmNQynaz7qfriSUJkiEZUrm2Wen1u3Kj9goZzWtrPyu7XR/go-log"
13 14 15
	ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
	dsns "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore/namespace"
	dsq "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore/query"
16
	mh "gx/ipfs/QmYf7ng2hG5XBtJA3tN34DQ2GUN5HNksEw1rLDkmr6vGku/go-multihash"
Jeromy's avatar
Jeromy committed
17
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
18 19
)

Jeromy's avatar
Jeromy committed
20
var log = logging.Logger("blockstore")
21

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22
// BlockPrefix namespaces blockstore datastores
23
var BlockPrefix = ds.NewKey("blocks")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24

25 26
var ValueTypeMismatch = errors.New("the retrieved value is not a Block")
var ErrHashMismatch = errors.New("block in storage has different hash than requested")
27

28 29
var ErrNotFound = errors.New("blockstore: block not found")

Jeromy's avatar
Jeromy committed
30
// Blockstore wraps a Datastore
31
type Blockstore interface {
32 33
	DeleteBlock(key.Key) error
	Has(key.Key) (bool, error)
34 35 36
	Get(key.Key) (blocks.Block, error)
	Put(blocks.Block) error
	PutMany([]blocks.Block) error
37

38
	AllKeysChan(ctx context.Context) (<-chan key.Key, error)
39 40
}

Jeromy's avatar
Jeromy committed
41 42 43
type GCBlockstore interface {
	Blockstore

44 45 46
	// GCLock locks the blockstore for garbage collection. No operations
	// that expect to finish with a pin should ocurr simultaneously.
	// Reading during GC is safe, and requires no lock.
47
	GCLock() Unlocker
48 49 50 51 52

	// PinLock locks the blockstore for sequences of puts expected to finish
	// with a pin (before GC). Multiple put->pin sequences can write through
	// at the same time, but no GC should not happen simulatenously.
	// Reading during Pinning is safe, and requires no lock.
53
	PinLock() Unlocker
Jeromy's avatar
Jeromy committed
54 55 56 57

	// GcRequested returns true if GCLock has been called and is waiting to
	// take the lock
	GCRequested() bool
Jeromy's avatar
Jeromy committed
58 59
}

Jeromy's avatar
Jeromy committed
60
func NewBlockstore(d ds.Batching) *blockstore {
Jeromy's avatar
Jeromy committed
61
	var dsb ds.Batching
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62
	dd := dsns.Wrap(d, BlockPrefix)
Jeromy's avatar
Jeromy committed
63
	dsb = dd
64
	return &blockstore{
Jeromy's avatar
Jeromy committed
65
		datastore: dsb,
66 67 68 69
	}
}

type blockstore struct {
Jeromy's avatar
Jeromy committed
70
	datastore ds.Batching
Jeromy's avatar
Jeromy committed
71

Jeromy's avatar
Jeromy committed
72 73 74
	lk      sync.RWMutex
	gcreq   int32
	gcreqlk sync.Mutex
75 76 77 78 79 80

	rehash bool
}

func (bs *blockstore) RuntimeHashing(enabled bool) {
	bs.rehash = enabled
81 82
}

83
func (bs *blockstore) Get(k key.Key) (blocks.Block, error) {
jbenet's avatar
jbenet committed
84 85 86 87
	if k == "" {
		return nil, ErrNotFound
	}

88
	maybeData, err := bs.datastore.Get(k.DsKey())
89 90 91
	if err == ds.ErrNotFound {
		return nil, ErrNotFound
	}
92 93 94 95 96 97 98 99
	if err != nil {
		return nil, err
	}
	bdata, ok := maybeData.([]byte)
	if !ok {
		return nil, ValueTypeMismatch
	}

100 101 102 103 104 105 106 107 108 109
	if bs.rehash {
		rb := blocks.NewBlock(bdata)
		if rb.Key() != k {
			return nil, ErrHashMismatch
		} else {
			return rb, nil
		}
	} else {
		return blocks.NewBlockWithHash(bdata, mh.Multihash(k))
	}
110 111
}

112
func (bs *blockstore) Put(block blocks.Block) error {
113
	k := block.Key().DsKey()
114 115

	// Has is cheaper than Put, so see if we already have it
116
	exists, err := bs.datastore.Has(k)
117
	if err == nil && exists {
118 119
		return nil // already stored.
	}
120
	return bs.datastore.Put(k, block.Data())
121
}
122

123
func (bs *blockstore) PutMany(blocks []blocks.Block) error {
124 125 126 127 128 129 130 131 132 133 134
	t, err := bs.datastore.Batch()
	if err != nil {
		return err
	}
	for _, b := range blocks {
		k := b.Key().DsKey()
		exists, err := bs.datastore.Has(k)
		if err == nil && exists {
			continue
		}

135
		err = t.Put(k, b.Data())
136 137 138 139 140 141 142
		if err != nil {
			return err
		}
	}
	return t.Commit()
}

143
func (bs *blockstore) Has(k key.Key) (bool, error) {
144 145 146
	return bs.datastore.Has(k.DsKey())
}

147
func (s *blockstore) DeleteBlock(k key.Key) error {
148 149
	return s.datastore.Delete(k.DsKey())
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
150

151
// AllKeysChan runs a query for keys from the blockstore.
152 153
// this is very simplistic, in the future, take dsq.Query as a param?
//
154
// AllKeysChan respects context
155
func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
156 157

	// KeysOnly, because that would be _a lot_ of data.
158
	q := dsq.Query{KeysOnly: true}
159 160
	// datastore/namespace does *NOT* fix up Query.Prefix
	q.Prefix = BlockPrefix.String()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
161 162 163 164 165
	res, err := bs.datastore.Query(q)
	if err != nil {
		return nil, err
	}

166
	// this function is here to compartmentalize
167
	get := func() (key.Key, bool) {
168 169
		select {
		case <-ctx.Done():
170
			return "", false
171 172
		case e, more := <-res.Next():
			if !more {
173
				return "", false
174 175 176
			}
			if e.Error != nil {
				log.Debug("blockstore.AllKeysChan got err:", e.Error)
177
				return "", false
178 179
			}

180
			// need to convert to key.Key using key.KeyFromDsKey.
181 182 183 184 185
			k, err := key.KeyFromDsKey(ds.NewKey(e.Key))
			if err != nil {
				log.Warningf("error parsing key from DsKey: ", err)
				return "", true
			}
186
			log.Debug("blockstore: query got key", k)
187 188

			// key must be a multihash. else ignore it.
189
			_, err = mh.Cast([]byte(k))
190
			if err != nil {
191
				log.Warningf("key from datastore was not a multihash: ", err)
192 193 194
				return "", true
			}

195 196
			return k, true
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
197
	}
198

199
	output := make(chan key.Key, dsq.KeysOnlyBufSize)
200 201 202 203 204 205 206 207 208 209 210
	go func() {
		defer func() {
			res.Process().Close() // ensure exit (signals early exit, too)
			close(output)
		}()

		for {
			k, ok := get()
			if !ok {
				return
			}
211 212 213
			if k == "" {
				continue
			}
214 215 216 217 218 219 220 221 222 223

			select {
			case <-ctx.Done():
				return
			case output <- k:
			}
		}
	}()

	return output, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
224
}
Jeromy's avatar
Jeromy committed
225

226 227 228 229 230 231 232 233 234 235 236 237 238 239
type Unlocker interface {
	Unlock()
}

type unlocker struct {
	unlock func()
}

func (u *unlocker) Unlock() {
	u.unlock()
	u.unlock = nil // ensure its not called twice
}

func (bs *blockstore) GCLock() Unlocker {
Jeromy's avatar
Jeromy committed
240
	atomic.AddInt32(&bs.gcreq, 1)
Jeromy's avatar
Jeromy committed
241
	bs.lk.Lock()
Jeromy's avatar
Jeromy committed
242
	atomic.AddInt32(&bs.gcreq, -1)
243
	return &unlocker{bs.lk.Unlock}
Jeromy's avatar
Jeromy committed
244 245
}

246
func (bs *blockstore) PinLock() Unlocker {
Jeromy's avatar
Jeromy committed
247
	bs.lk.RLock()
248
	return &unlocker{bs.lk.RUnlock}
Jeromy's avatar
Jeromy committed
249
}
Jeromy's avatar
Jeromy committed
250 251 252 253

func (bs *blockstore) GCRequested() bool {
	return atomic.LoadInt32(&bs.gcreq) > 0
}