blockservice.go 8.52 KB
Newer Older
1 2 3
// package blockservice implements a BlockService interface that provides
// a single GetBlock/AddBlock interface that seamlessly retrieves data either
// locally or from a remote peer through the exchange.
4 5 6
package blockservice

import (
7
	"context"
8
	"errors"
9
	"io"
10
	"sync"
11

Jeromy's avatar
Jeromy committed
12 13 14 15 16
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	blockstore "github.com/ipfs/go-ipfs-blockstore"
	exchange "github.com/ipfs/go-ipfs-exchange-interface"
	logging "github.com/ipfs/go-log"
17 18
)

Jeromy's avatar
Jeromy committed
19
var log = logging.Logger("blockservice")
20

21
var ErrNotFound = errors.New("blockservice: key not found")
Jeromy's avatar
Jeromy committed
22

Steven Allen's avatar
Steven Allen committed
23 24
// BlockGetter is the common interface shared between blockservice sessions and
// the blockservice.
25 26
type BlockGetter interface {
	// GetBlock gets the requested block.
27
	GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error)
28 29 30 31 32 33 34 35

	// GetBlocks does a batch request for the given cids, returning blocks as
	// they are found, in no particular order.
	//
	// It may not be able to find all requested blocks (or the context may
	// be canceled). In that case, it will close the channel early. It is up
	// to the consumer to detect this situation and keep track which blocks
	// it has received and which it hasn't.
36
	GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block
37 38
}

39 40
// BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
41
// It uses an internal `datastore.Datastore` instance to store values.
42
type BlockService interface {
43 44 45
	io.Closer
	BlockGetter

Jeromy's avatar
Jeromy committed
46
	// Blockstore returns a reference to the underlying blockstore
47
	Blockstore() blockstore.Blockstore
Jeromy's avatar
Jeromy committed
48 49

	// Exchange returns a reference to the underlying exchange (usually bitswap)
50
	Exchange() exchange.Interface
Jeromy's avatar
Jeromy committed
51 52

	// AddBlock puts a given block to the underlying datastore
53
	AddBlock(o blocks.Block) error
Jeromy's avatar
Jeromy committed
54 55 56

	// AddBlocks adds a slice of blocks at the same time using batching
	// capabilities of the underlying datastore whenever possible.
57
	AddBlocks(bs []blocks.Block) error
Jeromy's avatar
Jeromy committed
58

59
	// DeleteBlock deletes the given block from the blockservice.
60
	DeleteBlock(o cid.Cid) error
61 62 63 64 65
}

type blockService struct {
	blockstore blockstore.Blockstore
	exchange   exchange.Interface
66 67 68
	// If checkFirst is true then first check that a block doesn't
	// already exist to avoid republishing the block on the exchange.
	checkFirst bool
69 70 71
}

// NewBlockService creates a BlockService with given datastore instance.
72
func New(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
Jeromy's avatar
Jeromy committed
73
	if rem == nil {
Jeromy's avatar
Jeromy committed
74
		log.Warning("blockservice running in local (offline) mode.")
Jeromy's avatar
Jeromy committed
75
	}
76

77 78 79
	return &blockService{
		blockstore: bs,
		exchange:   rem,
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
		checkFirst: true,
	}
}

// NewWriteThrough ceates a BlockService that guarantees writes will go
// through to the blockstore and are not skipped by cache checks.
func NewWriteThrough(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
	if rem == nil {
		log.Warning("blockservice running in local (offline) mode.")
	}

	return &blockService{
		blockstore: bs,
		exchange:   rem,
		checkFirst: false,
95
	}
96 97
}

Steven Allen's avatar
Steven Allen committed
98 99 100
// Blockstore returns the blockstore behind this blockservice.
func (s *blockService) Blockstore() blockstore.Blockstore {
	return s.blockstore
101 102
}

Steven Allen's avatar
Steven Allen committed
103 104 105
// Exchange returns the exchange behind this blockservice.
func (s *blockService) Exchange() exchange.Interface {
	return s.exchange
106 107
}

108 109 110 111 112
// NewSession creates a new session that allows for
// controlled exchange of wantlists to decrease the bandwidth overhead.
// If the current exchange is a SessionExchange, a new exchange
// session will be created. Otherwise, the current exchange will be used
// directly.
113
func NewSession(ctx context.Context, bs BlockService) *Session {
114 115 116
	exch := bs.Exchange()
	if sessEx, ok := exch.(exchange.SessionExchange); ok {
		ses := sessEx.NewSession(ctx)
117
		return &Session{
118 119 120
			ses:    ses,
			sessEx: sessEx,
			bs:     bs.Blockstore(),
121 122 123
		}
	}
	return &Session{
124
		ses: exch,
125
		bs:  bs.Blockstore(),
126 127 128
	}
}

129
// AddBlock adds a particular block to the service, Putting it into the datastore.
130
// TODO pass a context into this if the remote.HasBlock is going to remain here.
131
func (s *blockService) AddBlock(o blocks.Block) error {
Jeromy's avatar
Jeromy committed
132
	c := o.Cid()
133
	if s.checkFirst {
134 135
		if has, err := s.blockstore.Has(c); has || err != nil {
			return err
136
		}
137 138
	}

139 140
	if err := s.blockstore.Put(o); err != nil {
		return err
Jeromy's avatar
Jeromy committed
141
	}
Jeromy's avatar
Jeromy committed
142

143 144
	log.Event(context.TODO(), "BlockService.BlockAdded", c)

145
	if err := s.exchange.HasBlock(o); err != nil {
146
		log.Errorf("HasBlock: %s", err.Error())
147
	}
Jeromy's avatar
Jeromy committed
148

149
	return nil
150 151
}

152
func (s *blockService) AddBlocks(bs []blocks.Block) error {
153
	var toput []blocks.Block
154
	if s.checkFirst {
155
		toput = make([]blocks.Block, 0, len(bs))
156 157 158
		for _, b := range bs {
			has, err := s.blockstore.Has(b.Cid())
			if err != nil {
159
				return err
160
			}
161 162
			if !has {
				toput = append(toput, b)
163
			}
164
		}
165
	} else {
Jeromy's avatar
Jeromy committed
166
		toput = bs
167 168
	}

169
	err := s.blockstore.PutMany(toput)
170
	if err != nil {
171
		return err
172 173
	}

Jeromy's avatar
Jeromy committed
174
	for _, o := range toput {
175
		log.Event(context.TODO(), "BlockService.BlockAdded", o.Cid())
176
		if err := s.exchange.HasBlock(o); err != nil {
177
			log.Errorf("HasBlock: %s", err.Error())
178 179
		}
	}
180
	return nil
181 182
}

183 184
// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
185
func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
Jeromy's avatar
Jeromy committed
186
	log.Debugf("BlockService GetBlock: '%s'", c)
jbenet's avatar
jbenet committed
187

188
	var f func() exchange.Fetcher
189
	if s.exchange != nil {
190
		f = s.getExchange
191 192
	}

Jakub Sztandera's avatar
Jakub Sztandera committed
193
	return getBlock(ctx, c, s.blockstore, f) // hash security
194 195
}

196 197 198 199 200
func (s *blockService) getExchange() exchange.Fetcher {
	return s.exchange
}

