messagequeue.go 23 KB
Newer Older
1 2 3 4
package messagequeue

import (
	"context"
dirkmc's avatar
dirkmc committed
5
	"math"
6
	"sync"
7 8 9
	"time"

	bsmsg "github.com/ipfs/go-bitswap/message"
dirkmc's avatar
dirkmc committed
10
	pb "github.com/ipfs/go-bitswap/message/pb"
11
	bsnet "github.com/ipfs/go-bitswap/network"
dirkmc's avatar
dirkmc committed
12 13
	bswl "github.com/ipfs/go-bitswap/wantlist"
	cid "github.com/ipfs/go-cid"
14
	logging "github.com/ipfs/go-log"
Raúl Kripalani's avatar
Raúl Kripalani committed
15
	peer "github.com/libp2p/go-libp2p-core/peer"
16
	"github.com/libp2p/go-libp2p/p2p/protocol/ping"
Dirk McCormick's avatar
Dirk McCormick committed
17
	"go.uber.org/zap"
18 19 20
)

var log = logging.Logger("bitswap")
Dirk McCormick's avatar
Dirk McCormick committed
21
var sflog = log.Desugar()
22

23 24
const (
	defaultRebroadcastInterval = 30 * time.Second
dirkmc's avatar
dirkmc committed
25 26
	// maxRetries is the number of times to attempt to send a message before
	// giving up
27 28
	maxRetries  = 3
	sendTimeout = 30 * time.Second
dirkmc's avatar
dirkmc committed
29 30 31 32 33 34 35 36 37
	// maxMessageSize is the maximum message size in bytes
	maxMessageSize = 1024 * 1024 * 2
	// sendErrorBackoff is the time to wait before retrying to connect after
	// an error when trying to send a message
	sendErrorBackoff = 100 * time.Millisecond
	// maxPriority is the max priority as defined by the bitswap protocol
	maxPriority = math.MaxInt32
	// sendMessageDebounce is the debounce duration when calling sendMessage()
	sendMessageDebounce = time.Millisecond
38 39
	// when we reach sendMessageCutoff wants/cancels, we'll send the message immediately.
	sendMessageCutoff = 256
Steven Allen's avatar
Steven Allen committed
40 41
	// when we debounce for more than sendMessageMaxDelay, we'll send the
	// message immediately.
42
	sendMessageMaxDelay = 20 * time.Millisecond
43 44 45
	// The maximum amount of time in which to accept a response as being valid
	// for latency calculation (as opposed to discarding it as an outlier)
	maxValidLatency = 30 * time.Second
46
)
47

48
// MessageNetwork is any network that can connect peers and generate a message
49
// sender.
50 51
type MessageNetwork interface {
	ConnectTo(context.Context, peer.ID) error
52
	NewMessageSender(context.Context, peer.ID, *bsnet.MessageSenderOpts) (bsnet.MessageSender, error)
53 54
	Latency(peer.ID) time.Duration
	Ping(context.Context, peer.ID) ping.Result
55
	Self() peer.ID
56 57
}

58
// MessageQueue implements queue of want messages to send to peers.
59
type MessageQueue struct {
60 61 62 63 64 65 66 67 68 69 70 71
	ctx          context.Context
	shutdown     func()
	p            peer.ID
	network      MessageNetwork
	dhTimeoutMgr DontHaveTimeoutManager

	// The maximum size of a message in bytes. Any overflow is put into the
	// next message
	maxMessageSize int

	// The amount of time to wait when there's an error sending to a peer
	// before retrying
dirkmc's avatar
dirkmc committed
72 73
	sendErrorBackoff time.Duration

74 75 76 77
	// The maximum amount of time in which to accept a response as being valid
	// for latency calculation
	maxValidLatency time.Duration

Dirk McCormick's avatar
Dirk McCormick committed
78
	// Signals that there are outgoing wants / cancels ready to be processed
Steven Allen's avatar
Steven Allen committed
79
	outgoingWork chan time.Time
Dirk McCormick's avatar
Dirk McCormick committed
80 81 82

	// Channel of CIDs of blocks / HAVEs / DONT_HAVEs received from the peer
	responses chan []cid.Cid
dirkmc's avatar
dirkmc committed
83 84 85 86 87 88

	// Take lock whenever any of these variables are modified
	wllock    sync.Mutex
	bcstWants recallWantlist
	peerWants recallWantlist
	cancels   *cid.Set
89
	priority  int32
dirkmc's avatar
dirkmc committed
90 91

	// Dont touch any of these variables outside of run loop
92 93 94 95
	sender                bsnet.MessageSender
	rebroadcastIntervalLk sync.RWMutex
	rebroadcastInterval   time.Duration
	rebroadcastTimer      *time.Timer
96 97 98
	// For performance reasons we just clear out the fields of the message
	// instead of creating a new one every time.
	msg bsmsg.BitSwapMessage
99 100
}

101
// recallWantlist keeps a list of pending wants and a list of sent wants
dirkmc's avatar
dirkmc committed
102 103 104
type recallWantlist struct {
	// The list of wants that have not yet been sent
	pending *bswl.Wantlist
105 106
	// The list of wants that have been sent
	sent *bswl.Wantlist
107 108
	// The time at which each want was sent
	sentAt map[cid.Cid]time.Time
dirkmc's avatar
dirkmc committed
109 110 111 112
}

func newRecallWantList() recallWantlist {
	return recallWantlist{
113 114
		pending: bswl.New(),
		sent:    bswl.New(),
115
		sentAt:  make(map[cid.Cid]time.Time),
dirkmc's avatar
dirkmc committed
116 117 118
	}
}

119
// Add want to the pending list
120
func (r *recallWantlist) Add(c cid.Cid, priority int32, wtype pb.Message_Wantlist_WantType) {
dirkmc's avatar
dirkmc committed
121 122 123
	r.pending.Add(c, priority, wtype)
}

124
// Remove wants from both the pending list and the list of sent wants
dirkmc's avatar
dirkmc committed
125 126
func (r *recallWantlist) Remove(c cid.Cid) {
	r.pending.Remove(c)
127 128
	r.sent.Remove(c)
	delete(r.sentAt, c)
dirkmc's avatar
dirkmc committed
129 130
}

131
// Remove wants by type from both the pending list and the list of sent wants
dirkmc's avatar
dirkmc committed
132 133
func (r *recallWantlist) RemoveType(c cid.Cid, wtype pb.Message_Wantlist_WantType) {
	r.pending.RemoveType(c, wtype)
134 135 136 137
	r.sent.RemoveType(c, wtype)
	if _, ok := r.sent.Contains(c); !ok {
		delete(r.sentAt, c)
	}
dirkmc's avatar
dirkmc committed
138 139
}

140
// MarkSent moves the want from the pending to the sent list
141 142 143
//
// Returns true if the want was marked as sent. Returns false if the want wasn't
// pending.
Cory Schwartz's avatar
Cory Schwartz committed
144
func (r *recallWantlist) MarkSent(e bswl.Entry) bool {
145 146 147
	if !r.pending.RemoveType(e.Cid, e.WantType) {
		return false
	}
148
	r.sent.Add(e.Cid, e.Priority, e.WantType)
149
	return true
150 151
}

152 153 154 155 156 157 158 159 160 161
// SentAt records the time at which a want was sent
func (r *recallWantlist) SentAt(c cid.Cid, at time.Time) {
	// The want may have been cancelled in the interim
	if _, ok := r.sent.Contains(c); ok {
		if _, ok := r.sentAt[c]; !ok {
			r.sentAt[c] = at
		}
	}
}

162
// ClearSentAt clears out the record of the time a want was sent.
Dirk McCormick's avatar
Dirk McCormick committed
163 164
// We clear the sent at time when we receive a response for a key as we
// only need the first response for latency measurement.
165 166 167 168
func (r *recallWantlist) ClearSentAt(c cid.Cid) {
	delete(r.sentAt, c)
}

169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
type peerConn struct {
	p       peer.ID
	network MessageNetwork
}

func newPeerConnection(p peer.ID, network MessageNetwork) *peerConn {
	return &peerConn{p, network}
}

func (pc *peerConn) Ping(ctx context.Context) ping.Result {
	return pc.network.Ping(ctx, pc.p)
}

func (pc *peerConn) Latency() time.Duration {
	return pc.network.Latency(pc.p)
}

// Fires when a timeout occurs waiting for a response from a peer running an
// older version of Bitswap that doesn't support DONT_HAVE messages.
type OnDontHaveTimeout func(peer.ID, []cid.Cid)

// DontHaveTimeoutManager pings a peer to estimate latency so it can set a reasonable
// upper bound on when to consider a DONT_HAVE request as timed out (when connected to
// a peer that doesn't support DONT_HAVE messages)
type DontHaveTimeoutManager interface {
	// Start the manager (idempotent)
	Start()
	// Shutdown the manager (Shutdown is final, manager cannot be restarted)
	Shutdown()
	// AddPending adds the wants as pending a response. If the are not
	// cancelled before the timeout, the OnDontHaveTimeout method will be called.
	AddPending([]cid.Cid)
	// CancelPending removes the wants
	CancelPending([]cid.Cid)
Dirk McCormick's avatar
Dirk McCormick committed
203
	// UpdateMessageLatency informs the manager of a new latency measurement
204 205 206
	UpdateMessageLatency(time.Duration)
}

207 208 209
// New creates a new MessageQueue.
func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout) *MessageQueue {
	onTimeout := func(ks []cid.Cid) {
Dirk McCormick's avatar
Dirk McCormick committed
210
		log.Infow("Bitswap: timeout waiting for blocks", "cids", ks, "peer", p)
211 212
		onDontHaveTimeout(p, ks)
	}
213
	dhTimeoutMgr := newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout)
214
	return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr)
dirkmc's avatar
dirkmc committed
215 216 217
}

// This constructor is used by the tests
218 219 220 221 222 223 224 225
func newMessageQueue(
	ctx context.Context,
	p peer.ID,
	network MessageNetwork,
	maxMsgSize int,
	sendErrorBackoff time.Duration,
	maxValidLatency time.Duration,
	dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue {
226

227
	ctx, cancel := context.WithCancel(ctx)
228
	return &MessageQueue{
229
		ctx:                 ctx,
230
		shutdown:            cancel,
231
		p:                   p,
dirkmc's avatar
dirkmc committed
232
		network:             network,
233
		dhTimeoutMgr:        dhTimeoutMgr,
dirkmc's avatar
dirkmc committed
234 235 236 237
		maxMessageSize:      maxMsgSize,
		bcstWants:           newRecallWantList(),
		peerWants:           newRecallWantList(),
		cancels:             cid.NewSet(),
Steven Allen's avatar
Steven Allen committed
238
		outgoingWork:        make(chan time.Time, 1),
Dirk McCormick's avatar
Dirk McCormick committed
239
		responses:           make(chan []cid.Cid, 8),
240
		rebroadcastInterval: defaultRebroadcastInterval,
dirkmc's avatar
dirkmc committed
241
		sendErrorBackoff:    sendErrorBackoff,
242
		maxValidLatency:     maxValidLatency,
dirkmc's avatar
dirkmc committed
243
		priority:            maxPriority,
244 245 246
		// For performance reasons we just clear out the fields of the message
		// after using it, instead of creating a new one every time.
		msg: bsmsg.New(false),
247 248 249
	}
}

dirkmc's avatar
dirkmc committed
250 251 252
// Add want-haves that are part of a broadcast to all connected peers
func (mq *MessageQueue) AddBroadcastWantHaves(wantHaves []cid.Cid) {
	if len(wantHaves) == 0 {
253 254
		return
	}
dirkmc's avatar
dirkmc committed
255 256 257 258 259 260 261 262 263 264 265

	mq.wllock.Lock()
	defer mq.wllock.Unlock()

	for _, c := range wantHaves {
		mq.bcstWants.Add(c, mq.priority, pb.Message_Wantlist_Have)
		mq.priority--

		// We're adding a want-have for the cid, so clear any pending cancel
		// for the cid
		mq.cancels.Remove(c)
266
	}
dirkmc's avatar
dirkmc committed
267 268 269

	// Schedule a message send
	mq.signalWorkReady()
270 271
}

dirkmc's avatar
dirkmc committed
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
// Add want-haves and want-blocks for the peer for this message queue.
func (mq *MessageQueue) AddWants(wantBlocks []cid.Cid, wantHaves []cid.Cid) {
	if len(wantBlocks) == 0 && len(wantHaves) == 0 {
		return
	}

	mq.wllock.Lock()
	defer mq.wllock.Unlock()

	for _, c := range wantHaves {
		mq.peerWants.Add(c, mq.priority, pb.Message_Wantlist_Have)
		mq.priority--

		// We're adding a want-have for the cid, so clear any pending cancel
		// for the cid
		mq.cancels.Remove(c)
	}
	for _, c := range wantBlocks {
		mq.peerWants.Add(c, mq.priority, pb.Message_Wantlist_Block)
		mq.priority--

		// We're adding a want-block for the cid, so clear any pending cancel
		// for the cid
		mq.cancels.Remove(c)
	}

	// Schedule a message send
	mq.signalWorkReady()
}

// Add cancel messages for the given keys.
func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) {
	if len(cancelKs) == 0 {
		return
	}

308 309 310
	// Cancel any outstanding DONT_HAVE timers
	mq.dhTimeoutMgr.CancelPending(cancelKs)

dirkmc's avatar
dirkmc committed
311 312
	mq.wllock.Lock()

313 314
	workReady := false

315
	// Remove keys from broadcast and peer wants, and add to cancels
dirkmc's avatar
dirkmc committed
316
	for _, c := range cancelKs {
317 318 319 320 321
		// Check if a want for the key was sent
		_, wasSentBcst := mq.bcstWants.sent.Contains(c)
		_, wasSentPeer := mq.peerWants.sent.Contains(c)

		// Remove the want from tracking wantlists
dirkmc's avatar
dirkmc committed
322 323
		mq.bcstWants.Remove(c)
		mq.peerWants.Remove(c)
324 325 326 327 328 329

		// Only send a cancel if a want was sent
		if wasSentBcst || wasSentPeer {
			mq.cancels.Add(c)
			workReady = true
		}
dirkmc's avatar
dirkmc committed
330 331
	}

332 333 334 335
	mq.wllock.Unlock()

	// Unlock first to be nice to the scheduler.

dirkmc's avatar
dirkmc committed
336
	// Schedule a message send
337 338 339
	if workReady {
		mq.signalWorkReady()
	}
340 341
}

342 343 344
// ResponseReceived is called when a message is received from the network.
// ks is the set of blocks, HAVEs and DONT_HAVEs in the message
// Note that this is just used to calculate latency.
Dirk McCormick's avatar
Dirk McCormick committed
345
func (mq *MessageQueue) ResponseReceived(ks []cid.Cid) {
346 347 348 349 350 351 352
	if len(ks) == 0 {
		return
	}

	// These messages are just used to approximate latency, so if we get so
	// many responses that they get backed up, just ignore the overflow.
	select {
Dirk McCormick's avatar
Dirk McCormick committed
353
	case mq.responses <- ks:
354 355 356 357
	default:
	}
}

358 359 360 361
// SetRebroadcastInterval sets a new interval on which to rebroadcast the full wantlist
func (mq *MessageQueue) SetRebroadcastInterval(delay time.Duration) {
	mq.rebroadcastIntervalLk.Lock()
	mq.rebroadcastInterval = delay
362 363 364
	if mq.rebroadcastTimer != nil {
		mq.rebroadcastTimer.Reset(delay)
	}
365
	mq.rebroadcastIntervalLk.Unlock()
366
}
367

dirkmc's avatar
dirkmc committed
368
// Startup starts the processing of messages and rebroadcasting.
369
func (mq *MessageQueue) Startup() {
370 371 372
	mq.rebroadcastIntervalLk.RLock()
	mq.rebroadcastTimer = time.NewTimer(mq.rebroadcastInterval)
	mq.rebroadcastIntervalLk.RUnlock()
373
	go mq.runQueue()
374 375
}

376
// Shutdown stops the processing of messages for a message queue.
377
func (mq *MessageQueue) Shutdown() {
378
	mq.shutdown()
379
}
380

381 382 383
func (mq *MessageQueue) onShutdown() {
	// Shut down the DONT_HAVE timeout manager
	mq.dhTimeoutMgr.Shutdown()
384 385 386 387 388

	// Reset the streamMessageSender
	if mq.sender != nil {
		_ = mq.sender.Reset()
	}
389 390
}

391
func (mq *MessageQueue) runQueue() {
392 393
	defer mq.onShutdown()

Steven Allen's avatar
Steven Allen committed
394 395 396
	// Create a timer for debouncing scheduled work.
	scheduleWork := time.NewTimer(0)
	if !scheduleWork.Stop() {
397 398
		// Need to drain the timer if Stop() returns false
		// See: https://golang.org/pkg/time/#Timer.Stop
Steven Allen's avatar
Steven Allen committed
399 400 401 402
		<-scheduleWork.C
	}

	var workScheduled time.Time
403
	for mq.ctx.Err() == nil {
404
		select {
405 406
		case <-mq.rebroadcastTimer.C:
			mq.rebroadcastWantlist()
407

Steven Allen's avatar
Steven Allen committed
408 409 410 411 412 413 414 415
		case when := <-mq.outgoingWork:
			// If we have work scheduled, cancel the timer. If we
			// don't, record when the work was scheduled.
			// We send the time on the channel so we accurately
			// track delay.
			if workScheduled.IsZero() {
				workScheduled = when
			} else if !scheduleWork.Stop() {
416
				// Need to drain the timer if Stop() returns false
Steven Allen's avatar
Steven Allen committed
417 418 419 420 421
				<-scheduleWork.C
			}

			// If we have too many updates and/or we've waited too
			// long, send immediately.
422
			if mq.pendingWorkCount() > sendMessageCutoff ||
Steven Allen's avatar
Steven Allen committed
423 424 425 426 427 428 429
				time.Since(workScheduled) >= sendMessageMaxDelay {
				mq.sendIfReady()
				workScheduled = time.Time{}
			} else {
				// Otherwise, extend the timer.
				scheduleWork.Reset(sendMessageDebounce)
			}
430

Steven Allen's avatar
Steven Allen committed
431 432 433 434
		case <-scheduleWork.C:
			// We have work scheduled and haven't seen any updates
			// in sendMessageDebounce. Send immediately.
			workScheduled = time.Time{}
dirkmc's avatar
dirkmc committed
435
			mq.sendIfReady()
436 437 438 439 440

		case res := <-mq.responses:
			// We received a response from the peer, calculate latency
			mq.handleResponse(res)

441
		case <-mq.ctx.Done():
442 443 444 445 446
			return
		}
	}
}

dirkmc's avatar
dirkmc committed
447
// Periodically resend the list of wants to the peer
448 449 450 451 452
func (mq *MessageQueue) rebroadcastWantlist() {
	mq.rebroadcastIntervalLk.RLock()
	mq.rebroadcastTimer.Reset(mq.rebroadcastInterval)
	mq.rebroadcastIntervalLk.RUnlock()

dirkmc's avatar
dirkmc committed
453 454 455 456 457
	// If some wants were transferred from the rebroadcast list
	if mq.transferRebroadcastWants() {
		// Send them out
		mq.sendMessage()
	}
458 459
}

dirkmc's avatar
dirkmc committed
460 461 462 463
// Transfer wants from the rebroadcast lists into the pending lists.
func (mq *MessageQueue) transferRebroadcastWants() bool {
	mq.wllock.Lock()
	defer mq.wllock.Unlock()
464

dirkmc's avatar
dirkmc committed
465
	// Check if there are any wants to rebroadcast
466
	if mq.bcstWants.sent.Len() == 0 && mq.peerWants.sent.Len() == 0 {
dirkmc's avatar
dirkmc committed
467
		return false
468
	}
dirkmc's avatar
dirkmc committed
469

470 471 472
	// Copy sent wants into pending wants lists
	mq.bcstWants.pending.Absorb(mq.bcstWants.sent)
	mq.peerWants.pending.Absorb(mq.peerWants.sent)
dirkmc's avatar
dirkmc committed
473 474

	return true
475 476
}

Steven Allen's avatar
Steven Allen committed
477
func (mq *MessageQueue) signalWorkReady() {
dirkmc's avatar
dirkmc committed
478
	select {
Steven Allen's avatar
Steven Allen committed
479
	case mq.outgoingWork <- time.Now():
dirkmc's avatar
dirkmc committed
480 481
	default:
	}
482 483
}

dirkmc's avatar
dirkmc committed
484 485 486
func (mq *MessageQueue) sendIfReady() {
	if mq.hasPendingWork() {
		mq.sendMessage()
487
	}
dirkmc's avatar
dirkmc committed
488
}
489

dirkmc's avatar
dirkmc committed
490
func (mq *MessageQueue) sendMessage() {
491
	sender, err := mq.initializeSender()
492
	if err != nil {
493 494 495
		// If we fail to initialize the sender, the networking layer will
		// emit a Disconnect event and the MessageQueue will get cleaned up
		log.Infof("Could not open message sender to peer %s: %s", mq.p, err)
496
		mq.Shutdown()
497
		return
498 499
	}

500
	// Make sure the DONT_HAVE timeout manager has started
501 502
	// Note: Start is idempotent
	mq.dhTimeoutMgr.Start()
503

dirkmc's avatar
dirkmc committed
504
	// Convert want lists to a Bitswap Message
505
	message, onSent := mq.extractOutgoingMessage(mq.sender.SupportsHave())
506 507 508 509 510

	// After processing the message, clear out its fields to save memory
	defer mq.msg.Reset(false)

	if message.Empty() {
dirkmc's avatar
dirkmc committed
511 512 513
		return
	}

514 515
	wantlist := message.Wantlist()
	mq.logOutgoingMessage(wantlist)
dirkmc's avatar
dirkmc committed
516

517 518 519 520
	if err := sender.SendMsg(mq.ctx, message); err != nil {
		// If the message couldn't be sent, the networking layer will
		// emit a Disconnect event and the MessageQueue will get cleaned up
		log.Infof("Could not send message to peer %s: %s", mq.p, err)
521
		mq.Shutdown()
522 523
		return
	}
dirkmc's avatar
dirkmc committed
524

525 526 527
	// Record sent time so as to calculate message latency
	onSent()

528 529
	// Set a timer to wait for responses
	mq.simulateDontHaveWithTimeout(wantlist)
dirkmc's avatar
dirkmc committed
530

531 532 533 534 535
	// If the message was too big and only a subset of wants could be
	// sent, schedule sending the rest of the wants in the next
	// iteration of the event loop.
	if mq.hasPendingWork() {
		mq.signalWorkReady()
536 537
	}
}
538

539 540 541 542
// If want-block times out, simulate a DONT_HAVE reponse.
// This is necessary when making requests to peers running an older version of
// Bitswap that doesn't support the DONT_HAVE response, and is also useful to
// mitigate getting blocked by a peer that takes a long time to respond.
543
func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) {
544 545
	// Get the CID of each want-block that expects a DONT_HAVE response
	wants := make([]cid.Cid, 0, len(wantlist))
546 547 548

	mq.wllock.Lock()

549 550 551 552 553
	for _, entry := range wantlist {
		if entry.WantType == pb.Message_Wantlist_Block && entry.SendDontHave {
			// Unlikely, but just in case check that the block hasn't been
			// received in the interim
			c := entry.Cid
554
			if _, ok := mq.peerWants.sent.Contains(c); ok {
555 556 557 558 559 560 561 562 563 564 565
				wants = append(wants, c)
			}
		}
	}

	mq.wllock.Unlock()

	// Add wants to DONT_HAVE timeout manager
	mq.dhTimeoutMgr.AddPending(wants)
}

Dirk McCormick's avatar
Dirk McCormick committed
566 567 568
// handleResponse is called when a response is received from the peer,
// with the CIDs of received blocks / HAVEs / DONT_HAVEs
func (mq *MessageQueue) handleResponse(ks []cid.Cid) {
569 570 571 572 573 574 575
	now := time.Now()
	earliest := time.Time{}

	mq.wllock.Lock()

	// Check if the keys in the response correspond to any request that was
	// sent to the peer.
576 577 578 579 580 581 582 583 584
	//
	// - Find the earliest request so as to calculate the longest latency as
	//   we want to be conservative when setting the timeout
	// - Ignore latencies that are very long, as these are likely to be outliers
	//   caused when
	//   - we send a want to peer A
	//   - peer A does not have the block
	//   - peer A later receives the block from peer B
	//   - peer A sends us HAVE / block
Dirk McCormick's avatar
Dirk McCormick committed
585
	for _, c := range ks {
586
		if at, ok := mq.bcstWants.sentAt[c]; ok {
587
			if (earliest.IsZero() || at.Before(earliest)) && now.Sub(at) < mq.maxValidLatency {
588 589 590
				earliest = at
			}
			mq.bcstWants.ClearSentAt(c)
591
		}
592
		if at, ok := mq.peerWants.sentAt[c]; ok {
593
			if (earliest.IsZero() || at.Before(earliest)) && now.Sub(at) < mq.maxValidLatency {
594 595 596 597 598 599
				earliest = at
			}
			// Clear out the sent time for the CID because we only want to
			// record the latency between the request and the first response
			// for that CID (not subsequent responses)
			mq.peerWants.ClearSentAt(c)
600 601 602 603 604 605 606 607 608 609 610
		}
	}

	mq.wllock.Unlock()

	if !earliest.IsZero() {
		// Inform the timeout manager of the calculated latency
		mq.dhTimeoutMgr.UpdateMessageLatency(now.Sub(earliest))
	}
}

611
func (mq *MessageQueue) logOutgoingMessage(wantlist []bsmsg.Entry) {
Dirk McCormick's avatar
Dirk McCormick committed
612
	// Save some CPU cycles and allocations if log level is higher than debug
Steven Allen's avatar
Steven Allen committed
613
	if ce := sflog.Check(zap.DebugLevel, "sent message"); ce == nil {
Dirk McCormick's avatar
Dirk McCormick committed
614 615 616
		return
	}

Dirk McCormick's avatar
Dirk McCormick committed
617
	self := mq.network.Self()
618
	for _, e := range wantlist {
Dirk McCormick's avatar
Dirk McCormick committed
619 620
		if e.Cancel {
			if e.WantType == pb.Message_Wantlist_Have {
Steven Allen's avatar
Steven Allen committed
621 622 623 624 625 626
				log.Debugw("sent message",
					"type", "CANCEL_WANT_HAVE",
					"cid", e.Cid,
					"local", self,
					"to", mq.p,
				)
Dirk McCormick's avatar
Dirk McCormick committed
627
			} else {
Steven Allen's avatar
Steven Allen committed
628 629 630 631 632 633
				log.Debugw("sent message",
					"type", "CANCEL_WANT_BLOCK",
					"cid", e.Cid,
					"local", self,
					"to", mq.p,
				)
Dirk McCormick's avatar
Dirk McCormick committed
634 635 636
			}
		} else {
			if e.WantType == pb.Message_Wantlist_Have {
Steven Allen's avatar
Steven Allen committed
637 638 639 640 641 642
				log.Debugw("sent message",
					"type", "WANT_HAVE",
					"cid", e.Cid,
					"local", self,
					"to", mq.p,
				)
Dirk McCormick's avatar
Dirk McCormick committed
643
			} else {
Steven Allen's avatar
Steven Allen committed
644 645 646 647 648 649
				log.Debugw("sent message",
					"type", "WANT_BLOCK",
					"cid", e.Cid,
					"local", self,
					"to", mq.p,
				)
Dirk McCormick's avatar
Dirk McCormick committed
650 651 652 653
			}
		}
	}
}
dirkmc's avatar
dirkmc committed
654

655
// Whether there is work to be processed
dirkmc's avatar
dirkmc committed
656
func (mq *MessageQueue) hasPendingWork() bool {
Steven Allen's avatar
Steven Allen committed
657 658 659
	return mq.pendingWorkCount() > 0
}

660
// The amount of work that is waiting to be processed
Steven Allen's avatar
Steven Allen committed
661
func (mq *MessageQueue) pendingWorkCount() int {
dirkmc's avatar
dirkmc committed
662 663 664
	mq.wllock.Lock()
	defer mq.wllock.Unlock()

Steven Allen's avatar
Steven Allen committed
665
	return mq.bcstWants.pending.Len() + mq.peerWants.pending.Len() + mq.cancels.Len()
dirkmc's avatar
dirkmc committed
666 667
}

668
// Convert the lists of wants into a Bitswap message
669
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) {
670
	// Get broadcast and regular wantlist entries.
671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692
	mq.wllock.Lock()
	peerEntries := mq.peerWants.pending.Entries()
	bcstEntries := mq.bcstWants.pending.Entries()
	cancels := mq.cancels.Keys()
	if !supportsHave {
		filteredPeerEntries := peerEntries[:0]
		// If the remote peer doesn't support HAVE / DONT_HAVE messages,
		// don't send want-haves (only send want-blocks)
		//
		// Doing this here under the lock makes everything else in this
		// function simpler.
		//
		// TODO: We should _try_ to avoid recording these in the first
		// place if possible.
		for _, e := range peerEntries {
			if e.WantType == pb.Message_Wantlist_Have {
				mq.peerWants.RemoveType(e.Cid, pb.Message_Wantlist_Have)
			} else {
				filteredPeerEntries = append(filteredPeerEntries, e)
			}
		}
		peerEntries = filteredPeerEntries
693
	}
694
	mq.wllock.Unlock()
dirkmc's avatar
dirkmc committed
695

696
	// We prioritize cancels, then regular wants, then broadcast wants.
dirkmc's avatar
dirkmc committed
697

698 699 700 701 702 703
	var (
		msgSize         = 0 // size of message so far
		sentCancels     = 0 // number of cancels in message
		sentPeerEntries = 0 // number of peer entries in message
		sentBcstEntries = 0 // number of broadcast entries in message
	)
dirkmc's avatar
dirkmc committed
704

705
	// Add each cancel to the message
706
	for _, c := range cancels {
707 708 709
		msgSize += mq.msg.Cancel(c)
		sentCancels++

710
		if msgSize >= mq.maxMessageSize {
711
			goto FINISH
712
		}
713
	}
714

715 716 717 718 719 720 721 722
	// Next, add the wants. If we have too many entries to fit into a single
	// message, sort by priority and include the high priority ones first.
	// However, avoid sorting till we really need to as this code is a
	// called frequently.

	// Add each regular want-have / want-block to the message.
	if msgSize+(len(peerEntries)*bsmsg.MaxEntrySize) > mq.maxMessageSize {
		bswl.SortEntries(peerEntries)
dirkmc's avatar
dirkmc committed
723 724
	}

725
	for _, e := range peerEntries {
726 727 728
		msgSize += mq.msg.AddEntry(e.Cid, e.Priority, e.WantType, true)
		sentPeerEntries++

729
		if msgSize >= mq.maxMessageSize {
730
			goto FINISH
731
		}
732
	}
733

734 735 736
	// Add each broadcast want-have to the message.
	if msgSize+(len(bcstEntries)*bsmsg.MaxEntrySize) > mq.maxMessageSize {
		bswl.SortEntries(bcstEntries)
dirkmc's avatar
dirkmc committed
737 738
	}

739
	// Add each broadcast want-have to the message
740
	for _, e := range bcstEntries {
741 742
		// Broadcast wants are sent as want-have
		wantType := pb.Message_Wantlist_Have
dirkmc's avatar
dirkmc committed
743

744 745 746 747 748
		// If the remote peer doesn't support HAVE / DONT_HAVE messages,
		// send a want-block instead
		if !supportsHave {
			wantType = pb.Message_Wantlist_Block
		}
dirkmc's avatar
dirkmc committed
749

750
		msgSize += mq.msg.AddEntry(e.Cid, e.Priority, wantType, false)
751
		sentBcstEntries++
dirkmc's avatar
dirkmc committed
752

753 754 755
		if msgSize >= mq.maxMessageSize {
			goto FINISH
		}
756 757
	}

758 759 760 761 762
FINISH:

	// Finally, re-take the lock, mark sent and remove any entries from our
	// message that we've decided to cancel at the last minute.
	mq.wllock.Lock()
763
	for i, e := range peerEntries[:sentPeerEntries] {
764 765 766
		if !mq.peerWants.MarkSent(e) {
			// It changed.
			mq.msg.Remove(e.Cid)
767
			peerEntries[i].Cid = cid.Undef
768 769 770
		}
	}

771
	for i, e := range bcstEntries[:sentBcstEntries] {
772 773
		if !mq.bcstWants.MarkSent(e) {
			mq.msg.Remove(e.Cid)
774
			bcstEntries[i].Cid = cid.Undef
775 776 777 778 779 780 781 782 783 784 785 786
		}
	}

	for _, c := range cancels[:sentCancels] {
		if !mq.cancels.Has(c) {
			mq.msg.Remove(c)
		} else {
			mq.cancels.Remove(c)
		}
	}
	mq.wllock.Unlock()

787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808
	// When the message has been sent, record the time at which each want was
	// sent so we can calculate message latency
	onSent := func() {
		now := time.Now()

		mq.wllock.Lock()
		defer mq.wllock.Unlock()

		for _, e := range peerEntries[:sentPeerEntries] {
			if e.Cid.Defined() { // Check if want was cancelled in the interim
				mq.peerWants.SentAt(e.Cid, now)
			}
		}

		for _, e := range bcstEntries[:sentBcstEntries] {
			if e.Cid.Defined() { // Check if want was cancelled in the interim
				mq.bcstWants.SentAt(e.Cid, now)
			}
		}
	}

	return mq.msg, onSent
809 810
}

811 812 813 814 815 816 817 818 819 820
func (mq *MessageQueue) initializeSender() (bsnet.MessageSender, error) {
	if mq.sender == nil {
		opts := &bsnet.MessageSenderOpts{
			MaxRetries:       maxRetries,
			SendTimeout:      sendTimeout,
			SendErrorBackoff: sendErrorBackoff,
		}
		nsender, err := mq.network.NewMessageSender(mq.ctx, mq.p, opts)
		if err != nil {
			return nil, err
821
		}
822

823
		mq.sender = nsender
824
	}
825
	return mq.sender, nil
826
}