peerresponsesender_test.go 11.2 KB
Newer Older
1 2 3 4 5 6 7 8 9
package peerresponsemanager

import (
	"context"
	"fmt"
	"math/rand"
	"testing"
	"time"

10
	"github.com/ipfs/go-graphsync"
Hannah Howard's avatar
Hannah Howard committed
11
	"github.com/stretchr/testify/require"
12

13
	blocks "github.com/ipfs/go-block-format"
14 15 16
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/testutil"
	"github.com/ipld/go-ipld-prime"
17
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
18
	"github.com/libp2p/go-libp2p-core/peer"
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
)

type fakePeerHandler struct {
	lastBlocks    []blocks.Block
	lastResponses []gsmsg.GraphSyncResponse
	sent          chan struct{}
	done          chan struct{}
}

func (fph *fakePeerHandler) SendResponse(p peer.ID, responses []gsmsg.GraphSyncResponse, blks []blocks.Block) <-chan struct{} {
	fph.lastResponses = responses
	fph.lastBlocks = blks
	fph.sent <- struct{}{}
	return fph.done
}

func TestPeerResponseManagerSendsResponses(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
37
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
38 39
	defer cancel()
	p := testutil.GeneratePeers(1)[0]
40 41 42
	requestID1 := graphsync.RequestID(rand.Int31())
	requestID2 := graphsync.RequestID(rand.Int31())
	requestID3 := graphsync.RequestID(rand.Int31())
43 44 45 46 47 48 49 50 51 52 53
	blks := testutil.GenerateBlocksOfSize(5, 100)
	links := make([]ipld.Link, 0, len(blks))
	for _, block := range blks {
		links = append(links, cidlink.Link{Cid: block.Cid()})
	}
	done := make(chan struct{}, 1)
	sent := make(chan struct{}, 1)
	fph := &fakePeerHandler{
		done: done,
		sent: sent,
	}
54
	peerResponseManager := NewResponseSender(ctx, p, fph)
55 56
	peerResponseManager.Startup()

57 58 59 60
	bd := peerResponseManager.SendResponse(requestID1, links[0], blks[0].RawData())
	require.Equal(t, links[0], bd.Link())
	require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSize())
	require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSizeOnWire())
Hannah Howard's avatar
Hannah Howard committed
61
	testutil.AssertDoesReceive(ctx, t, sent, "did not send first message")
62

Hannah Howard's avatar
Hannah Howard committed
63 64
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[0].Cid(), fph.lastBlocks[0].Cid(), "did not send correct blocks for first message")
65

Hannah Howard's avatar
Hannah Howard committed
66 67 68
	require.Len(t, fph.lastResponses, 1)
	require.Equal(t, requestID1, fph.lastResponses[0].RequestID())
	require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())
69

70 71 72 73 74 75 76 77 78 79 80 81
	bd = peerResponseManager.SendResponse(requestID2, links[0], blks[0].RawData())
	require.Equal(t, links[0], bd.Link())
	require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSize())
	require.Equal(t, uint64(0), bd.BlockSizeOnWire())
	bd = peerResponseManager.SendResponse(requestID1, links[1], blks[1].RawData())
	require.Equal(t, links[1], bd.Link())
	require.Equal(t, uint64(len(blks[1].RawData())), bd.BlockSize())
	require.Equal(t, uint64(len(blks[1].RawData())), bd.BlockSizeOnWire())
	bd = peerResponseManager.SendResponse(requestID1, links[2], nil)
	require.Equal(t, links[2], bd.Link())
	require.Equal(t, uint64(0), bd.BlockSize())
	require.Equal(t, uint64(0), bd.BlockSizeOnWire())
82 83 84 85 86
	peerResponseManager.FinishRequest(requestID1)

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
87
	testutil.AssertDoesReceive(ctx, t, sent, "did not send second message")
88

Hannah Howard's avatar
Hannah Howard committed
89 90
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[1].Cid(), fph.lastBlocks[0].Cid(), "did not dedup blocks correctly on second message")
91

Hannah Howard's avatar
Hannah Howard committed
92
	require.Len(t, fph.lastResponses, 2, "did not send correct number of responses")
93
	response1, err := findResponseForRequestID(fph.lastResponses, requestID1)
Hannah Howard's avatar
Hannah Howard committed
94 95
	require.NoError(t, err)
	require.Equal(t, graphsync.RequestCompletedPartial, response1.Status(), "did not send correct response code in second message")
96
	response2, err := findResponseForRequestID(fph.lastResponses, requestID2)
Hannah Howard's avatar
Hannah Howard committed
97 98
	require.NoError(t, err)
	require.Equal(t, graphsync.PartialResponse, response2.Status(), "did not send corrent response code in second message")
99 100 101 102 103 104 105 106

	peerResponseManager.SendResponse(requestID2, links[3], blks[3].RawData())
	peerResponseManager.SendResponse(requestID3, links[4], blks[4].RawData())
	peerResponseManager.FinishRequest(requestID2)

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
107
	testutil.AssertDoesReceive(ctx, t, sent, "did not send third message")
108

Hannah Howard's avatar
Hannah Howard committed
109 110 111
	require.Equal(t, 2, len(fph.lastBlocks))
	testutil.AssertContainsBlock(t, fph.lastBlocks, blks[3])
	testutil.AssertContainsBlock(t, fph.lastBlocks, blks[4])
112

Hannah Howard's avatar
Hannah Howard committed
113
	require.Len(t, fph.lastResponses, 2, "did not send correct number of responses")
114
	response2, err = findResponseForRequestID(fph.lastResponses, requestID2)
Hannah Howard's avatar
Hannah Howard committed
115 116
	require.NoError(t, err)
	require.Equal(t, graphsync.RequestCompletedFull, response2.Status(), "did not send correct response code in third message")
117
	response3, err := findResponseForRequestID(fph.lastResponses, requestID3)
Hannah Howard's avatar
Hannah Howard committed
118 119
	require.NoError(t, err)
	require.Equal(t, graphsync.PartialResponse, response3.Status(), "did not send correct response code in third message")
120 121 122 123 124 125 126

	peerResponseManager.SendResponse(requestID3, links[0], blks[0].RawData())
	peerResponseManager.SendResponse(requestID3, links[4], blks[4].RawData())

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
127
	testutil.AssertDoesReceive(ctx, t, sent, "did not send fourth message")
128

Hannah Howard's avatar
Hannah Howard committed
129 130
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[0].Cid(), fph.lastBlocks[0].Cid(), "Should resend block cause there were no in progress requests")
131

Hannah Howard's avatar
Hannah Howard committed
132 133 134
	require.Len(t, fph.lastResponses, 1)
	require.Equal(t, requestID3, fph.lastResponses[0].RequestID())
	require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())
135 136
}

137 138 139
func TestPeerResponseManagerSendsVeryLargeBlocksResponses(t *testing.T) {

	p := testutil.GeneratePeers(1)[0]
140
	requestID1 := graphsync.RequestID(rand.Int31())
141 142 143
	// generate large blocks before proceeding
	blks := testutil.GenerateBlocksOfSize(5, 1000000)
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
144
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
145 146 147 148 149 150 151 152 153 154 155
	defer cancel()
	links := make([]ipld.Link, 0, len(blks))
	for _, block := range blks {
		links = append(links, cidlink.Link{Cid: block.Cid()})
	}
	done := make(chan struct{}, 1)
	sent := make(chan struct{}, 1)
	fph := &fakePeerHandler{
		done: done,
		sent: sent,
	}
156
	peerResponseManager := NewResponseSender(ctx, p, fph)
157 158 159 160
	peerResponseManager.Startup()

	peerResponseManager.SendResponse(requestID1, links[0], blks[0].RawData())

Hannah Howard's avatar
Hannah Howard committed
161
	testutil.AssertDoesReceive(ctx, t, sent, "did not send first message")
162

Hannah Howard's avatar
Hannah Howard committed
163 164
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[0].Cid(), fph.lastBlocks[0].Cid(), "did not send correct blocks for first message")
165

Hannah Howard's avatar
Hannah Howard committed
166 167 168
	require.Len(t, fph.lastResponses, 1)
	require.Equal(t, requestID1, fph.lastResponses[0].RequestID())
	require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())
169 170 171 172 173 174 175 176 177

	// Send 3 very large blocks
	peerResponseManager.SendResponse(requestID1, links[1], blks[1].RawData())
	peerResponseManager.SendResponse(requestID1, links[2], blks[2].RawData())
	peerResponseManager.SendResponse(requestID1, links[3], blks[3].RawData())

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
178
	testutil.AssertDoesReceive(ctx, t, sent, "did not send second message ")
179

Hannah Howard's avatar
Hannah Howard committed
180 181
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[1].Cid(), fph.lastBlocks[0].Cid(), "Should break up message")
182

Hannah Howard's avatar
Hannah Howard committed
183
	require.Len(t, fph.lastResponses, 1, "Should break up message")
184 185 186 187 188 189 190 191

	// Send one more block while waiting
	peerResponseManager.SendResponse(requestID1, links[4], blks[4].RawData())
	peerResponseManager.FinishRequest(requestID1)

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
192
	testutil.AssertDoesReceive(ctx, t, sent, "did not send third message")
193

Hannah Howard's avatar
Hannah Howard committed
194 195
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[2].Cid(), fph.lastBlocks[0].Cid(), "should break up message")
196

Hannah Howard's avatar
Hannah Howard committed
197
	require.Len(t, fph.lastResponses, 1, "should break up message")
198 199 200 201

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
202
	testutil.AssertDoesReceive(ctx, t, sent, "did not send fourth message")
203

Hannah Howard's avatar
Hannah Howard committed
204 205
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[3].Cid(), fph.lastBlocks[0].Cid(), "should break up message")
206

Hannah Howard's avatar
Hannah Howard committed
207
	require.Len(t, fph.lastResponses, 1, "should break up message")
208 209 210 211

	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
212
	testutil.AssertDoesReceive(ctx, t, sent, "did not send fifth message")
213

Hannah Howard's avatar
Hannah Howard committed
214 215
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[4].Cid(), fph.lastBlocks[0].Cid(), "should break up message")
216

Hannah Howard's avatar
Hannah Howard committed
217
	require.Len(t, fph.lastResponses, 1, "should break up message")
218 219

	response, err := findResponseForRequestID(fph.lastResponses, requestID1)
Hannah Howard's avatar
Hannah Howard committed
220 221
	require.NoError(t, err)
	require.Equal(t, graphsync.RequestCompletedFull, response.Status(), "did not send corrent response code in fifth message")
222 223 224

}

225 226
func TestPeerResponseManagerSendsExtensionData(t *testing.T) {
	ctx := context.Background()
Hannah Howard's avatar
Hannah Howard committed
227
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
228 229 230 231 232 233 234 235 236 237 238 239 240 241
	defer cancel()
	p := testutil.GeneratePeers(1)[0]
	requestID1 := graphsync.RequestID(rand.Int31())
	blks := testutil.GenerateBlocksOfSize(5, 100)
	links := make([]ipld.Link, 0, len(blks))
	for _, block := range blks {
		links = append(links, cidlink.Link{Cid: block.Cid()})
	}
	done := make(chan struct{}, 1)
	sent := make(chan struct{}, 1)
	fph := &fakePeerHandler{
		done: done,
		sent: sent,
	}
242
	peerResponseManager := NewResponseSender(ctx, p, fph)
243 244 245 246
	peerResponseManager.Startup()

	peerResponseManager.SendResponse(requestID1, links[0], blks[0].RawData())

Hannah Howard's avatar
Hannah Howard committed
247
	testutil.AssertDoesReceive(ctx, t, sent, "did not send first message")
248

Hannah Howard's avatar
Hannah Howard committed
249 250
	require.Len(t, fph.lastBlocks, 1)
	require.Equal(t, blks[0].Cid(), fph.lastBlocks[0].Cid(), "did not send correct blocks for first message")
251

Hannah Howard's avatar
Hannah Howard committed
252 253 254
	require.Len(t, fph.lastResponses, 1)
	require.Equal(t, requestID1, fph.lastResponses[0].RequestID())
	require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273

	extensionData1 := testutil.RandomBytes(100)
	extensionName1 := graphsync.ExtensionName("AppleSauce/McGee")
	extension1 := graphsync.ExtensionData{
		Name: extensionName1,
		Data: extensionData1,
	}
	extensionData2 := testutil.RandomBytes(100)
	extensionName2 := graphsync.ExtensionName("HappyLand/Happenstance")
	extension2 := graphsync.ExtensionData{
		Name: extensionName2,
		Data: extensionData2,
	}
	peerResponseManager.SendResponse(requestID1, links[1], blks[1].RawData())
	peerResponseManager.SendExtensionData(requestID1, extension1)
	peerResponseManager.SendExtensionData(requestID1, extension2)
	// let peer reponse manager know last message was sent so message sending can continue
	done <- struct{}{}

Hannah Howard's avatar
Hannah Howard committed
274
	testutil.AssertDoesReceive(ctx, t, sent, "did not send second message")
275

Hannah Howard's avatar
Hannah Howard committed
276
	require.Len(t, fph.lastResponses, 1, "did not send correct number of responses for second message")
277 278 279

	lastResponse := fph.lastResponses[0]
	returnedData1, found := lastResponse.Extension(extensionName1)
Hannah Howard's avatar
Hannah Howard committed
280 281
	require.True(t, found)
	require.Equal(t, extensionData1, returnedData1, "did not encode first extension")
282 283

	returnedData2, found := lastResponse.Extension(extensionName2)
Hannah Howard's avatar
Hannah Howard committed
284 285
	require.True(t, found)
	require.Equal(t, extensionData2, returnedData2, "did not encode first extension")
286 287
}

288
func findResponseForRequestID(responses []gsmsg.GraphSyncResponse, requestID graphsync.RequestID) (gsmsg.GraphSyncResponse, error) {
289 290 291 292 293 294 295
	for _, response := range responses {
		if response.RequestID() == requestID {
			return response, nil
		}
	}
	return gsmsg.GraphSyncResponse{}, fmt.Errorf("Response Not Found")
}