func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, fget func() exchange.Fetcher) (blocks.Block, error) {
201
	block, err := bs.Get(c)
Jeromy's avatar
Jeromy committed
202
	if err == nil {
203
		return block, nil
Jeromy's avatar
Jeromy committed
204 205
	}

206 207 208
	if err == blockstore.ErrNotFound && fget != nil {
		f := fget() // Don't load the exchange until we have to

209 210
		// TODO be careful checking ErrNotFound. If the underlying
		// implementation changes, this will break.
211
		log.Debug("Blockservice: Searching bitswap")
212
		blk, err := f.GetBlock(ctx, c)
Jeromy's avatar
Jeromy committed
213
		if err != nil {
214 215 216
			if err == blockstore.ErrNotFound {
				return nil, ErrNotFound
			}
Jeromy's avatar
Jeromy committed
217 218
			return nil, err
		}
219
		log.Event(ctx, "BlockService.BlockFetched", c)
Jeromy's avatar
Jeromy committed
220
		return blk, nil
Jeromy's avatar
Jeromy committed
221 222
	}

223
	log.Debug("Blockservice GetBlock: Not found")
Jeromy's avatar
Jeromy committed
224
	if err == blockstore.ErrNotFound {
225
		return nil, ErrNotFound
226
	}
Jeromy's avatar
Jeromy committed
227 228

	return nil, err
229
}
Jeromy's avatar
Jeromy committed
230

231 232 233
// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
234
func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
235
	return getBlocks(ctx, ks, s.blockstore, s.getExchange) // hash security
236 237
}

238
func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, fget func() exchange.Fetcher) <-chan blocks.Block {
239
	out := make(chan blocks.Block)
240

241
	go func() {
242
		defer close(out)
243

244
		var misses []cid.Cid
Jeromy's avatar
Jeromy committed
245
		for _, c := range ks {
246
			hit, err := bs.Get(c)
247
			if err != nil {
248
				misses = append(misses, c)
249
				continue
250
			}
251 252 253 254 255
			select {
			case out <- hit:
			case <-ctx.Done():
				return
			}
256
		}
Jeromy's avatar
Jeromy committed
257

258 259 260 261
		if len(misses) == 0 {
			return
		}

262
		f := fget() // don't load exchange unless we have to
263
		rblocks, err := f.GetBlocks(ctx, misses)
Jeromy's avatar
Jeromy committed
264
		if err != nil {
265
			log.Debugf("Error with GetBlocks: %s", err)
Jeromy's avatar
Jeromy committed
266 267
			return
		}
268

269
		for b := range rblocks {
270
			log.Event(ctx, "BlockService.BlockFetched", b.Cid())
271 272 273 274 275
			select {
			case out <- b:
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
276
		}
277 278
	}()
	return out
Jeromy's avatar
Jeromy committed
279 280
}

Jeromy's avatar
Jeromy committed
281
// DeleteBlock deletes a block in the blockservice from the datastore
282
func (s *blockService) DeleteBlock(c cid.Cid) error {
283 284 285 286 287
	err := s.blockstore.DeleteBlock(c)
	if err == nil {
		log.Event(context.TODO(), "BlockService.BlockDeleted", c)
	}
	return err
Jeromy's avatar
Jeromy committed
288
}
289

290
func (s *blockService) Close() error {
291
	log.Debug("blockservice is shutting down...")
292
	return s.exchange.Close()
293
}
294

Jeromy's avatar
Jeromy committed
295
// Session is a helper type to provide higher level access to bitswap sessions
296
type Session struct {
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
	bs      blockstore.Blockstore
	ses     exchange.Fetcher
	sessEx  exchange.SessionExchange
	sessCtx context.Context
	lk      sync.Mutex
}

func (s *Session) getSession() exchange.Fetcher {
	s.lk.Lock()
	defer s.lk.Unlock()
	if s.ses == nil {
		s.ses = s.sessEx.NewSession(s.sessCtx)
	}

	return s.ses
312 313
}

Jeromy's avatar
Jeromy committed
314
// GetBlock gets a block in the context of a request session
315
func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
316
	return getBlock(ctx, c, s.bs, s.getSession) // hash security
317 318
}

Jeromy's avatar
Jeromy committed
319
// GetBlocks gets blocks in the context of a request session
320
func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
321
	return getBlocks(ctx, ks, s.bs, s.getSession) // hash security
322
}
Steven Allen's avatar
Steven Allen committed
323 324

var _ BlockGetter = (*Session)(nil)