blockstore.go 5.73 KB
Newer Older
1 2 3 4 5 6
// package blockstore implements a thin wrapper over a datastore, giving a
// clean interface for Getting and Putting block objects.
package blockstore

import (
	"errors"
Jeromy's avatar
Jeromy committed
7
	"sync"
Jeromy's avatar
Jeromy committed
8
	"sync/atomic"
9

10
	blocks "github.com/ipfs/go-ipfs/blocks"
11
	key "github.com/ipfs/go-ipfs/blocks/key"
Jeromy's avatar
Jeromy committed
12 13 14 15
	ds "gx/ipfs/QmNgqJarToRiq2GBaPJhkmW4B5BxS5B74E1rkGvv2JoaTp/go-datastore"
	dsns "gx/ipfs/QmNgqJarToRiq2GBaPJhkmW4B5BxS5B74E1rkGvv2JoaTp/go-datastore/namespace"
	dsq "gx/ipfs/QmNgqJarToRiq2GBaPJhkmW4B5BxS5B74E1rkGvv2JoaTp/go-datastore/query"
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
16
	mh "gx/ipfs/QmYf7ng2hG5XBtJA3tN34DQ2GUN5HNksEw1rLDkmr6vGku/go-multihash"
Jeromy's avatar
Jeromy committed
17
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
18 19
)

Jeromy's avatar
Jeromy committed
20
var log = logging.Logger("blockstore")
21

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22
// BlockPrefix namespaces blockstore datastores
23
var BlockPrefix = ds.NewKey("blocks")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24

25 26
var ValueTypeMismatch = errors.New("the retrieved value is not a Block")
var ErrHashMismatch = errors.New("block in storage has different hash than requested")
27

28 29
var ErrNotFound = errors.New("blockstore: block not found")

Jeromy's avatar
Jeromy committed
30
// Blockstore wraps a Datastore
31
type Blockstore interface {
32 33
	DeleteBlock(key.Key) error
	Has(key.Key) (bool, error)
34 35 36
	Get(key.Key) (blocks.Block, error)
	Put(blocks.Block) error
	PutMany([]blocks.Block) error
37

38
	AllKeysChan(ctx context.Context) (<-chan key.Key, error)
39 40
}

Jeromy's avatar
Jeromy committed
41 42 43
type GCBlockstore interface {
	Blockstore

44 45 46
	// GCLock locks the blockstore for garbage collection. No operations
	// that expect to finish with a pin should ocurr simultaneously.
	// Reading during GC is safe, and requires no lock.
47
	GCLock() Unlocker
48 49 50 51 52

	// PinLock locks the blockstore for sequences of puts expected to finish
	// with a pin (before GC). Multiple put->pin sequences can write through
	// at the same time, but no GC should not happen simulatenously.
	// Reading during Pinning is safe, and requires no lock.
53
	PinLock() Unlocker
Jeromy's avatar
Jeromy committed
54 55 56 57

	// GcRequested returns true if GCLock has been called and is waiting to
	// take the lock
	GCRequested() bool
Jeromy's avatar
Jeromy committed
58 59
}

Jeromy's avatar
Jeromy committed
60
func NewBlockstore(d ds.Batching) *blockstore {
Jeromy's avatar
Jeromy committed
61
	var dsb ds.Batching
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62
	dd := dsns.Wrap(d, BlockPrefix)
Jeromy's avatar
Jeromy committed
63
	dsb = dd
64
	return &blockstore{
Jeromy's avatar
Jeromy committed
65
		datastore: dsb,
66 67 68 69
	}
}

type blockstore struct {
Jeromy's avatar
Jeromy committed
70
	datastore ds.Batching
Jeromy's avatar
Jeromy committed
71

Jeromy's avatar
Jeromy committed
72 73 74
	lk      sync.RWMutex
	gcreq   int32
	gcreqlk sync.Mutex
75 76 77 78

	rehash bool
}

79
func (bs *blockstore) HashOnRead(enabled bool) {
80
	bs.rehash = enabled
81 82
}

83
func (bs *blockstore) Get(k key.Key) (blocks.Block, error) {
jbenet's avatar
jbenet committed
84 85 86 87
	if k == "" {
		return nil, ErrNotFound
	}

88
	maybeData, err := bs.datastore.Get(k.DsKey())
89 90 91
	if err == ds.ErrNotFound {
		return nil, ErrNotFound
	}
92 93 94 95 96 97 98 99
	if err != nil {
		return nil, err
	}
	bdata, ok := maybeData.([]byte)
	if !ok {
		return nil, ValueTypeMismatch
	}

100 101 102 103 104 105 106 107 108 109
	if bs.rehash {
		rb := blocks.NewBlock(bdata)
		if rb.Key() != k {
			return nil, ErrHashMismatch
		} else {
			return rb, nil
		}
	} else {
		return blocks.NewBlockWithHash(bdata, mh.Multihash(k))
	}
110 111
}

112
func (bs *blockstore) Put(block blocks.Block) error {
113
	k := block.Key().DsKey()
114 115

	// Has is cheaper than Put, so see if we already have it
116
	exists, err := bs.datastore.Has(k)
117
	if err == nil && exists {
118 119
		return nil // already stored.
	}
Jeromy's avatar
Jeromy committed
120
	return bs.datastore.Put(k, block.RawData())
121
}
122

123
func (bs *blockstore) PutMany(blocks []blocks.Block) error {
124 125 126 127 128 129 130 131 132 133 134
	t, err := bs.datastore.Batch()
	if err != nil {
		return err
	}
	for _, b := range blocks {
		k := b.Key().DsKey()
		exists, err := bs.datastore.Has(k)
		if err == nil && exists {
			continue
		}

Jeromy's avatar
Jeromy committed
135
		err = t.Put(k, b.RawData())
136 137 138 139 140 141 142
		if err != nil {
			return err
		}
	}
	return t.Commit()
}

143
func (bs *blockstore) Has(k key.Key) (bool, error) {
144 145 146
	return bs.datastore.Has(k.DsKey())
}

147
func (s *blockstore) DeleteBlock(k key.Key) error {
148 149
	return s.datastore.Delete(k.DsKey())
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
150

151
// AllKeysChan runs a query for keys from the blockstore.
152 153
// this is very simplistic, in the future, take dsq.Query as a param?
//
154
// AllKeysChan respects context
155
func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
156 157

	// KeysOnly, because that would be _a lot_ of data.
158
	q := dsq.Query{KeysOnly: true}
159 160
	// datastore/namespace does *NOT* fix up Query.Prefix
	q.Prefix = BlockPrefix.String()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
161 162 163 164 165
	res, err := bs.datastore.Query(q)
	if err != nil {
		return nil, err
	}

166
	// this function is here to compartmentalize
167
	get := func() (key.Key, bool) {
168 169
		select {
		case <-ctx.Done():
170
			return "", false
171 172
		case e, more := <-res.Next():
			if !more {
173
				return "", false
174 175 176
			}
			if e.Error != nil {
				log.Debug("blockstore.AllKeysChan got err:", e.Error)
177
				return "", false
178 179
			}

180
			// need to convert to key.Key using key.KeyFromDsKey.
181 182 183 184 185
			k, err := key.KeyFromDsKey(ds.NewKey(e.Key))
			if err != nil {
				log.Warningf("error parsing key from DsKey: ", err)
				return "", true
			}
186
			log.Debug("blockstore: query got key", k)
187 188

			// key must be a multihash. else ignore it.
189
			_, err = mh.Cast([]byte(k))
190
			if err != nil {
191
				log.Warningf("key from datastore was not a multihash: ", err)
192 193 194
				return "", true
			}

195 196
			return k, true
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
197
	}
198

199
	output := make(chan key.Key, dsq.KeysOnlyBufSize)
200 201 202 203 204 205 206 207 208 209 210
	go func() {
		defer func() {
			res.Process().Close() // ensure exit (signals early exit, too)
			close(output)
		}()

		for {
			k, ok := get()
			if !ok {
				return
			}
211 212 213
			if k == "" {
				continue
			}
214 215 216 217 218 219 220 221 222 223

			select {
			case <-ctx.Done():
				return
			case output <- k:
			}
		}
	}()

	return output, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
224
}
Jeromy's avatar
Jeromy committed
225

226 227 228 229 230 231 232 233 234 235 236 237 238 239
type Unlocker interface {
	Unlock()
}

type unlocker struct {
	unlock func()
}

func (u *unlocker) Unlock() {
	u.unlock()
	u.unlock = nil // ensure its not called twice
}

func (bs *blockstore) GCLock() Unlocker {
Jeromy's avatar
Jeromy committed
240
	atomic.AddInt32(&bs.gcreq, 1)
Jeromy's avatar
Jeromy committed
241
	bs.lk.Lock()
Jeromy's avatar
Jeromy committed
242
	atomic.AddInt32(&bs.gcreq, -1)
243
	return &unlocker{bs.lk.Unlock}
Jeromy's avatar
Jeromy committed
244 245
}

246
func (bs *blockstore) PinLock() Unlocker {
Jeromy's avatar
Jeromy committed
247
	bs.lk.RLock()
248
	return &unlocker{bs.lk.RUnlock}
Jeromy's avatar
Jeromy committed
249
}
Jeromy's avatar
Jeromy committed
250 251 252 253

func (bs *blockstore) GCRequested() bool {
	return atomic.LoadInt32(&bs.gcreq) > 0
}