sessionwantsender.go 19.5 KB
Newer Older
dirkmc's avatar
dirkmc committed
1 2 3 4 5
package session

import (
	"context"

6
	bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
dirkmc's avatar
dirkmc committed
7 8 9 10 11

	cid "github.com/ipfs/go-cid"
	peer "github.com/libp2p/go-libp2p-core/peer"
)

12 13 14 15 16 17 18
const (
	// Maximum number of changes to accept before blocking
	changesBufferSize = 128
	// If the session receives this many DONT_HAVEs in a row from a peer,
	// it prunes the peer from the session
	peerDontHaveLimit = 16
)
dirkmc's avatar
dirkmc committed
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50

// BlockPresence indicates whether a peer has a block.
// Note that the order is important, we decide which peer to send a want to
// based on knowing whether peer has the block. eg we're more likely to send
// a want to a peer that has the block than a peer that doesnt have the block
// so BPHave > BPDontHave
type BlockPresence int

const (
	BPDontHave BlockPresence = iota
	BPUnknown
	BPHave
)

// update encapsulates a message received by the session
type update struct {
	// Which peer sent the update
	from peer.ID
	// cids of blocks received
	ks []cid.Cid
	// HAVE message
	haves []cid.Cid
	// DONT_HAVE message
	dontHaves []cid.Cid
}

// peerAvailability indicates a peer's connection state
type peerAvailability struct {
	target    peer.ID
	available bool
}

51 52
// change can be new wants, a new message received by the session,
// or a change in the connect status of a peer
dirkmc's avatar
dirkmc committed
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
type change struct {
	// new wants requested
	add []cid.Cid
	// new message received by session (blocks / HAVEs / DONT_HAVEs)
	update update
	// peer has connected / disconnected
	availability peerAvailability
}

type onSendFn func(to peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid)
type onPeersExhaustedFn func([]cid.Cid)

//
// sessionWantSender is responsible for sending want-have and want-block to
// peers. For each want, it sends a single optimistic want-block request to
// one peer and want-have requests to all other peers in the session.
// To choose the best peer for the optimistic want-block it maintains a list
// of how peers have responded to each want (HAVE / DONT_HAVE / Unknown) and
// consults the peer response tracker (records which peers sent us blocks).
//
type sessionWantSender struct {
	// When the context is cancelled, sessionWantSender shuts down
	ctx context.Context
	// The session ID
	sessionID uint64
	// A channel that collects incoming changes (events)
	changes chan change
	// Information about each want indexed by CID
	wants map[cid.Cid]*wantInfo
82 83
	// Keeps track of how many consecutive DONT_HAVEs a peer has sent
	peerConsecutiveDontHaves map[peer.ID]int
dirkmc's avatar
dirkmc committed
84 85 86 87 88 89
	// Tracks which peers we have send want-block to
	swbt *sentWantBlocksTracker
	// Tracks the number of blocks each peer sent us
	peerRspTrkr *peerResponseTracker
	// Sends wants to peers
	pm PeerManager
90 91
	// Keeps track of peers in the session
	spm SessionPeerManager
dirkmc's avatar
dirkmc committed
92 93 94 95 96 97 98 99
	// Keeps track of which peer has / doesn't have a block
	bpm *bsbpm.BlockPresenceManager
	// Called when wants are sent
	onSend onSendFn
	// Called when all peers explicitly don't have a block
	onPeersExhausted onPeersExhaustedFn
}

100 101
func newSessionWantSender(ctx context.Context, sid uint64, pm PeerManager, spm SessionPeerManager,
	bpm *bsbpm.BlockPresenceManager, onSend onSendFn, onPeersExhausted onPeersExhaustedFn) sessionWantSender {
dirkmc's avatar
dirkmc committed
102

103
	sws := sessionWantSender{
104 105 106 107 108 109 110
		ctx:                      ctx,
		sessionID:                sid,
		changes:                  make(chan change, changesBufferSize),
		wants:                    make(map[cid.Cid]*wantInfo),
		peerConsecutiveDontHaves: make(map[peer.ID]int),
		swbt:                     newSentWantBlocksTracker(),
		peerRspTrkr:              newPeerResponseTracker(),
dirkmc's avatar
dirkmc committed
111 112

		pm:               pm,
113
		spm:              spm,
dirkmc's avatar
dirkmc committed
114 115 116 117 118
		bpm:              bpm,
		onSend:           onSend,
		onPeersExhausted: onPeersExhausted,
	}

119
	return sws
dirkmc's avatar
dirkmc committed
120 121
}

122 123
func (sws *sessionWantSender) ID() uint64 {
	return sws.sessionID
dirkmc's avatar
dirkmc committed
124 125 126
}

// Add is called when new wants are added to the session
127
func (sws *sessionWantSender) Add(ks []cid.Cid) {
dirkmc's avatar
dirkmc committed
128 129 130
	if len(ks) == 0 {
		return
	}
131
	sws.addChange(change{add: ks})
dirkmc's avatar
dirkmc committed
132 133 134 135
}

// Update is called when the session receives a message with incoming blocks
// or HAVE / DONT_HAVE
136
func (sws *sessionWantSender) Update(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
dirkmc's avatar
dirkmc committed
137
	hasUpdate := len(ks) > 0 || len(haves) > 0 || len(dontHaves) > 0
138
	if !hasUpdate {
dirkmc's avatar
dirkmc committed
139 140 141
		return
	}

142 143 144
	sws.addChange(change{
		update: update{from, ks, haves, dontHaves},
	})
dirkmc's avatar
dirkmc committed
145 146 147 148
}

// SignalAvailability is called by the PeerManager to signal that a peer has
// connected / disconnected
149
func (sws *sessionWantSender) SignalAvailability(p peer.ID, isAvailable bool) {
dirkmc's avatar
dirkmc committed
150
	availability := peerAvailability{p, isAvailable}
151
	sws.addChange(change{availability: availability})
dirkmc's avatar
dirkmc committed
152 153 154
}

// Run is the main loop for processing incoming changes
155
func (sws *sessionWantSender) Run() {
dirkmc's avatar
dirkmc committed
156 157
	for {
		select {
158 159 160 161
		case ch := <-sws.changes:
			sws.onChange([]change{ch})
		case <-sws.ctx.Done():
			sws.shutdown()
dirkmc's avatar
dirkmc committed
162 163 164 165 166 167
			return
		}
	}
}

// addChange adds a new change to the queue
168
func (sws *sessionWantSender) addChange(c change) {
dirkmc's avatar
dirkmc committed
169
	select {
170 171
	case sws.changes <- c:
	case <-sws.ctx.Done():
dirkmc's avatar
dirkmc committed
172 173 174 175
	}
}

// shutdown unregisters the session with the PeerManager
176 177
func (sws *sessionWantSender) shutdown() {
	sws.pm.UnregisterSession(sws.sessionID)
dirkmc's avatar
dirkmc committed
178 179 180 181
}

// collectChanges collects all the changes that have occurred since the last
// invocation of onChange
182
func (sws *sessionWantSender) collectChanges(changes []change) []change {
dirkmc's avatar
dirkmc committed
183 184
	for len(changes) < changesBufferSize {
		select {
185
		case next := <-sws.changes:
dirkmc's avatar
dirkmc committed
186 187 188 189 190 191 192 193 194
			changes = append(changes, next)
		default:
			return changes
		}
	}
	return changes
}

// onChange processes the next set of changes
195
func (sws *sessionWantSender) onChange(changes []change) {
dirkmc's avatar
dirkmc committed
196 197
	// Several changes may have been recorded since the last time we checked,
	// so pop all outstanding changes from the channel
198
	changes = sws.collectChanges(changes)
dirkmc's avatar
dirkmc committed
199 200 201 202 203 204 205

	// Apply each change
	availability := make(map[peer.ID]bool, len(changes))
	var updates []update
	for _, chng := range changes {
		// Initialize info for new wants
		for _, c := range chng.add {
206
			sws.trackWant(c)
dirkmc's avatar
dirkmc committed
207 208 209 210
		}

		// Consolidate updates and changes to availability
		if chng.update.from != "" {
211 212 213 214 215 216
			// If the update includes blocks or haves, treat it as signaling that
			// the peer is available
			if len(chng.update.ks) > 0 || len(chng.update.haves) > 0 {
				availability[chng.update.from] = true
			}

dirkmc's avatar
dirkmc committed
217 218 219 220 221 222 223 224
			updates = append(updates, chng.update)
		}
		if chng.availability.target != "" {
			availability[chng.availability.target] = chng.availability.available
		}
	}

	// Update peer availability
225
	newlyAvailable, newlyUnavailable := sws.processAvailability(availability)
dirkmc's avatar
dirkmc committed
226 227

	// Update wants
228
	dontHaves := sws.processUpdates(updates)
229 230 231

	// Check if there are any wants for which all peers have indicated they
	// don't have the want
232
	sws.checkForExhaustedWants(dontHaves, newlyUnavailable)
dirkmc's avatar
dirkmc committed
233 234

	// If there are some connected peers, send any pending wants
235 236
	if sws.spm.HasPeers() {
		sws.sendNextWants(newlyAvailable)
dirkmc's avatar
dirkmc committed
237 238 239 240 241
	}
}

// processAvailability updates the want queue with any changes in
// peer availability
242 243 244
// It returns the peers that have become
// - newly available
// - newly unavailable
245
func (sws *sessionWantSender) processAvailability(availability map[peer.ID]bool) (avail []peer.ID, unavail []peer.ID) {
dirkmc's avatar
dirkmc committed
246
	var newlyAvailable []peer.ID
247
	var newlyUnavailable []peer.ID
dirkmc's avatar
dirkmc committed
248
	for p, isNowAvailable := range availability {
249 250 251 252 253 254 255 256 257 258 259 260
		stateChange := false
		if isNowAvailable {
			isNewPeer := sws.spm.AddPeer(p)
			if isNewPeer {
				stateChange = true
				newlyAvailable = append(newlyAvailable, p)
			}
		} else {
			wasAvailable := sws.spm.RemovePeer(p)
			if wasAvailable {
				stateChange = true
				newlyUnavailable = append(newlyUnavailable, p)
dirkmc's avatar
dirkmc committed
261 262
			}
		}
263 264 265 266 267 268 269 270

		// If the state has changed
		if stateChange {
			sws.updateWantsPeerAvailability(p, isNowAvailable)
			// Reset the count of consecutive DONT_HAVEs received from the
			// peer
			delete(sws.peerConsecutiveDontHaves, p)
		}
dirkmc's avatar
dirkmc committed
271 272
	}

273
	return newlyAvailable, newlyUnavailable
dirkmc's avatar
dirkmc committed
274 275 276
}

// trackWant creates a new entry in the map of CID -> want info
277 278
func (sws *sessionWantSender) trackWant(c cid.Cid) {
	if _, ok := sws.wants[c]; ok {
dirkmc's avatar
dirkmc committed
279 280 281 282
		return
	}

	// Create the want info
283 284
	wi := newWantInfo(sws.peerRspTrkr)
	sws.wants[c] = wi
dirkmc's avatar
dirkmc committed
285 286 287

	// For each available peer, register any information we know about
	// whether the peer has the block
288 289
	for _, p := range sws.spm.Peers() {
		sws.updateWantBlockPresence(c, p)
dirkmc's avatar
dirkmc committed
290 291 292
	}
}

293 294
// processUpdates processes incoming blocks and HAVE / DONT_HAVEs.
// It returns all DONT_HAVEs.
295
func (sws *sessionWantSender) processUpdates(updates []update) []cid.Cid {
296 297
	// Process received blocks keys
	blkCids := cid.NewSet()
dirkmc's avatar
dirkmc committed
298
	for _, upd := range updates {
299 300
		for _, c := range upd.ks {
			blkCids.Add(c)
Dirk McCormick's avatar
Dirk McCormick committed
301

302 303 304 305 306 307 308 309 310 311
			// Remove the want
			removed := sws.removeWant(c)
			if removed != nil {
				// Inform the peer tracker that this peer was the first to send
				// us the block
				sws.peerRspTrkr.receivedBlockFrom(upd.from)
			}
			delete(sws.peerConsecutiveDontHaves, upd.from)
		}
	}
dirkmc's avatar
dirkmc committed
312

313 314 315 316
	// Process received DONT_HAVEs
	dontHaves := cid.NewSet()
	prunePeers := make(map[peer.ID]struct{})
	for _, upd := range updates {
dirkmc's avatar
dirkmc committed
317
		for _, c := range upd.dontHaves {
318 319 320 321 322 323 324 325 326
			// Track the number of consecutive DONT_HAVEs each peer receives
			if sws.peerConsecutiveDontHaves[upd.from] == peerDontHaveLimit {
				prunePeers[upd.from] = struct{}{}
			} else {
				sws.peerConsecutiveDontHaves[upd.from]++
			}

			// If we already received a block for the want, there's no need to
			// update block presence etc
327 328 329 330
			if blkCids.Has(c) {
				continue
			}

dirkmc's avatar
dirkmc committed
331 332 333
			dontHaves.Add(c)

			// Update the block presence for the peer
334
			sws.updateWantBlockPresence(c, upd.from)
dirkmc's avatar
dirkmc committed
335 336 337

			// Check if the DONT_HAVE is in response to a want-block
			// (could also be in response to want-have)
338
			if sws.swbt.haveSentWantBlockTo(upd.from, c) {
dirkmc's avatar
dirkmc committed
339 340
				// If we were waiting for a response from this peer, clear
				// sentTo so that we can send the want to another peer
341 342
				if sentTo, ok := sws.getWantSentTo(c); ok && sentTo == upd.from {
					sws.setWantSentTo(c, "")
dirkmc's avatar
dirkmc committed
343 344 345
				}
			}
		}
346
	}
dirkmc's avatar
dirkmc committed
347

348 349
	// Process received HAVEs
	for _, upd := range updates {
dirkmc's avatar
dirkmc committed
350
		for _, c := range upd.haves {
351 352 353 354
			// If we haven't already received a block for the want
			if !blkCids.Has(c) {
				// Update the block presence for the peer
				sws.updateWantBlockPresence(c, upd.from)
355 356 357
			}

			// Clear the consecutive DONT_HAVE count for the peer
358
			delete(sws.peerConsecutiveDontHaves, upd.from)
359
			delete(prunePeers, upd.from)
dirkmc's avatar
dirkmc committed
360 361 362
		}
	}

363 364
	// If any peers have sent us too many consecutive DONT_HAVEs, remove them
	// from the session
365 366 367 368 369 370 371
	for p := range prunePeers {
		// Before removing the peer from the session, check if the peer
		// sent us a HAVE for a block that we want
		for c := range sws.wants {
			if sws.bpm.PeerHasBlock(p, c) {
				delete(prunePeers, p)
				break
372 373
			}
		}
374 375
	}
	if len(prunePeers) > 0 {
376 377
		go func() {
			for p := range prunePeers {
378
				// Peer doesn't have anything we want, so remove it
Dirk McCormick's avatar
Dirk McCormick committed
379
				log.Infof("peer %s sent too many dont haves, removing from session %d", p, sws.ID())
380
				sws.SignalAvailability(p, false)
381 382
			}
		}()
383
	}
384 385 386 387 388 389

	return dontHaves.Keys()
}

// checkForExhaustedWants checks if there are any wants for which all peers
// have sent a DONT_HAVE. We call these "exhausted" wants.
390
func (sws *sessionWantSender) checkForExhaustedWants(dontHaves []cid.Cid, newlyUnavailable []peer.ID) {
391 392 393 394 395 396 397 398 399 400 401 402 403
	// If there are no new DONT_HAVEs, and no peers became unavailable, then
	// we don't need to check for exhausted wants
	if len(dontHaves) == 0 && len(newlyUnavailable) == 0 {
		return
	}

	// We need to check each want for which we just received a DONT_HAVE
	wants := dontHaves

	// If a peer just became unavailable, then we need to check all wants
	// (because it may be the last peer who hadn't sent a DONT_HAVE for a CID)
	if len(newlyUnavailable) > 0 {
		// Collect all pending wants
404 405
		wants = make([]cid.Cid, len(sws.wants))
		for c := range sws.wants {
406 407 408 409 410
			wants = append(wants, c)
		}

		// If the last available peer in the session has become unavailable
		// then we need to broadcast all pending wants
411 412
		if !sws.spm.HasPeers() {
			sws.processExhaustedWants(wants)
413 414 415 416 417 418 419
			return
		}
	}

	// If all available peers for a cid sent a DONT_HAVE, signal to the session
	// that we've exhausted available peers
	if len(wants) > 0 {
420 421
		exhausted := sws.bpm.AllPeersDoNotHaveBlock(sws.spm.Peers(), wants)
		sws.processExhaustedWants(exhausted)
422 423 424 425 426
	}
}

// processExhaustedWants filters the list so that only those wants that haven't
// already been marked as exhausted are passed to onPeersExhausted()
427 428
func (sws *sessionWantSender) processExhaustedWants(exhausted []cid.Cid) {
	newlyExhausted := sws.newlyExhausted(exhausted)
429
	if len(newlyExhausted) > 0 {
430
		sws.onPeersExhausted(newlyExhausted)
431
	}
dirkmc's avatar
dirkmc committed
432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
}

// convenience structs for passing around want-blocks and want-haves for a peer
type wantSets struct {
	wantBlocks *cid.Set
	wantHaves  *cid.Set
}

type allWants map[peer.ID]*wantSets

func (aw allWants) forPeer(p peer.ID) *wantSets {
	if _, ok := aw[p]; !ok {
		aw[p] = &wantSets{
			wantBlocks: cid.NewSet(),
			wantHaves:  cid.NewSet(),
		}
	}
	return aw[p]
}

// sendNextWants sends wants to peers according to the latest information
// about which peers have / dont have blocks
454
func (sws *sessionWantSender) sendNextWants(newlyAvailable []peer.ID) {
dirkmc's avatar
dirkmc committed
455 456
	toSend := make(allWants)

457
	for c, wi := range sws.wants {
dirkmc's avatar
dirkmc committed
458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476
		// Ensure we send want-haves to any newly available peers
		for _, p := range newlyAvailable {
			toSend.forPeer(p).wantHaves.Add(c)
		}

		// We already sent a want-block to a peer and haven't yet received a
		// response yet
		if wi.sentTo != "" {
			continue
		}

		// All the peers have indicated that they don't have the block
		// corresponding to this want, so we must wait to discover more peers
		if wi.bestPeer == "" {
			// TODO: work this out in real time instead of using bestP?
			continue
		}

		// Record that we are sending a want-block for this want to the peer
477
		sws.setWantSentTo(c, wi.bestPeer)
dirkmc's avatar
dirkmc committed
478 479 480 481 482

		// Send a want-block to the chosen peer
		toSend.forPeer(wi.bestPeer).wantBlocks.Add(c)

		// Send a want-have to each other peer
483
		for _, op := range sws.spm.Peers() {
dirkmc's avatar
dirkmc committed
484 485 486 487 488 489 490
			if op != wi.bestPeer {
				toSend.forPeer(op).wantHaves.Add(c)
			}
		}
	}

	// Send any wants we've collected
491
	sws.sendWants(toSend)
dirkmc's avatar
dirkmc committed
492 493 494
}

// sendWants sends want-have and want-blocks to the appropriate peers
495
func (sws *sessionWantSender) sendWants(sends allWants) {
dirkmc's avatar
dirkmc committed
496 497 498
	// For each peer we're sending a request to
	for p, snd := range sends {
		// Piggyback some other want-haves onto the request to the peer
499
		for _, c := range sws.getPiggybackWantHaves(p, snd.wantBlocks) {
dirkmc's avatar
dirkmc committed
500 501 502 503 504 505 506 507 508
			snd.wantHaves.Add(c)
		}

		// Send the wants to the peer.
		// Note that the PeerManager ensures that we don't sent duplicate
		// want-haves / want-blocks to a peer, and that want-blocks take
		// precedence over want-haves.
		wblks := snd.wantBlocks.Keys()
		whaves := snd.wantHaves.Keys()
509
		sws.pm.SendWants(sws.ctx, p, wblks, whaves)
dirkmc's avatar
dirkmc committed
510 511

		// Inform the session that we've sent the wants
512
		sws.onSend(p, wblks, whaves)
dirkmc's avatar
dirkmc committed
513 514

		// Record which peers we send want-block to
515
		sws.swbt.addSentWantBlocksTo(p, wblks)
dirkmc's avatar
dirkmc committed
516 517 518 519 520
	}
}

// getPiggybackWantHaves gets the want-haves that should be piggybacked onto
// a request that we are making to send want-blocks to a peer
521
func (sws *sessionWantSender) getPiggybackWantHaves(p peer.ID, wantBlocks *cid.Set) []cid.Cid {
dirkmc's avatar
dirkmc committed
522
	var whs []cid.Cid
523
	for c := range sws.wants {
dirkmc's avatar
dirkmc committed
524 525
		// Don't send want-have if we're already sending a want-block
		// (or have previously)
526
		if !wantBlocks.Has(c) && !sws.swbt.haveSentWantBlockTo(p, c) {
dirkmc's avatar
dirkmc committed
527 528 529 530 531 532 533 534
			whs = append(whs, c)
		}
	}
	return whs
}

// newlyExhausted filters the list of keys for wants that have not already
// been marked as exhausted (all peers indicated they don't have the block)
535
func (sws *sessionWantSender) newlyExhausted(ks []cid.Cid) []cid.Cid {
dirkmc's avatar
dirkmc committed
536 537
	var res []cid.Cid
	for _, c := range ks {
538
		if wi, ok := sws.wants[c]; ok {
dirkmc's avatar
dirkmc committed
539 540 541 542 543 544 545 546 547 548
			if !wi.exhausted {
				res = append(res, c)
				wi.exhausted = true
			}
		}
	}
	return res
}

// removeWant is called when the corresponding block is received
549 550 551
func (sws *sessionWantSender) removeWant(c cid.Cid) *wantInfo {
	if wi, ok := sws.wants[c]; ok {
		delete(sws.wants, c)
dirkmc's avatar
dirkmc committed
552 553 554 555 556 557 558
		return wi
	}
	return nil
}

// updateWantsPeerAvailability is called when the availability changes for a
// peer. It updates all the wants accordingly.
559 560
func (sws *sessionWantSender) updateWantsPeerAvailability(p peer.ID, isNowAvailable bool) {
	for c, wi := range sws.wants {
dirkmc's avatar
dirkmc committed
561
		if isNowAvailable {
562
			sws.updateWantBlockPresence(c, p)
dirkmc's avatar
dirkmc committed
563 564 565 566 567 568 569 570
		} else {
			wi.removePeer(p)
		}
	}
}

// updateWantBlockPresence is called when a HAVE / DONT_HAVE is received for the given
// want / peer
571 572
func (sws *sessionWantSender) updateWantBlockPresence(c cid.Cid, p peer.ID) {
	wi, ok := sws.wants[c]
dirkmc's avatar
dirkmc committed
573 574 575 576 577 578
	if !ok {
		return
	}

	// If the peer sent us a HAVE or DONT_HAVE for the cid, adjust the
	// block presence for the peer / cid combination
579
	if sws.bpm.PeerHasBlock(p, c) {
dirkmc's avatar
dirkmc committed
580
		wi.setPeerBlockPresence(p, BPHave)
581
	} else if sws.bpm.PeerDoesNotHaveBlock(p, c) {
dirkmc's avatar
dirkmc committed
582 583 584 585 586 587 588
		wi.setPeerBlockPresence(p, BPDontHave)
	} else {
		wi.setPeerBlockPresence(p, BPUnknown)
	}
}

// Which peer was the want sent to
589 590
func (sws *sessionWantSender) getWantSentTo(c cid.Cid) (peer.ID, bool) {
	if wi, ok := sws.wants[c]; ok {
dirkmc's avatar
dirkmc committed
591 592 593 594 595 596
		return wi.sentTo, true
	}
	return "", false
}

// Record which peer the want was sent to
597 598
func (sws *sessionWantSender) setWantSentTo(c cid.Cid, p peer.ID) {
	if wi, ok := sws.wants[c]; ok {
dirkmc's avatar
dirkmc committed
599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689
		wi.sentTo = p
	}
}

// wantInfo keeps track of the information for a want
type wantInfo struct {
	// Tracks HAVE / DONT_HAVE sent to us for the want by each peer
	blockPresence map[peer.ID]BlockPresence
	// The peer that we've sent a want-block to (cleared when we get a response)
	sentTo peer.ID
	// The "best" peer to send the want to next
	bestPeer peer.ID
	// Keeps track of how many hits / misses each peer has sent us for wants
	// in the session
	peerRspTrkr *peerResponseTracker
	// true if all known peers have sent a DONT_HAVE for this want
	exhausted bool
}

// func newWantInfo(prt *peerResponseTracker, c cid.Cid, startIndex int) *wantInfo {
func newWantInfo(prt *peerResponseTracker) *wantInfo {
	return &wantInfo{
		blockPresence: make(map[peer.ID]BlockPresence),
		peerRspTrkr:   prt,
		exhausted:     false,
	}
}

// setPeerBlockPresence sets the block presence for the given peer
func (wi *wantInfo) setPeerBlockPresence(p peer.ID, bp BlockPresence) {
	wi.blockPresence[p] = bp
	wi.calculateBestPeer()

	// If a peer informed us that it has a block then make sure the want is no
	// longer flagged as exhausted (exhausted means no peers have the block)
	if bp == BPHave {
		wi.exhausted = false
	}
}

// removePeer deletes the given peer from the want info
func (wi *wantInfo) removePeer(p peer.ID) {
	// If we were waiting to hear back from the peer that is being removed,
	// clear the sentTo field so we no longer wait
	if p == wi.sentTo {
		wi.sentTo = ""
	}
	delete(wi.blockPresence, p)
	wi.calculateBestPeer()
}

// calculateBestPeer finds the best peer to send the want to next
func (wi *wantInfo) calculateBestPeer() {
	// Recalculate the best peer
	bestBP := BPDontHave
	bestPeer := peer.ID("")

	// Find the peer with the best block presence, recording how many peers
	// share the block presence
	countWithBest := 0
	for p, bp := range wi.blockPresence {
		if bp > bestBP {
			bestBP = bp
			bestPeer = p
			countWithBest = 1
		} else if bp == bestBP {
			countWithBest++
		}
	}
	wi.bestPeer = bestPeer

	// If no peer has a block presence better than DONT_HAVE, bail out
	if bestPeer == "" {
		return
	}

	// If there was only one peer with the best block presence, we're done
	if countWithBest <= 1 {
		return
	}

	// There were multiple peers with the best block presence, so choose one of
	// them to be the best
	var peersWithBest []peer.ID
	for p, bp := range wi.blockPresence {
		if bp == bestBP {
			peersWithBest = append(peersWithBest, p)
		}
	}
	wi.bestPeer = wi.peerRspTrkr.choose(peersWithBest)
}