Unverified Commit 159748c3 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #279 from ipfs/fix/races

fix: races in tests
parents 5d28b384 e12b69e4
......@@ -76,6 +76,10 @@ const (
// the alpha for the EWMA used to track long term usefulness
longTermAlpha = 0.05
// how frequently the engine should sample usefulness. Peers that
// interact every shortTerm time period are considered "active".
shortTerm = 10 * time.Second
// long term ratio defines what "long term" means in terms of the
// shortTerm duration. Peers that interact once every longTermRatio are
// considered useful over the long term.
......@@ -96,14 +100,6 @@ const (
blockstoreWorkerCount = 128
)
var (
// how frequently the engine should sample usefulness. Peers that
// interact every shortTerm time period are considered "active".
//
// this is only a variable to make testing easier.
shortTerm = 10 * time.Second
)
// Envelope contains a message for a Peer.
type Envelope struct {
// Peer is the intended recipient.
......@@ -161,6 +157,9 @@ type Engine struct {
// bytes up to which we will replace a want-have with a want-block
maxBlockSizeReplaceHasWithBlock int
// how frequently the engine should sample peer usefulness
peerSampleInterval time.Duration
sendDontHaves bool
self peer.ID
......@@ -168,11 +167,13 @@ type Engine struct {
// NewEngine creates a new block sending engine for the given block store
func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID) *Engine {
return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock)
return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, shortTerm)
}
// This constructor is used by the tests
func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID, maxReplaceSize int) *Engine {
func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID,
maxReplaceSize int, peerSampleInterval time.Duration) *Engine {
e := &Engine{
ledgerMap: make(map[peer.ID]*ledger),
bsm: newBlockstoreManager(ctx, bs, blockstoreWorkerCount),
......@@ -181,6 +182,7 @@ func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger,
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100),
maxBlockSizeReplaceHasWithBlock: maxReplaceSize,
peerSampleInterval: peerSampleInterval,
taskWorkerCount: taskWorkerCount,
sendDontHaves: true,
self: self,
......@@ -236,7 +238,7 @@ func (e *Engine) StartWorkers(ctx context.Context, px process.Process) {
// adjust it ±25% based on our debt ratio. Peers that have historically been
// more useful to us than we are to them get the highest score.
func (e *Engine) scoreWorker(ctx context.Context) {
ticker := time.NewTicker(shortTerm)
ticker := time.NewTicker(e.peerSampleInterval)
defer ticker.Stop()
type update struct {
......
......@@ -91,10 +91,10 @@ type engineSet struct {
Blockstore blockstore.Blockstore
}
func newTestEngine(ctx context.Context, idStr string) engineSet {
func newTestEngine(ctx context.Context, idStr string, peerSampleInterval time.Duration) engineSet {
fpt := &fakePeerTagger{}
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
e := newEngine(ctx, bs, fpt, "localhost", 0)
e := newEngine(ctx, bs, fpt, "localhost", 0, peerSampleInterval)
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
return engineSet{
Peer: peer.ID(idStr),
......@@ -108,8 +108,8 @@ func newTestEngine(ctx context.Context, idStr string) engineSet {
func TestConsistentAccounting(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sender := newTestEngine(ctx, "Ernie")
receiver := newTestEngine(ctx, "Bert")
sender := newTestEngine(ctx, "Ernie", shortTerm)
receiver := newTestEngine(ctx, "Bert", shortTerm)
// Send messages from Ernie to Bert
for i := 0; i < 1000; i++ {
......@@ -143,8 +143,8 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sanfrancisco := newTestEngine(ctx, "sf")
seattle := newTestEngine(ctx, "sea")
sanfrancisco := newTestEngine(ctx, "sf", shortTerm)
seattle := newTestEngine(ctx, "sea", shortTerm)
m := message.New(true)
......@@ -181,7 +181,7 @@ func peerIsPartner(p peer.ID, e *Engine) bool {
func TestOutboxClosedWhenEngineClosed(t *testing.T) {
ctx := context.Background()
t.SkipNow() // TODO implement *Engine.Close
e := newEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0)
e := newEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0, shortTerm)
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
var wg sync.WaitGroup
wg.Add(1)
......@@ -509,7 +509,7 @@ func TestPartnerWantHaveWantBlockNonActive(t *testing.T) {
testCases = onlyTestCases
}
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
for i, testCase := range testCases {
t.Logf("Test case %d:", i)
......@@ -665,7 +665,7 @@ func TestPartnerWantHaveWantBlockActive(t *testing.T) {
testCases = onlyTestCases
}
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
var next envChan
......@@ -850,7 +850,7 @@ func TestPartnerWantsThenCancels(t *testing.T) {
ctx := context.Background()
for i := 0; i < numRounds; i++ {
expected := make([][]string, 0, len(testcases))
e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0)
e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
for _, testcase := range testcases {
set := testcase[0]
......@@ -875,7 +875,7 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
blks := testutil.GenerateBlocksOfSize(4, 8*1024)
......@@ -919,7 +919,7 @@ func TestSendDontHave(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm)
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
blks := testutil.GenerateBlocksOfSize(4, 8*1024)
......@@ -981,8 +981,8 @@ func TestSendDontHave(t *testing.T) {
func TestTaggingPeers(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
sanfrancisco := newTestEngine(ctx, "sf")
seattle := newTestEngine(ctx, "sea")
sanfrancisco := newTestEngine(ctx, "sf", shortTerm)
seattle := newTestEngine(ctx, "sea", shortTerm)
keys := []string{"a", "b", "c", "d", "e"}
for _, letter := range keys {
......@@ -1007,13 +1007,11 @@ func TestTaggingPeers(t *testing.T) {
}
func TestTaggingUseful(t *testing.T) {
oldShortTerm := shortTerm
shortTerm = 2 * time.Millisecond
defer func() { shortTerm = oldShortTerm }()
peerSampleInterval := 2 * time.Millisecond
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
me := newTestEngine(ctx, "engine")
me := newTestEngine(ctx, "engine", peerSampleInterval)
friend := peer.ID("friend")
block := blocks.NewBlock([]byte("foobar"))
......@@ -1025,21 +1023,21 @@ func TestTaggingUseful(t *testing.T) {
t.Fatal("Peers should be untagged but weren't")
}
me.Engine.MessageSent(friend, msg)
time.Sleep(shortTerm * 2)
time.Sleep(peerSampleInterval * 2)
if me.PeerTagger.count(me.Engine.tagUseful) != 1 {
t.Fatal("Peers should be tagged but weren't")
}
time.Sleep(shortTerm * 8)
time.Sleep(peerSampleInterval * 8)
}
if me.PeerTagger.count(me.Engine.tagUseful) == 0 {
t.Fatal("peers should still be tagged due to long-term usefulness")
}
time.Sleep(shortTerm * 2)
time.Sleep(peerSampleInterval * 2)
if me.PeerTagger.count(me.Engine.tagUseful) == 0 {
t.Fatal("peers should still be tagged due to long-term usefulness")
}
time.Sleep(shortTerm * 20)
time.Sleep(peerSampleInterval * 30)
if me.PeerTagger.count(me.Engine.tagUseful) != 0 {
t.Fatal("peers should finally be untagged")
}
......
......@@ -50,9 +50,24 @@ type timeoutRecorder struct {
func (tr *timeoutRecorder) onTimeout(tks []cid.Cid) {
tr.lk.Lock()
defer tr.lk.Unlock()
tr.timedOutKs = append(tr.timedOutKs, tks...)
}
func (tr *timeoutRecorder) timedOutCount() int {
tr.lk.Lock()
defer tr.lk.Unlock()
return len(tr.timedOutKs)
}
func (tr *timeoutRecorder) clear() {
tr.lk.Lock()
defer tr.lk.Unlock()
tr.timedOutKs = nil
}
func TestDontHaveTimeoutMgrTimeout(t *testing.T) {
firstks := testutil.GenerateCids(2)
secondks := append(firstks, testutil.GenerateCids(3)...)
......@@ -75,7 +90,7 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) {
time.Sleep(expectedTimeout - 5*time.Millisecond)
// At this stage no keys should have timed out
if len(tr.timedOutKs) > 0 {
if tr.timedOutCount() > 0 {
t.Fatal("expected timeout not to have happened yet")
}
......@@ -86,12 +101,12 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) {
time.Sleep(10 * time.Millisecond)
// At this stage first set of keys should have timed out
if len(tr.timedOutKs) != len(firstks) {
if tr.timedOutCount() != len(firstks) {
t.Fatal("expected timeout")
}
// Clear the recorded timed out keys
tr.timedOutKs = nil
tr.clear()
// Sleep until the second set of keys should have timed out
time.Sleep(expectedTimeout)
......@@ -99,7 +114,7 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) {
// At this stage all keys should have timed out. The second set included
// the first set of keys, but they were added before the first set timed
// out, so only the remaining keys should have beed added.
if len(tr.timedOutKs) != len(secondks)-len(firstks) {
if tr.timedOutCount() != len(secondks)-len(firstks) {
t.Fatal("expected second set of keys to timeout")
}
}
......@@ -130,7 +145,7 @@ func TestDontHaveTimeoutMgrCancel(t *testing.T) {
time.Sleep(expectedTimeout)
// At this stage all non-cancelled keys should have timed out
if len(tr.timedOutKs) != len(ks)-cancelCount {
if tr.timedOutCount() != len(ks)-cancelCount {
t.Fatal("expected timeout")
}
}
......@@ -167,7 +182,7 @@ func TestDontHaveTimeoutWantCancelWant(t *testing.T) {
time.Sleep(10 * time.Millisecond)
// At this stage only the key that was never cancelled should have timed out
if len(tr.timedOutKs) != 1 {
if tr.timedOutCount() != 1 {
t.Fatal("expected one key to timeout")
}
......@@ -175,7 +190,7 @@ func TestDontHaveTimeoutWantCancelWant(t *testing.T) {
time.Sleep(latency)
// At this stage the key that was added back should also have timed out
if len(tr.timedOutKs) != 2 {
if tr.timedOutCount() != 2 {
t.Fatal("expected added back key to timeout")
}
}
......@@ -202,7 +217,7 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) {
time.Sleep(latency + 5*time.Millisecond)
// At this stage all keys should have timed out
if len(tr.timedOutKs) != len(ks) {
if tr.timedOutCount() != len(ks) {
t.Fatal("expected timeout")
}
}
......@@ -229,7 +244,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) {
time.Sleep(expectedTimeout - 5*time.Millisecond)
// At this stage no timeout should have happened yet
if len(tr.timedOutKs) > 0 {
if tr.timedOutCount() > 0 {
t.Fatal("expected timeout not to have happened yet")
}
......@@ -237,7 +252,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) {
time.Sleep(10 * time.Millisecond)
// Now the keys should have timed out
if len(tr.timedOutKs) != len(ks) {
if tr.timedOutCount() != len(ks) {
t.Fatal("expected timeout")
}
}
......@@ -263,7 +278,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) {
time.Sleep(defaultTimeout - 5*time.Millisecond)
// At this stage no timeout should have happened yet
if len(tr.timedOutKs) > 0 {
if tr.timedOutCount() > 0 {
t.Fatal("expected timeout not to have happened yet")
}
......@@ -271,7 +286,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) {
time.Sleep(10 * time.Millisecond)
// Now the keys should have timed out
if len(tr.timedOutKs) != len(ks) {
if tr.timedOutCount() != len(ks) {
t.Fatal("expected timeout")
}
}
......@@ -281,17 +296,11 @@ func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) {
latency := time.Millisecond * 10
latMultiplier := 1
expProcessTime := time.Duration(0)
tr := timeoutRecorder{}
ctx := context.Background()
pc := &mockPeerConn{latency: latency}
var lk sync.Mutex
var timedOutKs []cid.Cid
onTimeout := func(tks []cid.Cid) {
lk.Lock()
defer lk.Unlock()
timedOutKs = append(timedOutKs, tks...)
}
dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, onTimeout,
dhtm := newDontHaveTimeoutMgrWithParams(ctx, pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dhtm.Start()
......@@ -308,7 +317,7 @@ func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) {
time.Sleep(10 * time.Millisecond)
// Manager was shut down so timeout should not have fired
if len(timedOutKs) != 0 {
if tr.timedOutCount() != 0 {
t.Fatal("expected no timeout after shutdown")
}
}
......@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"
......@@ -42,12 +43,16 @@ func (fms *fakeMessageNetwork) Ping(context.Context, peer.ID) ping.Result {
}
type fakeDontHaveTimeoutMgr struct {
lk sync.Mutex
ks []cid.Cid
}
func (fp *fakeDontHaveTimeoutMgr) Start() {}
func (fp *fakeDontHaveTimeoutMgr) Shutdown() {}
func (fp *fakeDontHaveTimeoutMgr) AddPending(ks []cid.Cid) {
fp.lk.Lock()
defer fp.lk.Unlock()
s := cid.NewSet()
for _, c := range append(fp.ks, ks...) {
s.Add(c)
......@@ -55,6 +60,9 @@ func (fp *fakeDontHaveTimeoutMgr) AddPending(ks []cid.Cid) {
fp.ks = s.Keys()
}
func (fp *fakeDontHaveTimeoutMgr) CancelPending(ks []cid.Cid) {
fp.lk.Lock()
defer fp.lk.Unlock()
s := cid.NewSet()
for _, c := range fp.ks {
s.Add(c)
......@@ -64,8 +72,15 @@ func (fp *fakeDontHaveTimeoutMgr) CancelPending(ks []cid.Cid) {
}
fp.ks = s.Keys()
}
func (fp *fakeDontHaveTimeoutMgr) pendingCount() int {
fp.lk.Lock()
defer fp.lk.Unlock()
return len(fp.ks)
}
type fakeMessageSender struct {
lk sync.Mutex
sendError error
fullClosed chan<- struct{}
reset chan<- struct{}
......@@ -74,7 +89,23 @@ type fakeMessageSender struct {
supportsHave bool
}
func newFakeMessageSender(sendError error, fullClosed chan<- struct{}, reset chan<- struct{},
messagesSent chan<- bsmsg.BitSwapMessage, sendErrors chan<- error, supportsHave bool) *fakeMessageSender {
return &fakeMessageSender{
sendError: sendError,
fullClosed: fullClosed,
reset: reset,
messagesSent: messagesSent,
sendErrors: sendErrors,
supportsHave: supportsHave,
}
}
func (fms *fakeMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error {
fms.lk.Lock()
defer fms.lk.Unlock()
if fms.sendError != nil {
fms.sendErrors <- fms.sendError
return fms.sendError
......@@ -82,6 +113,12 @@ func (fms *fakeMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMess
fms.messagesSent <- msg
return nil
}
func (fms *fakeMessageSender) clearSendError() {
fms.lk.Lock()
defer fms.lk.Unlock()
fms.sendError = nil
}
func (fms *fakeMessageSender) Close() error { fms.fullClosed <- struct{}{}; return nil }
func (fms *fakeMessageSender) Reset() error { fms.reset <- struct{}{}; return nil }
func (fms *fakeMessageSender) SupportsHave() bool { return fms.supportsHave }
......@@ -119,7 +156,7 @@ func TestStartupAndShutdown(t *testing.T) {
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true}
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
......@@ -161,7 +198,7 @@ func TestSendingMessagesDeduped(t *testing.T) {
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true}
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
......@@ -184,7 +221,7 @@ func TestSendingMessagesPartialDupe(t *testing.T) {
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true}
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
......@@ -207,7 +244,7 @@ func TestSendingMessagesPriority(t *testing.T) {
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true}
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
......@@ -276,7 +313,7 @@ func TestCancelOverridesPendingWants(t *testing.T) {
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true}
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
......@@ -310,7 +347,7 @@ func TestWantOverridesPendingCancels(t *testing.T) {
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true}
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
......@@ -343,7 +380,7 @@ func TestWantlistRebroadcast(t *testing.T) {
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true}
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
......@@ -440,7 +477,7 @@ func TestSendingLargeMessages(t *testing.T) {
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true}
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
dhtm := &fakeDontHaveTimeoutMgr{}
peerID := testutil.GeneratePeers(1)[0]
......@@ -471,7 +508,7 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) {
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, false}
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, false)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
......@@ -527,7 +564,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) {
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, false}
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, false)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
......@@ -540,7 +577,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) {
collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
// Check want-blocks are added to DontHaveTimeoutMgr
if len(dhtm.ks) != len(wbs) {
if dhtm.pendingCount() != len(wbs) {
t.Fatal("want-blocks not added to DontHaveTimeoutMgr")
}
......@@ -549,7 +586,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) {
collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
// Check want-blocks are removed from DontHaveTimeoutMgr
if len(dhtm.ks) != len(wbs)-cancelCount {
if dhtm.pendingCount() != len(wbs)-cancelCount {
t.Fatal("want-blocks not removed from DontHaveTimeoutMgr")
}
}
......@@ -560,7 +597,7 @@ func TestResendAfterError(t *testing.T) {
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true}
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
dhtm := &fakeDontHaveTimeoutMgr{}
peerID := testutil.GeneratePeers(1)[0]
......@@ -576,7 +613,7 @@ func TestResendAfterError(t *testing.T) {
// After the first error is received, clear sendError so that
// subsequent sends will not error
errs = append(errs, <-sendErrors)
fakeSender.sendError = nil
fakeSender.clearSendError()
}()
// Make the first send error out
......@@ -599,7 +636,7 @@ func TestResendAfterMaxRetries(t *testing.T) {
sendErrors := make(chan error)
resetChan := make(chan struct{}, maxRetries*2)
fullClosedChan := make(chan struct{}, 1)
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent, sendErrors, true}
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
dhtm := &fakeDontHaveTimeoutMgr{}
peerID := testutil.GeneratePeers(1)[0]
......@@ -612,8 +649,11 @@ func TestResendAfterMaxRetries(t *testing.T) {
messageQueue.Startup()
var lk sync.Mutex
var errs []error
go func() {
lk.Lock()
defer lk.Unlock()
for len(errs) < maxRetries {
err := <-sendErrors
errs = append(errs, err)
......@@ -625,7 +665,10 @@ func TestResendAfterMaxRetries(t *testing.T) {
messageQueue.AddWants(wantBlocks, wantHaves)
messages := collectMessages(ctx, t, messagesSent, 50*time.Millisecond)
if len(errs) != maxRetries {
lk.Lock()
errCount := len(errs)
lk.Unlock()
if errCount != maxRetries {
t.Fatal("Expected maxRetries errors, got", len(errs))
}
......@@ -635,7 +678,7 @@ func TestResendAfterMaxRetries(t *testing.T) {
}
// Clear sendError so that subsequent sends will not error
fakeSender.sendError = nil
fakeSender.clearSendError()
// Add a new batch of wants
messageQueue.AddWants(wantBlocks2, wantHaves2)
......
......@@ -14,22 +14,55 @@ import (
)
type sentWants struct {
sync.Mutex
p peer.ID
wantHaves *cid.Set
wantBlocks *cid.Set
}
func (sw *sentWants) add(wantBlocks []cid.Cid, wantHaves []cid.Cid) {
sw.Lock()
defer sw.Unlock()
for _, c := range wantBlocks {
sw.wantBlocks.Add(c)
}
for _, c := range wantHaves {
if !sw.wantBlocks.Has(c) {
sw.wantHaves.Add(c)
}
}
}
func (sw *sentWants) wantHavesKeys() []cid.Cid {
sw.Lock()
defer sw.Unlock()
return sw.wantHaves.Keys()
}
func (sw *sentWants) wantBlocksKeys() []cid.Cid {
sw.Lock()
defer sw.Unlock()
return sw.wantBlocks.Keys()
}
type mockPeerManager struct {
peerSessions sync.Map
peerSends sync.Map
lk sync.Mutex
peerSessions map[peer.ID]bspm.Session
peerSends map[peer.ID]*sentWants
}
func newMockPeerManager() *mockPeerManager {
return &mockPeerManager{}
return &mockPeerManager{
peerSessions: make(map[peer.ID]bspm.Session),
peerSends: make(map[peer.ID]*sentWants),
}
}
func (pm *mockPeerManager) RegisterSession(p peer.ID, sess bspm.Session) bool {
pm.peerSessions.Store(p, sess)
pm.lk.Lock()
defer pm.lk.Unlock()
pm.peerSessions[p] = sess
return true
}
......@@ -37,33 +70,62 @@ func (pm *mockPeerManager) UnregisterSession(sesid uint64) {
}
func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
swi, _ := pm.peerSends.LoadOrStore(p, sentWants{p, cid.NewSet(), cid.NewSet()})
sw := swi.(sentWants)
for _, c := range wantBlocks {
sw.wantBlocks.Add(c)
}
for _, c := range wantHaves {
if !sw.wantBlocks.Has(c) {
sw.wantHaves.Add(c)
}
pm.lk.Lock()
defer pm.lk.Unlock()
sw, ok := pm.peerSends[p]
if !ok {
sw = &sentWants{p: p, wantHaves: cid.NewSet(), wantBlocks: cid.NewSet()}
pm.peerSends[p] = sw
}
sw.add(wantBlocks, wantHaves)
}
func (pm *mockPeerManager) waitNextWants() map[peer.ID]sentWants {
func (pm *mockPeerManager) waitNextWants() map[peer.ID]*sentWants {
time.Sleep(5 * time.Millisecond)
nw := make(map[peer.ID]sentWants)
pm.peerSends.Range(func(k, v interface{}) bool {
nw[k.(peer.ID)] = v.(sentWants)
return true
})
pm.lk.Lock()
defer pm.lk.Unlock()
nw := make(map[peer.ID]*sentWants)
for p, sentWants := range pm.peerSends {
nw[p] = sentWants
}
return nw
}
func (pm *mockPeerManager) clearWants() {
pm.peerSends.Range(func(k, v interface{}) bool {
pm.peerSends.Delete(k)
return true
})
pm.lk.Lock()
defer pm.lk.Unlock()
for p := range pm.peerSends {
delete(pm.peerSends, p)
}
}
type exhaustedPeers struct {
lk sync.Mutex
ks []cid.Cid
}
func (ep *exhaustedPeers) onPeersExhausted(ks []cid.Cid) {
ep.lk.Lock()
defer ep.lk.Unlock()
ep.ks = append(ep.ks, ks...)
}
func (ep *exhaustedPeers) clear() {
ep.lk.Lock()
defer ep.lk.Unlock()
ep.ks = nil
}
func (ep *exhaustedPeers) exhausted() []cid.Cid {
ep.lk.Lock()
defer ep.lk.Unlock()
return append([]cid.Cid{}, ep.ks...)
}
func TestSendWants(t *testing.T) {
......@@ -95,10 +157,10 @@ func TestSendWants(t *testing.T) {
if !ok {
t.Fatal("Nothing sent to peer")
}
if !testutil.MatchKeysIgnoreOrder(sw.wantBlocks.Keys(), blkCids0) {
if !testutil.MatchKeysIgnoreOrder(sw.wantBlocksKeys(), blkCids0) {
t.Fatal("Wrong keys")
}
if sw.wantHaves.Len() > 0 {
if len(sw.wantHavesKeys()) > 0 {
t.Fatal("Expecting no want-haves")
}
}
......@@ -133,7 +195,7 @@ func TestSendsWantBlockToOnePeerOnly(t *testing.T) {
if !ok {
t.Fatal("Nothing sent to peer")
}
if !testutil.MatchKeysIgnoreOrder(sw.wantBlocks.Keys(), blkCids0) {
if !testutil.MatchKeysIgnoreOrder(sw.wantBlocksKeys(), blkCids0) {
t.Fatal("Wrong keys")
}
......@@ -156,7 +218,7 @@ func TestSendsWantBlockToOnePeerOnly(t *testing.T) {
if sw.wantBlocks.Len() > 0 {
t.Fatal("Expecting no want-blocks")
}
if !testutil.MatchKeysIgnoreOrder(sw.wantHaves.Keys(), blkCids0) {
if !testutil.MatchKeysIgnoreOrder(sw.wantHavesKeys(), blkCids0) {
t.Fatal("Wrong keys")
}
}
......@@ -190,7 +252,7 @@ func TestReceiveBlock(t *testing.T) {
if !ok {
t.Fatal("Nothing sent to peer")
}
if !testutil.MatchKeysIgnoreOrder(sw.wantBlocks.Keys(), cids) {
if !testutil.MatchKeysIgnoreOrder(sw.wantBlocksKeys(), cids) {
t.Fatal("Wrong keys")
}
......@@ -215,7 +277,7 @@ func TestReceiveBlock(t *testing.T) {
if !ok {
t.Fatal("Nothing sent to peer")
}
wb := sw.wantBlocks.Keys()
wb := sw.wantBlocksKeys()
if len(wb) != 1 || !wb[0].Equals(cids[1]) {
t.Fatal("Wrong keys", wb)
}
......@@ -250,7 +312,7 @@ func TestPeerUnavailable(t *testing.T) {
if !ok {
t.Fatal("Nothing sent to peer")
}
if !testutil.MatchKeysIgnoreOrder(sw.wantBlocks.Keys(), cids) {
if !testutil.MatchKeysIgnoreOrder(sw.wantBlocksKeys(), cids) {
t.Fatal("Wrong keys")
}
......@@ -281,7 +343,7 @@ func TestPeerUnavailable(t *testing.T) {
if !ok {
t.Fatal("Nothing sent to peer")
}
if !testutil.MatchKeysIgnoreOrder(sw.wantBlocks.Keys(), cids) {
if !testutil.MatchKeysIgnoreOrder(sw.wantBlocksKeys(), cids) {
t.Fatal("Wrong keys")
}
}
......@@ -297,11 +359,8 @@ func TestPeersExhausted(t *testing.T) {
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
var exhausted []cid.Cid
onPeersExhausted := func(ks []cid.Cid) {
exhausted = append(exhausted, ks...)
}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
ep := exhaustedPeers{}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)
go spm.Run()
......@@ -321,12 +380,12 @@ func TestPeersExhausted(t *testing.T) {
// All available peers (peer A) have sent us a DONT_HAVE for cid1,
// so expect that onPeersExhausted() will be called with cid1
if !testutil.MatchKeysIgnoreOrder(exhausted, []cid.Cid{cids[1]}) {
if !testutil.MatchKeysIgnoreOrder(ep.exhausted(), []cid.Cid{cids[1]}) {
t.Fatal("Wrong keys")
}
// Clear exhausted cids
exhausted = []cid.Cid{}
ep.clear()
// peerB: HAVE cid0
bpm.ReceiveFrom(peerB, []cid.Cid{cids[0]}, []cid.Cid{})
......@@ -343,7 +402,7 @@ func TestPeersExhausted(t *testing.T) {
// All available peers (peer A and peer B) have sent us a DONT_HAVE
// for cid1, but we already called onPeersExhausted with cid1, so it
// should not be called again
if len(exhausted) > 0 {
if len(ep.exhausted()) > 0 {
t.Fatal("Wrong keys")
}
......@@ -356,7 +415,7 @@ func TestPeersExhausted(t *testing.T) {
// All available peers (peer A and peer B) have sent us a DONT_HAVE for
// cid2, so expect that onPeersExhausted() will be called with cid2
if !testutil.MatchKeysIgnoreOrder(exhausted, []cid.Cid{cids[2]}) {
if !testutil.MatchKeysIgnoreOrder(ep.exhausted(), []cid.Cid{cids[2]}) {
t.Fatal("Wrong keys")
}
}
......@@ -376,11 +435,8 @@ func TestPeersExhaustedLastWaitingPeerUnavailable(t *testing.T) {
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
var exhausted []cid.Cid
onPeersExhausted := func(ks []cid.Cid) {
exhausted = append(exhausted, ks...)
}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
ep := exhaustedPeers{}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)
go spm.Run()
......@@ -409,7 +465,7 @@ func TestPeersExhaustedLastWaitingPeerUnavailable(t *testing.T) {
// All remaining peers (peer A) have sent us a DONT_HAVE for cid1,
// so expect that onPeersExhausted() will be called with cid1
if !testutil.MatchKeysIgnoreOrder(exhausted, []cid.Cid{cids[1]}) {
if !testutil.MatchKeysIgnoreOrder(ep.exhausted(), []cid.Cid{cids[1]}) {
t.Fatal("Wrong keys")
}
}
......@@ -427,11 +483,8 @@ func TestPeersExhaustedAllPeersUnavailable(t *testing.T) {
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
var exhausted []cid.Cid
onPeersExhausted := func(ks []cid.Cid) {
exhausted = append(exhausted, ks...)
}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
ep := exhaustedPeers{}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)
go spm.Run()
......@@ -455,7 +508,7 @@ func TestPeersExhaustedAllPeersUnavailable(t *testing.T) {
// Expect that onPeersExhausted() will be called with all cids for blocks
// that have not been received
if !testutil.MatchKeysIgnoreOrder(exhausted, []cid.Cid{cids[1], cids[2]}) {
if !testutil.MatchKeysIgnoreOrder(ep.exhausted(), []cid.Cid{cids[1], cids[2]}) {
t.Fatal("Wrong keys")
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment