messagequeue_test.go 24.3 KB
Newer Older
1 2 3 4
package messagequeue

import (
	"context"
5
	"fmt"
6 7
	"math"
	"math/rand"
Dirk McCormick's avatar
Dirk McCormick committed
8
	"sync"
9 10 11
	"testing"
	"time"

12
	"github.com/ipfs/go-bitswap/internal/testutil"
13
	pb "github.com/ipfs/go-bitswap/message/pb"
dirkmc's avatar
dirkmc committed
14
	cid "github.com/ipfs/go-cid"
15 16 17

	bsmsg "github.com/ipfs/go-bitswap/message"
	bsnet "github.com/ipfs/go-bitswap/network"
Raúl Kripalani's avatar
Raúl Kripalani committed
18
	peer "github.com/libp2p/go-libp2p-core/peer"
19
	"github.com/libp2p/go-libp2p/p2p/protocol/ping"
20 21 22 23 24 25 26 27 28 29 30 31
)

type fakeMessageNetwork struct {
	connectError       error
	messageSenderError error
	messageSender      bsnet.MessageSender
}

func (fmn *fakeMessageNetwork) ConnectTo(context.Context, peer.ID) error {
	return fmn.connectError
}

Dirk McCormick's avatar
Dirk McCormick committed
32
func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID, *bsnet.MessageSenderOpts) (bsnet.MessageSender, error) {
33 34 35
	if fmn.messageSenderError == nil {
		return fmn.messageSender, nil
	}
36
	return nil, fmn.messageSenderError
37 38
}

39 40 41 42 43 44 45
func (fms *fakeMessageNetwork) Self() peer.ID                 { return "" }
func (fms *fakeMessageNetwork) Latency(peer.ID) time.Duration { return 0 }
func (fms *fakeMessageNetwork) Ping(context.Context, peer.ID) ping.Result {
	return ping.Result{Error: fmt.Errorf("ping error")}
}

type fakeDontHaveTimeoutMgr struct {
46 47 48
	lk          sync.Mutex
	ks          []cid.Cid
	latencyUpds []time.Duration
49 50 51 52 53
}

func (fp *fakeDontHaveTimeoutMgr) Start()    {}
func (fp *fakeDontHaveTimeoutMgr) Shutdown() {}
func (fp *fakeDontHaveTimeoutMgr) AddPending(ks []cid.Cid) {
Dirk McCormick's avatar
Dirk McCormick committed
54 55 56
	fp.lk.Lock()
	defer fp.lk.Unlock()

57 58 59 60 61 62 63
	s := cid.NewSet()
	for _, c := range append(fp.ks, ks...) {
		s.Add(c)
	}
	fp.ks = s.Keys()
}
func (fp *fakeDontHaveTimeoutMgr) CancelPending(ks []cid.Cid) {
Dirk McCormick's avatar
Dirk McCormick committed
64 65 66
	fp.lk.Lock()
	defer fp.lk.Unlock()

67 68 69 70 71 72 73 74 75
	s := cid.NewSet()
	for _, c := range fp.ks {
		s.Add(c)
	}
	for _, c := range ks {
		s.Remove(c)
	}
	fp.ks = s.Keys()
}
76 77 78 79 80 81 82 83 84 85 86 87
func (fp *fakeDontHaveTimeoutMgr) UpdateMessageLatency(elapsed time.Duration) {
	fp.lk.Lock()
	defer fp.lk.Unlock()

	fp.latencyUpds = append(fp.latencyUpds, elapsed)
}
func (fp *fakeDontHaveTimeoutMgr) latencyUpdates() []time.Duration {
	fp.lk.Lock()
	defer fp.lk.Unlock()

	return fp.latencyUpds
}
Dirk McCormick's avatar
Dirk McCormick committed
88 89 90 91 92 93
func (fp *fakeDontHaveTimeoutMgr) pendingCount() int {
	fp.lk.Lock()
	defer fp.lk.Unlock()

	return len(fp.ks)
}
dirkmc's avatar
dirkmc committed
94

95
type fakeMessageSender struct {
Dirk McCormick's avatar
Dirk McCormick committed
96
	lk           sync.Mutex
97
	reset        chan<- struct{}
Dirk McCormick's avatar
Dirk McCormick committed
98
	messagesSent chan<- []bsmsg.Entry
dirkmc's avatar
dirkmc committed
99
	supportsHave bool
100 101
}

102
func newFakeMessageSender(reset chan<- struct{},
Dirk McCormick's avatar
Dirk McCormick committed
103
	messagesSent chan<- []bsmsg.Entry, supportsHave bool) *fakeMessageSender {
Dirk McCormick's avatar
Dirk McCormick committed
104 105 106 107 108 109 110 111

	return &fakeMessageSender{
		reset:        reset,
		messagesSent: messagesSent,
		supportsHave: supportsHave,
	}
}

112
func (fms *fakeMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error {
Dirk McCormick's avatar
Dirk McCormick committed
113 114 115
	fms.lk.Lock()
	defer fms.lk.Unlock()

Dirk McCormick's avatar
Dirk McCormick committed
116
	fms.messagesSent <- msg.Wantlist()
dirkmc's avatar
dirkmc committed
117
	return nil
118
}
119
func (fms *fakeMessageSender) Close() error       { return nil }
dirkmc's avatar
dirkmc committed
120 121
func (fms *fakeMessageSender) Reset() error       { fms.reset <- struct{}{}; return nil }
func (fms *fakeMessageSender) SupportsHave() bool { return fms.supportsHave }
122

123 124
func mockTimeoutCb(peer.ID, []cid.Cid) {}

125 126
func collectMessages(ctx context.Context,
	t *testing.T,
Dirk McCormick's avatar
Dirk McCormick committed
127 128 129
	messagesSent <-chan []bsmsg.Entry,
	timeout time.Duration) [][]bsmsg.Entry {
	var messagesReceived [][]bsmsg.Entry
130 131 132 133 134 135 136 137 138 139 140 141
	timeoutctx, cancel := context.WithTimeout(ctx, timeout)
	defer cancel()
	for {
		select {
		case messageReceived := <-messagesSent:
			messagesReceived = append(messagesReceived, messageReceived)
		case <-timeoutctx.Done():
			return messagesReceived
		}
	}
}

Dirk McCormick's avatar
Dirk McCormick committed
142
func totalEntriesLength(messages [][]bsmsg.Entry) int {
143
	totalLength := 0
Dirk McCormick's avatar
Dirk McCormick committed
144 145
	for _, m := range messages {
		totalLength += len(m)
146 147 148 149 150 151
	}
	return totalLength
}

func TestStartupAndShutdown(t *testing.T) {
	ctx := context.Background()
Dirk McCormick's avatar
Dirk McCormick committed
152
	messagesSent := make(chan []bsmsg.Entry)
153
	resetChan := make(chan struct{}, 1)
154
	fakeSender := newFakeMessageSender(resetChan, messagesSent, true)
155 156
	fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
	peerID := testutil.GeneratePeers(1)[0]
157
	messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
dirkmc's avatar
dirkmc committed
158
	bcstwh := testutil.GenerateCids(10)
159

160
	messageQueue.Startup()
dirkmc's avatar
dirkmc committed
161
	messageQueue.AddBroadcastWantHaves(bcstwh)
162 163
	messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
	if len(messages) != 1 {
dirkmc's avatar
dirkmc committed
164
		t.Fatal("wrong number of messages were sent for broadcast want-haves")
165 166 167
	}

	firstMessage := messages[0]
Dirk McCormick's avatar
Dirk McCormick committed
168
	if len(firstMessage) != len(bcstwh) {
169 170
		t.Fatal("did not add all wants to want list")
	}
Dirk McCormick's avatar
Dirk McCormick committed
171
	for _, entry := range firstMessage {
172 173 174 175 176 177 178 179 180 181 182 183
		if entry.Cancel {
			t.Fatal("initial add sent cancel entry when it should not have")
		}
	}

	messageQueue.Shutdown()

	timeoutctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
	defer cancel()
	select {
	case <-resetChan:
	case <-timeoutctx.Done():
184
		t.Fatal("message sender should have been reset but wasn't")
185 186 187 188 189
	}
}

func TestSendingMessagesDeduped(t *testing.T) {
	ctx := context.Background()
Dirk McCormick's avatar
Dirk McCormick committed
190
	messagesSent := make(chan []bsmsg.Entry)
191
	resetChan := make(chan struct{}, 1)
192
	fakeSender := newFakeMessageSender(resetChan, messagesSent, true)
193 194
	fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
	peerID := testutil.GeneratePeers(1)[0]
195
	messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
dirkmc's avatar
dirkmc committed
196 197
	wantHaves := testutil.GenerateCids(10)
	wantBlocks := testutil.GenerateCids(10)
198

dirkmc's avatar
dirkmc committed
199 200 201
	messageQueue.Startup()
	messageQueue.AddWants(wantBlocks, wantHaves)
	messageQueue.AddWants(wantBlocks, wantHaves)
202 203
	messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond)

dirkmc's avatar
dirkmc committed
204
	if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks) {
205 206 207 208 209 210
		t.Fatal("Messages were not deduped")
	}
}

