messagequeue_test.go 8.63 KB
Newer Older
1 2 3 4
package messagequeue

import (
	"context"
5
	"fmt"
6 7 8 9 10
	"math/rand"
	"sync"
	"testing"
	"time"

Hannah Howard's avatar
Hannah Howard committed
11
	basicnode "github.com/ipld/go-ipld-prime/node/basic"
12
	"github.com/ipld/go-ipld-prime/traversal/selector/builder"
Hannah Howard's avatar
Hannah Howard committed
13
	"github.com/libp2p/go-libp2p-core/peer"
Hannah Howard's avatar
Hannah Howard committed
14
	"github.com/stretchr/testify/require"
15

Hannah Howard's avatar
Hannah Howard committed
16
	"github.com/ipfs/go-graphsync"
17 18
	gsmsg "github.com/ipfs/go-graphsync/message"
	gsnet "github.com/ipfs/go-graphsync/network"
19
	"github.com/ipfs/go-graphsync/notifications"
Hannah Howard's avatar
Hannah Howard committed
20
	"github.com/ipfs/go-graphsync/testutil"
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
)

type fakeMessageNetwork struct {
	connectError       error
	messageSenderError error
	messageSender      gsnet.MessageSender
	wait               *sync.WaitGroup
}

func (fmn *fakeMessageNetwork) ConnectTo(context.Context, peer.ID) error {
	return fmn.connectError
}

func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID) (gsnet.MessageSender, error) {
	fmn.wait.Done()
	if fmn.messageSenderError == nil {
		return fmn.messageSender, nil
	}
	return nil, fmn.messageSenderError
}

type fakeMessageSender struct {
	sendError    error
	fullClosed   chan<- struct{}
	reset        chan<- struct{}
	messagesSent chan<- gsmsg.GraphSyncMessage
}

func (fms *fakeMessageSender) SendMsg(ctx context.Context, msg gsmsg.GraphSyncMessage) error {
	fms.messagesSent <- msg
	return fms.sendError
}
func (fms *fakeMessageSender) Close() error { fms.fullClosed <- struct{}{}; return nil }
func (fms *fakeMessageSender) Reset() error { fms.reset <- struct{}{}; return nil }

func TestStartupAndShutdown(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
	defer cancel()

	peer := testutil.GeneratePeers(1)[0]
	messagesSent := make(chan gsmsg.GraphSyncMessage)
	resetChan := make(chan struct{}, 1)
	fullClosedChan := make(chan struct{}, 1)
	messageSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
	var waitGroup sync.WaitGroup
	messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}

	messageQueue := New(ctx, peer, messageNetwork)
	messageQueue.Startup()
71 72
	id := graphsync.RequestID(rand.Int31())
	priority := graphsync.Priority(rand.Int31())
Eric Myhre's avatar
Eric Myhre committed
73
	ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
74
	selector := ssb.Matcher().Node()
75
	root := testutil.GenerateCids(1)[0]
76 77

	waitGroup.Add(1)
78
	messageQueue.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
79

Hannah Howard's avatar
Hannah Howard committed
80
	testutil.AssertDoesReceive(ctx, t, messagesSent, "message was not sent")
81 82 83

	messageQueue.Shutdown()

Hannah Howard's avatar
Hannah Howard committed
84
	testutil.AssertDoesReceiveFirst(t, fullClosedChan, "message sender should be closed", resetChan, ctx.Done())
85 86
}

87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
func TestShutdownDuringMessageSend(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
	defer cancel()

	peer := testutil.GeneratePeers(1)[0]
	messagesSent := make(chan gsmsg.GraphSyncMessage)
	resetChan := make(chan struct{}, 1)
	fullClosedChan := make(chan struct{}, 1)
	messageSender := &fakeMessageSender{
		fmt.Errorf("Something went wrong"),
		fullClosedChan,
		resetChan,
		messagesSent}
	var waitGroup sync.WaitGroup
	messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}

	messageQueue := New(ctx, peer, messageNetwork)
	messageQueue.Startup()
106 107
	id := graphsync.RequestID(rand.Int31())
	priority := graphsync.Priority(rand.Int31())
Eric Myhre's avatar
Eric Myhre committed
108
	ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
109
	selector := ssb.Matcher().Node()
110 111 112 113 114 115 116 117 118 119 120 121
	root := testutil.GenerateCids(1)[0]

	// setup a message and advance as far as beginning to send it
	waitGroup.Add(1)
	messageQueue.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
	waitGroup.Wait()

	// now shut down
	messageQueue.Shutdown()

	// let the message send attempt complete and fail (as it would if
	// the connection were closed)
Hannah Howard's avatar
Hannah Howard committed
122
	testutil.AssertDoesReceive(ctx, t, messagesSent, "message send not attempted")
123 124

	// verify the connection is reset after a failed send attempt
Hannah Howard's avatar
Hannah Howard committed
125
	testutil.AssertDoesReceiveFirst(t, resetChan, "message sender was not reset", fullClosedChan, ctx.Done())
126 127 128 129 130 131 132

	// now verify after it's reset, no further retries, connection
	// resets, or attempts to close the connection, cause the queue
	// should realize it's shut down and stop processing
	// FIXME: this relies on time passing -- 100 ms to be exact
	// and we should instead mock out time as a dependency
	waitGroup.Add(1)
