peerresponsesender_test.go 16.5 KB
Newer Older
1 2 3 4 5 6 7 8 9
package peerresponsemanager

import (
	"context"
	"fmt"
	"math/rand"
	"testing"
	"time"

10
	"github.com/ipfs/go-graphsync"
Hannah Howard's avatar
Hannah Howard committed
11
	"github.com/stretchr/testify/require"
12

13
	blocks "github.com/ipfs/go-block-format"
14 15 16
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/testutil"
	"github.com/ipld/go-ipld-prime"
17
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
18
	"github.com/libp2p/go-libp2p-core/peer"
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
)

type fakePeerHandler struct {
	lastBlocks    []blocks.Block
	lastResponses []gsmsg.GraphSyncResponse
	sent          chan struct{}
	done          chan struct{}
}

func (fph *fakePeerHandler) SendResponse(p peer.ID, responses []gsmsg.GraphSyncResponse, blks []blocks.Block) <-chan struct{} {
	fph.lastResponses = responses
	fph.lastBlocks = blks
	fph.sent <- struct{}{}
	return fph.done
}

35
func TestPeerResponseSenderSendsResponses(t *testing.T) {
36
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
37
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
38 39
	defer cancel()
	p := testutil.GeneratePeers(1)[0]
40 41 42
	requestID1 := graphsync.RequestID(rand.Int31())
	requestID2 := graphsync.RequestID(rand.Int31())
	requestID3 := graphsync.RequestID(rand.Int31())
43 44 45 46 47 48 49 50 51 52 53
	blks := testutil.GenerateBlocksOfSize(5, 100)
	links := make([]ipld.Link, 0, len(blks))
	for _, block := range blks {
		links = append(links, cidlink.Link{Cid: block.Cid()})
	}
	done := make(chan struct{}, 1)
	sent := make(chan struct{}, 1)
	fph := &fakePeerHandler{
		done: done,
		sent: sent,
	}
Hannah Howard's avatar
Hannah Howard committed
54 55
	peerResponseSender := NewResponseSender(ctx, p, fph)
	peerResponseSender.Startup()
56

Hannah Howard's avatar
Hannah Howard committed
57
	bd := peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData())
58 59 60
	require.Equal(t, links[0], bd.Link())
	require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSize())
	require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSizeOnWire())
Hannah Howard's avatar
Hannah Howard committed
61
	testutil.AssertDoesReceive(ctx, t, sent, "did not send first message")
62

Hannah Howard's avatar
Hannah Howard committed
63 64
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[0].Cid(), fph.lastBlocks[0].Cid(), "did not send correct blocks for first message")
65

Hannah Howard's avatar
Hannah Howard committed
66 67 68
	require.Len(t, fph.lastResponses, 1)
	require.Equal(t, requestID1, fph.lastResponses[0].RequestID())
	require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())
69

Hannah Howard's avatar
Hannah Howard committed
70
	bd = peerResponseSender.SendResponse(requestID2, links[0], blks[0].RawData())
71 72 73
	require.Equal(t, links[0], bd.Link())
	require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSize())
	require.Equal(t, uint64(0), bd.BlockSizeOnWire())
Hannah Howard's avatar
Hannah Howard committed
74
	bd = peerResponseSender.SendResponse(requestID1, links[1], blks[1].RawData())
75 76 77
	require.Equal(t, links[1], bd.Link())
	require.Equal(t, uint64(len(blks[1].RawData())), bd.BlockSize())
	require.Equal(t, uint64(len(blks[1].RawData())), bd.BlockSizeOnWire())
Hannah Howard's avatar
Hannah Howard committed
78
	bd = peerResponseSender.SendResponse(requestID1, links[2], nil)
79 80 81
	require.Equal(t, links[2], bd.Link())
	require.Equal(t, uint64(0), bd.BlockSize())
	require.Equal(t, uint64(0), bd.BlockSizeOnWire())
Hannah Howard's avatar
Hannah Howard committed
82
	peerResponseSender.FinishRequest(requestID1)
83 84 85 86

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
87
	testutil.AssertDoesReceive(ctx, t, sent, "did not send second message")
88

Hannah Howard's avatar
Hannah Howard committed
89 90
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[1].Cid(), fph.lastBlocks[0].Cid(), "did not dedup blocks correctly on second message")
91

Hannah Howard's avatar
Hannah Howard committed
92
	require.Len(t, fph.lastResponses, 2, "did not send correct number of responses")
93
	response1, err := findResponseForRequestID(fph.lastResponses, requestID1)
Hannah Howard's avatar
Hannah Howard committed
94 95
	require.NoError(t, err)
	require.Equal(t, graphsync.RequestCompletedPartial, response1.Status(), "did not send correct response code in second message")
96
	response2, err := findResponseForRequestID(fph.lastResponses, requestID2)
Hannah Howard's avatar
Hannah Howard committed
97 98
	require.NoError(t, err)
	require.Equal(t, graphsync.PartialResponse, response2.Status(), "did not send corrent response code in second message")
99

Hannah Howard's avatar
Hannah Howard committed
100 101 102
	peerResponseSender.SendResponse(requestID2, links[3], blks[3].RawData())
	peerResponseSender.SendResponse(requestID3, links[4], blks[4].RawData())
	peerResponseSender.FinishRequest(requestID2)
103 104 105 106

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
107
	testutil.AssertDoesReceive(ctx, t, sent, "did not send third message")
108

Hannah Howard's avatar
Hannah Howard committed
109 110 111
	require.Equal(t, 2, len(fph.lastBlocks))
	testutil.AssertContainsBlock(t, fph.lastBlocks, blks[3])
	testutil.AssertContainsBlock(t, fph.lastBlocks, blks[4])
112

Hannah Howard's avatar
Hannah Howard committed
113
	require.Len(t, fph.lastResponses, 2, "did not send correct number of responses")
114
	response2, err = findResponseForRequestID(fph.lastResponses, requestID2)
Hannah Howard's avatar
Hannah Howard committed
115 116
	require.NoError(t, err)
	require.Equal(t, graphsync.RequestCompletedFull, response2.Status(), "did not send correct response code in third message")
117
	response3, err := findResponseForRequestID(fph.lastResponses, requestID3)
Hannah Howard's avatar
Hannah Howard committed
118 119
	require.NoError(t, err)
	require.Equal(t, graphsync.PartialResponse, response3.Status(), "did not send correct response code in third message")
120

Hannah Howard's avatar
Hannah Howard committed
121 122
	peerResponseSender.SendResponse(requestID3, links[0], blks[0].RawData())
	peerResponseSender.SendResponse(requestID3, links[4], blks[4].RawData())
123 124 125 126

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
127
	testutil.AssertDoesReceive(ctx, t, sent, "did not send fourth message")
128

Hannah Howard's avatar
Hannah Howard committed
129 130
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[0].Cid(), fph.lastBlocks[0].Cid(), "Should resend block cause there were no in progress requests")
131

Hannah Howard's avatar
Hannah Howard committed
132 133 134
	require.Len(t, fph.lastResponses, 1)
	require.Equal(t, requestID3, fph.lastResponses[0].RequestID())
	require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())
135 136
}

137
func TestPeerResponseSenderSendsVeryLargeBlocksResponses(t *testing.T) {
138 139

	p := testutil.GeneratePeers(1)[0]
140
	requestID1 := graphsync.RequestID(rand.Int31())
141 142 143
	// generate large blocks before proceeding
	blks := testutil.GenerateBlocksOfSize(5, 1000000)
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
144
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
145 146 147 148 149 150 151 152 153 154 155
	defer cancel()
	links := make([]ipld.Link, 0, len(blks))
	for _, block := range blks {
		links = append(links, cidlink.Link{Cid: block.Cid()})
	}
	done := make(chan struct{}, 1)
	sent := make(chan struct{}, 1)
	fph := &fakePeerHandler{
		done: done,
		sent: sent,
	}
Hannah Howard's avatar
Hannah Howard committed
156 157
	peerResponseSender := NewResponseSender(ctx, p, fph)
	peerResponseSender.Startup()
158

Hannah Howard's avatar
Hannah Howard committed
159
	peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData())
160

Hannah Howard's avatar
Hannah Howard committed
161
	testutil.AssertDoesReceive(ctx, t, sent, "did not send first message")
162

Hannah Howard's avatar
Hannah Howard committed
163 164
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[0].Cid(), fph.lastBlocks[0].Cid(), "did not send correct blocks for first message")
165

Hannah Howard's avatar
Hannah Howard committed
166 167 168
	require.Len(t, fph.lastResponses, 1)
	require.Equal(t, requestID1, fph.lastResponses[0].RequestID())
	require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())
169 170

	// Send 3 very large blocks
Hannah Howard's avatar
Hannah Howard committed
171 172 173
	peerResponseSender.SendResponse(requestID1, links[1], blks[1].RawData())
	peerResponseSender.SendResponse(requestID1, links[2], blks[2].RawData())
	peerResponseSender.SendResponse(requestID1, links[3], blks[3].RawData())
174 175 176 177

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
178
	testutil.AssertDoesReceive(ctx, t, sent, "did not send second message ")
179

Hannah Howard's avatar
Hannah Howard committed
180 181
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[1].Cid(), fph.lastBlocks[0].Cid(), "Should break up message")
182

Hannah Howard's avatar
Hannah Howard committed
183
	require.Len(t, fph.lastResponses, 1, "Should break up message")
184 185

	// Send one more block while waiting
Hannah Howard's avatar
Hannah Howard committed
186 187
	peerResponseSender.SendResponse(requestID1, links[4], blks[4].RawData())
	peerResponseSender.FinishRequest(requestID1)
188 189 190 191

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
192
	testutil.AssertDoesReceive(ctx, t, sent, "did not send third message")
193

Hannah Howard's avatar
Hannah Howard committed
194 195
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[2].Cid(), fph.lastBlocks[0].Cid(), "should break up message")
196

Hannah Howard's avatar
Hannah Howard committed
197
	require.Len(t, fph.lastResponses, 1, "should break up message")
198 199 200 201

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
202
	testutil.AssertDoesReceive(ctx, t, sent, "did not send fourth message")
203

Hannah Howard's avatar
Hannah Howard committed
204 205
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[3].Cid(), fph.lastBlocks[0].Cid(), "should break up message")
206

Hannah Howard's avatar
Hannah Howard committed
207
	require.Len(t, fph.lastResponses, 1, "should break up message")
208 209 210 211

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
212
	testutil.AssertDoesReceive(ctx, t, sent, "did not send fifth message")
213

Hannah Howard's avatar
Hannah Howard committed
214 215
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[4].Cid(), fph.lastBlocks[0].Cid(), "should break up message")
216

Hannah Howard's avatar
Hannah Howard committed
217
	require.Len(t, fph.lastResponses, 1, "should break up message")
218 219

	response, err := findResponseForRequestID(fph.lastResponses, requestID1)
Hannah Howard's avatar
Hannah Howard committed
220 221
	require.NoError(t, err)
	require.Equal(t, graphsync.RequestCompletedFull, response.Status(), "did not send corrent response code in fifth message")
222 223 224

}

225
func TestPeerResponseSenderSendsExtensionData(t *testing.T) {
226
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
227
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
228 229 230 231 232 233 234 235 236 237 238 239 240 241
	defer cancel()
	p := testutil.GeneratePeers(1)[0]
	requestID1 := graphsync.RequestID(rand.Int31())
	blks := testutil.GenerateBlocksOfSize(5, 100)
	links := make([]ipld.Link, 0, len(blks))
	for _, block := range blks {
		links = append(links, cidlink.Link{Cid: block.Cid()})
	}
	done := make(chan struct{}, 1)
	sent := make(chan struct{}, 1)
	fph := &fakePeerHandler{
		done: done,
		sent: sent,
	}
Hannah Howard's avatar
Hannah Howard committed
242 243
	peerResponseSender := NewResponseSender(ctx, p, fph)
	peerResponseSender.Startup()
244

Hannah Howard's avatar
Hannah Howard committed
245
	peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData())
246

Hannah Howard's avatar
Hannah Howard committed
247
	testutil.AssertDoesReceive(ctx, t, sent, "did not send first message")
248

Hannah Howard's avatar
Hannah Howard committed
249 250
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[0].Cid(), fph.lastBlocks[0].Cid(), "did not send correct blocks for first message")
251

Hannah Howard's avatar
Hannah Howard committed
252 253 254
	require.Len(t, fph.lastResponses, 1)
	require.Equal(t, requestID1, fph.lastResponses[0].RequestID())
	require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())
255 256 257 258 259 260 261 262 263 264 265 266 267

	extensionData1 := testutil.RandomBytes(100)
	extensionName1 := graphsync.ExtensionName("AppleSauce/McGee")
	extension1 := graphsync.ExtensionData{
		Name: extensionName1,
		Data: extensionData1,
	}
	extensionData2 := testutil.RandomBytes(100)
	extensionName2 := graphsync.ExtensionName("HappyLand/Happenstance")
	extension2 := graphsync.ExtensionData{
		Name: extensionName2,
		Data: extensionData2,
	}
Hannah Howard's avatar
Hannah Howard committed
268 269 270
	peerResponseSender.SendResponse(requestID1, links[1], blks[1].RawData())
	peerResponseSender.SendExtensionData(requestID1, extension1)
	peerResponseSender.SendExtensionData(requestID1, extension2)
271 272 273
	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
274
	testutil.AssertDoesReceive(ctx, t, sent, "did not send second message")
275

Hannah Howard's avatar
Hannah Howard committed
276
	require.Len(t, fph.lastResponses, 1, "did not send correct number of responses for second message")
277 278 279

	lastResponse := fph.lastResponses[0]
	returnedData1, found := lastResponse.Extension(extensionName1)
Hannah Howard's avatar
Hannah Howard committed
280 281
	require.True(t, found)
	require.Equal(t, extensionData1, returnedData1, "did not encode first extension")
282 283

	returnedData2, found := lastResponse.Extension(extensionName2)
Hannah Howard's avatar
Hannah Howard committed
284 285
	require.True(t, found)
	require.Equal(t, extensionData2, returnedData2, "did not encode first extension")
286 287
}

288
func TestPeerResponseSenderSendsResponsesInTransaction(t *testing.T) {
Hannah Howard's avatar
Hannah Howard committed
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()
	p := testutil.GeneratePeers(1)[0]
	requestID1 := graphsync.RequestID(rand.Int31())
	blks := testutil.GenerateBlocksOfSize(5, 100)
	links := make([]ipld.Link, 0, len(blks))
	for _, block := range blks {
		links = append(links, cidlink.Link{Cid: block.Cid()})
	}
	done := make(chan struct{}, 1)
	sent := make(chan struct{}, 1)
	fph := &fakePeerHandler{
		done: done,
		sent: sent,
	}
	peerResponseSender := NewResponseSender(ctx, p, fph)
	peerResponseSender.Startup()

	err := peerResponseSender.Transaction(requestID1, func(peerResponseSender PeerResponseTransactionSender) error {
		bd := peerResponseSender.SendResponse(links[0], blks[0].RawData())
		require.Equal(t, links[0], bd.Link())
		require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSize())
		require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSizeOnWire())

		timer := time.NewTimer(100 * time.Millisecond)
		testutil.AssertDoesReceiveFirst(t, timer.C, "should not send a message", sent)
		require.Len(t, fph.lastBlocks, 0)
		require.Len(t, fph.lastResponses, 0)

		bd = peerResponseSender.SendResponse(links[1], blks[1].RawData())
		require.Equal(t, links[1], bd.Link())
		require.Equal(t, uint64(len(blks[1].RawData())), bd.BlockSize())
		require.Equal(t, uint64(len(blks[1].RawData())), bd.BlockSizeOnWire())
		bd = peerResponseSender.SendResponse(links[2], nil)
		require.Equal(t, links[2], bd.Link())
		require.Equal(t, uint64(0), bd.BlockSize())
		require.Equal(t, uint64(0), bd.BlockSizeOnWire())
		peerResponseSender.FinishRequest()

		timer.Reset(100 * time.Millisecond)
		testutil.AssertDoesReceiveFirst(t, timer.C, "should not send a message", sent)
		return nil
	})
	require.NoError(t, err)
	testutil.AssertDoesReceive(ctx, t, sent, "should sent first message")
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
}

func TestPeerResponseSenderIgnoreBlocks(t *testing.T) {
	ctx := context.Background()
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()
	p := testutil.GeneratePeers(1)[0]
	requestID1 := graphsync.RequestID(rand.Int31())
	requestID2 := graphsync.RequestID(rand.Int31())
	blks := testutil.GenerateBlocksOfSize(5, 100)
	links := make([]ipld.Link, 0, len(blks))
	for _, block := range blks {
		links = append(links, cidlink.Link{Cid: block.Cid()})
	}
	done := make(chan struct{}, 1)
	sent := make(chan struct{}, 1)
	fph := &fakePeerHandler{
		done: done,
		sent: sent,
	}
	peerResponseSender := NewResponseSender(ctx, p, fph)
	peerResponseSender.Startup()

	peerResponseSender.IgnoreBlocks(requestID1, links)

	bd := peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData())
	require.Equal(t, links[0], bd.Link())
	require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSize())
	require.Equal(t, uint64(0), bd.BlockSizeOnWire())
	testutil.AssertDoesReceive(ctx, t, sent, "did not send first message")

	require.Len(t, fph.lastBlocks, 0)
Hannah Howard's avatar
Hannah Howard committed
367

368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415
	require.Len(t, fph.lastResponses, 1)
	require.Equal(t, requestID1, fph.lastResponses[0].RequestID())
	require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())

	bd = peerResponseSender.SendResponse(requestID2, links[0], blks[0].RawData())
	require.Equal(t, links[0], bd.Link())
	require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSize())
	require.Equal(t, uint64(0), bd.BlockSizeOnWire())
	bd = peerResponseSender.SendResponse(requestID1, links[1], blks[1].RawData())
	require.Equal(t, links[1], bd.Link())
	require.Equal(t, uint64(len(blks[1].RawData())), bd.BlockSize())
	require.Equal(t, uint64(0), bd.BlockSizeOnWire())
	bd = peerResponseSender.SendResponse(requestID1, links[2], blks[2].RawData())
	require.Equal(t, links[2], bd.Link())
	require.Equal(t, uint64(len(blks[2].RawData())), bd.BlockSize())
	require.Equal(t, uint64(0), bd.BlockSizeOnWire())
	peerResponseSender.FinishRequest(requestID1)

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

	testutil.AssertDoesReceive(ctx, t, sent, "did not send second message")

	require.Len(t, fph.lastBlocks, 0)

	require.Len(t, fph.lastResponses, 2, "did not send correct number of responses")
	response1, err := findResponseForRequestID(fph.lastResponses, requestID1)
	require.NoError(t, err)
	require.Equal(t, graphsync.RequestCompletedFull, response1.Status(), "did not send correct response code in second message")
	response2, err := findResponseForRequestID(fph.lastResponses, requestID2)
	require.NoError(t, err)
	require.Equal(t, graphsync.PartialResponse, response2.Status(), "did not send corrent response code in second message")

	peerResponseSender.SendResponse(requestID2, links[3], blks[3].RawData())
	peerResponseSender.FinishRequest(requestID2)

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

	testutil.AssertDoesReceive(ctx, t, sent, "did not send third message")

	require.Equal(t, 1, len(fph.lastBlocks))
	testutil.AssertContainsBlock(t, fph.lastBlocks, blks[3])

	require.Len(t, fph.lastResponses, 1, "did not send correct number of responses")
	response2, err = findResponseForRequestID(fph.lastResponses, requestID2)
	require.NoError(t, err)
	require.Equal(t, graphsync.RequestCompletedFull, response2.Status(), "did not send correct response code in third message")
Hannah Howard's avatar
Hannah Howard committed
416 417
}

418
func findResponseForRequestID(responses []gsmsg.GraphSyncResponse, requestID graphsync.RequestID) (gsmsg.GraphSyncResponse, error) {
419 420 421 422 423 424 425
	for _, response := range responses {
		if response.RequestID() == requestID {
			return response, nil
		}
	}
	return gsmsg.GraphSyncResponse{}, fmt.Errorf("Response Not Found")
}