peerresponsesender_test.go 10.6 KB
Newer Older
1 2 3 4 5 6 7 8 9
package peerresponsemanager

import (
	"context"
	"fmt"
	"math/rand"
	"testing"
	"time"

10
	"github.com/ipfs/go-graphsync"
Hannah Howard's avatar
Hannah Howard committed
11
	"github.com/stretchr/testify/require"
12

13
	blocks "github.com/ipfs/go-block-format"
14 15 16
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/testutil"
	"github.com/ipld/go-ipld-prime"
17
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
18
	"github.com/libp2p/go-libp2p-core/peer"
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
)

type fakePeerHandler struct {
	lastBlocks    []blocks.Block
	lastResponses []gsmsg.GraphSyncResponse
	sent          chan struct{}
	done          chan struct{}
}

func (fph *fakePeerHandler) SendResponse(p peer.ID, responses []gsmsg.GraphSyncResponse, blks []blocks.Block) <-chan struct{} {
	fph.lastResponses = responses
	fph.lastBlocks = blks
	fph.sent <- struct{}{}
	return fph.done
}

func TestPeerResponseManagerSendsResponses(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
37
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
38 39
	defer cancel()
	p := testutil.GeneratePeers(1)[0]
40 41 42
	requestID1 := graphsync.RequestID(rand.Int31())
	requestID2 := graphsync.RequestID(rand.Int31())
	requestID3 := graphsync.RequestID(rand.Int31())
43 44 45 46 47 48 49 50 51 52 53
	blks := testutil.GenerateBlocksOfSize(5, 100)
	links := make([]ipld.Link, 0, len(blks))
	for _, block := range blks {
		links = append(links, cidlink.Link{Cid: block.Cid()})
	}
	done := make(chan struct{}, 1)
	sent := make(chan struct{}, 1)
	fph := &fakePeerHandler{
		done: done,
		sent: sent,
	}
54
	peerResponseManager := NewResponseSender(ctx, p, fph)
55 56 57 58
	peerResponseManager.Startup()

	peerResponseManager.SendResponse(requestID1, links[0], blks[0].RawData())

Hannah Howard's avatar
Hannah Howard committed
59
	testutil.AssertDoesReceive(ctx, t, sent, "did not send first message")
60

Hannah Howard's avatar
Hannah Howard committed
61 62
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[0].Cid(), fph.lastBlocks[0].Cid(), "did not send correct blocks for first message")
63

Hannah Howard's avatar
Hannah Howard committed
64 65 66
	require.Len(t, fph.lastResponses, 1)
	require.Equal(t, requestID1, fph.lastResponses[0].RequestID())
	require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())
67 68 69 70 71 72 73 74 75

	peerResponseManager.SendResponse(requestID2, links[0], blks[0].RawData())
	peerResponseManager.SendResponse(requestID1, links[1], blks[1].RawData())
	peerResponseManager.SendResponse(requestID1, links[2], nil)
	peerResponseManager.FinishRequest(requestID1)

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
76
	testutil.AssertDoesReceive(ctx, t, sent, "did not send second message")
77

Hannah Howard's avatar
Hannah Howard committed
78 79
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[1].Cid(), fph.lastBlocks[0].Cid(), "did not dedup blocks correctly on second message")
80

Hannah Howard's avatar
Hannah Howard committed
81
	require.Len(t, fph.lastResponses, 2, "did not send correct number of responses")
82
	response1, err := findResponseForRequestID(fph.lastResponses, requestID1)
Hannah Howard's avatar
Hannah Howard committed
83 84
	require.NoError(t, err)
	require.Equal(t, graphsync.RequestCompletedPartial, response1.Status(), "did not send correct response code in second message")
85
	response2, err := findResponseForRequestID(fph.lastResponses, requestID2)
Hannah Howard's avatar
Hannah Howard committed
86 87
	require.NoError(t, err)
	require.Equal(t, graphsync.PartialResponse, response2.Status(), "did not send corrent response code in second message")
88 89 90 91 92 93 94 95

	peerResponseManager.SendResponse(requestID2, links[3], blks[3].RawData())
	peerResponseManager.SendResponse(requestID3, links[4], blks[4].RawData())
	peerResponseManager.FinishRequest(requestID2)

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
96
	testutil.AssertDoesReceive(ctx, t, sent, "did not send third message")
97

Hannah Howard's avatar
Hannah Howard committed
98 99 100
	require.Equal(t, 2, len(fph.lastBlocks))
	testutil.AssertContainsBlock(t, fph.lastBlocks, blks[3])
	testutil.AssertContainsBlock(t, fph.lastBlocks, blks[4])
101

Hannah Howard's avatar
Hannah Howard committed
102
	require.Len(t, fph.lastResponses, 2, "did not send correct number of responses")
103
	response2, err = findResponseForRequestID(fph.lastResponses, requestID2)
Hannah Howard's avatar
Hannah Howard committed
104 105
	require.NoError(t, err)
	require.Equal(t, graphsync.RequestCompletedFull, response2.Status(), "did not send correct response code in third message")
106
	response3, err := findResponseForRequestID(fph.lastResponses, requestID3)
Hannah Howard's avatar
Hannah Howard committed
107 108
	require.NoError(t, err)
	require.Equal(t, graphsync.PartialResponse, response3.Status(), "did not send correct response code in third message")
109 110 111 112 113 114 115

	peerResponseManager.SendResponse(requestID3, links[0], blks[0].RawData())
	peerResponseManager.SendResponse(requestID3, links[4], blks[4].RawData())

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
116
	testutil.AssertDoesReceive(ctx, t, sent, "did not send fourth message")
117

Hannah Howard's avatar
Hannah Howard committed
118 119
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[0].Cid(), fph.lastBlocks[0].Cid(), "Should resend block cause there were no in progress requests")
120

Hannah Howard's avatar
Hannah Howard committed
121 122 123
	require.Len(t, fph.lastResponses, 1)
	require.Equal(t, requestID3, fph.lastResponses[0].RequestID())
	require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())
124 125
}

126 127 128
func TestPeerResponseManagerSendsVeryLargeBlocksResponses(t *testing.T) {

	p := testutil.GeneratePeers(1)[0]
129
	requestID1 := graphsync.RequestID(rand.Int31())
130 131 132
	// generate large blocks before proceeding
	blks := testutil.GenerateBlocksOfSize(5, 1000000)
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
133
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
134 135 136 137 138 139 140 141 142 143 144
	defer cancel()
	links := make([]ipld.Link, 0, len(blks))
	for _, block := range blks {
		links = append(links, cidlink.Link{Cid: block.Cid()})
	}
	done := make(chan struct{}, 1)
	sent := make(chan struct{}, 1)
	fph := &fakePeerHandler{
		done: done,
		sent: sent,
	}
145
	peerResponseManager := NewResponseSender(ctx, p, fph)
146 147 148 149
	peerResponseManager.Startup()

	peerResponseManager.SendResponse(requestID1, links[0], blks[0].RawData())

Hannah Howard's avatar
Hannah Howard committed
150
	testutil.AssertDoesReceive(ctx, t, sent, "did not send first message")
151

Hannah Howard's avatar
Hannah Howard committed
152 153
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[0].Cid(), fph.lastBlocks[0].Cid(), "did not send correct blocks for first message")
154

Hannah Howard's avatar
Hannah Howard committed
155 156 157
	require.Len(t, fph.lastResponses, 1)
	require.Equal(t, requestID1, fph.lastResponses[0].RequestID())
	require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())
158 159 160 161 162 163 164 165 166

	// Send 3 very large blocks
	peerResponseManager.SendResponse(requestID1, links[1], blks[1].RawData())
	peerResponseManager.SendResponse(requestID1, links[2], blks[2].RawData())
	peerResponseManager.SendResponse(requestID1, links[3], blks[3].RawData())

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
167
	testutil.AssertDoesReceive(ctx, t, sent, "did not send second message ")
168

Hannah Howard's avatar
Hannah Howard committed
169 170
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[1].Cid(), fph.lastBlocks[0].Cid(), "Should break up message")
171

Hannah Howard's avatar
Hannah Howard committed
172
	require.Len(t, fph.lastResponses, 1, "Should break up message")
173 174 175 176 177 178 179 180

	// Send one more block while waiting
	peerResponseManager.SendResponse(requestID1, links[4], blks[4].RawData())
	peerResponseManager.FinishRequest(requestID1)

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
181
	testutil.AssertDoesReceive(ctx, t, sent, "did not send third message")
182

Hannah Howard's avatar
Hannah Howard committed
183 184
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[2].Cid(), fph.lastBlocks[0].Cid(), "should break up message")
185

Hannah Howard's avatar
Hannah Howard committed
186
	require.Len(t, fph.lastResponses, 1, "should break up message")
187 188 189 190

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
191
	testutil.AssertDoesReceive(ctx, t, sent, "did not send fourth message")
192

Hannah Howard's avatar
Hannah Howard committed
193 194
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[3].Cid(), fph.lastBlocks[0].Cid(), "should break up message")
195

Hannah Howard's avatar
Hannah Howard committed
196
	require.Len(t, fph.lastResponses, 1, "should break up message")
197 198 199 200

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
201
	testutil.AssertDoesReceive(ctx, t, sent, "did not send fifth message")
202

Hannah Howard's avatar
Hannah Howard committed
203 204
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[4].Cid(), fph.lastBlocks[0].Cid(), "should break up message")
205

Hannah Howard's avatar
Hannah Howard committed
206
	require.Len(t, fph.lastResponses, 1, "should break up message")
207 208

	response, err := findResponseForRequestID(fph.lastResponses, requestID1)
Hannah Howard's avatar
Hannah Howard committed
209 210
	require.NoError(t, err)
	require.Equal(t, graphsync.RequestCompletedFull, response.Status(), "did not send corrent response code in fifth message")
211 212 213

}

214 215
func TestPeerResponseManagerSendsExtensionData(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
216
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
217 218 219 220 221 222 223 224 225 226 227 228 229 230
	defer cancel()
	p := testutil.GeneratePeers(1)[0]
	requestID1 := graphsync.RequestID(rand.Int31())
	blks := testutil.GenerateBlocksOfSize(5, 100)
	links := make([]ipld.Link, 0, len(blks))
	for _, block := range blks {
		links = append(links, cidlink.Link{Cid: block.Cid()})
	}
	done := make(chan struct{}, 1)
	sent := make(chan struct{}, 1)
	fph := &fakePeerHandler{
		done: done,
		sent: sent,
	}
231
	peerResponseManager := NewResponseSender(ctx, p, fph)
232 233 234 235
	peerResponseManager.Startup()

	peerResponseManager.SendResponse(requestID1, links[0], blks[0].RawData())

Hannah Howard's avatar
Hannah Howard committed
236
	testutil.AssertDoesReceive(ctx, t, sent, "did not send first message")
237

Hannah Howard's avatar
Hannah Howard committed
238 239
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[0].Cid(), fph.lastBlocks[0].Cid(), "did not send correct blocks for first message")
240

Hannah Howard's avatar
Hannah Howard committed
241 242 243
	require.Len(t, fph.lastResponses, 1)
	require.Equal(t, requestID1, fph.lastResponses[0].RequestID())
	require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262

	extensionData1 := testutil.RandomBytes(100)
	extensionName1 := graphsync.ExtensionName("AppleSauce/McGee")
	extension1 := graphsync.ExtensionData{
		Name: extensionName1,
		Data: extensionData1,
	}
	extensionData2 := testutil.RandomBytes(100)
	extensionName2 := graphsync.ExtensionName("HappyLand/Happenstance")
	extension2 := graphsync.ExtensionData{
		Name: extensionName2,
		Data: extensionData2,
	}
	peerResponseManager.SendResponse(requestID1, links[1], blks[1].RawData())
	peerResponseManager.SendExtensionData(requestID1, extension1)
	peerResponseManager.SendExtensionData(requestID1, extension2)
	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
263
	testutil.AssertDoesReceive(ctx, t, sent, "did not send second message")
264

Hannah Howard's avatar
Hannah Howard committed
265
	require.Len(t, fph.lastResponses, 1, "did not send correct number of responses for second message")
266 267 268

	lastResponse := fph.lastResponses[0]
	returnedData1, found := lastResponse.Extension(extensionName1)
Hannah Howard's avatar
Hannah Howard committed
269 270
	require.True(t, found)
	require.Equal(t, extensionData1, returnedData1, "did not encode first extension")
271 272

	returnedData2, found := lastResponse.Extension(extensionName2)
Hannah Howard's avatar
Hannah Howard committed
273 274
	require.True(t, found)
	require.Equal(t, extensionData2, returnedData2, "did not encode first extension")
275 276
}

277
func findResponseForRequestID(responses []gsmsg.GraphSyncResponse, requestID graphsync.RequestID) (gsmsg.GraphSyncResponse, error) {
278 279 280 281 282 283 284
	for _, response := range responses {
		if response.RequestID() == requestID {
			return response, nil
		}
	}
	return gsmsg.GraphSyncResponse{}, fmt.Errorf("Response Not Found")
}