blockstore.go 5.55 KB
Newer Older
1 2 3 4 5 6
// package blockstore implements a thin wrapper over a datastore, giving a
// clean interface for Getting and Putting block objects.
package blockstore

import (
	"errors"
Jeromy's avatar
Jeromy committed
7
	"sync"
Jeromy's avatar
Jeromy committed
8
	"sync/atomic"
9

10
	blocks "github.com/ipfs/go-ipfs/blocks"
11
	key "github.com/ipfs/go-ipfs/blocks/key"
12
	logging "gx/ipfs/QmNQynaz7qfriSUJkiEZUrm2Wen1u3Kj9goZzWtrPyu7XR/go-log"
13
	mh "gx/ipfs/QmYf7ng2hG5XBtJA3tN34DQ2GUN5HNksEw1rLDkmr6vGku/go-multihash"
Jakub Sztandera's avatar
Jakub Sztandera committed
14 15 16
	ds "gx/ipfs/QmZ6A6P6AMo8SR3jXAwzTuSU6B9R2Y4eqW2yW9VvfUayDN/go-datastore"
	dsns "gx/ipfs/QmZ6A6P6AMo8SR3jXAwzTuSU6B9R2Y4eqW2yW9VvfUayDN/go-datastore/namespace"
	dsq "gx/ipfs/QmZ6A6P6AMo8SR3jXAwzTuSU6B9R2Y4eqW2yW9VvfUayDN/go-datastore/query"
Jeromy's avatar
Jeromy committed
17
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
18 19
)

Jeromy's avatar
Jeromy committed
20
var log = logging.Logger("blockstore")
21

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22
// BlockPrefix namespaces blockstore datastores
23
var BlockPrefix = ds.NewKey("blocks")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24

25 26
var ValueTypeMismatch = errors.New("the retrieved value is not a Block")
var ErrHashMismatch = errors.New("block in storage has different hash than requested")
27

28 29
var ErrNotFound = errors.New("blockstore: block not found")

Jeromy's avatar
Jeromy committed
30
// Blockstore wraps a Datastore
31
type Blockstore interface {
32 33
	DeleteBlock(key.Key) error
	Has(key.Key) (bool, error)
34 35 36
	Get(key.Key) (blocks.Block, error)
	Put(blocks.Block) error
	PutMany([]blocks.Block) error
37

38
	AllKeysChan(ctx context.Context) (<-chan key.Key, error)
39 40
}

Jeromy's avatar
Jeromy committed
41 42 43
type GCBlockstore interface {
	Blockstore

44 45 46
	// GCLock locks the blockstore for garbage collection. No operations
	// that expect to finish with a pin should ocurr simultaneously.
	// Reading during GC is safe, and requires no lock.
47
	GCLock() Unlocker
48 49 50 51 52

	// PinLock locks the blockstore for sequences of puts expected to finish
	// with a pin (before GC). Multiple put->pin sequences can write through
	// at the same time, but no GC should not happen simulatenously.
	// Reading during Pinning is safe, and requires no lock.
53
	PinLock() Unlocker
Jeromy's avatar
Jeromy committed
54 55 56 57

	// GcRequested returns true if GCLock has been called and is waiting to
	// take the lock
	GCRequested() bool
Jeromy's avatar
Jeromy committed
58 59
}

Jeromy's avatar
Jeromy committed
60
func NewBlockstore(d ds.Batching) *blockstore {
Jeromy's avatar
Jeromy committed
61
	var dsb ds.Batching
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62
	dd := dsns.Wrap(d, BlockPrefix)
Jeromy's avatar
Jeromy committed
63
	dsb = dd
64
	return &blockstore{
Jeromy's avatar
Jeromy committed
65
		datastore: dsb,
66 67 68 69
	}
}

type blockstore struct {
Jeromy's avatar
Jeromy committed
70
	datastore ds.Batching
Jeromy's avatar
Jeromy committed
71

Jeromy's avatar
Jeromy committed
72 73 74
	lk      sync.RWMutex
	gcreq   int32
	gcreqlk sync.Mutex
75 76 77 78 79 80

	rehash bool
}

func (bs *blockstore) RuntimeHashing(enabled bool) {
	bs.rehash = enabled
81 82
}

83
func (bs *blockstore) Get(k key.Key) (blocks.Block, error) {
jbenet's avatar
jbenet committed
84 85 86 87
	if k == "" {
		return nil, ErrNotFound
	}

88
	maybeData, err := bs.datastore.Get(k.DsKey())
89 90 91
	if err == ds.ErrNotFound {
		return nil, ErrNotFound
	}
92 93 94 95 96 97 98 99
	if err != nil {
		return nil, err
	}
	bdata, ok := maybeData.([]byte)
	if !ok {
		return nil, ValueTypeMismatch
	}

100 101 102 103 104 105 106 107 108 109
	if bs.rehash {
		rb := blocks.NewBlock(bdata)
		if rb.Key() != k {
			return nil, ErrHashMismatch
		} else {
			return rb, nil
		}
	} else {
		return blocks.NewBlockWithHash(bdata, mh.Multihash(k))
	}
110 111
}

112
func (bs *blockstore) Put(block blocks.Block) error {
113
	k := block.Key().DsKey()
114 115

	// Has is cheaper than Put, so see if we already have it
116
	exists, err := bs.datastore.Has(k)
117
	if err == nil && exists {
118 119
		return nil // already stored.
	}
120
	return bs.datastore.Put(k, block.Data())
121
}
122

123
func (bs *blockstore) PutMany(blocks []blocks.Block) error {
124 125 126 127 128 129 130 131 132 133 134
	t, err := bs.datastore.Batch()
	if err != nil {
		return err
	}
	for _, b := range blocks {
		k := b.Key().DsKey()
		exists, err := bs.datastore.Has(k)
		if err == nil && exists {
			continue
		}

135
		err = t.Put(k, b.Data())
136 137 138 139 140 141 142
		if err != nil {
			return err
		}
	}
	return t.Commit()
}

143
func (bs *blockstore) Has(k key.Key) (bool, error) {
144 145 146
	return bs.datastore.Has(k.DsKey())
}

147
func (s *blockstore) DeleteBlock(k key.Key) error {
148 149
	return s.datastore.Delete(k.DsKey())
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
150

151
// AllKeysChan runs a query for keys from the blockstore.
152 153
// this is very simplistic, in the future, take dsq.Query as a param?
//
154
// AllKeysChan respects context
155
func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
156 157

	// KeysOnly, because that would be _a lot_ of data.
158
	q := dsq.Query{KeysOnly: true}
159 160
	// datastore/namespace does *NOT* fix up Query.Prefix
	q.Prefix = BlockPrefix.String()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
161 162 163 164 165
	res, err := bs.datastore.Query(q)
	if err != nil {
		return nil, err
	}

166
	// this function is here to compartmentalize
167
	get := func() (k key.Key, ok bool) {
168 169 170 171 172 173 174 175 176 177 178 179
		select {
		case <-ctx.Done():
			return k, false
		case e, more := <-res.Next():
			if !more {
				return k, false
			}
			if e.Error != nil {
				log.Debug("blockstore.AllKeysChan got err:", e.Error)
				return k, false
			}

180 181
			// need to convert to key.Key using key.KeyFromDsKey.
			k = key.KeyFromDsKey(ds.NewKey(e.Key))
182
			log.Debug("blockstore: query got key", k)
183 184 185 186 187 188 189

			// key must be a multihash. else ignore it.
			_, err := mh.Cast([]byte(k))
			if err != nil {
				return "", true
			}

190 191
			return k, true
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
192
	}
193

194
	output := make(chan key.Key)
195 196 197 198 199 200 201 202 203 204 205
	go func() {
		defer func() {
			res.Process().Close() // ensure exit (signals early exit, too)
			close(output)
		}()

		for {
			k, ok := get()
			if !ok {
				return
			}
206 207 208
			if k == "" {
				continue
			}
209 210 211 212 213 214 215 216 217 218

			select {
			case <-ctx.Done():
				return
			case output <- k:
			}
		}
	}()

	return output, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
219
}
Jeromy's avatar
Jeromy committed
220

221 222 223 224 225 226 227 228 229 230 231 232 233 234
type Unlocker interface {
	Unlock()
}

type unlocker struct {
	unlock func()
}

func (u *unlocker) Unlock() {
	u.unlock()
	u.unlock = nil // ensure its not called twice
}

func (bs *blockstore) GCLock() Unlocker {
Jeromy's avatar
Jeromy committed
235
	atomic.AddInt32(&bs.gcreq, 1)
Jeromy's avatar
Jeromy committed
236
	bs.lk.Lock()
Jeromy's avatar
Jeromy committed
237
	atomic.AddInt32(&bs.gcreq, -1)
238
	return &unlocker{bs.lk.Unlock}
Jeromy's avatar
Jeromy committed
239 240
}

241
func (bs *blockstore) PinLock() Unlocker {
Jeromy's avatar
Jeromy committed
242
	bs.lk.RLock()
243
	return &unlocker{bs.lk.RUnlock}
Jeromy's avatar
Jeromy committed
244
}
Jeromy's avatar
Jeromy committed
245 246 247 248

func (bs *blockstore) GCRequested() bool {
	return atomic.LoadInt32(&bs.gcreq) > 0
}