func TestSendingMessagesPartialDupe(t *testing.T) {
	ctx := context.Background()
Dirk McCormick's avatar
Dirk McCormick committed
211
	messagesSent := make(chan []bsmsg.Entry)
212
	resetChan := make(chan struct{}, 1)
213
	fakeSender := newFakeMessageSender(resetChan, messagesSent, true)
214 215
	fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
	peerID := testutil.GeneratePeers(1)[0]
216
	messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
dirkmc's avatar
dirkmc committed
217 218
	wantHaves := testutil.GenerateCids(10)
	wantBlocks := testutil.GenerateCids(10)
219

dirkmc's avatar
dirkmc committed
220 221 222
	messageQueue.Startup()
	messageQueue.AddWants(wantBlocks[:8], wantHaves[:8])
	messageQueue.AddWants(wantBlocks[3:], wantHaves[3:])
223 224
	messages := collectMessages(ctx, t, messagesSent, 20*time.Millisecond)

dirkmc's avatar
dirkmc committed
225
	if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks) {
226 227
		t.Fatal("messages were not correctly deduped")
	}
dirkmc's avatar
dirkmc committed
228 229 230 231
}

func TestSendingMessagesPriority(t *testing.T) {
	ctx := context.Background()
Dirk McCormick's avatar
Dirk McCormick committed
232
	messagesSent := make(chan []bsmsg.Entry)
dirkmc's avatar
dirkmc committed
233
	resetChan := make(chan struct{}, 1)
234
	fakeSender := newFakeMessageSender(resetChan, messagesSent, true)
dirkmc's avatar
dirkmc committed
235 236
	fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
	peerID := testutil.GeneratePeers(1)[0]
237
	messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
dirkmc's avatar
dirkmc committed
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
	wantHaves1 := testutil.GenerateCids(5)
	wantHaves2 := testutil.GenerateCids(5)
	wantHaves := append(wantHaves1, wantHaves2...)
	wantBlocks1 := testutil.GenerateCids(5)
	wantBlocks2 := testutil.GenerateCids(5)
	wantBlocks := append(wantBlocks1, wantBlocks2...)

	messageQueue.Startup()
	messageQueue.AddWants(wantBlocks1, wantHaves1)
	messageQueue.AddWants(wantBlocks2, wantHaves2)
	messages := collectMessages(ctx, t, messagesSent, 20*time.Millisecond)

	if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks) {
		t.Fatal("wrong number of wants")
	}
Cory Schwartz's avatar
Cory Schwartz committed
253
	byCid := make(map[cid.Cid]bsmsg.Entry)
Dirk McCormick's avatar
Dirk McCormick committed
254
	for _, entry := range messages[0] {
dirkmc's avatar
dirkmc committed
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
		byCid[entry.Cid] = entry
	}

	// Check that earliest want-haves have highest priority
	for i := range wantHaves {
		if i > 0 {
			if byCid[wantHaves[i]].Priority > byCid[wantHaves[i-1]].Priority {
				t.Fatal("earliest want-haves should have higher priority")
			}
		}
	}

	// Check that earliest want-blocks have highest priority
	for i := range wantBlocks {
		if i > 0 {
			if byCid[wantBlocks[i]].Priority > byCid[wantBlocks[i-1]].Priority {
				t.Fatal("earliest want-blocks should have higher priority")
			}
		}
	}

	// Check that want-haves have higher priority than want-blocks within
	// same group
	for i := range wantHaves1 {
		if i > 0 {
			if byCid[wantHaves[i]].Priority <= byCid[wantBlocks[0]].Priority {
				t.Fatal("want-haves should have higher priority than want-blocks")
			}
		}
	}
285

dirkmc's avatar
dirkmc committed
286 287 288 289 290 291 292 293 294
	// Check that all items in first group have higher priority than first item
	// in second group
	for i := range wantHaves1 {
		if i > 0 {
			if byCid[wantHaves[i]].Priority <= byCid[wantHaves2[0]].Priority {
				t.Fatal("items in first group should have higher priority than items in second group")
			}
		}
	}
295
}
296

dirkmc's avatar
dirkmc committed
297 298
func TestCancelOverridesPendingWants(t *testing.T) {
	ctx := context.Background()
Dirk McCormick's avatar
Dirk McCormick committed
299
	messagesSent := make(chan []bsmsg.Entry)
dirkmc's avatar
dirkmc committed
300
	resetChan := make(chan struct{}, 1)
301
	fakeSender := newFakeMessageSender(resetChan, messagesSent, true)
dirkmc's avatar
dirkmc committed
302 303
	fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
	peerID := testutil.GeneratePeers(1)[0]
304
	messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
305

dirkmc's avatar
dirkmc committed
306 307
	wantHaves := testutil.GenerateCids(2)
	wantBlocks := testutil.GenerateCids(2)
308
	cancels := []cid.Cid{wantBlocks[0], wantHaves[0]}
dirkmc's avatar
dirkmc committed
309 310 311

	messageQueue.Startup()
	messageQueue.AddWants(wantBlocks, wantHaves)
312
	messageQueue.AddCancels(cancels)
dirkmc's avatar
dirkmc committed
313 314
	messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond)

315
	if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks)-len(cancels) {
dirkmc's avatar
dirkmc committed
316 317
		t.Fatal("Wrong message count")
	}
318

319 320
	// Cancelled 1 want-block and 1 want-have before they were sent
	// so that leaves 1 want-block and 1 want-have
Dirk McCormick's avatar
Dirk McCormick committed
321
	wb, wh, cl := filterWantTypes(messages[0])
dirkmc's avatar
dirkmc committed
322 323 324 325 326 327
	if len(wb) != 1 || !wb[0].Equals(wantBlocks[1]) {
		t.Fatal("Expected 1 want-block")
	}
	if len(wh) != 1 || !wh[0].Equals(wantHaves[1]) {
		t.Fatal("Expected 1 want-have")
	}
328 329 330 331 332 333 334 335 336 337 338 339 340 341
	// Cancelled wants before they were sent, so no cancel should be sent
	// to the network
	if len(cl) != 0 {
		t.Fatal("Expected no cancels")
	}

	// Cancel the remaining want-blocks and want-haves
	cancels = append(wantHaves, wantBlocks...)
	messageQueue.AddCancels(cancels)
	messages = collectMessages(ctx, t, messagesSent, 10*time.Millisecond)

	// The remaining 2 cancels should be sent to the network as they are for
	// wants that were sent to the network
	_, _, cl = filterWantTypes(messages[0])
dirkmc's avatar
dirkmc committed
342 343 344 345 346 347
	if len(cl) != 2 {
		t.Fatal("Expected 2 cancels")
	}
}

func TestWantOverridesPendingCancels(t *testing.T) {
348
	ctx := context.Background()
Dirk McCormick's avatar
Dirk McCormick committed
349
	messagesSent := make(chan []bsmsg.Entry)
350
	resetChan := make(chan struct{}, 1)
351
	fakeSender := newFakeMessageSender(resetChan, messagesSent, true)
352 353
	fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
	peerID := testutil.GeneratePeers(1)[0]
354
	messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
355 356 357 358

	cids := testutil.GenerateCids(3)
	wantBlocks := cids[:1]
	wantHaves := cids[1:]
359 360

	messageQueue.Startup()
361 362 363 364

	// Add 1 want-block and 2 want-haves
	messageQueue.AddWants(wantBlocks, wantHaves)

dirkmc's avatar
dirkmc committed
365
	messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
366 367 368
	if totalEntriesLength(messages) != len(wantBlocks)+len(wantHaves) {
		t.Fatal("Wrong message count", totalEntriesLength(messages))
	}
dirkmc's avatar
dirkmc committed
369

370 371 372 373 374 375 376 377
	// Cancel existing wants
	messageQueue.AddCancels(cids)
	// Override one cancel with a want-block (before cancel is sent to network)
	messageQueue.AddWants(cids[:1], []cid.Cid{})

	messages = collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
	if totalEntriesLength(messages) != 3 {
		t.Fatal("Wrong message count", totalEntriesLength(messages))
dirkmc's avatar
dirkmc committed
378 379
	}

380
	// Should send 1 want-block and 2 cancels
Dirk McCormick's avatar
Dirk McCormick committed
381
	wb, wh, cl := filterWantTypes(messages[0])
382
	if len(wb) != 1 {
dirkmc's avatar
dirkmc committed
383 384
		t.Fatal("Expected 1 want-block")
	}
385 386
	if len(wh) != 0 {
		t.Fatal("Expected 0 want-have")
dirkmc's avatar
dirkmc committed
387
	}
388 389
	if len(cl) != 2 {
		t.Fatal("Expected 2 cancels")
dirkmc's avatar
dirkmc committed
390 391 392 393 394
	}
}

func TestWantlistRebroadcast(t *testing.T) {
	ctx := context.Background()
Dirk McCormick's avatar
Dirk McCormick committed
395
	messagesSent := make(chan []bsmsg.Entry)
dirkmc's avatar
dirkmc committed
396
	resetChan := make(chan struct{}, 1)
397
	fakeSender := newFakeMessageSender(resetChan, messagesSent, true)
dirkmc's avatar
dirkmc committed
398 399
	fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
	peerID := testutil.GeneratePeers(1)[0]
400
	messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
dirkmc's avatar
dirkmc committed
401 402 403 404 405 406 407
	bcstwh := testutil.GenerateCids(10)
	wantHaves := testutil.GenerateCids(10)
	wantBlocks := testutil.GenerateCids(10)

	// Add some broadcast want-haves
	messageQueue.Startup()
	messageQueue.AddBroadcastWantHaves(bcstwh)
408 409 410 411 412
	messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
	if len(messages) != 1 {
		t.Fatal("wrong number of messages were sent for initial wants")
	}

dirkmc's avatar
dirkmc committed
413 414
	// All broadcast want-haves should have been sent
	firstMessage := messages[0]
Dirk McCormick's avatar
Dirk McCormick committed
415
	if len(firstMessage) != len(bcstwh) {
dirkmc's avatar
dirkmc committed
416 417 418 419
		t.Fatal("wrong number of wants")
	}

	// Tell message queue to rebroadcast after 5ms, then wait 8ms
420
	messageQueue.SetRebroadcastInterval(5 * time.Millisecond)
421
	messages = collectMessages(ctx, t, messagesSent, 8*time.Millisecond)
422
	if len(messages) != 1 {
423
		t.Fatal("wrong number of messages were rebroadcast")
424 425
	}

dirkmc's avatar
dirkmc committed
426 427
	// All the want-haves should have been rebroadcast
	firstMessage = messages[0]
Dirk McCormick's avatar
Dirk McCormick committed
428
	if len(firstMessage) != len(bcstwh) {
dirkmc's avatar
dirkmc committed
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443
		t.Fatal("did not rebroadcast all wants")
	}

	// Tell message queue to rebroadcast after a long time (so it doesn't
	// interfere with the next message collection), then send out some
	// regular wants and collect them
	messageQueue.SetRebroadcastInterval(1 * time.Second)
	messageQueue.AddWants(wantBlocks, wantHaves)
	messages = collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
	if len(messages) != 1 {
		t.Fatal("wrong number of messages were rebroadcast")
	}

	// All new wants should have been sent
	firstMessage = messages[0]
Dirk McCormick's avatar
Dirk McCormick committed
444
	if len(firstMessage) != len(wantHaves)+len(wantBlocks) {
dirkmc's avatar
dirkmc committed
445 446 447
		t.Fatal("wrong number of wants")
	}

Dirk McCormick's avatar
Dirk McCormick committed
448 449 450
	// Tell message queue to rebroadcast after 10ms, then wait 15ms
	messageQueue.SetRebroadcastInterval(10 * time.Millisecond)
	messages = collectMessages(ctx, t, messagesSent, 15*time.Millisecond)
dirkmc's avatar
dirkmc committed
451 452 453 454
	firstMessage = messages[0]

	// Both original and new wants should have been rebroadcast
	totalWants := len(bcstwh) + len(wantHaves) + len(wantBlocks)
Dirk McCormick's avatar
Dirk McCormick committed
455
	if len(firstMessage) != totalWants {
dirkmc's avatar
dirkmc committed
456 457 458 459 460 461 462 463 464 465 466 467 468 469
		t.Fatal("did not rebroadcast all wants")
	}

	// Cancel some of the wants
	messageQueue.SetRebroadcastInterval(1 * time.Second)
	cancels := append([]cid.Cid{bcstwh[0]}, wantHaves[0], wantBlocks[0])
	messageQueue.AddCancels(cancels)
	messages = collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
	if len(messages) != 1 {
		t.Fatal("wrong number of messages were rebroadcast")
	}

	// Cancels for each want should have been sent
	firstMessage = messages[0]
Dirk McCormick's avatar
Dirk McCormick committed
470
	if len(firstMessage) != len(cancels) {
dirkmc's avatar
dirkmc committed
471
		t.Fatal("wrong number of cancels")
472
	}
Dirk McCormick's avatar
Dirk McCormick committed
473
	for _, entry := range firstMessage {
dirkmc's avatar
dirkmc committed
474 475 476 477 478
		if !entry.Cancel {
			t.Fatal("expected cancels")
		}
	}

Dirk McCormick's avatar
Dirk McCormick committed
479 480 481
	// Tell message queue to rebroadcast after 10ms, then wait 15ms
	messageQueue.SetRebroadcastInterval(10 * time.Millisecond)
	messages = collectMessages(ctx, t, messagesSent, 15*time.Millisecond)
dirkmc's avatar
dirkmc committed
482
	firstMessage = messages[0]
Dirk McCormick's avatar
Dirk McCormick committed
483
	if len(firstMessage) != totalWants-len(cancels) {
dirkmc's avatar
dirkmc committed
484 485 486 487 488 489
		t.Fatal("did not rebroadcast all wants")
	}
}

func TestSendingLargeMessages(t *testing.T) {
	ctx := context.Background()
Dirk McCormick's avatar
Dirk McCormick committed
490
	messagesSent := make(chan []bsmsg.Entry)
dirkmc's avatar
dirkmc committed
491
	resetChan := make(chan struct{}, 1)
492
	fakeSender := newFakeMessageSender(resetChan, messagesSent, true)
dirkmc's avatar
dirkmc committed
493
	fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
494
	dhtm := &fakeDontHaveTimeoutMgr{}
dirkmc's avatar
dirkmc committed
495 496 497 498 499
	peerID := testutil.GeneratePeers(1)[0]

	wantBlocks := testutil.GenerateCids(10)
	entrySize := 44
	maxMsgSize := entrySize * 3 // 3 wants
500
	messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMsgSize, sendErrorBackoff, maxValidLatency, dhtm)
dirkmc's avatar
dirkmc committed
501 502 503

	messageQueue.Startup()
	messageQueue.AddWants(wantBlocks, []cid.Cid{})
504
	messages := collectMessages(ctx, t, messagesSent, 100*time.Millisecond)
dirkmc's avatar
dirkmc committed
505 506 507 508 509 510 511 512 513 514 515 516 517 518

	// want-block has size 44, so with maxMsgSize 44 * 3 (3 want-blocks), then if
	// we send 10 want-blocks we should expect 4 messages:
	// [***] [***] [***] [*]
	if len(messages) != 4 {
		t.Fatal("expected 4 messages to be sent, got", len(messages))
	}
	if totalEntriesLength(messages) != len(wantBlocks) {
		t.Fatal("wrong number of wants")
	}
}

func TestSendToPeerThatDoesntSupportHave(t *testing.T) {
	ctx := context.Background()
Dirk McCormick's avatar
Dirk McCormick committed
519
	messagesSent := make(chan []bsmsg.Entry)
dirkmc's avatar
dirkmc committed
520
	resetChan := make(chan struct{}, 1)
521
	fakeSender := newFakeMessageSender(resetChan, messagesSent, false)
dirkmc's avatar
dirkmc committed
522 523 524
	fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
	peerID := testutil.GeneratePeers(1)[0]

525
	messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
dirkmc's avatar
dirkmc committed
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540
	messageQueue.Startup()

	// If the remote peer doesn't support HAVE / DONT_HAVE messages
	// - want-blocks should be sent normally
	// - want-haves should not be sent
	// - broadcast want-haves should be sent as want-blocks

	// Check broadcast want-haves
	bcwh := testutil.GenerateCids(10)
	messageQueue.AddBroadcastWantHaves(bcwh)
	messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond)

	if len(messages) != 1 {
		t.Fatal("wrong number of messages were sent", len(messages))
	}
Dirk McCormick's avatar
Dirk McCormick committed
541
	wl := messages[0]
dirkmc's avatar
dirkmc committed
542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559
	if len(wl) != len(bcwh) {
		t.Fatal("wrong number of entries in wantlist", len(wl))
	}
	for _, entry := range wl {
		if entry.WantType != pb.Message_Wantlist_Block {
			t.Fatal("broadcast want-haves should be sent as want-blocks")
		}
	}

	// Check regular want-haves and want-blocks
	wbs := testutil.GenerateCids(10)
	whs := testutil.GenerateCids(10)
	messageQueue.AddWants(wbs, whs)
	messages = collectMessages(ctx, t, messagesSent, 10*time.Millisecond)

	if len(messages) != 1 {
		t.Fatal("wrong number of messages were sent", len(messages))
	}
Dirk McCormick's avatar
Dirk McCormick committed
560
	wl = messages[0]
dirkmc's avatar
dirkmc committed
561 562 563 564 565 566 567 568 569 570
	if len(wl) != len(wbs) {
		t.Fatal("should only send want-blocks (no want-haves)", len(wl))
	}
	for _, entry := range wl {
		if entry.WantType != pb.Message_Wantlist_Block {
			t.Fatal("should only send want-blocks")
		}
	}
}

571 572
func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) {
	ctx := context.Background()
Dirk McCormick's avatar
Dirk McCormick committed
573
	messagesSent := make(chan []bsmsg.Entry)
574
	resetChan := make(chan struct{}, 1)
575
	fakeSender := newFakeMessageSender(resetChan, messagesSent, false)
576 577 578 579
	fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
	peerID := testutil.GeneratePeers(1)[0]

	dhtm := &fakeDontHaveTimeoutMgr{}
580
	messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm)
581 582 583 584 585 586 587
	messageQueue.Startup()

	wbs := testutil.GenerateCids(10)
	messageQueue.AddWants(wbs, nil)
	collectMessages(ctx, t, messagesSent, 10*time.Millisecond)

	// Check want-blocks are added to DontHaveTimeoutMgr
Dirk McCormick's avatar
Dirk McCormick committed
588
	if dhtm.pendingCount() != len(wbs) {
589 590 591 592 593 594 595 596
		t.Fatal("want-blocks not added to DontHaveTimeoutMgr")
	}

	cancelCount := 2
	messageQueue.AddCancels(wbs[:cancelCount])
	collectMessages(ctx, t, messagesSent, 10*time.Millisecond)

	// Check want-blocks are removed from DontHaveTimeoutMgr
Dirk McCormick's avatar
Dirk McCormick committed
597
	if dhtm.pendingCount() != len(wbs)-cancelCount {
598 599 600 601
		t.Fatal("want-blocks not removed from DontHaveTimeoutMgr")
	}
}

602 603 604 605 606 607 608 609 610
func TestResponseReceived(t *testing.T) {
	ctx := context.Background()
	messagesSent := make(chan []bsmsg.Entry)
	resetChan := make(chan struct{}, 1)
	fakeSender := newFakeMessageSender(resetChan, messagesSent, false)
	fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
	peerID := testutil.GeneratePeers(1)[0]

	dhtm := &fakeDontHaveTimeoutMgr{}
611
	messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm)
612 613 614 615 616 617 618 619 620 621 622 623 624
	messageQueue.Startup()

	cids := testutil.GenerateCids(10)

	// Add some wants and wait 10ms
	messageQueue.AddWants(cids[:5], nil)
	collectMessages(ctx, t, messagesSent, 10*time.Millisecond)

	// Add some wants and wait another 10ms
	messageQueue.AddWants(cids[5:8], nil)
	collectMessages(ctx, t, messagesSent, 10*time.Millisecond)

	// Receive a response for some of the wants from both groups
Dirk McCormick's avatar
Dirk McCormick committed
625
	messageQueue.ResponseReceived([]cid.Cid{cids[0], cids[6], cids[9]})
626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641

	// Wait a short time for processing
	time.Sleep(10 * time.Millisecond)

	// Check that message queue informs DHTM of received responses
	upds := dhtm.latencyUpdates()
	if len(upds) != 1 {
		t.Fatal("expected one latency update")
	}
	// Elapsed time should be between when the first want was sent and the
	// response received (about 20ms)
	if upds[0] < 15*time.Millisecond || upds[0] > 25*time.Millisecond {
		t.Fatal("expected latency to be time since oldest message sent")
	}
}

642 643 644 645 646 647 648 649 650
func TestResponseReceivedAppliesForFirstResponseOnly(t *testing.T) {
	ctx := context.Background()
	messagesSent := make(chan []bsmsg.Entry)
	resetChan := make(chan struct{}, 1)
	fakeSender := newFakeMessageSender(resetChan, messagesSent, false)
	fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
	peerID := testutil.GeneratePeers(1)[0]

	dhtm := &fakeDontHaveTimeoutMgr{}
651
	messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm)
652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685
	messageQueue.Startup()

	cids := testutil.GenerateCids(2)

	// Add some wants and wait 10ms
	messageQueue.AddWants(cids, nil)
	collectMessages(ctx, t, messagesSent, 10*time.Millisecond)

	// Receive a response for the wants
	messageQueue.ResponseReceived(cids)

	// Wait another 10ms
	time.Sleep(10 * time.Millisecond)

	// Message queue should inform DHTM of first response
	upds := dhtm.latencyUpdates()
	if len(upds) != 1 {
		t.Fatal("expected one latency update")
	}

	// Receive a second response for the same wants
	messageQueue.ResponseReceived(cids)

	// Wait for the response to be processed by the message queue
	time.Sleep(10 * time.Millisecond)

	// Message queue should not inform DHTM of second response because the
	// CIDs are a subset of the first response
	upds = dhtm.latencyUpdates()
	if len(upds) != 1 {
		t.Fatal("expected one latency update")
	}
}

686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727
func TestResponseReceivedDiscardsOutliers(t *testing.T) {
	ctx := context.Background()
	messagesSent := make(chan []bsmsg.Entry)
	resetChan := make(chan struct{}, 1)
	fakeSender := newFakeMessageSender(resetChan, messagesSent, false)
	fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
	peerID := testutil.GeneratePeers(1)[0]

	maxValLatency := 30 * time.Millisecond
	dhtm := &fakeDontHaveTimeoutMgr{}
	messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValLatency, dhtm)
	messageQueue.Startup()

	cids := testutil.GenerateCids(4)

	// Add some wants and wait 20ms
	messageQueue.AddWants(cids[:2], nil)
	collectMessages(ctx, t, messagesSent, 20*time.Millisecond)

	// Add some more wants and wait long enough that the first wants will be
	// outside the maximum valid latency, but the second wants will be inside
	messageQueue.AddWants(cids[2:], nil)
	collectMessages(ctx, t, messagesSent, maxValLatency-10*time.Millisecond)

	// Receive a response for the wants
	messageQueue.ResponseReceived(cids)

	// Wait for the response to be processed by the message queue
	time.Sleep(10 * time.Millisecond)

	// Check that the latency calculation excludes the first wants
	// (because they're older than max valid latency)
	upds := dhtm.latencyUpdates()
	if len(upds) != 1 {
		t.Fatal("expected one latency update")
	}
	// Elapsed time should not include outliers
	if upds[0] > maxValLatency {
		t.Fatal("expected latency calculation to discard outliers")
	}
}

dirkmc's avatar
dirkmc committed
728 729 730 731 732 733 734 735 736 737 738
func filterWantTypes(wantlist []bsmsg.Entry) ([]cid.Cid, []cid.Cid, []cid.Cid) {
	var wbs []cid.Cid
	var whs []cid.Cid
	var cls []cid.Cid
	for _, e := range wantlist {
		if e.Cancel {
			cls = append(cls, e.Cid)
		} else if e.WantType == pb.Message_Wantlist_Block {
			wbs = append(wbs, e.Cid)
		} else {
			whs = append(whs, e.Cid)
739 740
		}
	}
dirkmc's avatar
dirkmc committed
741
	return wbs, whs, cls
742
}
743 744 745 746 747 748

// Simplistic benchmark to allow us to simulate conditions on the gateways
func BenchmarkMessageQueue(b *testing.B) {
	ctx := context.Background()

	createQueue := func() *MessageQueue {
Dirk McCormick's avatar
Dirk McCormick committed
749
		messagesSent := make(chan []bsmsg.Entry)
750
		resetChan := make(chan struct{}, 1)
751
		fakeSender := newFakeMessageSender(resetChan, messagesSent, true)
752 753 754 755
		fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
		dhtm := &fakeDontHaveTimeoutMgr{}
		peerID := testutil.GeneratePeers(1)[0]

756
		messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm)
757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797
		messageQueue.Startup()

		go func() {
			for {
				<-messagesSent
				time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
			}
		}()

		return messageQueue
	}

	// Create a handful of message queues to start with
	var qs []*MessageQueue
	for i := 0; i < 5; i++ {
		qs = append(qs, createQueue())
	}

	for n := 0; n < b.N; n++ {
		// Create a new message queue every 10 ticks
		if n%10 == 0 {
			qs = append(qs, createQueue())
		}

		// Pick a random message queue, favoring those created later
		qn := len(qs)
		i := int(math.Floor(float64(qn) * float64(1-rand.Float32()*rand.Float32())))
		if i >= qn { // because of floating point math
			i = qn - 1
		}

		// Alternately add either a few wants or a lot of broadcast wants
		if rand.Intn(2) == 0 {
			wants := testutil.GenerateCids(10)
			qs[i].AddWants(wants[:2], wants[2:])
		} else {
			wants := testutil.GenerateCids(60)
			qs[i].AddBroadcastWantHaves(wants)
		}
	}
}