Commit c1922c0d authored by Dirk McCormick's avatar Dirk McCormick

fix: tests

parent b097d702
......@@ -2,7 +2,6 @@ package messagequeue
import (
"context"
"errors"
"fmt"
"math"
"math/rand"
......@@ -31,7 +30,7 @@ func (fmn *fakeMessageNetwork) ConnectTo(context.Context, peer.ID) error {
return fmn.connectError
}
func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error) {
func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID, *bsnet.MessageSenderOpts) (bsnet.MessageSender, error) {
if fmn.messageSenderError == nil {
return fmn.messageSender, nil
}
......@@ -83,23 +82,19 @@ func (fp *fakeDontHaveTimeoutMgr) pendingCount() int {
type fakeMessageSender struct {
lk sync.Mutex
sendError error
fullClosed chan<- struct{}
reset chan<- struct{}
messagesSent chan<- []bsmsg.Entry
sendErrors chan<- error
supportsHave bool
}
func newFakeMessageSender(sendError error, fullClosed chan<- struct{}, reset chan<- struct{},
messagesSent chan<- []bsmsg.Entry, sendErrors chan<- error, supportsHave bool) *fakeMessageSender {
func newFakeMessageSender(fullClosed chan<- struct{}, reset chan<- struct{},
messagesSent chan<- []bsmsg.Entry, supportsHave bool) *fakeMessageSender {
return &fakeMessageSender{
sendError: sendError,
fullClosed: fullClosed,
reset: reset,
messagesSent: messagesSent,
sendErrors: sendErrors,
supportsHave: supportsHave,
}
}
......@@ -108,19 +103,9 @@ func (fms *fakeMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMess
fms.lk.Lock()
defer fms.lk.Unlock()
if fms.sendError != nil {
fms.sendErrors <- fms.sendError
return fms.sendError
}
fms.messagesSent <- msg.Wantlist()
return nil
}
func (fms *fakeMessageSender) clearSendError() {
fms.lk.Lock()
defer fms.lk.Unlock()
fms.sendError = nil
}
func (fms *fakeMessageSender) Close() error { fms.fullClosed <- struct{}{}; return nil }
func (fms *fakeMessageSender) Reset() error { fms.reset <- struct{}{}; return nil }
func (fms *fakeMessageSender) SupportsHave() bool { return fms.supportsHave }
......@@ -155,10 +140,9 @@ func totalEntriesLength(messages [][]bsmsg.Entry) int {
func TestStartupAndShutdown(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
......@@ -197,10 +181,9 @@ func TestStartupAndShutdown(t *testing.T) {
func TestSendingMessagesDeduped(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
......@@ -220,10 +203,9 @@ func TestSendingMessagesDeduped(t *testing.T) {
func TestSendingMessagesPartialDupe(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
......@@ -243,10 +225,9 @@ func TestSendingMessagesPartialDupe(t *testing.T) {
func TestSendingMessagesPriority(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
......@@ -312,10 +293,9 @@ func TestSendingMessagesPriority(t *testing.T) {
func TestCancelOverridesPendingWants(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
......@@ -364,10 +344,9 @@ func TestCancelOverridesPendingWants(t *testing.T) {
func TestWantOverridesPendingCancels(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
......@@ -412,10 +391,9 @@ func TestWantOverridesPendingCancels(t *testing.T) {
func TestWantlistRebroadcast(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb)
......@@ -509,10 +487,9 @@ func TestWantlistRebroadcast(t *testing.T) {
func TestSendingLargeMessages(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
dhtm := &fakeDontHaveTimeoutMgr{}
peerID := testutil.GeneratePeers(1)[0]
......@@ -540,10 +517,9 @@ func TestSendingLargeMessages(t *testing.T) {
func TestSendToPeerThatDoesntSupportHave(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, false)
fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, false)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
......@@ -596,10 +572,9 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) {
func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, false)
fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, false)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
......@@ -626,105 +601,6 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) {
}
}
func TestResendAfterError(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
dhtm := &fakeDontHaveTimeoutMgr{}
peerID := testutil.GeneratePeers(1)[0]
sendErrBackoff := 5 * time.Millisecond
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrBackoff, dhtm)
wantBlocks := testutil.GenerateCids(10)
wantHaves := testutil.GenerateCids(10)
messageQueue.Startup()
var errs []error
go func() {
// After the first error is received, clear sendError so that
// subsequent sends will not error
errs = append(errs, <-sendErrors)
fakeSender.clearSendError()
}()
// Make the first send error out
fakeSender.sendError = errors.New("send err")
messageQueue.AddWants(wantBlocks, wantHaves)
messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
if len(errs) != 1 {
t.Fatal("Expected first send to error")
}
if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks) {
t.Fatal("Expected subsequent send to succeed")
}
}
func TestResendAfterMaxRetries(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, maxRetries*2)
fullClosedChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
dhtm := &fakeDontHaveTimeoutMgr{}
peerID := testutil.GeneratePeers(1)[0]
sendErrBackoff := 2 * time.Millisecond
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrBackoff, dhtm)
wantBlocks := testutil.GenerateCids(10)
wantHaves := testutil.GenerateCids(10)
wantBlocks2 := testutil.GenerateCids(10)
wantHaves2 := testutil.GenerateCids(10)
messageQueue.Startup()
var lk sync.Mutex
var errs []error
go func() {
lk.Lock()
defer lk.Unlock()
for len(errs) < maxRetries {
err := <-sendErrors
errs = append(errs, err)
}
}()
// Make the first group of send attempts error out
fakeSender.sendError = errors.New("send err")
messageQueue.AddWants(wantBlocks, wantHaves)
messages := collectMessages(ctx, t, messagesSent, 50*time.Millisecond)
lk.Lock()
errCount := len(errs)
lk.Unlock()
if errCount != maxRetries {
t.Fatal("Expected maxRetries errors, got", len(errs))
}
// No successful send after max retries, so expect no messages sent
if totalEntriesLength(messages) != 0 {
t.Fatal("Expected no messages")
}
// Clear sendError so that subsequent sends will not error
fakeSender.clearSendError()
// Add a new batch of wants
messageQueue.AddWants(wantBlocks2, wantHaves2)
messages = collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
// All wants from previous and new send should be sent
if totalEntriesLength(messages) != len(wantHaves)+len(wantBlocks)+len(wantHaves2)+len(wantBlocks2) {
t.Fatal("Expected subsequent send to send first and second batches of wants")
}
}
func filterWantTypes(wantlist []bsmsg.Entry) ([]cid.Cid, []cid.Cid, []cid.Cid) {
var wbs []cid.Cid
var whs []cid.Cid
......@@ -747,10 +623,9 @@ func BenchmarkMessageQueue(b *testing.B) {
createQueue := func() *MessageQueue {
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakeSender := newFakeMessageSender(fullClosedChan, resetChan, messagesSent, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
dhtm := &fakeDontHaveTimeoutMgr{}
peerID := testutil.GeneratePeers(1)[0]
......
......@@ -99,7 +99,7 @@ func TestAddingAndRemovingPeers(t *testing.T) {
t.Fatal("Peers connected that shouldn't be connected")
}
// removing a peer with only one reference
// disconnect a peer
peerManager.Disconnected(peer1)
connectedPeers = peerManager.ConnectedPeers()
......@@ -107,13 +107,12 @@ func TestAddingAndRemovingPeers(t *testing.T) {
t.Fatal("Peer should have been disconnected but was not")
}
// connecting a peer twice, then disconnecting once, should stay in queue
peerManager.Connected(peer2, nil)
peerManager.Disconnected(peer2)
// reconnect peer
peerManager.Connected(peer1, nil)
connectedPeers = peerManager.ConnectedPeers()
if !testutil.ContainsPeer(connectedPeers, peer2) {
t.Fatal("Peer was disconnected but should not have been")
if !testutil.ContainsPeer(connectedPeers, peer1) {
t.Fatal("Peer should have been connected but was not")
}
}
......
......@@ -113,7 +113,7 @@ func (s *streamMessageSender) Connect(ctx context.Context) (stream network.Strea
}
stream, err = s.bsnet.newStreamToPeer(ctx, s.to)
if err != nil {
if err == nil {
s.stream = stream
return s.stream, nil
}
......
......@@ -5,10 +5,10 @@ import (
"testing"
"time"
tn "github.com/ipfs/go-bitswap/testnet"
bsmsg "github.com/ipfs/go-bitswap/message"
pb "github.com/ipfs/go-bitswap/message/pb"
bsnet "github.com/ipfs/go-bitswap/network"
tn "github.com/ipfs/go-bitswap/testnet"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
......@@ -170,7 +170,7 @@ func TestSupportsHave(t *testing.T) {
mr := mockrouting.NewServer()
streamNet, err := tn.StreamNet(ctx, mn, mr)
if err != nil {
t.Fatal("Unable to setup network")
t.Fatalf("Unable to setup network: %s", err)
}
type testCase struct {
......@@ -199,7 +199,9 @@ func TestSupportsHave(t *testing.T) {
t.Fatal(err)
}
senderCurrent, err := bsnet1.NewMessageSender(ctx, p2.ID())
senderCurrent, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
SendTimeout: time.Second,
})
if err != nil {
t.Fatal(err)
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment