bitswap.go 15.4 KB
Newer Older
1
// Package bitswap implements the IPFS exchange interface with the BitSwap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2
// bilateral exchange protocol.
3 4 5
package bitswap

import (
6
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
	"errors"
8
	"sync"
Jeromy's avatar
Jeromy committed
9 10
	"time"

11
	bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
12
	delay "github.com/ipfs/go-ipfs-delay"
13

Jeromy's avatar
Jeromy committed
14
	decision "github.com/ipfs/go-bitswap/decision"
15
	bsgetter "github.com/ipfs/go-bitswap/getter"
Jeromy's avatar
Jeromy committed
16
	bsmsg "github.com/ipfs/go-bitswap/message"
17
	bsmq "github.com/ipfs/go-bitswap/messagequeue"
Jeromy's avatar
Jeromy committed
18
	bsnet "github.com/ipfs/go-bitswap/network"
19
	notifications "github.com/ipfs/go-bitswap/notifications"
20
	bspm "github.com/ipfs/go-bitswap/peermanager"
21
	bspqm "github.com/ipfs/go-bitswap/providerquerymanager"
22
	bssession "github.com/ipfs/go-bitswap/session"
23
	bssm "github.com/ipfs/go-bitswap/sessionmanager"
24
	bsspm "github.com/ipfs/go-bitswap/sessionpeermanager"
25
	bswm "github.com/ipfs/go-bitswap/wantmanager"
Jeromy's avatar
Jeromy committed
26 27 28 29 30 31 32 33
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	blockstore "github.com/ipfs/go-ipfs-blockstore"
	exchange "github.com/ipfs/go-ipfs-exchange-interface"
	logging "github.com/ipfs/go-log"
	metrics "github.com/ipfs/go-metrics-interface"
	process "github.com/jbenet/goprocess"
	procctx "github.com/jbenet/goprocess/context"
Raúl Kripalani's avatar
Raúl Kripalani committed
34
	peer "github.com/libp2p/go-libp2p-core/peer"
35 36
)

Jeromy's avatar
Jeromy committed
37
var log = logging.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38

39 40
var _ exchange.SessionExchange = (*Bitswap)(nil)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
41
const (
42
	// these requests take at _least_ two minutes at the moment.
43 44
	provideTimeout         = time.Minute * 3
	defaultProvSearchDelay = time.Second
Jeromy's avatar
Jeromy committed
45
)
46

Jeromy's avatar
Jeromy committed
47
var (
48 49 50 51
	// HasBlockBufferSize is the buffer size of the channel for new blocks
	// that need to be provided. They should get pulled over by the
	// provideCollector even before they are actually provided.
	// TODO: Does this need to be this large givent that?
52 53
	HasBlockBufferSize    = 256
	provideKeysBufferSize = 2048
Steven Allen's avatar
Steven Allen committed
54
	provideWorkerMax      = 6
55 56 57

	// the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size
	metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
58
)
Jeromy's avatar
Jeromy committed
59

60 61 62 63 64 65 66 67 68 69 70
// Option defines the functional option type that can be used to configure
// bitswap instances
type Option func(*Bitswap)

// ProvideEnabled is an option for enabling/disabling provide announcements
func ProvideEnabled(enabled bool) Option {
	return func(bs *Bitswap) {
		bs.provideEnabled = enabled
	}
}

71 72 73 74 75 76 77 78 79 80 81 82 83 84
// ProviderSearchDelay overwrites the global provider search delay
func ProviderSearchDelay(newProvSearchDelay time.Duration) Option {
	return func(bs *Bitswap) {
		bs.provSearchDelay = newProvSearchDelay
	}
}

// RebroadcastDelay overwrites the global provider rebroadcast delay
func RebroadcastDelay(newRebroadcastDelay delay.D) Option {
	return func(bs *Bitswap) {
		bs.rebroadcastDelay = newRebroadcastDelay
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
85 86
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
87
// delegate. Runs until context is cancelled or bitswap.Close is called.
Łukasz Magiera's avatar
Łukasz Magiera committed
88
func New(parent context.Context, network bsnet.BitSwapNetwork,
89
	bstore blockstore.Blockstore, options ...Option) exchange.Interface {
90

91 92
	// important to use provided parent context (since it may include important
	// loggable data). It's probably not a good idea to allow bitswap to be
93
	// coupled to the concerns of the ipfs daemon in this way.
94 95 96 97
	//
	// FIXME(btc) Now that bitswap manages itself using a process, it probably
	// shouldn't accept a context anymore. Clients should probably use Close()
	// exclusively. We should probably find another way to share logging data
98
	ctx, cancelFunc := context.WithCancel(parent)
99
	ctx = metrics.CtxSubScope(ctx, "bitswap")
100
	dupHist := metrics.NewCtx(ctx, "recv_dup_blocks_bytes", "Summary of duplicate"+
101
		" data blocks recived").Histogram(metricsBuckets)
102
	allHist := metrics.NewCtx(ctx, "recv_all_blocks_bytes", "Summary of all"+
103
		" data blocks recived").Histogram(metricsBuckets)
104

105 106 107
	sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
		" this bitswap").Histogram(metricsBuckets)

108 109 110 111
	px := process.WithTeardown(func() error {
		return nil
	})

112 113
	peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
		return bsmq.New(ctx, p, network)
114 115
	}

116
	wm := bswm.New(ctx, bspm.New(ctx, peerQueueFactory))
117 118
	pqm := bspqm.New(ctx, network)

119
	sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter,
120
		notif notifications.PubSub,
121 122
		provSearchDelay time.Duration,
		rebroadcastDelay delay.D) bssm.Session {
123
		return bssession.New(ctx, id, wm, pm, srs, notif, provSearchDelay, rebroadcastDelay)
124 125
	}
	sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager {
126
		return bsspm.New(ctx, id, network.ConnectionManager(), pqm)
127
	}
128 129 130
	sessionRequestSplitterFactory := func(ctx context.Context) bssession.RequestSplitter {
		return bssrs.New(ctx)
	}
131
	notif := notifications.New()
132

133
	engine := decision.NewEngine(ctx, bstore, network.ConnectionManager()) // TODO close the engine with Close() method
134
	bs := &Bitswap{
135
		blockstore:       bstore,
136
		engine:           engine,
137 138 139 140 141 142
		network:          network,
		process:          px,
		newBlocks:        make(chan cid.Cid, HasBlockBufferSize),
		provideKeys:      make(chan cid.Cid, provideKeysBufferSize),
		wm:               wm,
		pqm:              pqm,
143 144
		sm:               bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory, notif),
		notif:            notif,
145 146 147 148 149 150 151
		counters:         new(counters),
		dupMetric:        dupHist,
		allMetric:        allHist,
		sentHistogram:    sentHistogram,
		provideEnabled:   true,
		provSearchDelay:  defaultProvSearchDelay,
		rebroadcastDelay: delay.Fixed(time.Minute),
152 153 154 155 156
	}

	// apply functional options before starting and running bitswap
	for _, option := range options {
		option(bs)
157
	}
158 159

	bs.wm.Startup()
160
	bs.pqm.Startup()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
161
	network.SetDelegate(bs)
162

163
	// Start up bitswaps async worker routines
164
	bs.startWorkers(ctx, px)
165
	engine.StartWorkers(ctx, px)
166 167 168 169 170 171

	// bind the context and process.
	// do it over here to avoid closing before all setup is done.
	go func() {
		<-px.Closing() // process closes first
		cancelFunc()
172
		notif.Shutdown()
173 174 175
	}()
	procctx.CloseAfterContext(px, ctx) // parent cancelled first

176 177 178
	return bs
}

179 180
// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
181
	// the wantlist tracks global wants for bitswap
182
	wm *bswm.WantManager
183

184 185 186
	// the provider query manager manages requests to find providers
	pqm *bspqm.ProviderQueryManager

187 188
	// the engine is the bit of logic that decides who to send which blocks to
	engine *decision.Engine
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
189

190 191
	// network delivers messages on behalf of the session
	network bsnet.BitSwapNetwork
192 193 194 195 196

	// blockstore is the local database
	// NB: ensure threadsafety
	blockstore blockstore.Blockstore

197 198 199
	// manages channels of outgoing blocks for sessions
	notif notifications.PubSub

200 201 202
	// newBlocks is a channel for newly added blocks to be provided to the
	// network.  blocks pushed down this channel get buffered and fed to the
	// provideKeys channel later on to avoid too much network activity
203
	newBlocks chan cid.Cid
204
	// provideKeys directly feeds provide workers
205
	provideKeys chan cid.Cid
206

207 208 209
	process process.Process

	// Counters for various statistics
210 211
	counterLk sync.Mutex
	counters  *counters
212 213

	// Metrics interface metrics
214 215 216
	dupMetric     metrics.Histogram
	allMetric     metrics.Histogram
	sentHistogram metrics.Histogram
Jeromy's avatar
Jeromy committed
217

218 219
	// the sessionmanager manages tracking sessions
	sm *bssm.SessionManager
220 221 222

	// whether or not to make provide announcements
	provideEnabled bool
223 224 225 226 227 228

	// how long to wait before looking for providers in a session
	provSearchDelay time.Duration

	// how often to rebroadcast providing requests to find more optimized providers
	rebroadcastDelay delay.D
229 230
}

231 232 233 234 235 236 237 238 239 240
type counters struct {
	blocksRecvd    uint64
	dupBlocksRecvd uint64
	dupDataRecvd   uint64
	blocksSent     uint64
	dataSent       uint64
	dataRecvd      uint64
	messagesRecvd  uint64
}

241
// GetBlock attempts to retrieve a particular block from peers within the
242
// deadline enforced by the context.
243
func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
244
	return bsgetter.SyncGetBlock(parent, k, bs.GetBlocks)
245 246
}

247 248
// WantlistForPeer returns the currently understood list of blocks requested by a
// given peer.
249 250
func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid {
	var out []cid.Cid
251
	for _, e := range bs.engine.WantlistForPeer(p) {
252
		out = append(out, e.Cid)
253 254 255 256
	}
	return out
}

257 258
// LedgerForPeer returns aggregated data about blocks swapped and communication
// with a given peer.
259 260 261 262
func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
	return bs.engine.LedgerForPeer(p)
}

263 264 265 266 267 268 269
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
270
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
271
	session := bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
272
	return session.GetBlocks(ctx, keys)
Jeromy's avatar
Jeromy committed
273 274
}

Łukasz Magiera's avatar
Łukasz Magiera committed
275
// HasBlock announces the existence of a block to this bitswap service. The
276
// service will potentially notify its peers.
277
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
278
	return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk})
279 280 281 282 283 284
}

// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
285
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block) error {
286 287 288 289 290
	select {
	case <-bs.process.Closing():
		return errors.New("bitswap is closed")
	default:
	}
291

292 293 294 295 296 297 298
	wanted := blks

	// If blocks came from the network
	if from != "" {
		// Split blocks into wanted blocks vs duplicates
		wanted = make([]blocks.Block, 0, len(blks))
		for _, b := range blks {
299
			if bs.sm.IsWanted(b.Cid()) {
300 301 302 303 304 305 306 307 308
				wanted = append(wanted, b)
			} else {
				log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
			}
		}
	}

	// Put wanted blocks into blockstore
	err := bs.blockstore.PutMany(wanted)
309
	if err != nil {
310
		log.Errorf("Error writing %d blocks to datastore: %s", len(wanted), err)
311 312
		return err
	}
313

314 315 316 317 318
	// NOTE: There exists the possiblity for a race condition here.  If a user
	// creates a node, then adds it to the dagservice while another goroutine
	// is waiting on a GetBlock for that object, they will receive a reference
	// to the same node. We should address this soon, but i'm not going to do
	// it now as it requires more thought and isnt causing immediate problems.
Jeromy's avatar
Jeromy committed
319

320 321 322 323 324 325 326 327 328 329 330 331 332 333
	allKs := make([]cid.Cid, 0, len(blks))
	for _, b := range blks {
		allKs = append(allKs, b.Cid())
	}

	wantedKs := allKs
	if len(blks) != len(wanted) {
		wantedKs = make([]cid.Cid, 0, len(wanted))
		for _, b := range wanted {
			wantedKs = append(wantedKs, b.Cid())
		}
	}

	// Send all block keys (including duplicates) to any sessions that want them.
334
	// (The duplicates are needed by sessions for accounting purposes)
335
	bs.sm.ReceiveFrom(from, allKs)
336

337 338
	// Send wanted block keys to decision engine
	bs.engine.AddBlocks(wantedKs)
339

340 341 342 343 344 345 346
	// Publish the block to any Bitswap clients that had requested blocks.
	// (the sessions use this pubsub mechanism to inform clients of received
	// blocks)
	for _, b := range wanted {
		bs.notif.Publish(b)
	}

347
	// If the reprovider is enabled, send wanted blocks to reprovider
348
	if bs.provideEnabled {
349
		for _, k := range wantedKs {
350
			select {
351
			case bs.newBlocks <- k:
352 353 354 355
				// send block off to be reprovided
			case <-bs.process.Closing():
				return bs.process.Close()
			}
356
		}
357
	}
358

359 360 361 362 363 364
	if from != "" {
		for _, b := range wanted {
			log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid())
		}
	}

365
	return nil
366 367
}

368 369
// ReceiveMessage is called by the network interface when a new message is
// received.
370
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
Steven Allen's avatar
Steven Allen committed
371 372 373
	bs.counterLk.Lock()
	bs.counters.messagesRecvd++
	bs.counterLk.Unlock()
Jeromy's avatar
Jeromy committed
374

Jeromy's avatar
Jeromy committed
375 376
	// This call records changes to wantlists, blocks received,
	// and number of bytes transfered.
377
	bs.engine.MessageReceived(ctx, p, incoming)
Jeromy's avatar
Jeromy committed
378 379
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
380

381 382 383
	iblocks := incoming.Blocks()

	if len(iblocks) == 0 {
384 385 386
		return
	}

387 388 389
	bs.updateReceiveCounters(iblocks)
	for _, b := range iblocks {
		log.Debugf("[recv] block; cid=%s, peer=%s", b.Cid(), p)
390
	}
391

392
	// Process blocks
393
	err := bs.receiveBlocksFrom(ctx, p, iblocks)
394
	if err != nil {
395
		log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
396
		return
397
	}
398 399 400 401 402 403 404
}

func (bs *Bitswap) updateReceiveCounters(blocks []blocks.Block) {
	// Check which blocks are in the datastore
	// (Note: any errors from the blockstore are simply logged out in
	// blockstoreHas())
	blocksHas := bs.blockstoreHas(blocks)
405

406 407 408
	bs.counterLk.Lock()
	defer bs.counterLk.Unlock()

409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426
	// Do some accounting for each block
	for i, b := range blocks {
		has := blocksHas[i]

		blkLen := len(b.RawData())
		bs.allMetric.Observe(float64(blkLen))
		if has {
			bs.dupMetric.Observe(float64(blkLen))
		}

		c := bs.counters

		c.blocksRecvd++
		c.dataRecvd += uint64(blkLen)
		if has {
			c.dupBlocksRecvd++
			c.dupDataRecvd += uint64(blkLen)
		}
427 428 429
	}
}

430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452
func (bs *Bitswap) blockstoreHas(blks []blocks.Block) []bool {
	res := make([]bool, len(blks))

	wg := sync.WaitGroup{}
	for i, block := range blks {
		wg.Add(1)
		go func(i int, b blocks.Block) {
			defer wg.Done()

			has, err := bs.blockstore.Has(b.Cid())
			if err != nil {
				log.Infof("blockstore.Has error: %s", err)
				has = false
			}

			res[i] = has
		}(i, block)
	}
	wg.Wait()

	return res
}

453 454
// PeerConnected is called by the network interface
// when a peer initiates a new connection to bitswap.
455
func (bs *Bitswap) PeerConnected(p peer.ID) {
456
	bs.wm.Connected(p)
457
	bs.engine.PeerConnected(p)
458 459
}

460 461
// PeerDisconnected is called by the network interface when a peer
// closes a connection
462
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
463
	bs.wm.Disconnected(p)
464
	bs.engine.PeerDisconnected(p)
465 466
}

467 468
// ReceiveError is called by the network interface when an error happens
// at the network layer. Currently just logs error.
469
func (bs *Bitswap) ReceiveError(err error) {
470
	log.Infof("Bitswap ReceiveError: %s", err)
471 472
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
473 474
}

475
// Close is called to shutdown Bitswap
476
func (bs *Bitswap) Close() error {
477
	return bs.process.Close()
478
}
479

480
// GetWantlist returns the current local wantlist.
481
func (bs *Bitswap) GetWantlist() []cid.Cid {
482
	entries := bs.wm.CurrentWants()
483
	out := make([]cid.Cid, 0, len(entries))
484
	for _, e := range entries {
485
		out = append(out, e.Cid)
486 487 488
	}
	return out
}
489

490
// IsOnline is needed to match go-ipfs-exchange-interface
491 492 493
func (bs *Bitswap) IsOnline() bool {
	return true
}
494

495 496 497 498 499 500
// NewSession generates a new Bitswap session. You should use this, rather
// that calling Bitswap.GetBlocks, any time you intend to do several related
// block requests in a row. The session returned will have it's own GetBlocks
// method, but the session will use the fact that the requests are related to
// be more efficient in its requests to peers. If you are using a session
// from go-blockservice, it will create a bitswap session automatically.
501
func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
502
	return bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
503
}