Commit 3b7ae9b8 authored by hannahhoward's avatar hannahhoward

test(messagequeue): Add test for messagequeue

parent 9532d009
......@@ -14,12 +14,17 @@ import (
var log = logging.Logger("bitswap")
type MessageNetwork interface {
ConnectTo(context.Context, peer.ID) error
NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error)
}
type MessageQueue struct {
p peer.ID
outlk sync.Mutex
out bsmsg.BitSwapMessage
network bsnet.BitSwapNetwork
network MessageNetwork
wl *wantlist.ThreadSafe
sender bsnet.MessageSender
......@@ -30,7 +35,7 @@ type MessageQueue struct {
done chan struct{}
}
func New(p peer.ID, network bsnet.BitSwapNetwork) *MessageQueue {
func New(p peer.ID, network MessageNetwork) *MessageQueue {
return &MessageQueue{
done: make(chan struct{}),
work: make(chan struct{}, 1),
......@@ -90,22 +95,25 @@ func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) {
func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist.Entry) {
// new peer, we will want to give them our full wantlist
fullwantlist := bsmsg.New(true)
for _, e := range initialEntries {
for k := range e.SesTrk {
mq.wl.AddEntry(e, k)
if len(initialEntries) > 0 {
fullwantlist := bsmsg.New(true)
for _, e := range initialEntries {
for k := range e.SesTrk {
mq.wl.AddEntry(e, k)
}
fullwantlist.AddEntry(e.Cid, e.Priority)
}
fullwantlist.AddEntry(e.Cid, e.Priority)
mq.out = fullwantlist
mq.work <- struct{}{}
}
mq.out = fullwantlist
mq.work <- struct{}{}
go mq.runQueue(ctx)
}
func (mq *MessageQueue) Shutdown() {
close(mq.done)
}
func (mq *MessageQueue) runQueue(ctx context.Context) {
for {
select {
......
package messagequeue
import (
"context"
"testing"
"time"
"github.com/ipfs/go-bitswap/testutil"
bsmsg "github.com/ipfs/go-bitswap/message"
bsnet "github.com/ipfs/go-bitswap/network"
peer "github.com/libp2p/go-libp2p-peer"
)
type fakeMessageNetwork struct {
connectError error
messageSenderError error
messageSender bsnet.MessageSender
}
func (fmn *fakeMessageNetwork) ConnectTo(context.Context, peer.ID) error {
return fmn.connectError
}
func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error) {
if fmn.messageSenderError == nil {
return fmn.messageSender, nil
} else {
return nil, fmn.messageSenderError
}
}
type fakeMessageSender struct {
sendError error
fullClosed chan<- struct{}
reset chan<- struct{}
messagesSent chan<- bsmsg.BitSwapMessage
}
func (fms *fakeMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error {
fms.messagesSent <- msg
return fms.sendError
}
func (fms *fakeMessageSender) Close() error { fms.fullClosed <- struct{}{}; return nil }
func (fms *fakeMessageSender) Reset() error { fms.reset <- struct{}{}; return nil }
func collectMessages(ctx context.Context,
t *testing.T,
messagesSent <-chan bsmsg.BitSwapMessage,
timeout time.Duration) []bsmsg.BitSwapMessage {
var messagesReceived []bsmsg.BitSwapMessage
timeoutctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
for {
select {
case messageReceived := <-messagesSent:
messagesReceived = append(messagesReceived, messageReceived)
case <-timeoutctx.Done():
return messagesReceived
}
}
}
func totalEntriesLength(messages []bsmsg.BitSwapMessage) int {
totalLength := 0
for _, messages := range messages {
totalLength += len(messages.Wantlist())
}
return totalLength
}
func TestStartupAndShutdown(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan bsmsg.BitSwapMessage)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(peerID, fakenet)
ses := testutil.GenerateSessionID()
wl := testutil.GenerateWantlist(10, ses)
messageQueue.Startup(ctx, wl.Entries())
messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
if len(messages) != 1 {
t.Fatal("wrong number of messages were sent for initial wants")
}
firstMessage := messages[0]
if len(firstMessage.Wantlist()) != wl.Len() {
t.Fatal("did not add all wants to want list")
}
for _, entry := range firstMessage.Wantlist() {
if entry.Cancel {
t.Fatal("initial add sent cancel entry when it should not have")
}
}
messageQueue.Shutdown()
timeoutctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
select {
case <-fullClosedChan:
case <-resetChan:
t.Fatal("message sender should have been closed but was reset")
case <-timeoutctx.Done():
t.Fatal("message sender should have been closed but wasn't")
}
}
func TestSendingMessagesDeduped(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan bsmsg.BitSwapMessage)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(peerID, fakenet)
ses1 := testutil.GenerateSessionID()
ses2 := testutil.GenerateSessionID()
entries := testutil.GenerateMessageEntries(10, false)
messageQueue.Startup(ctx, nil)
messageQueue.AddMessage(entries, ses1)
messageQueue.AddMessage(entries, ses2)
messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
if totalEntriesLength(messages) != len(entries) {
t.Fatal("Messages were not deduped")
}
}
func TestSendingMessagesPartialDupe(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan bsmsg.BitSwapMessage)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := New(peerID, fakenet)
ses1 := testutil.GenerateSessionID()
ses2 := testutil.GenerateSessionID()
entries := testutil.GenerateMessageEntries(10, false)
moreEntries := testutil.GenerateMessageEntries(5, false)
secondEntries := append(entries[5:], moreEntries...)
messageQueue.Startup(ctx, nil)
messageQueue.AddMessage(entries, ses1)
messageQueue.AddMessage(secondEntries, ses2)
messages := collectMessages(ctx, t, messagesSent, 20*time.Millisecond)
if totalEntriesLength(messages) != len(entries)+len(moreEntries) {
t.Fatal("messages were not correctly deduped")
}
}
......@@ -131,13 +131,13 @@ func TestSendingMessagesToPeers(t *testing.T) {
peerManager.Connected(peer2, nil)
peerManager.Connected(peer3, nil)
entries := testutil.GenerateEntries(5, false)
entries := testutil.GenerateMessageEntries(5, false)
ses := testutil.GenerateSessionID()
peerManager.SendMessage(entries, nil, ses)
peersReceived := collectAndCheckMessages(
ctx, t, messagesSent, entries, ses, 200*time.Millisecond)
ctx, t, messagesSent, entries, ses, 10*time.Millisecond)
if len(peersReceived) != 3 {
t.Fatal("Incorrect number of peers received messages")
}
......@@ -157,7 +157,7 @@ func TestSendingMessagesToPeers(t *testing.T) {
peersToSendTo = append(peersToSendTo, peer1, peer3, peer4)
peerManager.SendMessage(entries, peersToSendTo, ses)
peersReceived = collectAndCheckMessages(
ctx, t, messagesSent, entries, ses, 200*time.Millisecond)
ctx, t, messagesSent, entries, ses, 10*time.Millisecond)
if len(peersReceived) != 2 {
t.Fatal("Incorrect number of peers received messages")
......
......@@ -21,8 +21,19 @@ func GenerateCids(n int) []cid.Cid {
return cids
}
// GenerateEntries makes fake bitswap message entries
func GenerateEntries(n int, isCancel bool) []*bsmsg.Entry {
// GenerateWantlist makes a populated wantlist
func GenerateWantlist(n int, ses uint64) *wantlist.ThreadSafe {
wl := wantlist.NewThreadSafe()
for i := 0; i < n; i++ {
prioritySeq++
entry := wantlist.NewRefEntry(blockGenerator.Next().Cid(), prioritySeq)
wl.AddEntry(entry, ses)
}
return wl
}
// GenerateMessageEntries makes fake bitswap message entries
func GenerateMessageEntries(n int, isCancel bool) []*bsmsg.Entry {
bsmsgs := make([]*bsmsg.Entry, 0, n)
for i := 0; i < n; i++ {
prioritySeq++
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment