messagequeue_test.go 8.53 KB
Newer Older
1 2 3 4
package messagequeue

import (
	"context"
5
	"fmt"
6 7 8 9 10
	"math/rand"
	"sync"
	"testing"
	"time"

11
	"github.com/ipfs/go-graphsync"
12
	"github.com/ipfs/go-graphsync/testutil"
13
	"github.com/ipld/go-ipld-prime/traversal/selector/builder"
Hannah Howard's avatar
Hannah Howard committed
14
	"github.com/stretchr/testify/require"
15 16 17

	gsmsg "github.com/ipfs/go-graphsync/message"
	gsnet "github.com/ipfs/go-graphsync/network"
18
	ipldfree "github.com/ipld/go-ipld-prime/impl/free"
19
	"github.com/libp2p/go-libp2p-core/peer"
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
)

type fakeMessageNetwork struct {
	connectError       error
	messageSenderError error
	messageSender      gsnet.MessageSender
	wait               *sync.WaitGroup
}

func (fmn *fakeMessageNetwork) ConnectTo(context.Context, peer.ID) error {
	return fmn.connectError
}

func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID) (gsnet.MessageSender, error) {
	fmn.wait.Done()
	if fmn.messageSenderError == nil {
		return fmn.messageSender, nil
	}
	return nil, fmn.messageSenderError
}

type fakeMessageSender struct {
	sendError    error
	fullClosed   chan<- struct{}
	reset        chan<- struct{}
	messagesSent chan<- gsmsg.GraphSyncMessage
}

func (fms *fakeMessageSender) SendMsg(ctx context.Context, msg gsmsg.GraphSyncMessage) error {
	fms.messagesSent <- msg
	return fms.sendError
}
func (fms *fakeMessageSender) Close() error { fms.fullClosed <- struct{}{}; return nil }
func (fms *fakeMessageSender) Reset() error { fms.reset <- struct{}{}; return nil }

func TestStartupAndShutdown(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
	defer cancel()

	peer := testutil.GeneratePeers(1)[0]
	messagesSent := make(chan gsmsg.GraphSyncMessage)
	resetChan := make(chan struct{}, 1)
	fullClosedChan := make(chan struct{}, 1)
	messageSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
	var waitGroup sync.WaitGroup
	messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}

	messageQueue := New(ctx, peer, messageNetwork)
	messageQueue.Startup()
70 71
	id := graphsync.RequestID(rand.Int31())
	priority := graphsync.Priority(rand.Int31())
72 73
	ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
	selector := ssb.Matcher().Node()
74
	root := testutil.GenerateCids(1)[0]
75 76

	waitGroup.Add(1)
77
	messageQueue.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
78

Hannah Howard's avatar
Hannah Howard committed
79
	testutil.AssertDoesReceive(ctx, t, messagesSent, "message was not sent")
80 81 82

	messageQueue.Shutdown()

Hannah Howard's avatar
Hannah Howard committed
83
	testutil.AssertDoesReceiveFirst(t, fullClosedChan, "message sender should be closed", resetChan, ctx.Done())
84 85
}

86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
func TestShutdownDuringMessageSend(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
	defer cancel()

	peer := testutil.GeneratePeers(1)[0]
	messagesSent := make(chan gsmsg.GraphSyncMessage)
	resetChan := make(chan struct{}, 1)
	fullClosedChan := make(chan struct{}, 1)
	messageSender := &fakeMessageSender{
		fmt.Errorf("Something went wrong"),
		fullClosedChan,
		resetChan,
		messagesSent}
	var waitGroup sync.WaitGroup
	messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}

	messageQueue := New(ctx, peer, messageNetwork)
	messageQueue.Startup()
105 106
	id := graphsync.RequestID(rand.Int31())
	priority := graphsync.Priority(rand.Int31())
107 108
	ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
	selector := ssb.Matcher().Node()
109 110 111 112 113 114 115 116 117 118 119 120
	root := testutil.GenerateCids(1)[0]

	// setup a message and advance as far as beginning to send it
	waitGroup.Add(1)
	messageQueue.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
	waitGroup.Wait()

	// now shut down
	messageQueue.Shutdown()

	// let the message send attempt complete and fail (as it would if
	// the connection were closed)
Hannah Howard's avatar
Hannah Howard committed
121
	testutil.AssertDoesReceive(ctx, t, messagesSent, "message send not attempted")
122 123

	// verify the connection is reset after a failed send attempt
Hannah Howard's avatar
Hannah Howard committed
124
	testutil.AssertDoesReceiveFirst(t, resetChan, "message sender was not reset", fullClosedChan, ctx.Done())
125 126 127 128 129 130 131

	// now verify after it's reset, no further retries, connection
	// resets, or attempts to close the connection, cause the queue
	// should realize it's shut down and stop processing
	// FIXME: this relies on time passing -- 100 ms to be exact
	// and we should instead mock out time as a dependency
	waitGroup.Add(1)
Hannah Howard's avatar
Hannah Howard committed
132
	testutil.AssertDoesReceiveFirst(t, ctx.Done(), "further message operations should not occur", messagesSent, resetChan, fullClosedChan)
133 134
}

135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
func TestProcessingNotification(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
	defer cancel()

	peer := testutil.GeneratePeers(1)[0]
	messagesSent := make(chan gsmsg.GraphSyncMessage)
	resetChan := make(chan struct{}, 1)
	fullClosedChan := make(chan struct{}, 1)
	messageSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
	var waitGroup sync.WaitGroup
	messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}

	messageQueue := New(ctx, peer, messageNetwork)
	waitGroup.Add(1)
	blks := testutil.GenerateBlocksOfSize(3, 128)

152
	newMessage := gsmsg.New()
153 154 155
	responseID := graphsync.RequestID(rand.Int31())
	extensionName := graphsync.ExtensionName("graphsync/awesome")
	extension := graphsync.ExtensionData{
156 157 158
		Name: extensionName,
		Data: testutil.RandomBytes(100),
	}
159
	status := graphsync.RequestCompletedFull
160
	newMessage.AddResponse(gsmsg.NewResponse(responseID, status, extension))
161
	processing := messageQueue.AddResponses(newMessage.Responses(), blks)
Hannah Howard's avatar
Hannah Howard committed
162
	testutil.AssertChannelEmpty(t, processing, "processing notification sent while queue is shutdown")
163 164 165 166

	// wait for send attempt
	messageQueue.Startup()
	waitGroup.Wait()
Hannah Howard's avatar
Hannah Howard committed
167
	testutil.AssertDoesReceive(ctx, t, processing, "message was not processed")
168

Hannah Howard's avatar
Hannah Howard committed
169 170 171 172 173
	var message gsmsg.GraphSyncMessage
	testutil.AssertReceive(ctx, t, messagesSent, &message, "message did not send")
	receivedBlocks := message.Blocks()
	for _, block := range receivedBlocks {
		testutil.AssertContainsBlock(t, blks, block)
174
	}
Hannah Howard's avatar
Hannah Howard committed
175 176 177 178 179 180
	firstResponse := message.Responses()[0]
	extensionData, found := firstResponse.Extension(extensionName)
	require.Equal(t, responseID, firstResponse.RequestID())
	require.Equal(t, status, firstResponse.Status())
	require.True(t, found)
	require.Equal(t, extension.Data, extensionData)
181 182
}

183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
func TestDedupingMessages(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
	defer cancel()

	peer := testutil.GeneratePeers(1)[0]
	messagesSent := make(chan gsmsg.GraphSyncMessage)
	resetChan := make(chan struct{}, 1)
	fullClosedChan := make(chan struct{}, 1)
	messageSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
	var waitGroup sync.WaitGroup
	messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}

	messageQueue := New(ctx, peer, messageNetwork)
	messageQueue.Startup()
	waitGroup.Add(1)
199 200
	id := graphsync.RequestID(rand.Int31())
	priority := graphsync.Priority(rand.Int31())
201 202
	ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
	selector := ssb.Matcher().Node()
203
	root := testutil.GenerateCids(1)[0]
204

205
	messageQueue.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
206 207
	// wait for send attempt
	waitGroup.Wait()
208 209
	id2 := graphsync.RequestID(rand.Int31())
	priority2 := graphsync.Priority(rand.Int31())
210
	selector2 := ssb.ExploreAll(ssb.Matcher()).Node()
211
	root2 := testutil.GenerateCids(1)[0]
212 213
	id3 := graphsync.RequestID(rand.Int31())
	priority3 := graphsync.Priority(rand.Int31())
214
	selector3 := ssb.ExploreIndex(0, ssb.Matcher()).Node()
215 216 217 218
	root3 := testutil.GenerateCids(1)[0]

	messageQueue.AddRequest(gsmsg.NewRequest(id2, root2, selector2, priority2))
	messageQueue.AddRequest(gsmsg.NewRequest(id3, root3, selector3, priority3))
219

Hannah Howard's avatar
Hannah Howard committed
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
	var message gsmsg.GraphSyncMessage
	testutil.AssertReceive(ctx, t, messagesSent, &message, "message did not send")

	requests := message.Requests()
	require.Len(t, requests, 1, "number of requests in first message was not 1")
	request := requests[0]
	require.Equal(t, id, request.ID())
	require.False(t, request.IsCancel())
	require.Equal(t, priority, request.Priority())
	require.Equal(t, selector, request.Selector())

	testutil.AssertReceive(ctx, t, messagesSent, &message, "message did not senf")

	requests = message.Requests()
	require.Len(t, requests, 2, "number of requests in second message was not 2")
	for _, request := range requests {
		if request.ID() == id2 {
			require.False(t, request.IsCancel())
			require.Equal(t, priority2, request.Priority())
			require.Equal(t, selector2, request.Selector())
		} else if request.ID() == id3 {
			require.False(t, request.IsCancel())
			require.Equal(t, priority3, request.Priority())
			require.Equal(t, selector3, request.Selector())
		} else {
			t.Fatal("incorrect request added to message")
246 247 248
		}
	}
}