blockservice.go 7.84 KB
Newer Older
1 2 3
// package blockservice implements a BlockService interface that provides
// a single GetBlock/AddBlock interface that seamlessly retrieves data either
// locally or from a remote peer through the exchange.
4 5 6
package blockservice

import (
7
	"context"
8
	"errors"
Jeromy's avatar
Jeromy committed
9
	"fmt"
10
	"io"
11

12
	exchange "github.com/ipfs/go-ipfs/exchange"
13
	bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"
Jeromy's avatar
Jeromy committed
14

Steven Allen's avatar
Steven Allen committed
15
	logging "gx/ipfs/QmRb5jh8z2E8hMGN2tkvs1yHynUanqnZ3UeKwgN1i9P1F8/go-log"
16
	blockstore "gx/ipfs/QmTVDM4LCSUMFNQzbDLL9zQwp8usE6QHymFdh3h8vL9v6b/go-ipfs-blockstore"
Steven Allen's avatar
Steven Allen committed
17 18
	cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
	blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
19 20
)

Jeromy's avatar
Jeromy committed
21
var log = logging.Logger("blockservice")
22

23
var ErrNotFound = errors.New("blockservice: key not found")
Jeromy's avatar
Jeromy committed
24

Steven Allen's avatar
Steven Allen committed
25 26
// BlockGetter is the common interface shared between blockservice sessions and
// the blockservice.
27 28 29 30 31 32 33 34 35 36 37 38 39 40
type BlockGetter interface {
	// GetBlock gets the requested block.
	GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)

	// GetBlocks does a batch request for the given cids, returning blocks as
	// they are found, in no particular order.
	//
	// It may not be able to find all requested blocks (or the context may
	// be canceled). In that case, it will close the channel early. It is up
	// to the consumer to detect this situation and keep track which blocks
	// it has received and which it hasn't.
	GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block
}

41 42
// BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
43
// It uses an internal `datastore.Datastore` instance to store values.
44
type BlockService interface {
45 46 47
	io.Closer
	BlockGetter

Jeromy's avatar
Jeromy committed
48
	// Blockstore returns a reference to the underlying blockstore
49
	Blockstore() blockstore.Blockstore
Jeromy's avatar
Jeromy committed
50 51

	// Exchange returns a reference to the underlying exchange (usually bitswap)
52
	Exchange() exchange.Interface
Jeromy's avatar
Jeromy committed
53 54

	// AddBlock puts a given block to the underlying datastore
55
	AddBlock(o blocks.Block) error
Jeromy's avatar
Jeromy committed
56 57 58

	// AddBlocks adds a slice of blocks at the same time using batching
	// capabilities of the underlying datastore whenever possible.
59
	AddBlocks(bs []blocks.Block) error
Jeromy's avatar
Jeromy committed
60

61 62
	// DeleteBlock deletes the given block from the blockservice.
	DeleteBlock(o *cid.Cid) error
63 64 65 66 67
}

type blockService struct {
	blockstore blockstore.Blockstore
	exchange   exchange.Interface
68 69 70
	// If checkFirst is true then first check that a block doesn't
	// already exist to avoid republishing the block on the exchange.
	checkFirst bool
71 72 73
}

// NewBlockService creates a BlockService with given datastore instance.
74
func New(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
Jeromy's avatar
Jeromy committed
75
	if rem == nil {
Jeromy's avatar
Jeromy committed
76
		log.Warning("blockservice running in local (offline) mode.")
Jeromy's avatar
Jeromy committed
77
	}
78

79 80 81
	return &blockService{
		blockstore: bs,
		exchange:   rem,
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
		checkFirst: true,
	}
}

// NewWriteThrough ceates a BlockService that guarantees writes will go
// through to the blockstore and are not skipped by cache checks.
func NewWriteThrough(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
	if rem == nil {
		log.Warning("blockservice running in local (offline) mode.")
	}

	return &blockService{
		blockstore: bs,
		exchange:   rem,
		checkFirst: false,
97
	}
98 99
}

Steven Allen's avatar
Steven Allen committed
100 101 102
// Blockstore returns the blockstore behind this blockservice.
func (s *blockService) Blockstore() blockstore.Blockstore {
	return s.blockstore
103 104
}

Steven Allen's avatar
Steven Allen committed
105 106 107
// Exchange returns the exchange behind this blockservice.
func (s *blockService) Exchange() exchange.Interface {
	return s.exchange
108 109
}

110 111
// NewSession creates a bitswap session that allows for controlled exchange of
// wantlists to decrease the bandwidth overhead.
112 113 114
func NewSession(ctx context.Context, bs BlockService) *Session {
	exchange := bs.Exchange()
	if bswap, ok := exchange.(*bitswap.Bitswap); ok {
115 116 117
		ses := bswap.NewSession(ctx)
		return &Session{
			ses: ses,
118
			bs:  bs.Blockstore(),
119 120 121
		}
	}
	return &Session{
122 123
		ses: exchange,
		bs:  bs.Blockstore(),
124 125 126
	}
}

127
// AddBlock adds a particular block to the service, Putting it into the datastore.
128
// TODO pass a context into this if the remote.HasBlock is going to remain here.
129
func (s *blockService) AddBlock(o blocks.Block) error {
Jeromy's avatar
Jeromy committed
130
	c := o.Cid()
131
	if s.checkFirst {
132 133
		if has, err := s.blockstore.Has(c); has || err != nil {
			return err
134
		}
135 136
	}

137 138
	if err := s.blockstore.Put(o); err != nil {
		return err
Jeromy's avatar
Jeromy committed
139
	}
Jeromy's avatar
Jeromy committed
140

141
	if err := s.exchange.HasBlock(o); err != nil {
142
		// TODO(#4623): really an error?
143
		return errors.New("blockservice is closed")
144
	}
Jeromy's avatar
Jeromy committed
145

146
	return nil
147 148
}

149
func (s *blockService) AddBlocks(bs []blocks.Block) error {
150
	var toput []blocks.Block
151
	if s.checkFirst {
152
		toput = make([]blocks.Block, 0, len(bs))
153 154 155
		for _, b := range bs {
			has, err := s.blockstore.Has(b.Cid())
			if err != nil {
156
				return err
157
			}
158 159
			if !has {
				toput = append(toput, b)
160
			}
161
		}
162
	} else {
Jeromy's avatar
Jeromy committed
163
		toput = bs
164 165
	}

166
	err := s.blockstore.PutMany(toput)
167
	if err != nil {
168
		return err
169 170
	}

Jeromy's avatar
Jeromy committed
171
	for _, o := range toput {
172
		if err := s.exchange.HasBlock(o); err != nil {
173
			// TODO(#4623): Should this really *return*?
174
			return fmt.Errorf("blockservice is closed (%s)", err)
175 176
		}
	}
177
	return nil
178 179
}

180 181
// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
182
func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
Jeromy's avatar
Jeromy committed
183
	log.Debugf("BlockService GetBlock: '%s'", c)
jbenet's avatar
jbenet committed
184

185 186 187 188 189 190 191 192 193 194
	var f exchange.Fetcher
	if s.exchange != nil {
		f = s.exchange
	}

	return getBlock(ctx, c, s.blockstore, f)
}

func getBlock(ctx context.Context, c *cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) (blocks.Block, error) {
	block, err := bs.Get(c)
Jeromy's avatar
Jeromy committed
195
	if err == nil {
196
		return block, nil
Jeromy's avatar
Jeromy committed
197 198
	}

199
	if err == blockstore.ErrNotFound && f != nil {
200 201
		// TODO be careful checking ErrNotFound. If the underlying
		// implementation changes, this will break.
202
		log.Debug("Blockservice: Searching bitswap")
203
		blk, err := f.GetBlock(ctx, c)
Jeromy's avatar
Jeromy committed
204
		if err != nil {
205 206 207
			if err == blockstore.ErrNotFound {
				return nil, ErrNotFound
			}
Jeromy's avatar
Jeromy committed
208 209 210
			return nil, err
		}
		return blk, nil
Jeromy's avatar
Jeromy committed
211 212
	}

213
	log.Debug("Blockservice GetBlock: Not found")
Jeromy's avatar
Jeromy committed
214
	if err == blockstore.ErrNotFound {
215
		return nil, ErrNotFound
216
	}
Jeromy's avatar
Jeromy committed
217 218

	return nil, err
219
}
Jeromy's avatar
Jeromy committed
220

221 222 223
// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
224
func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
225 226 227 228
	return getBlocks(ctx, ks, s.blockstore, s.exchange)
}

func getBlocks(ctx context.Context, ks []*cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) <-chan blocks.Block {
229
	out := make(chan blocks.Block)
230
	go func() {
231
		defer close(out)
232
		var misses []*cid.Cid
Jeromy's avatar
Jeromy committed
233
		for _, c := range ks {
234
			hit, err := bs.Get(c)
235
			if err != nil {
236
				misses = append(misses, c)
237
				continue
238
			}
239
			log.Debug("Blockservice: Got data in datastore")
240 241 242 243 244
			select {
			case out <- hit:
			case <-ctx.Done():
				return
			}
245
		}
Jeromy's avatar
Jeromy committed
246

247 248 249 250
		if len(misses) == 0 {
			return
		}

251
		rblocks, err := f.GetBlocks(ctx, misses)
Jeromy's avatar
Jeromy committed
252
		if err != nil {
253
			log.Debugf("Error with GetBlocks: %s", err)
Jeromy's avatar
Jeromy committed
254 255
			return
		}
256

257 258 259 260 261 262
		for b := range rblocks {
			select {
			case out <- b:
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
263
		}
264 265
	}()
	return out
Jeromy's avatar
Jeromy committed
266 267
}

Jeromy's avatar
Jeromy committed
268
// DeleteBlock deletes a block in the blockservice from the datastore
269 270
func (s *blockService) DeleteBlock(c *cid.Cid) error {
	return s.blockstore.DeleteBlock(c)
Jeromy's avatar
Jeromy committed
271
}
272

273
func (s *blockService) Close() error {
274
	log.Debug("blockservice is shutting down...")
275
	return s.exchange.Close()
276
}
277

Jeromy's avatar
Jeromy committed
278
// Session is a helper type to provide higher level access to bitswap sessions
279 280 281 282 283
type Session struct {
	bs  blockstore.Blockstore
	ses exchange.Fetcher
}

Jeromy's avatar
Jeromy committed
284
// GetBlock gets a block in the context of a request session
285 286 287 288
func (s *Session) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
	return getBlock(ctx, c, s.bs, s.ses)
}

Jeromy's avatar
Jeromy committed
289
// GetBlocks gets blocks in the context of a request session
290 291 292
func (s *Session) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
	return getBlocks(ctx, ks, s.bs, s.ses)
}
Steven Allen's avatar
Steven Allen committed
293 294

var _ BlockGetter = (*Session)(nil)