engine.go 24.2 KB
Newer Older
1
// Package decision implements the decision engine for the bitswap service.
2
package decision
3 4

import (
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
7
	"sync"
Jeromy's avatar
Jeromy committed
8
	"time"
9

10
	"github.com/google/uuid"
dirkmc's avatar
dirkmc committed
11

Jeromy's avatar
Jeromy committed
12
	bsmsg "github.com/ipfs/go-bitswap/message"
dirkmc's avatar
dirkmc committed
13
	pb "github.com/ipfs/go-bitswap/message/pb"
Jeromy's avatar
Jeromy committed
14
	wl "github.com/ipfs/go-bitswap/wantlist"
dirkmc's avatar
dirkmc committed
15
	blocks "github.com/ipfs/go-block-format"
16
	cid "github.com/ipfs/go-cid"
Jeromy's avatar
Jeromy committed
17 18
	bstore "github.com/ipfs/go-ipfs-blockstore"
	logging "github.com/ipfs/go-log"
19 20
	"github.com/ipfs/go-peertaskqueue"
	"github.com/ipfs/go-peertaskqueue/peertask"
21
	process "github.com/jbenet/goprocess"
Raúl Kripalani's avatar
Raúl Kripalani committed
22
	peer "github.com/libp2p/go-libp2p-core/peer"
23 24
)

25 26 27 28 29 30 31 32
// TODO consider taking responsibility for other types of requests. For
// example, there could be a |cancelQueue| for all of the cancellation
// messages that need to go out. There could also be a |wantlistQueue| for
// the local peer's wantlists. Alternatively, these could all be bundled
// into a single, intelligent global queue that efficiently
// batches/combines and takes all of these into consideration.
//
// Right now, messages go onto the network for four reasons:
33 34
// 1. an initial `sendwantlist` message to a provider of the first key in a
//    request
35 36 37 38 39 40 41 42 43 44 45 46
// 2. a periodic full sweep of `sendwantlist` messages to all providers
// 3. upon receipt of blocks, a `cancel` message to all peers
// 4. draining the priority queue of `blockrequests` from peers
//
// Presently, only `blockrequests` are handled by the decision engine.
// However, there is an opportunity to give it more responsibility! If the
// decision engine is given responsibility for all of the others, it can
// intelligently decide how to combine requests efficiently.
//
// Some examples of what would be possible:
//
// * when sending out the wantlists, include `cancel` requests
47 48
// * when handling `blockrequests`, include `sendwantlist` and `cancel` as
//   appropriate
49
// * when handling `cancel`, if we recently received a wanted block from a
50
//   peer, include a partial wantlist that contains a few other high priority
51 52 53 54 55 56
//   blocks
//
// In a sense, if we treat the decision engine as a black box, it could do
// whatever it sees fit to produce desired outcomes (get wanted keys
// quickly, maintain good relationships with peers, etc).

Jeromy's avatar
Jeromy committed
57
var log = logging.Logger("engine")
58

Brian Tiger Chow's avatar
Brian Tiger Chow committed
59
const (
60 61
	// outboxChanBuffer must be 0 to prevent stale messages from being sent
	outboxChanBuffer = 0
dirkmc's avatar
dirkmc committed
62 63 64 65
	// targetMessageSize is the ideal size of the batched payload. We try to
	// pop this much data off the request queue, but it may be a little more
	// or less depending on what's in the queue.
	targetMessageSize = 16 * 1024
66 67
	// tagFormat is the tag given to peers associated an engine
	tagFormat = "bs-engine-%s-%s"
68

69 70 71 72 73 74 75 76 77 78
	// queuedTagWeight is the default weight for peers that have work queued
	// on their behalf.
	queuedTagWeight = 10

	// the alpha for the EWMA used to track short term usefulness
	shortTermAlpha = 0.5

	// the alpha for the EWMA used to track long term usefulness
	longTermAlpha = 0.05

Dirk McCormick's avatar
Dirk McCormick committed
79 80 81 82
	// how frequently the engine should sample usefulness. Peers that
	// interact every shortTerm time period are considered "active".
	shortTerm = 10 * time.Second

83 84 85 86 87 88 89 90
	// long term ratio defines what "long term" means in terms of the
	// shortTerm duration. Peers that interact once every longTermRatio are
	// considered useful over the long term.
	longTermRatio = 10

	// long/short term scores for tagging peers
	longTermScore  = 10 // this is a high tag but it grows _very_ slowly.
	shortTermScore = 10 // this is a high tag but it'll go away quickly if we aren't using the peer.
91

dirkmc's avatar
dirkmc committed
92 93 94 95 96 97 98
	// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
	// bytes up to which we will replace a want-have with a want-block
	maxBlockSizeReplaceHasWithBlock = 1024

	// Number of concurrent workers that pull tasks off the request queue
	taskWorkerCount = 8

99 100
	// Number of concurrent workers that process requests to the blockstore
	blockstoreWorkerCount = 128
101 102
)

103
// Envelope contains a message for a Peer.
104
type Envelope struct {
105
	// Peer is the intended recipient.
106
	Peer peer.ID
107

108
	// Message is the payload.
109
	Message bsmsg.BitSwapMessage
Jeromy's avatar
Jeromy committed
110 111 112

	// A callback to notify the decision queue that the task is complete
	Sent func()
113 114
}

115 116 117 118 119 120 121
// PeerTagger covers the methods on the connection manager used by the decision
// engine to tag peers
type PeerTagger interface {
	TagPeer(peer.ID, string, int)
	UntagPeer(p peer.ID, tag string)
}

122
// Engine manages sending requested blocks to peers.
123
type Engine struct {
124 125 126
	// peerRequestQueue is a priority queue of requests received from peers.
	// Requests are popped from the queue, packaged up, and placed in the
	// outbox.
127
	peerRequestQueue *peertaskqueue.PeerTaskQueue
128

129 130 131 132 133
	// FIXME it's a bit odd for the client and the worker to both share memory
	// (both modify the peerRequestQueue) and also to communicate over the
	// workSignal channel. consider sending requests over the channel and
	// allowing the worker to have exclusive access to the peerRequestQueue. In
	// that case, no lock would be required.
Jeromy's avatar
Jeromy committed
134
	workSignal chan struct{}
135

136 137
	// outbox contains outgoing messages to peers. This is owned by the
	// taskWorker goroutine
Brian Tiger Chow's avatar
Brian Tiger Chow committed
138
	outbox chan (<-chan *Envelope)
139

140
	bsm *blockstoreManager
141

142 143
	peerTagger PeerTagger

144 145
	tagQueued, tagUseful string

dirkmc's avatar
dirkmc committed
146 147
	lock sync.RWMutex // protects the fields immediatly below

148
	// ledgerMap lists Ledgers by their Partner key.
149
	ledgerMap map[peer.ID]*ledger
Jeromy's avatar
Jeromy committed
150 151

	ticker *time.Ticker
152 153 154

	taskWorkerLock  sync.Mutex
	taskWorkerCount int
dirkmc's avatar
dirkmc committed
155 156 157 158 159

	// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
	// bytes up to which we will replace a want-have with a want-block
	maxBlockSizeReplaceHasWithBlock int

Dirk McCormick's avatar
Dirk McCormick committed
160 161
	// how frequently the engine should sample peer usefulness
	peerSampleInterval time.Duration
162 163
	// used by the tests to detect when a sample is taken
	sampleCh chan struct{}
Dirk McCormick's avatar
Dirk McCormick committed
164

165 166
	sendDontHaves bool

dirkmc's avatar
dirkmc committed
167
	self peer.ID
168 169
}

170
// NewEngine creates a new block sending engine for the given block store
dirkmc's avatar
dirkmc committed
171
func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID) *Engine {
172
	return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, shortTerm, nil)
dirkmc's avatar
dirkmc committed
173 174 175
}

// This constructor is used by the tests
Dirk McCormick's avatar
Dirk McCormick committed
176
func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID,
177
	maxReplaceSize int, peerSampleInterval time.Duration, sampleCh chan struct{}) *Engine {
Dirk McCormick's avatar
Dirk McCormick committed
178

179
	e := &Engine{
dirkmc's avatar
dirkmc committed
180 181 182 183 184 185 186
		ledgerMap:                       make(map[peer.ID]*ledger),
		bsm:                             newBlockstoreManager(ctx, bs, blockstoreWorkerCount),
		peerTagger:                      peerTagger,
		outbox:                          make(chan (<-chan *Envelope), outboxChanBuffer),
		workSignal:                      make(chan struct{}, 1),
		ticker:                          time.NewTicker(time.Millisecond * 100),
		maxBlockSizeReplaceHasWithBlock: maxReplaceSize,
Dirk McCormick's avatar
Dirk McCormick committed
187
		peerSampleInterval:              peerSampleInterval,
188
		sampleCh:                        sampleCh,
dirkmc's avatar
dirkmc committed
189
		taskWorkerCount:                 taskWorkerCount,
190
		sendDontHaves:                   true,
dirkmc's avatar
dirkmc committed
191
		self:                            self,
192
	}
193 194
	e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String())
	e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String())
195 196
	e.peerRequestQueue = peertaskqueue.New(
		peertaskqueue.OnPeerAddedHook(e.onPeerAdded),
dirkmc's avatar
dirkmc committed
197 198 199
		peertaskqueue.OnPeerRemovedHook(e.onPeerRemoved),
		peertaskqueue.TaskMerger(newTaskMerger()),
		peertaskqueue.IgnoreFreezing(true))
200
	return e
Jeromy's avatar
Jeromy committed
201 202
}

203 204 205 206 207 208 209 210 211 212
// SetSendDontHaves indicates what to do when the engine receives a want-block
// for a block that is not in the blockstore. Either
// - Send a DONT_HAVE message
// - Simply don't respond
// Older versions of Bitswap did not respond, so this allows us to simulate
// those older versions for testing.
func (e *Engine) SetSendDontHaves(send bool) {
	e.sendDontHaves = send
}

213 214 215 216
// Start up workers to handle requests from other nodes for the data on this node
func (e *Engine) StartWorkers(ctx context.Context, px process.Process) {
	// Start up blockstore manager
	e.bsm.start(px)
217
	px.Go(e.scoreWorker)
218 219 220 221 222 223 224 225

	for i := 0; i < e.taskWorkerCount; i++ {
		px.Go(func(px process.Process) {
			e.taskWorker(ctx)
		})
	}
}

226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
// scoreWorker keeps track of how "useful" our peers are, updating scores in the
// connection manager.
//
// It does this by tracking two scores: short-term usefulness and long-term
// usefulness. Short-term usefulness is sampled frequently and highly weights
// new observations. Long-term usefulness is sampled less frequently and highly
// weights on long-term trends.
//
// In practice, we do this by keeping two EWMAs. If we see an interaction
// within the sampling period, we record the score, otherwise, we record a 0.
// The short-term one has a high alpha and is sampled every shortTerm period.
// The long-term one has a low alpha and is sampled every
// longTermRatio*shortTerm period.
//
// To calculate the final score, we sum the short-term and long-term scores then
241 242
// adjust it ±25% based on our debt ratio. Peers that have historically been
// more useful to us than we are to them get the highest score.
243
func (e *Engine) scoreWorker(px process.Process) {
Dirk McCormick's avatar
Dirk McCormick committed
244
	ticker := time.NewTicker(e.peerSampleInterval)
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
	defer ticker.Stop()

	type update struct {
		peer  peer.ID
		score int
	}
	var (
		lastShortUpdate, lastLongUpdate time.Time
		updates                         []update
	)

	for i := 0; ; i = (i + 1) % longTermRatio {
		var now time.Time
		select {
		case now = <-ticker.C:
260
		case <-px.Closing():
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
			return
		}

		// The long term update ticks every `longTermRatio` short
		// intervals.
		updateLong := i == 0

		e.lock.Lock()
		for _, ledger := range e.ledgerMap {
			ledger.lk.Lock()

			// Update the short-term score.
			if ledger.lastExchange.After(lastShortUpdate) {
				ledger.shortScore = ewma(ledger.shortScore, shortTermScore, shortTermAlpha)
			} else {
				ledger.shortScore = ewma(ledger.shortScore, 0, shortTermAlpha)
			}

			// Update the long-term score.
			if updateLong {
				if ledger.lastExchange.After(lastLongUpdate) {
					ledger.longScore = ewma(ledger.longScore, longTermScore, longTermAlpha)
				} else {
					ledger.longScore = ewma(ledger.longScore, 0, longTermAlpha)
				}
			}

			// Calculate the new score.
289 290 291 292
			//
			// The accounting score adjustment prefers peers _we_
			// need over peers that need us. This doesn't help with
			// leeching.
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
			score := int((ledger.shortScore + ledger.longScore) * ((ledger.Accounting.Score())*.5 + .75))

			// Avoid updating the connection manager unless there's a change. This can be expensive.
			if ledger.score != score {
				// put these in a list so we can perform the updates outside _global_ the lock.
				updates = append(updates, update{ledger.Partner, score})
				ledger.score = score
			}
			ledger.lk.Unlock()
		}
		e.lock.Unlock()

		// record the times.
		lastShortUpdate = now
		if updateLong {
			lastLongUpdate = now
		}

		// apply the updates
		for _, update := range updates {
			if update.score == 0 {
				e.peerTagger.UntagPeer(update.peer, e.tagUseful)
			} else {
				e.peerTagger.TagPeer(update.peer, e.tagUseful, update.score)
			}
		}
		// Keep the memory. It's not much and it saves us from having to allocate.
		updates = updates[:0]
321 322 323 324 325

		// Used by the tests
		if e.sampleCh != nil {
			e.sampleCh <- struct{}{}
		}
326 327 328
	}
}

329
func (e *Engine) onPeerAdded(p peer.ID) {
330
	e.peerTagger.TagPeer(p, e.tagQueued, queuedTagWeight)
331 332 333
}

func (e *Engine) onPeerRemoved(p peer.ID) {
334
	e.peerTagger.UntagPeer(p, e.tagQueued)
335 336
}

337 338
// WantlistForPeer returns the list of keys that the given peer has asked for
func (e *Engine) WantlistForPeer(p peer.ID) []wl.Entry {
339
	partner := e.findOrCreate(p)
340

341
	partner.lk.Lock()
342 343 344 345
	entries := partner.wantList.Entries()
	partner.lk.Unlock()

	wl.SortEntries(entries)
346 347

	return entries
348 349
}

350 351
// LedgerForPeer returns aggregated data about blocks swapped and communication
// with a given peer.
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
func (e *Engine) LedgerForPeer(p peer.ID) *Receipt {
	ledger := e.findOrCreate(p)

	ledger.lk.Lock()
	defer ledger.lk.Unlock()

	return &Receipt{
		Peer:      ledger.Partner.String(),
		Value:     ledger.Accounting.Value(),
		Sent:      ledger.Accounting.BytesSent,
		Recv:      ledger.Accounting.BytesRecv,
		Exchanged: ledger.ExchangeCount(),
	}
}

dirkmc's avatar
dirkmc committed
367 368 369
// Each taskWorker pulls items off the request queue up to the maximum size
// and adds them to an envelope that is passed off to the bitswap workers,
// which send the message to the network.
370
func (e *Engine) taskWorker(ctx context.Context) {
371
	defer e.taskWorkerExit()
372
	for {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
373
		oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking
374 375 376 377 378 379 380 381 382 383 384 385
		select {
		case <-ctx.Done():
			return
		case e.outbox <- oneTimeUse:
		}
		// receiver is ready for an outoing envelope. let's prepare one. first,
		// we must acquire a task from the PQ...
		envelope, err := e.nextEnvelope(ctx)
		if err != nil {
			close(oneTimeUse)
			return // ctx cancelled
		}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
386
		oneTimeUse <- envelope // buffered. won't block
387 388 389 390
		close(oneTimeUse)
	}
}

391 392 393 394 395 396 397 398 399 400 401
// taskWorkerExit handles cleanup of task workers
func (e *Engine) taskWorkerExit() {
	e.taskWorkerLock.Lock()
	defer e.taskWorkerLock.Unlock()

	e.taskWorkerCount--
	if e.taskWorkerCount == 0 {
		close(e.outbox)
	}
}

402 403 404
// nextEnvelope runs in the taskWorker goroutine. Returns an error if the
// context is cancelled before the next Envelope can be created.
func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
Jeromy's avatar
Jeromy committed
405
	for {
dirkmc's avatar
dirkmc committed
406 407 408
		// Pop some tasks off the request queue
		p, nextTasks, pendingBytes := e.peerRequestQueue.PopTasks(targetMessageSize)
		for len(nextTasks) == 0 {
Jeromy's avatar
Jeromy committed
409
			select {
410
			case <-ctx.Done():
411
				return nil, ctx.Err()
412
			case <-e.workSignal:
dirkmc's avatar
dirkmc committed
413
				p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize)
Jeromy's avatar
Jeromy committed
414
			case <-e.ticker.C:
dirkmc's avatar
dirkmc committed
415 416 417
				// When a task is cancelled, the queue may be "frozen" for a
				// period of time. We periodically "thaw" the queue to make
				// sure it doesn't get stuck in a frozen state.
418
				e.peerRequestQueue.ThawRound()
dirkmc's avatar
dirkmc committed
419
				p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize)
Jeromy's avatar
Jeromy committed
420 421
			}
		}
422

dirkmc's avatar
dirkmc committed
423 424 425
		// Create a new message
		msg := bsmsg.New(true)

Dirk McCormick's avatar
Dirk McCormick committed
426
		log.Debugw("Bitswap process tasks", "local", e.self, "taskCount", len(nextTasks))
dirkmc's avatar
dirkmc committed
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448

		// Amount of data in the request queue still waiting to be popped
		msg.SetPendingBytes(int32(pendingBytes))

		// Split out want-blocks, want-haves and DONT_HAVEs
		blockCids := make([]cid.Cid, 0, len(nextTasks))
		blockTasks := make(map[cid.Cid]*taskData, len(nextTasks))
		for _, t := range nextTasks {
			c := t.Topic.(cid.Cid)
			td := t.Data.(*taskData)
			if td.HaveBlock {
				if td.IsWantBlock {
					blockCids = append(blockCids, c)
					blockTasks[c] = td
				} else {
					// Add HAVES to the message
					msg.AddHave(c)
				}
			} else {
				// Add DONT_HAVEs to the message
				msg.AddDontHave(c)
			}
449
		}
dirkmc's avatar
dirkmc committed
450 451 452

		// Fetch blocks from datastore
		blks, err := e.bsm.getBlocks(ctx, blockCids)
453 454 455 456
		if err != nil {
			// we're dropping the envelope but that's not an issue in practice.
			return nil, err
		}
457

dirkmc's avatar
dirkmc committed
458 459 460 461 462 463 464 465 466 467
		for c, t := range blockTasks {
			blk := blks[c]
			// If the block was not found (it has been removed)
			if blk == nil {
				// If the client requested DONT_HAVE, add DONT_HAVE to the message
				if t.SendDontHave {
					msg.AddDontHave(c)
				}
			} else {
				// Add the block to the message
Dirk McCormick's avatar
Dirk McCormick committed
468
				// log.Debugf("  make evlp %s->%s block: %s (%d bytes)", e.self, p, c, len(blk.RawData()))
dirkmc's avatar
dirkmc committed
469 470
				msg.AddBlock(blk)
			}
471
		}
472

dirkmc's avatar
dirkmc committed
473
		// If there's nothing in the message, bail out
474
		if msg.Empty() {
dirkmc's avatar
dirkmc committed
475
			e.peerRequestQueue.TasksDone(p, nextTasks...)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
476
			continue
477
		}
478

Dirk McCormick's avatar
Dirk McCormick committed
479
		log.Debugw("Bitswap engine -> msg", "local", e.self, "to", p, "blockCount", len(msg.Blocks()), "presenceCount", len(msg.BlockPresences()), "size", msg.Size())
480
		return &Envelope{
dirkmc's avatar
dirkmc committed
481
			Peer:    p,
482
			Message: msg,
483
			Sent: func() {
dirkmc's avatar
dirkmc committed
484 485 486 487 488 489
				// Once the message has been sent, signal the request queue so
				// it can be cleared from the queue
				e.peerRequestQueue.TasksDone(p, nextTasks...)

				// Signal the worker to check for more work
				e.signalNewWork()
490
			},
491
		}, nil
Jeromy's avatar
Jeromy committed
492 493 494
	}
}

495
// Outbox returns a channel of one-time use Envelope channels.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
496
func (e *Engine) Outbox() <-chan (<-chan *Envelope) {
497
	return e.outbox
498 499
}

500
// Peers returns a slice of Peers with whom the local node has active sessions.
501
func (e *Engine) Peers() []peer.ID {
dirkmc's avatar
dirkmc committed
502 503
	e.lock.RLock()
	defer e.lock.RUnlock()
504

505 506
	response := make([]peer.ID, 0, len(e.ledgerMap))

507
	for _, ledger := range e.ledgerMap {
508 509 510 511 512
		response = append(response, ledger.Partner)
	}
	return response
}

dirkmc's avatar
dirkmc committed
513 514 515
// MessageReceived is called when a message is received from a remote peer.
// For each item in the wantlist, add a want-have or want-block entry to the
// request queue (this is later popped off by the workerTasks)
516
func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) {
dirkmc's avatar
dirkmc committed
517 518
	entries := m.Wantlist()

Dirk McCormick's avatar
Dirk McCormick committed
519 520 521 522 523 524 525 526 527 528 529 530
	if len(entries) > 0 {
		log.Debugw("Bitswap engine <- msg", "local", e.self, "from", p, "entryCount", len(entries))
		for _, et := range entries {
			if !et.Cancel {
				if et.WantType == pb.Message_Wantlist_Have {
					log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", et.Cid)
				} else {
					log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", et.Cid)
				}
			}
		}
	}
dirkmc's avatar
dirkmc committed
531

532
	if m.Empty() {
Dirk McCormick's avatar
Dirk McCormick committed
533
		log.Infof("received empty message from %s", p)
534 535
	}

536 537 538
	newWorkExists := false
	defer func() {
		if newWorkExists {
539
			e.signalNewWork()
540 541
		}
	}()
542

543
	// Get block sizes
dirkmc's avatar
dirkmc committed
544
	wants, cancels := e.splitWantsCancels(entries)
545
	wantKs := cid.NewSet()
dirkmc's avatar
dirkmc committed
546 547
	for _, entry := range wants {
		wantKs.Add(entry.Cid)
548
	}
549 550 551 552 553
	blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys())
	if err != nil {
		log.Info("aborting message processing", err)
		return
	}
554

dirkmc's avatar
dirkmc committed
555
	// Get the ledger for the peer
556
	l := e.findOrCreate(p)
Jeromy's avatar
Jeromy committed
557 558
	l.lk.Lock()
	defer l.lk.Unlock()
dirkmc's avatar
dirkmc committed
559 560 561 562

	// Record how many bytes were received in the ledger
	blks := m.Blocks()
	for _, block := range blks {
Dirk McCormick's avatar
Dirk McCormick committed
563
		log.Debugw("Bitswap engine <- block", "local", e.self, "from", p, "cid", block.Cid(), "size", len(block.RawData()))
dirkmc's avatar
dirkmc committed
564 565 566 567
		l.ReceivedBytes(len(block.RawData()))
	}

	// If the peer sent a full wantlist, replace the ledger's wantlist
568 569 570
	if m.Full() {
		l.wantList = wl.New()
	}
571

572
	var activeEntries []peertask.Task
dirkmc's avatar
dirkmc committed
573 574 575

	// Remove cancelled blocks from the queue
	for _, entry := range cancels {
Dirk McCormick's avatar
Dirk McCormick committed
576
		log.Debugw("Bitswap engine <- cancel", "local", e.self, "from", p, "cid", entry.Cid)
dirkmc's avatar
dirkmc committed
577
		if l.CancelWant(entry.Cid) {
578
			e.peerRequestQueue.Remove(entry.Cid, p)
dirkmc's avatar
dirkmc committed
579 580 581 582 583 584 585 586 587 588 589 590 591
		}
	}

	// For each want-have / want-block
	for _, entry := range wants {
		c := entry.Cid
		blockSize, found := blockSizes[entry.Cid]

		// Add each want-have / want-block to the ledger
		l.Wants(c, entry.Priority, entry.WantType)

		// If the block was not found
		if !found {
Dirk McCormick's avatar
Dirk McCormick committed
592 593
			log.Debugw("Bitswap engine: block not found", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave)

dirkmc's avatar
dirkmc committed
594
			// Only add the task to the queue if the requester wants a DONT_HAVE
595
			if e.sendDontHaves && entry.SendDontHave {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
596
				newWorkExists = true
dirkmc's avatar
dirkmc committed
597 598 599
				isWantBlock := false
				if entry.WantType == pb.Message_Wantlist_Block {
					isWantBlock = true
600
				}
dirkmc's avatar
dirkmc committed
601 602 603

				activeEntries = append(activeEntries, peertask.Task{
					Topic:    c,
604
					Priority: int(entry.Priority),
dirkmc's avatar
dirkmc committed
605 606 607 608 609 610 611 612 613 614 615 616 617 618 619
					Work:     bsmsg.BlockPresenceSize(c),
					Data: &taskData{
						BlockSize:    0,
						HaveBlock:    false,
						IsWantBlock:  isWantBlock,
						SendDontHave: entry.SendDontHave,
					},
				})
			}
		} else {
			// The block was found, add it to the queue
			newWorkExists = true

			isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

Dirk McCormick's avatar
Dirk McCormick committed
620
			log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", entry.Cid, "isWantBlock", isWantBlock)
dirkmc's avatar
dirkmc committed
621 622 623 624 625 626 627 628

			// entrySize is the amount of space the entry takes up in the
			// message we send to the recipient. If we're sending a block, the
			// entrySize is the size of the block. Otherwise it's the size of
			// a block presence entry.
			entrySize := blockSize
			if !isWantBlock {
				entrySize = bsmsg.BlockPresenceSize(c)
629
			}
dirkmc's avatar
dirkmc committed
630 631
			activeEntries = append(activeEntries, peertask.Task{
				Topic:    c,
632
				Priority: int(entry.Priority),
dirkmc's avatar
dirkmc committed
633 634 635 636 637 638 639 640
				Work:     entrySize,
				Data: &taskData{
					BlockSize:    blockSize,
					HaveBlock:    true,
					IsWantBlock:  isWantBlock,
					SendDontHave: entry.SendDontHave,
				},
			})
641 642
		}
	}
dirkmc's avatar
dirkmc committed
643 644

	// Push entries onto the request queue
645
	if len(activeEntries) > 0 {
dirkmc's avatar
dirkmc committed
646
		e.peerRequestQueue.PushTasks(p, activeEntries...)
647
	}
dirkmc's avatar
dirkmc committed
648 649 650 651 652 653 654 655 656 657 658 659
}

// Split the want-have / want-block entries from the cancel entries
func (e *Engine) splitWantsCancels(es []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) {
	wants := make([]bsmsg.Entry, 0, len(es))
	cancels := make([]bsmsg.Entry, 0, len(es))
	for _, et := range es {
		if et.Cancel {
			cancels = append(cancels, et)
		} else {
			wants = append(wants, et)
		}
660
	}
dirkmc's avatar
dirkmc committed
661
	return wants, cancels
662 663
}

dirkmc's avatar
dirkmc committed
664 665 666 667 668 669 670 671 672 673 674 675 676
// ReceiveFrom is called when new blocks are received and added to the block
// store, meaning there may be peers who want those blocks, so we should send
// the blocks to them.
func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block, haves []cid.Cid) {
	if len(blks) == 0 {
		return
	}

	// Get the size of each block
	blockSizes := make(map[cid.Cid]int, len(blks))
	for _, blk := range blks {
		blockSizes[blk.Cid()] = len(blk.RawData())
	}
677

dirkmc's avatar
dirkmc committed
678 679 680
	// Check each peer to see if it wants one of the blocks we received
	work := false
	e.lock.RLock()
681
	for _, l := range e.ledgerMap {
dirkmc's avatar
dirkmc committed
682 683 684 685 686
		l.lk.RLock()

		for _, b := range blks {
			k := b.Cid()

687
			if entry, ok := l.WantListContains(k); ok {
688
				work = true
dirkmc's avatar
dirkmc committed
689 690 691 692 693 694 695 696 697 698 699

				blockSize := blockSizes[k]
				isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

				entrySize := blockSize
				if !isWantBlock {
					entrySize = bsmsg.BlockPresenceSize(k)
				}

				e.peerRequestQueue.PushTasks(l.Partner, peertask.Task{
					Topic:    entry.Cid,
700
					Priority: int(entry.Priority),
dirkmc's avatar
dirkmc committed
701 702 703 704 705 706 707 708
					Work:     entrySize,
					Data: &taskData{
						BlockSize:    blockSize,
						HaveBlock:    true,
						IsWantBlock:  isWantBlock,
						SendDontHave: false,
					},
				})
709
			}
710
		}
dirkmc's avatar
dirkmc committed
711
		l.lk.RUnlock()
712
	}
dirkmc's avatar
dirkmc committed
713
	e.lock.RUnlock()
714 715 716 717 718 719

	if work {
		e.signalNewWork()
	}
}

720 721 722 723 724 725
// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically

726 727
// MessageSent is called when a message has successfully been sent out, to record
// changes.
728
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
729
	l := e.findOrCreate(p)
730 731 732
	l.lk.Lock()
	defer l.lk.Unlock()

dirkmc's avatar
dirkmc committed
733
	// Remove sent blocks from the want list for the peer
734
	for _, block := range m.Blocks() {
Jeromy's avatar
Jeromy committed
735
		l.SentBytes(len(block.RawData()))
dirkmc's avatar
dirkmc committed
736 737 738 739 740
		l.wantList.RemoveType(block.Cid(), pb.Message_Wantlist_Block)
	}

	// Remove sent block presences from the want list for the peer
	for _, bp := range m.BlockPresences() {
741
		// Don't record sent data. We reserve that for data blocks.
dirkmc's avatar
dirkmc committed
742 743 744
		if bp.Type == pb.Message_Have {
			l.wantList.RemoveType(bp.Cid, pb.Message_Wantlist_Have)
		}
745 746 747
	}
}

748 749
// PeerConnected is called when a new peer connects, meaning we should start
// sending blocks.
750 751
func (e *Engine) PeerConnected(p peer.ID) {
	e.lock.Lock()
752
	defer e.lock.Unlock()
753 754

	_, ok := e.ledgerMap[p]
755
	if !ok {
756
		e.ledgerMap[p] = newLedger(p)
757 758 759
	}
}

760
// PeerDisconnected is called when a peer disconnects.
761
func (e *Engine) PeerDisconnected(p peer.ID) {
762 763
	e.lock.Lock()
	defer e.lock.Unlock()
dirkmc's avatar
dirkmc committed
764

765
	delete(e.ledgerMap, p)
766 767
}

dirkmc's avatar
dirkmc committed
768 769 770 771 772 773 774
// If the want is a want-have, and it's below a certain size, send the full
// block (instead of sending a HAVE)
func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize int) bool {
	isWantBlock := wantType == pb.Message_Wantlist_Block
	return isWantBlock || blockSize <= e.maxBlockSizeReplaceHasWithBlock
}

775
func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
776
	// NB not threadsafe
777
	return e.findOrCreate(p).Accounting.BytesSent
778 779
}

780
func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
781
	// NB not threadsafe
782
	return e.findOrCreate(p).Accounting.BytesRecv
783 784 785
}

// ledger lazily instantiates a ledger
786
func (e *Engine) findOrCreate(p peer.ID) *ledger {
dirkmc's avatar
dirkmc committed
787 788 789 790 791 792 793 794 795 796 797
	// Take a read lock (as it's less expensive) to check if we have a ledger
	// for the peer
	e.lock.RLock()
	l, ok := e.ledgerMap[p]
	e.lock.RUnlock()
	if ok {
		return l
	}

	// There's no ledger, so take a write lock, then check again and create the
	// ledger if necessary
Jeromy's avatar
Jeromy committed
798
	e.lock.Lock()
799
	defer e.lock.Unlock()
dirkmc's avatar
dirkmc committed
800
	l, ok = e.ledgerMap[p]
801 802
	if !ok {
		l = newLedger(p)
803
		e.ledgerMap[p] = l
804 805 806
	}
	return l
}
807 808 809 810 811 812 813 814

func (e *Engine) signalNewWork() {
	// Signal task generation to restart (if stopped!)
	select {
	case e.workSignal <- struct{}{}:
	default:
	}
}