Hannah Howard's avatar
Hannah Howard committed
133
	testutil.AssertDoesReceiveFirst(t, ctx.Done(), "further message operations should not occur", messagesSent, resetChan, fullClosedChan)
134 135
}

136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
func TestProcessingNotification(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
	defer cancel()

	peer := testutil.GeneratePeers(1)[0]
	messagesSent := make(chan gsmsg.GraphSyncMessage)
	resetChan := make(chan struct{}, 1)
	fullClosedChan := make(chan struct{}, 1)
	messageSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
	var waitGroup sync.WaitGroup
	messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}

	messageQueue := New(ctx, peer, messageNetwork)
	waitGroup.Add(1)
	blks := testutil.GenerateBlocksOfSize(3, 128)

153
	newMessage := gsmsg.New()
154 155 156
	responseID := graphsync.RequestID(rand.Int31())
	extensionName := graphsync.ExtensionName("graphsync/awesome")
	extension := graphsync.ExtensionData{
157 158 159
		Name: extensionName,
		Data: testutil.RandomBytes(100),
	}
160
	status := graphsync.RequestCompletedFull
161
	newMessage.AddResponse(gsmsg.NewResponse(responseID, status, extension))
162 163 164
	expectedTopic := "testTopic"
	notifee, verifier := testutil.NewTestNotifee(expectedTopic, 5)
	messageQueue.AddResponses(newMessage.Responses(), blks, notifee)
165 166 167 168 169

	// wait for send attempt
	messageQueue.Startup()
	waitGroup.Wait()

Hannah Howard's avatar
Hannah Howard committed
170 171 172 173 174
	var message gsmsg.GraphSyncMessage
	testutil.AssertReceive(ctx, t, messagesSent, &message, "message did not send")
	receivedBlocks := message.Blocks()
	for _, block := range receivedBlocks {
		testutil.AssertContainsBlock(t, blks, block)
175
	}
Hannah Howard's avatar
Hannah Howard committed
176 177 178 179 180 181
	firstResponse := message.Responses()[0]
	extensionData, found := firstResponse.Extension(extensionName)
	require.Equal(t, responseID, firstResponse.RequestID())
	require.Equal(t, status, firstResponse.Status())
	require.True(t, found)
	require.Equal(t, extension.Data, extensionData)
182 183 184 185 186 187

	verifier.ExpectEvents(ctx, t, []notifications.Event{
		Event{Name: Queued},
		Event{Name: Sent},
	})
	verifier.ExpectClose(ctx, t)
188 189
}

190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
func TestDedupingMessages(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
	defer cancel()

	peer := testutil.GeneratePeers(1)[0]
	messagesSent := make(chan gsmsg.GraphSyncMessage)
	resetChan := make(chan struct{}, 1)
	fullClosedChan := make(chan struct{}, 1)
	messageSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
	var waitGroup sync.WaitGroup
	messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}

	messageQueue := New(ctx, peer, messageNetwork)
	messageQueue.Startup()
	waitGroup.Add(1)
206 207
	id := graphsync.RequestID(rand.Int31())
	priority := graphsync.Priority(rand.Int31())
Eric Myhre's avatar
Eric Myhre committed
208
	ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
209
	selector := ssb.Matcher().Node()
210
	root := testutil.GenerateCids(1)[0]
211

212
	messageQueue.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
213 214
	// wait for send attempt
	waitGroup.Wait()
215 216
	id2 := graphsync.RequestID(rand.Int31())
	priority2 := graphsync.Priority(rand.Int31())
217
	selector2 := ssb.ExploreAll(ssb.Matcher()).Node()
218
	root2 := testutil.GenerateCids(1)[0]
219 220
	id3 := graphsync.RequestID(rand.Int31())
	priority3 := graphsync.Priority(rand.Int31())
221
	selector3 := ssb.ExploreIndex(0, ssb.Matcher()).Node()
222 223 224 225
	root3 := testutil.GenerateCids(1)[0]

	messageQueue.AddRequest(gsmsg.NewRequest(id2, root2, selector2, priority2))
	messageQueue.AddRequest(gsmsg.NewRequest(id3, root3, selector3, priority3))
226

Hannah Howard's avatar
Hannah Howard committed
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
	var message gsmsg.GraphSyncMessage
	testutil.AssertReceive(ctx, t, messagesSent, &message, "message did not send")

	requests := message.Requests()
	require.Len(t, requests, 1, "number of requests in first message was not 1")
	request := requests[0]
	require.Equal(t, id, request.ID())
	require.False(t, request.IsCancel())
	require.Equal(t, priority, request.Priority())
	require.Equal(t, selector, request.Selector())

	testutil.AssertReceive(ctx, t, messagesSent, &message, "message did not senf")

	requests = message.Requests()
	require.Len(t, requests, 2, "number of requests in second message was not 2")
	for _, request := range requests {
		if request.ID() == id2 {
			require.False(t, request.IsCancel())
			require.Equal(t, priority2, request.Priority())
			require.Equal(t, selector2, request.Selector())
		} else if request.ID() == id3 {
			require.False(t, request.IsCancel())
			require.Equal(t, priority3, request.Priority())
			require.Equal(t, selector3, request.Selector())
		} else {
			t.Fatal("incorrect request added to message")
253 254 255
		}
	}
}