Commit a690d47e authored by hannahhoward's avatar hannahhoward

test(graphsync): test slow connection large message

add test to repro errors with message size exceeding in slow connection with large blocks
parent 366a56f0
...@@ -63,9 +63,7 @@ type blockChain struct { ...@@ -63,9 +63,7 @@ type blockChain struct {
tipLink ipld.Link tipLink ipld.Link
} }
const blockChainLength = 100 func createBlock(nb ipldbridge.NodeBuilder, parents []ipld.Link, size int64) ipld.Node {
func createBlock(nb ipldbridge.NodeBuilder, parents []ipld.Link) ipld.Node {
return nb.CreateMap(func(mb ipldbridge.MapBuilder, knb ipldbridge.NodeBuilder, vnb ipldbridge.NodeBuilder) { return nb.CreateMap(func(mb ipldbridge.MapBuilder, knb ipldbridge.NodeBuilder, vnb ipldbridge.NodeBuilder) {
mb.Insert(knb.CreateString("Parents"), vnb.CreateList(func(lb ipldbridge.ListBuilder, vnb ipldbridge.NodeBuilder) { mb.Insert(knb.CreateString("Parents"), vnb.CreateList(func(lb ipldbridge.ListBuilder, vnb ipldbridge.NodeBuilder) {
for _, parent := range parents { for _, parent := range parents {
...@@ -73,7 +71,7 @@ func createBlock(nb ipldbridge.NodeBuilder, parents []ipld.Link) ipld.Node { ...@@ -73,7 +71,7 @@ func createBlock(nb ipldbridge.NodeBuilder, parents []ipld.Link) ipld.Node {
} }
})) }))
mb.Insert(knb.CreateString("Messages"), vnb.CreateList(func(lb ipldbridge.ListBuilder, vnb ipldbridge.NodeBuilder) { mb.Insert(knb.CreateString("Messages"), vnb.CreateList(func(lb ipldbridge.ListBuilder, vnb ipldbridge.NodeBuilder) {
lb.Append(vnb.CreateString("A message")) lb.Append(vnb.CreateBytes(testutil.RandomBytes(size)))
})) }))
}) })
} }
...@@ -82,10 +80,12 @@ func setupBlockChain( ...@@ -82,10 +80,12 @@ func setupBlockChain(
ctx context.Context, ctx context.Context,
t *testing.T, t *testing.T,
storer ipldbridge.Storer, storer ipldbridge.Storer,
bridge ipldbridge.IPLDBridge) *blockChain { bridge ipldbridge.IPLDBridge,
size int64,
blockChainLength int) *blockChain {
linkBuilder := cidlink.LinkBuilder{Prefix: cid.NewPrefixV1(cid.DagCBOR, mh.SHA2_256)} linkBuilder := cidlink.LinkBuilder{Prefix: cid.NewPrefixV1(cid.DagCBOR, mh.SHA2_256)}
genisisNode, err := bridge.BuildNode(func(nb ipldbridge.NodeBuilder) ipld.Node { genisisNode, err := bridge.BuildNode(func(nb ipldbridge.NodeBuilder) ipld.Node {
return createBlock(nb, []ipld.Link{}) return createBlock(nb, []ipld.Link{}, size)
}) })
if err != nil { if err != nil {
t.Fatal("Error creating genesis block") t.Fatal("Error creating genesis block")
...@@ -99,7 +99,7 @@ func setupBlockChain( ...@@ -99,7 +99,7 @@ func setupBlockChain(
middleLinks := make([]ipld.Link, 0, blockChainLength-2) middleLinks := make([]ipld.Link, 0, blockChainLength-2)
for i := 0; i < blockChainLength-2; i++ { for i := 0; i < blockChainLength-2; i++ {
node, err := bridge.BuildNode(func(nb ipldbridge.NodeBuilder) ipld.Node { node, err := bridge.BuildNode(func(nb ipldbridge.NodeBuilder) ipld.Node {
return createBlock(nb, []ipld.Link{parent}) return createBlock(nb, []ipld.Link{parent}, size)
}) })
if err != nil { if err != nil {
t.Fatal("Error creating middle block") t.Fatal("Error creating middle block")
...@@ -113,7 +113,7 @@ func setupBlockChain( ...@@ -113,7 +113,7 @@ func setupBlockChain(
parent = link parent = link
} }
tipNode, err := bridge.BuildNode(func(nb ipldbridge.NodeBuilder) ipld.Node { tipNode, err := bridge.BuildNode(func(nb ipldbridge.NodeBuilder) ipld.Node {
return createBlock(nb, []ipld.Link{parent}) return createBlock(nb, []ipld.Link{parent}, size)
}) })
if err != nil { if err != nil {
t.Fatal("Error creating tip block") t.Fatal("Error creating tip block")
...@@ -160,9 +160,11 @@ func TestMakeRequestToNetwork(t *testing.T) { ...@@ -160,9 +160,11 @@ func TestMakeRequestToNetwork(t *testing.T) {
bridge := ipldbridge.NewIPLDBridge() bridge := ipldbridge.NewIPLDBridge()
graphSync := New(ctx, gsnet1, bridge, loader, storer) graphSync := New(ctx, gsnet1, bridge, loader, storer)
blockChain := setupBlockChain(ctx, t, storer, bridge) blockChainLength := 100
blockChain := setupBlockChain(ctx, t, storer, bridge, 100, blockChainLength)
spec, err := bridge.BuildSelector(func(ssb ipldbridge.SelectorSpecBuilder) ipldbridge.SelectorSpec { spec, err := bridge.BuildSelector(func(ssb ipldbridge.SelectorSpecBuilder) ipldbridge.SelectorSpec {
return ssb.ExploreRecursive(100, return ssb.ExploreRecursive(blockChainLength,
ssb.ExploreFields(func(efsb ipldbridge.ExploreFieldsSpecBuilder) { ssb.ExploreFields(func(efsb ipldbridge.ExploreFieldsSpecBuilder) {
efsb.Insert("Parents", ssb.ExploreAll( efsb.Insert("Parents", ssb.ExploreAll(
ssb.ExploreRecursiveEdge())) ssb.ExploreRecursiveEdge()))
...@@ -243,9 +245,10 @@ func TestSendResponseToIncomingRequest(t *testing.T) { ...@@ -243,9 +245,10 @@ func TestSendResponseToIncomingRequest(t *testing.T) {
// initialize graphsync on second node to response to requests // initialize graphsync on second node to response to requests
New(ctx, gsnet2, bridge, loader, storer) New(ctx, gsnet2, bridge, loader, storer)
blockChain := setupBlockChain(ctx, t, storer, bridge) blockChainLength := 100
blockChain := setupBlockChain(ctx, t, storer, bridge, 100, blockChainLength)
spec, err := bridge.BuildSelector(func(ssb ipldbridge.SelectorSpecBuilder) ipldbridge.SelectorSpec { spec, err := bridge.BuildSelector(func(ssb ipldbridge.SelectorSpecBuilder) ipldbridge.SelectorSpec {
return ssb.ExploreRecursive(100, return ssb.ExploreRecursive(blockChainLength,
ssb.ExploreFields(func(efsb ipldbridge.ExploreFieldsSpecBuilder) { ssb.ExploreFields(func(efsb ipldbridge.ExploreFieldsSpecBuilder) {
efsb.Insert("Parents", ssb.ExploreAll( efsb.Insert("Parents", ssb.ExploreAll(
ssb.ExploreRecursiveEdge())) ssb.ExploreRecursiveEdge()))
...@@ -326,7 +329,7 @@ func TestGraphsyncRoundTrip(t *testing.T) { ...@@ -326,7 +329,7 @@ func TestGraphsyncRoundTrip(t *testing.T) {
loader1, storer1 := testbridge.NewMockStore(blockStore1) loader1, storer1 := testbridge.NewMockStore(blockStore1)
bridge1 := ipldbridge.NewIPLDBridge() bridge1 := ipldbridge.NewIPLDBridge()
// initialize graphsync on second node to response to requests // initialize graphsync on first node to make requests
requestor := New(ctx, gsnet1, bridge1, loader1, storer1) requestor := New(ctx, gsnet1, bridge1, loader1, storer1)
// setup receiving peer to just record message coming in // setup receiving peer to just record message coming in
...@@ -336,13 +339,14 @@ func TestGraphsyncRoundTrip(t *testing.T) { ...@@ -336,13 +339,14 @@ func TestGraphsyncRoundTrip(t *testing.T) {
loader2, storer2 := testbridge.NewMockStore(blockStore2) loader2, storer2 := testbridge.NewMockStore(blockStore2)
bridge2 := ipldbridge.NewIPLDBridge() bridge2 := ipldbridge.NewIPLDBridge()
blockChain := setupBlockChain(ctx, t, storer2, bridge2) blockChainLength := 100
blockChain := setupBlockChain(ctx, t, storer2, bridge2, 100, blockChainLength)
// initialize graphsync on second node to response to requests // initialize graphsync on second node to response to requests
New(ctx, gsnet2, bridge2, loader2, storer2) New(ctx, gsnet2, bridge2, loader2, storer2)
spec, err := bridge1.BuildSelector(func(ssb ipldbridge.SelectorSpecBuilder) ipldbridge.SelectorSpec { spec, err := bridge1.BuildSelector(func(ssb ipldbridge.SelectorSpecBuilder) ipldbridge.SelectorSpec {
return ssb.ExploreRecursive(100, return ssb.ExploreRecursive(blockChainLength,
ssb.ExploreFields(func(efsb ipldbridge.ExploreFieldsSpecBuilder) { ssb.ExploreFields(func(efsb ipldbridge.ExploreFieldsSpecBuilder) {
efsb.Insert("Parents", ssb.ExploreAll( efsb.Insert("Parents", ssb.ExploreAll(
ssb.ExploreRecursiveEdge())) ssb.ExploreRecursiveEdge()))
...@@ -383,3 +387,82 @@ func TestGraphsyncRoundTrip(t *testing.T) { ...@@ -383,3 +387,82 @@ func TestGraphsyncRoundTrip(t *testing.T) {
} }
} }
} }
// TestRoundTripLargeBlocksSlowNetwork test verifies graphsync continues to work
// under a specific of adverse conditions:
// -- large blocks being returned by a query
// -- slow network connection
// It verifies that Graphsync will properly break up network message packets
// so they can still be decoded on the client side, instead of building up a huge
// backlog of blocks and then sending them in one giant network packet that can't
// be decoded on the client side
func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) {
// create network
if testing.Short() {
t.Skip()
}
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
mn := mocknet.New(ctx)
// setup network
host1, err := mn.GenPeer()
if err != nil {
t.Fatal("error generating host")
}
host2, err := mn.GenPeer()
if err != nil {
t.Fatal("error generating host")
}
mn.SetLinkDefaults(mocknet.LinkOptions{Latency: 100 * time.Millisecond, Bandwidth: 3000000})
err = mn.LinkAll()
if err != nil {
t.Fatal("error linking hosts")
}
gsnet1 := gsnet.NewFromLibp2pHost(host1)
blockStore1 := make(map[ipld.Link][]byte)
loader1, storer1 := testbridge.NewMockStore(blockStore1)
bridge1 := ipldbridge.NewIPLDBridge()
// initialize graphsync on first node to make requests
requestor := New(ctx, gsnet1, bridge1, loader1, storer1)
// setup receiving peer to just record message coming in
gsnet2 := gsnet.NewFromLibp2pHost(host2)
blockStore2 := make(map[ipld.Link][]byte)
loader2, storer2 := testbridge.NewMockStore(blockStore2)
bridge2 := ipldbridge.NewIPLDBridge()
blockChainLength := 40
blockChain := setupBlockChain(ctx, t, storer2, bridge2, 200000, blockChainLength)
// initialize graphsync on second node to response to requests
New(ctx, gsnet2, bridge2, loader2, storer2)
spec, err := bridge1.BuildSelector(func(ssb ipldbridge.SelectorSpecBuilder) ipldbridge.SelectorSpec {
return ssb.ExploreRecursive(blockChainLength,
ssb.ExploreFields(func(efsb ipldbridge.ExploreFieldsSpecBuilder) {
efsb.Insert("Parents", ssb.ExploreAll(
ssb.ExploreRecursiveEdge()))
}))
})
if err != nil {
t.Fatal("Failed creating selector")
}
progressChan, errChan := requestor.Request(ctx, host2.ID(), blockChain.tipLink, spec)
responses := testutil.CollectResponses(ctx, t, progressChan)
errs := testutil.CollectErrors(ctx, t, errChan)
if len(responses) != blockChainLength*2 {
t.Fatal("did not traverse all nodes")
}
if len(errs) != 0 {
t.Fatal("errors during traverse")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment