bitswap.go 15.4 KB
Newer Older
1
// Package bitswap implements the IPFS exchange interface with the BitSwap
Brian Tiger Chow's avatar
Brian Tiger Chow committed
2
// bilateral exchange protocol.
3 4 5
package bitswap

import (
6
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
	"errors"
8
	"sync"
Jeromy's avatar
Jeromy committed
9 10
	"time"

11
	bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
12
	delay "github.com/ipfs/go-ipfs-delay"
13

Jeromy's avatar
Jeromy committed
14
	decision "github.com/ipfs/go-bitswap/decision"
15
	bsgetter "github.com/ipfs/go-bitswap/getter"
Jeromy's avatar
Jeromy committed
16
	bsmsg "github.com/ipfs/go-bitswap/message"
17
	bsmq "github.com/ipfs/go-bitswap/messagequeue"
Jeromy's avatar
Jeromy committed
18
	bsnet "github.com/ipfs/go-bitswap/network"
19
	notifications "github.com/ipfs/go-bitswap/notifications"
20
	bspm "github.com/ipfs/go-bitswap/peermanager"
21
	bspqm "github.com/ipfs/go-bitswap/providerquerymanager"
22
	bssession "github.com/ipfs/go-bitswap/session"
23
	bssm "github.com/ipfs/go-bitswap/sessionmanager"
24
	bsspm "github.com/ipfs/go-bitswap/sessionpeermanager"
25
	bswm "github.com/ipfs/go-bitswap/wantmanager"
Jeromy's avatar
Jeromy committed
26 27 28 29 30 31 32 33
	blocks "github.com/ipfs/go-block-format"
	cid "github.com/ipfs/go-cid"
	blockstore "github.com/ipfs/go-ipfs-blockstore"
	exchange "github.com/ipfs/go-ipfs-exchange-interface"
	logging "github.com/ipfs/go-log"
	metrics "github.com/ipfs/go-metrics-interface"
	process "github.com/jbenet/goprocess"
	procctx "github.com/jbenet/goprocess/context"
Raúl Kripalani's avatar
Raúl Kripalani committed
34
	peer "github.com/libp2p/go-libp2p-core/peer"
35 36
)

Jeromy's avatar
Jeromy committed
37
var log = logging.Logger("bitswap")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38

39 40
var _ exchange.SessionExchange = (*Bitswap)(nil)

Brian Tiger Chow's avatar
Brian Tiger Chow committed
41
const (
42
	// these requests take at _least_ two minutes at the moment.
43 44
	provideTimeout         = time.Minute * 3
	defaultProvSearchDelay = time.Second
Jeromy's avatar
Jeromy committed
45
)
46

Jeromy's avatar
Jeromy committed
47
var (
48 49 50 51
	// HasBlockBufferSize is the buffer size of the channel for new blocks
	// that need to be provided. They should get pulled over by the
	// provideCollector even before they are actually provided.
	// TODO: Does this need to be this large givent that?
52 53
	HasBlockBufferSize    = 256
	provideKeysBufferSize = 2048
Steven Allen's avatar
Steven Allen committed
54
	provideWorkerMax      = 6
55 56 57

	// the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size
	metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
58
)
Jeromy's avatar
Jeromy committed
59

60 61 62 63 64 65 66 67 68 69 70
// Option defines the functional option type that can be used to configure
// bitswap instances
type Option func(*Bitswap)

// ProvideEnabled is an option for enabling/disabling provide announcements
func ProvideEnabled(enabled bool) Option {
	return func(bs *Bitswap) {
		bs.provideEnabled = enabled
	}
}

71 72 73 74 75 76 77 78 79 80 81 82 83 84
// ProviderSearchDelay overwrites the global provider search delay
func ProviderSearchDelay(newProvSearchDelay time.Duration) Option {
	return func(bs *Bitswap) {
		bs.provSearchDelay = newProvSearchDelay
	}
}

// RebroadcastDelay overwrites the global provider rebroadcast delay
func RebroadcastDelay(newRebroadcastDelay delay.D) Option {
	return func(bs *Bitswap) {
		bs.rebroadcastDelay = newRebroadcastDelay
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
85 86
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
87
// delegate. Runs until context is cancelled or bitswap.Close is called.
Łukasz Magiera's avatar
Łukasz Magiera committed
88
func New(parent context.Context, network bsnet.BitSwapNetwork,
89
	bstore blockstore.Blockstore, options ...Option) exchange.Interface {
90

91 92
	// important to use provided parent context (since it may include important
	// loggable data). It's probably not a good idea to allow bitswap to be
93
	// coupled to the concerns of the ipfs daemon in this way.
94 95 96 97
	//
	// FIXME(btc) Now that bitswap manages itself using a process, it probably
	// shouldn't accept a context anymore. Clients should probably use Close()
	// exclusively. We should probably find another way to share logging data
98
	ctx, cancelFunc := context.WithCancel(parent)
99
	ctx = metrics.CtxSubScope(ctx, "bitswap")
100
	dupHist := metrics.NewCtx(ctx, "recv_dup_blocks_bytes", "Summary of duplicate"+
101
		" data blocks recived").Histogram(metricsBuckets)
102
	allHist := metrics.NewCtx(ctx, "recv_all_blocks_bytes", "Summary of all"+
103
		" data blocks recived").Histogram(metricsBuckets)
104

105 106 107
	sentHistogram := metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by"+
		" this bitswap").Histogram(metricsBuckets)

108 109 110 111
	px := process.WithTeardown(func() error {
		return nil
	})

112 113
	peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
		return bsmq.New(ctx, p, network)
114 115
	}

116
	wm := bswm.New(ctx, bspm.New(ctx, peerQueueFactory))
117 118
	pqm := bspqm.New(ctx, network)

119
	sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter,
120
		notif notifications.PubSub,
121 122
		provSearchDelay time.Duration,
		rebroadcastDelay delay.D) bssm.Session {
123
		return bssession.New(ctx, id, wm, pm, srs, notif, provSearchDelay, rebroadcastDelay)
124 125
	}
	sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager {
126
		return bsspm.New(ctx, id, network.ConnectionManager(), pqm)
127
	}
128 129 130
	sessionRequestSplitterFactory := func(ctx context.Context) bssession.RequestSplitter {
		return bssrs.New(ctx)
	}
131
	notif := notifications.New()
132

133
	bs := &Bitswap{
134 135 136 137 138 139 140 141
		blockstore:       bstore,
		engine:           decision.NewEngine(ctx, bstore, network.ConnectionManager()), // TODO close the engine with Close() method
		network:          network,
		process:          px,
		newBlocks:        make(chan cid.Cid, HasBlockBufferSize),
		provideKeys:      make(chan cid.Cid, provideKeysBufferSize),
		wm:               wm,
		pqm:              pqm,
142 143
		sm:               bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory, notif),
		notif:            notif,
144 145 146 147 148 149 150
		counters:         new(counters),
		dupMetric:        dupHist,
		allMetric:        allHist,
		sentHistogram:    sentHistogram,
		provideEnabled:   true,
		provSearchDelay:  defaultProvSearchDelay,
		rebroadcastDelay: delay.Fixed(time.Minute),
151 152 153 154 155
	}

	// apply functional options before starting and running bitswap
	for _, option := range options {
		option(bs)
156
	}
157 158

	bs.wm.Startup()
159
	bs.pqm.Startup()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
160
	network.SetDelegate(bs)
161

162
	// Start up bitswaps async worker routines
163
	bs.startWorkers(ctx, px)
164 165 166 167 168 169

	// bind the context and process.
	// do it over here to avoid closing before all setup is done.
	go func() {
		<-px.Closing() // process closes first
		cancelFunc()
170
		notif.Shutdown()
171 172 173
	}()
	procctx.CloseAfterContext(px, ctx) // parent cancelled first

174 175 176
	return bs
}

177 178
// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
179
	// the wantlist tracks global wants for bitswap
180
	wm *bswm.WantManager
181

182 183 184
	// the provider query manager manages requests to find providers
	pqm *bspqm.ProviderQueryManager

185 186
	// the engine is the bit of logic that decides who to send which blocks to
	engine *decision.Engine
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187

188 189
	// network delivers messages on behalf of the session
	network bsnet.BitSwapNetwork
190 191 192 193 194

	// blockstore is the local database
	// NB: ensure threadsafety
	blockstore blockstore.Blockstore

195 196 197
	// manages channels of outgoing blocks for sessions
	notif notifications.PubSub

198 199 200
	// newBlocks is a channel for newly added blocks to be provided to the
	// network.  blocks pushed down this channel get buffered and fed to the
	// provideKeys channel later on to avoid too much network activity
201
	newBlocks chan cid.Cid
202
	// provideKeys directly feeds provide workers
203
	provideKeys chan cid.Cid
204

205 206 207
	process process.Process

	// Counters for various statistics
208 209
	counterLk sync.Mutex
	counters  *counters
210 211

	// Metrics interface metrics
212 213 214
	dupMetric     metrics.Histogram
	allMetric     metrics.Histogram
	sentHistogram metrics.Histogram
Jeromy's avatar
Jeromy committed
215

216 217
	// the sessionmanager manages tracking sessions
	sm *bssm.SessionManager
218 219 220

	// whether or not to make provide announcements
	provideEnabled bool
221 222 223 224 225 226

	// how long to wait before looking for providers in a session
	provSearchDelay time.Duration

	// how often to rebroadcast providing requests to find more optimized providers
	rebroadcastDelay delay.D
227 228
}

229 230 231 232 233 234 235 236 237 238
type counters struct {
	blocksRecvd    uint64
	dupBlocksRecvd uint64
	dupDataRecvd   uint64
	blocksSent     uint64
	dataSent       uint64
	dataRecvd      uint64
	messagesRecvd  uint64
}

239
// GetBlock attempts to retrieve a particular block from peers within the
240
// deadline enforced by the context.
241
func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
242
	return bsgetter.SyncGetBlock(parent, k, bs.GetBlocks)
243 244
}

245 246
// WantlistForPeer returns the currently understood list of blocks requested by a
// given peer.
247 248
func (bs *Bitswap) WantlistForPeer(p peer.ID) []cid.Cid {
	var out []cid.Cid
249
	for _, e := range bs.engine.WantlistForPeer(p) {
250
		out = append(out, e.Cid)
251 252 253 254
	}
	return out
}

255 256
// LedgerForPeer returns aggregated data about blocks swapped and communication
// with a given peer.
257 258 259 260
func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
	return bs.engine.LedgerForPeer(p)
}

261 262 263 264 265 266 267
// GetBlocks returns a channel where the caller may receive blocks that
// correspond to the provided |keys|. Returns an error if BitSwap is unable to
// begin this request within the deadline enforced by the context.
//
// NB: Your request remains open until the context expires. To conserve
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
268
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
269
	session := bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
270
	return session.GetBlocks(ctx, keys)
Jeromy's avatar
Jeromy committed
271 272
}

Łukasz Magiera's avatar
Łukasz Magiera committed
273
// HasBlock announces the existence of a block to this bitswap service. The
274
// service will potentially notify its peers.
275
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
276
	return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk})
277 278 279 280 281 282
}

// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
283
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block) error {
284 285 286 287 288
	select {
	case <-bs.process.Closing():
		return errors.New("bitswap is closed")
	default:
	}
289

290 291 292 293 294 295 296
	wanted := blks

	// If blocks came from the network
	if from != "" {
		// Split blocks into wanted blocks vs duplicates
		wanted = make([]blocks.Block, 0, len(blks))
		for _, b := range blks {
297
			if bs.sm.IsWanted(b.Cid()) {
298 299 300 301 302 303 304 305 306
				wanted = append(wanted, b)
			} else {
				log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
			}
		}
	}

	// Put wanted blocks into blockstore
	err := bs.blockstore.PutMany(wanted)
307
	if err != nil {
308
		log.Errorf("Error writing %d blocks to datastore: %s", len(wanted), err)
309 310
		return err
	}
311

312 313 314 315 316
	// NOTE: There exists the possiblity for a race condition here.  If a user
	// creates a node, then adds it to the dagservice while another goroutine
	// is waiting on a GetBlock for that object, they will receive a reference
	// to the same node. We should address this soon, but i'm not going to do
	// it now as it requires more thought and isnt causing immediate problems.
Jeromy's avatar
Jeromy committed
317

318 319 320 321 322 323 324 325 326 327 328 329 330 331
	allKs := make([]cid.Cid, 0, len(blks))
	for _, b := range blks {
		allKs = append(allKs, b.Cid())
	}

	wantedKs := allKs
	if len(blks) != len(wanted) {
		wantedKs = make([]cid.Cid, 0, len(wanted))
		for _, b := range wanted {
			wantedKs = append(wantedKs, b.Cid())
		}
	}

	// Send all block keys (including duplicates) to any sessions that want them.
332
	// (The duplicates are needed by sessions for accounting purposes)
333
	bs.sm.ReceiveFrom(from, allKs)
334

335 336
	// Send wanted block keys to decision engine
	bs.engine.AddBlocks(wantedKs)
337

338 339 340 341 342 343 344
	// Publish the block to any Bitswap clients that had requested blocks.
	// (the sessions use this pubsub mechanism to inform clients of received
	// blocks)
	for _, b := range wanted {
		bs.notif.Publish(b)
	}

345
	// If the reprovider is enabled, send wanted blocks to reprovider
346
	if bs.provideEnabled {
347
		for _, k := range wantedKs {
348
			select {
349
			case bs.newBlocks <- k:
350 351 352 353
				// send block off to be reprovided
			case <-bs.process.Closing():
				return bs.process.Close()
			}
354
		}
355
	}
356

357 358 359 360 361 362
	if from != "" {
		for _, b := range wanted {
			log.Event(ctx, "Bitswap.GetBlockRequest.End", b.Cid())
		}
	}

363
	return nil
364 365
}

366 367
// ReceiveMessage is called by the network interface when a new message is
// received.
368
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
Steven Allen's avatar
Steven Allen committed
369 370 371
	bs.counterLk.Lock()
	bs.counters.messagesRecvd++
	bs.counterLk.Unlock()
Jeromy's avatar
Jeromy committed
372

Jeromy's avatar
Jeromy committed
373 374
	// This call records changes to wantlists, blocks received,
	// and number of bytes transfered.
375
	bs.engine.MessageReceived(p, incoming)
Jeromy's avatar
Jeromy committed
376 377
	// TODO: this is bad, and could be easily abused.
	// Should only track *useful* messages in ledger
378

379 380 381
	iblocks := incoming.Blocks()

	if len(iblocks) == 0 {
382 383 384
		return
	}

385 386 387
	bs.updateReceiveCounters(iblocks)
	for _, b := range iblocks {
		log.Debugf("[recv] block; cid=%s, peer=%s", b.Cid(), p)
388
	}
389

390
	// Process blocks
391
	err := bs.receiveBlocksFrom(ctx, p, iblocks)
392
	if err != nil {
393
		log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
394
		return
395
	}
396 397 398 399 400 401 402
}

func (bs *Bitswap) updateReceiveCounters(blocks []blocks.Block) {
	// Check which blocks are in the datastore
	// (Note: any errors from the blockstore are simply logged out in
	// blockstoreHas())
	blocksHas := bs.blockstoreHas(blocks)
403

404 405 406
	bs.counterLk.Lock()
	defer bs.counterLk.Unlock()

407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
	// Do some accounting for each block
	for i, b := range blocks {
		has := blocksHas[i]

		blkLen := len(b.RawData())
		bs.allMetric.Observe(float64(blkLen))
		if has {
			bs.dupMetric.Observe(float64(blkLen))
		}

		c := bs.counters

		c.blocksRecvd++
		c.dataRecvd += uint64(blkLen)
		if has {
			c.dupBlocksRecvd++
			c.dupDataRecvd += uint64(blkLen)
		}
425 426 427
	}
}

428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450
func (bs *Bitswap) blockstoreHas(blks []blocks.Block) []bool {
	res := make([]bool, len(blks))

	wg := sync.WaitGroup{}
	for i, block := range blks {
		wg.Add(1)
		go func(i int, b blocks.Block) {
			defer wg.Done()

			has, err := bs.blockstore.Has(b.Cid())
			if err != nil {
				log.Infof("blockstore.Has error: %s", err)
				has = false
			}

			res[i] = has
		}(i, block)
	}
	wg.Wait()

	return res
}

451 452
// PeerConnected is called by the network interface
// when a peer initiates a new connection to bitswap.
453
func (bs *Bitswap) PeerConnected(p peer.ID) {
454
	bs.wm.Connected(p)
455
	bs.engine.PeerConnected(p)
456 457
}

458 459
// PeerDisconnected is called by the network interface when a peer
// closes a connection
460
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
461
	bs.wm.Disconnected(p)
462
	bs.engine.PeerDisconnected(p)
463 464
}

465 466
// ReceiveError is called by the network interface when an error happens
// at the network layer. Currently just logs error.
467
func (bs *Bitswap) ReceiveError(err error) {
468
	log.Infof("Bitswap ReceiveError: %s", err)
469 470
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
471 472
}

473
// Close is called to shutdown Bitswap
474
func (bs *Bitswap) Close() error {
475
	return bs.process.Close()
476
}
477

478
// GetWantlist returns the current local wantlist.
479
func (bs *Bitswap) GetWantlist() []cid.Cid {
480
	entries := bs.wm.CurrentWants()
481
	out := make([]cid.Cid, 0, len(entries))
482
	for _, e := range entries {
483
		out = append(out, e.Cid)
484 485 486
	}
	return out
}
487

488
// IsOnline is needed to match go-ipfs-exchange-interface
489 490 491
func (bs *Bitswap) IsOnline() bool {
	return true
}
492

493 494 495 496 497 498
// NewSession generates a new Bitswap session. You should use this, rather
// that calling Bitswap.GetBlocks, any time you intend to do several related
// block requests in a row. The session returned will have it's own GetBlocks
// method, but the session will use the fact that the requests are related to
// be more efficient in its requests to peers. If you are using a session
// from go-blockservice, it will create a bitswap session automatically.
499
func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
500
	return bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
501
}