Commit 77e2de9b authored by hannahhoward's avatar hannahhoward

refactor(testbridge): remove mock selector spec

Removed mock selector spec, switch to using more real blockchain simulator
parent 84fb4879
......@@ -3,39 +3,29 @@ module github.com/ipfs/go-graphsync
go 1.12
require (
github.com/btcsuite/btcd v0.0.0-20190629003639-c26ffa870fd8 // indirect
github.com/gogo/protobuf v1.2.1
github.com/golang/protobuf v1.3.2 // indirect
github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce
github.com/gogo/protobuf v1.3.1
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.0
github.com/ipfs/go-cid v0.0.3
github.com/ipfs/go-datastore v0.0.5
github.com/ipfs/go-ipfs-blockstore v0.0.1
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c
github.com/ipfs/go-cid v0.0.4-0.20191112011718-79e75dffeb10
github.com/ipfs/go-datastore v0.1.0
github.com/ipfs/go-ipfs-blockstore v0.1.0
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.4
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-ipld-format v0.0.2
github.com/ipfs/go-log v0.0.1
github.com/ipfs/go-merkledag v0.2.3
github.com/ipfs/go-log v1.0.0
github.com/ipfs/go-merkledag v0.2.4
github.com/ipfs/go-peertaskqueue v0.2.0
github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb
github.com/ipld/go-ipld-prime v0.0.2-0.20191108012745-28a82f04c785
github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/libp2p/go-eventbus v0.0.3 // indirect
github.com/libp2p/go-libp2p v0.2.1
github.com/libp2p/go-libp2p-core v0.0.9
github.com/libp2p/go-libp2p v0.3.0
github.com/libp2p/go-libp2p-core v0.2.4
github.com/libp2p/go-libp2p-peer v0.2.0
github.com/libp2p/go-libp2p-secio v0.1.1 // indirect
github.com/libp2p/go-msgio v0.0.4 // indirect
github.com/multiformats/go-multiaddr v0.0.4
github.com/multiformats/go-multiaddr-dns v0.0.3 // indirect
github.com/multiformats/go-multihash v0.0.6
github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 // indirect
go.opencensus.io v0.22.0 // indirect
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e // indirect
github.com/multiformats/go-multiaddr v0.1.1
github.com/multiformats/go-multihash v0.0.9
)
This diff is collapsed.
......@@ -14,14 +14,12 @@ import (
"testing"
"time"
"github.com/ipld/go-ipld-prime/fluent"
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
bstore "github.com/ipfs/go-ipfs-blockstore"
......@@ -47,7 +45,6 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
mh "github.com/multiformats/go-multihash"
)
func TestMakeRequestToNetwork(t *testing.T) {
......@@ -63,13 +60,11 @@ func TestMakeRequestToNetwork(t *testing.T) {
graphSync := td.GraphSyncHost1()
blockChainLength := 100
blockChain := setupBlockChain(ctx, t, td.storer1, td.bridge, 100, blockChainLength)
spec := blockChainSelector(blockChainLength)
blockChain := testutil.SetupBlockChain(ctx, t, td.loader1, td.storer1, 100, blockChainLength)
requestCtx, requestCancel := context.WithCancel(ctx)
defer requestCancel()
graphSync.Request(requestCtx, td.host2.ID(), blockChain.tipLink, spec, td.extension)
graphSync.Request(requestCtx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)
var message receivedMessage
select {
......@@ -90,7 +85,7 @@ func TestMakeRequestToNetwork(t *testing.T) {
}
receivedRequest := receivedRequests[0]
receivedSpec := receivedRequest.Selector()
if !reflect.DeepEqual(spec, receivedSpec) {
if !reflect.DeepEqual(blockChain.Selector(), receivedSpec) {
t.Fatal("did not transmit selector spec correctly")
}
_, err := td.bridge.ParseSelector(receivedSpec)
......@@ -133,14 +128,12 @@ func TestSendResponseToIncomingRequest(t *testing.T) {
}
blockChainLength := 100
blockChain := setupBlockChain(ctx, t, td.storer2, td.bridge, 100, blockChainLength)
spec := blockChainSelector(blockChainLength)
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength)
requestID := graphsync.RequestID(rand.Int31())
message := gsmsg.New()
message.AddRequest(gsmsg.NewRequest(requestID, blockChain.tipLink.(cidlink.Link).Cid, spec, graphsync.Priority(math.MaxInt32), td.extension))
message.AddRequest(gsmsg.NewRequest(requestID, blockChain.TipLink.(cidlink.Link).Cid, blockChain.Selector(), graphsync.Priority(math.MaxInt32), td.extension))
// send request across network
td.gsnet1.SendMessage(ctx, td.host2.ID(), message)
// read the values sent back to requestor
......@@ -206,7 +199,7 @@ func TestGraphsyncRoundTrip(t *testing.T) {
// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := setupBlockChain(ctx, t, td.storer2, td.bridge, 100, blockChainLength)
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength)
// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()
......@@ -240,9 +233,7 @@ func TestGraphsyncRoundTrip(t *testing.T) {
t.Fatal("Error setting up extension")
}
spec := blockChainSelector(blockChainLength)
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.tipLink, spec, td.extension)
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)
responses := testutil.CollectResponses(ctx, t, progressChan)
errs := testutil.CollectErrors(ctx, t, errChan)
......@@ -307,13 +298,12 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) {
// setup receiving peer to just record message coming in
blockChainLength := 40
blockChain := setupBlockChain(ctx, t, td.storer2, td.bridge, 200000, blockChainLength)
blockChain := testutil.SetupBlockChain(ctx, t, td.loader1, td.storer2, 200000, blockChainLength)
// initialize graphsync on second node to response to requests
td.GraphSyncHost2()
spec := blockChainSelector(blockChainLength)
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.tipLink, spec)
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector())
responses := testutil.CollectResponses(ctx, t, progressChan)
errs := testutil.CollectErrors(ctx, t, errChan)
......@@ -599,89 +589,3 @@ func (r *receiver) Connected(p peer.ID) {
func (r *receiver) Disconnected(p peer.ID) {
}
type blockChain struct {
genisisNode ipld.Node
genisisLink ipld.Link
middleNodes []ipld.Node
middleLinks []ipld.Link
tipNode ipld.Node
tipLink ipld.Link
}
func createBlock(nb ipldbridge.NodeBuilder, parents []ipld.Link, size int64) ipld.Node {
return nb.CreateMap(func(mb ipldbridge.MapBuilder, knb ipldbridge.NodeBuilder, vnb ipldbridge.NodeBuilder) {
mb.Insert(knb.CreateString("Parents"), vnb.CreateList(func(lb ipldbridge.ListBuilder, vnb ipldbridge.NodeBuilder) {
for _, parent := range parents {
lb.Append(vnb.CreateLink(parent))
}
}))
mb.Insert(knb.CreateString("Messages"), vnb.CreateList(func(lb ipldbridge.ListBuilder, vnb ipldbridge.NodeBuilder) {
lb.Append(vnb.CreateBytes(testutil.RandomBytes(size)))
}))
})
}
func setupBlockChain(
ctx context.Context,
t *testing.T,
storer ipldbridge.Storer,
bridge ipldbridge.IPLDBridge,
size int64,
blockChainLength int) *blockChain {
linkBuilder := cidlink.LinkBuilder{Prefix: cid.NewPrefixV1(cid.DagCBOR, mh.SHA2_256)}
var genisisNode ipld.Node
err := fluent.Recover(func() {
nb := fluent.WrapNodeBuilder(ipldfree.NodeBuilder())
genisisNode = createBlock(nb, []ipld.Link{}, size)
})
if err != nil {
t.Fatal("Error creating genesis block")
}
genesisLink, err := linkBuilder.Build(ctx, ipldbridge.LinkContext{}, genisisNode, storer)
if err != nil {
t.Fatal("Error creating link to genesis block")
}
parent := genesisLink
middleNodes := make([]ipld.Node, 0, blockChainLength-2)
middleLinks := make([]ipld.Link, 0, blockChainLength-2)
for i := 0; i < blockChainLength-2; i++ {
var node ipld.Node
err := fluent.Recover(func() {
nb := fluent.WrapNodeBuilder(ipldfree.NodeBuilder())
node = createBlock(nb, []ipld.Link{parent}, size)
})
if err != nil {
t.Fatal("Error creating middle block")
}
middleNodes = append(middleNodes, node)
link, err := linkBuilder.Build(ctx, ipldbridge.LinkContext{}, node, storer)
if err != nil {
t.Fatal("Error creating link to middle block")
}
middleLinks = append(middleLinks, link)
parent = link
}
var tipNode ipld.Node
err = fluent.Recover(func() {
nb := fluent.WrapNodeBuilder(ipldfree.NodeBuilder())
tipNode = createBlock(nb, []ipld.Link{parent}, size)
})
if err != nil {
t.Fatal("Error creating tip block")
}
tipLink, err := linkBuilder.Build(ctx, ipldbridge.LinkContext{}, tipNode, storer)
if err != nil {
t.Fatal("Error creating link to tip block")
}
return &blockChain{genisisNode, genesisLink, middleNodes, middleLinks, tipNode, tipLink}
}
func blockChainSelector(blockChainLength int) ipld.Node {
ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
return ssb.ExploreRecursive(ipldselector.RecursionLimitDepth(blockChainLength),
ssb.ExploreFields(func(efsb ipldbridge.ExploreFieldsSpecBuilder) {
efsb.Insert("Parents", ssb.ExploreAll(
ssb.ExploreRecursiveEdge()))
})).Node()
}
......@@ -162,25 +162,6 @@ func readNNetworkRequests(ctx context.Context,
return requestRecords
}
func verifyMatchedResponses(t *testing.T, actualResponse []graphsync.ResponseProgress, expectedBlocks []blocks.Block) {
if len(actualResponse) != len(expectedBlocks) {
t.Fatal("wrong number of responses sent")
}
for _, responseProgress := range actualResponse {
data, err := responseProgress.Node.AsBytes()
if err != nil {
t.Fatal("Node was not a block")
}
blk, err := blocks.NewBlockWithCid(data, responseProgress.LastBlock.Link.(cidlink.Link).Cid)
if err != nil {
t.Fatal("block did not verify")
}
if !testutil.ContainsBlock(expectedBlocks, blk) {
t.Fatal("wrong block sent")
}
}
}
func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata {
md := make(metadata.Metadata, 0, len(blks))
for _, block := range blks {
......@@ -218,15 +199,13 @@ func TestNormalSimultaneousFetch(t *testing.T) {
defer cancel()
peers := testutil.GeneratePeers(1)
blocks1 := testutil.GenerateBlocksOfSize(5, 100)
blocks2 := testutil.GenerateBlocksOfSize(5, 100)
r1 := cidlink.Link{Cid: blocks1[0].Cid()}
r2 := cidlink.Link{Cid: blocks2[0].Cid()}
s1 := testbridge.NewMockSelectorSpec(cidsForBlocks(blocks1))
s2 := testbridge.NewMockSelectorSpec(cidsForBlocks(blocks2))
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
blockChain1 := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
blockChain2 := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
returnedResponseChan1, returnedErrorChan1 := requestManager.SendRequest(requestCtx, peers[0], r1, s1)
returnedResponseChan2, returnedErrorChan2 := requestManager.SendRequest(requestCtx, peers[0], r2, s2)
returnedResponseChan1, returnedErrorChan1 := requestManager.SendRequest(requestCtx, peers[0], blockChain1.TipLink, blockChain1.Selector())
returnedResponseChan2, returnedErrorChan2 := requestManager.SendRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector())
requestRecords := readNNetworkRequests(requestCtx, t, requestRecordChan, 2)
......@@ -237,22 +216,20 @@ func TestNormalSimultaneousFetch(t *testing.T) {
t.Fatal("did not send correct requests")
}
returnedS1 := requestRecords[0].gsr.Selector()
if !reflect.DeepEqual(s1, returnedS1) {
if !reflect.DeepEqual(blockChain1.Selector(), requestRecords[0].gsr.Selector()) {
t.Fatal("did not encode selector properly")
}
returnedS2 := requestRecords[1].gsr.Selector()
if !reflect.DeepEqual(s2, returnedS2) {
if !reflect.DeepEqual(blockChain2.Selector(), requestRecords[1].gsr.Selector()) {
t.Fatal("did not encode selector properly")
}
firstBlocks := append(blocks1, blocks2[:3]...)
firstMetadata1 := metadataForBlocks(blocks1, true)
firstBlocks := append(blockChain1.AllBlocks(), blockChain2.Blocks(0, 3)...)
firstMetadata1 := metadataForBlocks(blockChain1.AllBlocks(), true)
firstMetadataEncoded1, err := metadata.EncodeMetadata(firstMetadata1)
if err != nil {
t.Fatal("did not encode metadata")
}
firstMetadata2 := metadataForBlocks(blocks2[:3], true)
firstMetadata2 := metadataForBlocks(blockChain2.Blocks(0, 3), true)
firstMetadataEncoded2, err := metadata.EncodeMetadata(firstMetadata2)
if err != nil {
t.Fatal("did not encode metadata")
......@@ -274,15 +251,13 @@ func TestNormalSimultaneousFetch(t *testing.T) {
requestRecords[0].gsr.ID(): firstMetadata1,
requestRecords[1].gsr.ID(): firstMetadata2,
})
fal.successResponseOn(requestRecords[0].gsr.ID(), blocks1)
fal.successResponseOn(requestRecords[1].gsr.ID(), blocks2[:3])
fal.successResponseOn(requestRecords[0].gsr.ID(), blockChain1.AllBlocks())
fal.successResponseOn(requestRecords[1].gsr.ID(), blockChain2.Blocks(0, 3))
responses1 := testutil.CollectResponses(requestCtx, t, returnedResponseChan1)
verifyMatchedResponses(t, responses1, blocks1)
responses2 := testutil.ReadNResponses(requestCtx, t, returnedResponseChan2, 3)
verifyMatchedResponses(t, responses2, blocks2[:3])
blockChain1.VerifyWholeChain(requestCtx, returnedResponseChan1)
blockChain2.VerifyResponseRange(requestCtx, returnedResponseChan2, 0, 3)
moreBlocks := blocks2[3:]
moreBlocks := blockChain2.RemainderBlocks(3)
moreMetadata := metadataForBlocks(moreBlocks, true)
moreMetadataEncoded, err := metadata.EncodeMetadata(moreMetadata)
if err != nil {
......@@ -303,8 +278,7 @@ func TestNormalSimultaneousFetch(t *testing.T) {
fal.successResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
responses2 = testutil.CollectResponses(requestCtx, t, returnedResponseChan2)
verifyMatchedResponses(t, responses2, moreBlocks)
blockChain2.VerifyRemainder(requestCtx, returnedResponseChan2, 3)
testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
}
......@@ -325,17 +299,17 @@ func TestCancelRequestInProgress(t *testing.T) {
defer cancel2()
peers := testutil.GeneratePeers(1)
blocks1 := testutil.GenerateBlocksOfSize(5, 100)
s1 := testbridge.NewMockSelectorSpec(cidsForBlocks(blocks1))
r1 := cidlink.Link{Cid: blocks1[0].Cid()}
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
returnedResponseChan1, returnedErrorChan1 := requestManager.SendRequest(requestCtx1, peers[0], r1, s1)
returnedResponseChan2, returnedErrorChan2 := requestManager.SendRequest(requestCtx2, peers[0], r1, s1)
returnedResponseChan1, returnedErrorChan1 := requestManager.SendRequest(requestCtx1, peers[0], blockChain.TipLink, blockChain.Selector())
returnedResponseChan2, returnedErrorChan2 := requestManager.SendRequest(requestCtx2, peers[0], blockChain.TipLink, blockChain.Selector())
requestRecords := readNNetworkRequests(requestCtx, t, requestRecordChan, 2)
firstBlocks := blocks1[:3]
firstMetadata := encodedMetadataForBlocks(t, blocks1[:3], true)
firstBlocks := blockChain.Blocks(0, 3)
firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
firstResponses := []gsmsg.GraphSyncResponse{
gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata),
gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.PartialResponse, firstMetadata),
......@@ -343,30 +317,27 @@ func TestCancelRequestInProgress(t *testing.T) {
requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
fal.successResponseOn(requestRecords[0].gsr.ID(), blocks1[:3])
fal.successResponseOn(requestRecords[1].gsr.ID(), blocks1[:3])
responses1 := testutil.ReadNResponses(requestCtx, t, returnedResponseChan1, 3)
fal.successResponseOn(requestRecords[0].gsr.ID(), firstBlocks)
fal.successResponseOn(requestRecords[1].gsr.ID(), firstBlocks)
blockChain.VerifyResponseRange(requestCtx1, returnedResponseChan1, 0, 3)
cancel1()
rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
if rr.gsr.IsCancel() != true || rr.gsr.ID() != requestRecords[0].gsr.ID() {
t.Fatal("did not send correct cancel message over network")
}
moreBlocks := blocks1[3:]
moreMetadata := encodedMetadataForBlocks(t, blocks1[3:], true)
moreBlocks := blockChain.RemainderBlocks(3)
moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
moreResponses := []gsmsg.GraphSyncResponse{
gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
gsmsg.NewResponse(requestRecords[1].gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
}
requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
fal.successResponseOn(requestRecords[0].gsr.ID(), blocks1[3:])
fal.successResponseOn(requestRecords[1].gsr.ID(), blocks1[3:])
fal.successResponseOn(requestRecords[0].gsr.ID(), moreBlocks)
fal.successResponseOn(requestRecords[1].gsr.ID(), moreBlocks)
responses1 = append(responses1, testutil.CollectResponses(requestCtx, t, returnedResponseChan1)...)
verifyMatchedResponses(t, responses1, blocks1[:3])
responses2 := testutil.CollectResponses(requestCtx, t, returnedResponseChan2)
verifyMatchedResponses(t, responses2, blocks1)
testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan1)
blockChain.VerifyWholeChain(requestCtx, returnedResponseChan2)
testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan1)
testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan2)
}
......@@ -385,33 +356,32 @@ func TestCancelManagerExitsGracefully(t *testing.T) {
defer cancel()
peers := testutil.GeneratePeers(1)
blocks := testutil.GenerateBlocksOfSize(5, 100)
s := testbridge.NewMockSelectorSpec(cidsForBlocks(blocks))
r := cidlink.Link{Cid: blocks[0].Cid()}
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], r, s)
returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
firstBlocks := blocks[:3]
firstBlocks := blockChain.Blocks(0, 3)
firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
firstResponses := []gsmsg.GraphSyncResponse{
gsmsg.NewResponse(rr.gsr.ID(), graphsync.PartialResponse, firstMetadata),
}
requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks)
fal.successResponseOn(rr.gsr.ID(), firstBlocks)
responses := testutil.ReadNResponses(requestCtx, t, returnedResponseChan, 3)
blockChain.VerifyResponseRange(ctx, returnedResponseChan, 0, 3)
managerCancel()
moreBlocks := blocks[3:]
moreBlocks := blockChain.RemainderBlocks(3)
moreMetadata := encodedMetadataForBlocks(t, moreBlocks, true)
moreResponses := []gsmsg.GraphSyncResponse{
gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, moreMetadata),
}
requestManager.ProcessResponses(peers[0], moreResponses, moreBlocks)
fal.successResponseOn(rr.gsr.ID(), moreBlocks)
responses = append(responses, testutil.CollectResponses(requestCtx, t, returnedResponseChan)...)
verifyMatchedResponses(t, responses, firstBlocks)
testutil.VerifyEmptyResponse(requestCtx, t, returnedResponseChan)
testutil.VerifyEmptyErrors(requestCtx, t, returnedErrorChan)
}
......@@ -475,10 +445,11 @@ func TestFailedRequest(t *testing.T) {
defer cancel()
peers := testutil.GeneratePeers(1)
blocks := testutil.GenerateBlocksOfSize(5, 100)
s := testbridge.NewMockSelectorSpec(cidsForBlocks(blocks))
r := cidlink.Link{Cid: blocks[0].Cid()}
returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], r, s)
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
failedResponses := []gsmsg.GraphSyncResponse{
......@@ -504,18 +475,18 @@ func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) {
defer cancel()
peers := testutil.GeneratePeers(1)
blocks := testutil.GenerateBlocksOfSize(5, 100)
s := testbridge.NewMockSelectorSpec(cidsForBlocks(blocks))
r := cidlink.Link{Cid: blocks[0].Cid()}
returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], r, s)
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
// async loaded response responds immediately
fal.successResponseOn(rr.gsr.ID(), blocks)
fal.successResponseOn(rr.gsr.ID(), blockChain.AllBlocks())
responses := testutil.CollectResponses(requestCtx, t, returnedResponseChan)
verifyMatchedResponses(t, responses, blocks)
blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
// failure comes in later over network
failedResponses := []gsmsg.GraphSyncResponse{
......@@ -541,24 +512,23 @@ func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) {
defer cancel()
peers := testutil.GeneratePeers(1)
blocks := testutil.GenerateBlocksOfSize(5, 100)
s := testbridge.NewMockSelectorSpec(cidsForBlocks(blocks))
r := cidlink.Link{Cid: blocks[0].Cid()}
returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], r, s)
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
// async loaded response responds immediately
fal.successResponseOn(rr.gsr.ID(), blocks)
fal.successResponseOn(rr.gsr.ID(), blockChain.AllBlocks())
responses := testutil.CollectResponses(requestCtx, t, returnedResponseChan)
verifyMatchedResponses(t, responses, blocks)
blockChain.VerifyWholeChain(requestCtx, returnedResponseChan)
md := encodedMetadataForBlocks(t, blocks, true)
md := encodedMetadataForBlocks(t, blockChain.AllBlocks(), true)
firstResponses := []gsmsg.GraphSyncResponse{
gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedFull, md),
}
requestManager.ProcessResponses(peers[0], firstResponses, blocks)
requestManager.ProcessResponses(peers[0], firstResponses, blockChain.AllBlocks())
fal.verifyNoRemainingData(t, rr.gsr.ID())
testutil.VerifyEmptyErrors(ctx, t, returnedErrorChan)
......@@ -578,25 +548,25 @@ func TestRequestReturnsMissingBlocks(t *testing.T) {
defer cancel()
peers := testutil.GeneratePeers(1)
blocks := testutil.GenerateBlocksOfSize(5, 100)
s := testbridge.NewMockSelectorSpec(cidsForBlocks(blocks))
r := cidlink.Link{Cid: blocks[0].Cid()}
returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], r, s)
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector())
rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
md := encodedMetadataForBlocks(t, blocks, false)
md := encodedMetadataForBlocks(t, blockChain.AllBlocks(), false)
firstResponses := []gsmsg.GraphSyncResponse{
gsmsg.NewResponse(rr.gsr.ID(), graphsync.RequestCompletedPartial, md),
}
requestManager.ProcessResponses(peers[0], firstResponses, nil)
for _, block := range blocks {
for _, block := range blockChain.AllBlocks() {
fal.responseOn(rr.gsr.ID(), cidlink.Link{Cid: block.Cid()}, types.AsyncLoadResult{Data: nil, Err: fmt.Errorf("Terrible Thing")})
}
testutil.VerifyEmptyResponse(ctx, t, returnedResponseChan)
errs := testutil.CollectErrors(ctx, t, returnedErrorChan)
if len(errs) != len(blocks) {
t.Fatal("did not send all errors")
if len(errs) == 0 {
t.Fatal("did not send errors")
}
}
......@@ -615,9 +585,9 @@ func TestEncodingExtensions(t *testing.T) {
defer cancel()
peers := testutil.GeneratePeers(1)
cids := testutil.GenerateCids(1)
root := cidlink.Link{Cid: cids[0]}
selector := testbridge.NewMockSelectorSpec(cids)
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
extensionData1 := testutil.RandomBytes(100)
extensionName1 := graphsync.ExtensionName("AppleSauce/McGee")
......@@ -643,7 +613,7 @@ func TestEncodingExtensions(t *testing.T) {
return <-expectedError
}
requestManager.RegisterHook(hook)
returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], root, selector, extension1, extension2)
returnedResponseChan, returnedErrorChan := requestManager.SendRequest(requestCtx, peers[0], blockChain.TipLink, blockChain.Selector(), extension1, extension2)
rr := readNNetworkRequests(requestCtx, t, requestRecordChan, 1)[0]
......
......@@ -2,6 +2,8 @@ package loader
import (
"bytes"
"context"
"errors"
"io"
"github.com/ipfs/go-graphsync"
......@@ -20,7 +22,8 @@ type ResponseSender interface {
// WrapLoader wraps a given loader with an interceptor that sends loaded
// blocks out to the network with the given response sender.
func WrapLoader(loader ipldbridge.Loader,
func WrapLoader(ctx context.Context,
loader ipldbridge.Loader,
requestID graphsync.RequestID,
responseSender ResponseSender) ipldbridge.Loader {
return func(lnk ipld.Link, lnkCtx ipldbridge.LinkContext) (io.Reader, error) {
......@@ -38,6 +41,11 @@ func WrapLoader(loader ipldbridge.Loader,
if data == nil {
err = ipldbridge.ErrDoNotFollow()
}
select {
case <-ctx.Done():
return nil, errors.New("context cancelled")
default:
}
return result, err
}
}
......@@ -2,6 +2,7 @@ package loader
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
......@@ -34,6 +35,7 @@ func (frs *fakeResponseSender) SendResponse(
}
func TestWrappedLoaderSendsResponses(t *testing.T) {
ctx := context.Background()
frs := &fakeResponseSender{}
link1 := testbridge.NewMockLink()
link2 := testbridge.NewMockLink()
......@@ -47,7 +49,7 @@ func TestWrappedLoaderSendsResponses(t *testing.T) {
return nil, fmt.Errorf("unable to load block")
}
requestID := graphsync.RequestID(rand.Int31())
wrappedLoader := WrapLoader(loader, requestID, frs)
wrappedLoader := WrapLoader(ctx, loader, requestID, frs)
reader, err := wrappedLoader(link1, ipldbridge.LinkContext{})
if err != nil {
......
......@@ -238,7 +238,7 @@ func (rm *ResponseManager) executeQuery(ctx context.Context,
return
}
rootLink := cidlink.Link{Cid: request.Root()}
wrappedLoader := loader.WrapLoader(rm.loader, request.ID(), peerResponseSender)
wrappedLoader := loader.WrapLoader(ctx, rm.loader, request.ID(), peerResponseSender)
err = rm.ipldBridge.Traverse(ctx, wrappedLoader, rootLink, selector, noopVisitor)
if err != nil {
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
......
......@@ -10,7 +10,6 @@ import (
"testing"
"time"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
......@@ -131,8 +130,12 @@ func TestIncomingQuery(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 40*time.Millisecond)
defer cancel()
blks := testutil.GenerateBlocksOfSize(5, 20)
loader := testbridge.NewMockLoader(blks)
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
blks := blockChain.AllBlocks()
ipldBridge := testbridge.NewMockIPLDBridge()
requestIDChan := make(chan completedRequest, 1)
sentResponses := make(chan sentResponse, len(blks))
......@@ -143,14 +146,9 @@ func TestIncomingQuery(t *testing.T) {
responseManager := New(ctx, loader, ipldBridge, peerManager, queryQueue)
responseManager.Startup()
cids := make([]cid.Cid, 0, 5)
for _, block := range blks {
cids = append(cids, block.Cid())
}
selectorSpec := testbridge.NewMockSelectorSpec(cids)
requestID := graphsync.RequestID(rand.Int31())
requests := []gsmsg.GraphSyncRequest{
gsmsg.NewRequest(requestID, cids[0], selectorSpec, graphsync.Priority(math.MaxInt32)),
gsmsg.NewRequest(requestID, blockChain.TipLink.(cidlink.Link).Cid, blockChain.Selector(), graphsync.Priority(math.MaxInt32)),
}
p := testutil.GeneratePeers(1)[0]
responseManager.ProcessRequests(ctx, p, requests)
......@@ -183,8 +181,12 @@ func TestCancellationQueryInProgress(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 40*time.Millisecond)
defer cancel()
blks := testutil.GenerateBlocksOfSize(5, 20)
loader := testbridge.NewMockLoader(blks)
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
blks := blockChain.AllBlocks()
ipldBridge := testbridge.NewMockIPLDBridge()
requestIDChan := make(chan completedRequest)
sentResponses := make(chan sentResponse)
......@@ -195,14 +197,9 @@ func TestCancellationQueryInProgress(t *testing.T) {
responseManager := New(ctx, loader, ipldBridge, peerManager, queryQueue)
responseManager.Startup()
cids := make([]cid.Cid, 0, 5)
for _, block := range blks {
cids = append(cids, block.Cid())
}
selectorSpec := testbridge.NewMockSelectorSpec(cids)
requestID := graphsync.RequestID(rand.Int31())
requests := []gsmsg.GraphSyncRequest{
gsmsg.NewRequest(requestID, cids[0], selectorSpec, graphsync.Priority(math.MaxInt32)),
gsmsg.NewRequest(requestID, blockChain.TipLink.(cidlink.Link).Cid, blockChain.Selector(), graphsync.Priority(math.MaxInt32)),
}
p := testutil.GeneratePeers(1)[0]
responseManager.ProcessRequests(ctx, p, requests)
......@@ -267,8 +264,11 @@ func TestEarlyCancellation(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 40*time.Millisecond)
defer cancel()
blks := testutil.GenerateBlocksOfSize(5, 20)
loader := testbridge.NewMockLoader(blks)
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
ipldBridge := testbridge.NewMockIPLDBridge()
requestIDChan := make(chan completedRequest)
sentResponses := make(chan sentResponse)
......@@ -280,14 +280,9 @@ func TestEarlyCancellation(t *testing.T) {
responseManager := New(ctx, loader, ipldBridge, peerManager, queryQueue)
responseManager.Startup()
cids := make([]cid.Cid, 0, 5)
for _, block := range blks {
cids = append(cids, block.Cid())
}
selectorSpec := testbridge.NewMockSelectorSpec(cids)
requestID := graphsync.RequestID(rand.Int31())
requests := []gsmsg.GraphSyncRequest{
gsmsg.NewRequest(requestID, cids[0], selectorSpec, graphsync.Priority(math.MaxInt32)),
gsmsg.NewRequest(requestID, blockChain.TipLink.(cidlink.Link).Cid, blockChain.Selector(), graphsync.Priority(math.MaxInt32)),
}
p := testutil.GeneratePeers(1)[0]
responseManager.ProcessRequests(ctx, p, requests)
......@@ -317,8 +312,11 @@ func TestValidationAndExtensions(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 40*time.Millisecond)
defer cancel()
blks := testutil.GenerateBlocksOfSize(5, 20)
loader := testbridge.NewMockLoader(blks)
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
blockChain := testutil.SetupBlockChain(ctx, t, loader, storer, 100, 5)
ipldBridge := testbridge.NewMockIPLDBridge()
completedRequestChan := make(chan completedRequest, 1)
sentResponses := make(chan sentResponse, 100)
......@@ -327,11 +325,6 @@ func TestValidationAndExtensions(t *testing.T) {
peerManager := &fakePeerManager{peerResponseSender: fprs}
queryQueue := &fakeQueryQueue{}
cids := make([]cid.Cid, 0, 5)
for _, block := range blks {
cids = append(cids, block.Cid())
}
extensionData := testutil.RandomBytes(100)
extensionName := graphsync.ExtensionName("AppleSauce/McGee")
extension := graphsync.ExtensionData{
......@@ -348,7 +341,7 @@ func TestValidationAndExtensions(t *testing.T) {
selectorSpec := testbridge.NewInvalidSelectorSpec()
requestID := graphsync.RequestID(rand.Int31())
requests := []gsmsg.GraphSyncRequest{
gsmsg.NewRequest(requestID, cids[0], selectorSpec, graphsync.Priority(math.MaxInt32), extension),
gsmsg.NewRequest(requestID, blockChain.TipLink.(cidlink.Link).Cid, selectorSpec, graphsync.Priority(math.MaxInt32), extension),
}
p := testutil.GeneratePeers(1)[0]
......@@ -419,10 +412,9 @@ func TestValidationAndExtensions(t *testing.T) {
})
t.Run("with valid selector", func(t *testing.T) {
selectorSpec := testbridge.NewMockSelectorSpec(cids)
requestID := graphsync.RequestID(rand.Int31())
requests := []gsmsg.GraphSyncRequest{
gsmsg.NewRequest(requestID, cids[0], selectorSpec, graphsync.Priority(math.MaxInt32), extension),
gsmsg.NewRequest(requestID, blockChain.TipLink.(cidlink.Link).Cid, blockChain.Selector(), graphsync.Priority(math.MaxInt32), extension),
}
p := testutil.GeneratePeers(1)[0]
......
......@@ -17,12 +17,6 @@ type mockSelectorSpec struct {
FailValidation bool
}
// NewMockSelectorSpec returns a new mock selector that will visit the given
// cids.
func NewMockSelectorSpec(cidsVisited []cid.Cid) ipld.Node {
return &mockSelectorSpec{cidsVisited, false, false, false}
}
// NewUnparsableSelectorSpec returns a spec that will fail when you attempt to
// validate it or decompose to a node + selector.
func NewUnparsableSelectorSpec(cidsVisited []cid.Cid) ipld.Node {
......
package testbridge
import (
"bytes"
"context"
"fmt"
"io"
"testing"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
blocks "github.com/ipfs/go-block-format"
"github.com/ipld/go-ipld-prime"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync/ipldbridge"
"github.com/ipfs/go-graphsync/testutil"
)
func TestSelectorTraversal(t *testing.T) {
blks := testutil.GenerateBlocksOfSize(5, 20)
cids := make([]cid.Cid, 0, 5)
for _, block := range blks {
cids = append(cids, block.Cid())
}
var uniqueBlocksVisited []blocks.Block
loader := func(ipldLink ipld.Link, lnkCtx ipldbridge.LinkContext) (io.Reader, error) {
lnk := ipldLink.(cidlink.Link).Cid
for _, block := range blks {
if block.Cid() == lnk {
if testutil.ContainsBlock(uniqueBlocksVisited, block) {
return nil, fmt.Errorf("loaded block twice")
}
uniqueBlocksVisited = append(uniqueBlocksVisited, block)
return bytes.NewReader(block.RawData()), nil
}
}
return nil, fmt.Errorf("unable to load block")
}
bridge := NewMockIPLDBridge()
root := cidlink.Link{Cid: cids[0]}
mockSelectorSpec := NewMockSelectorSpec(cids)
selector, err := bridge.ParseSelector(mockSelectorSpec)
if err != nil {
t.Fatal("unable to decode selector")
}
var traversalFn ipldbridge.AdvVisitFn
traversalFn = func(tp ipldbridge.TraversalProgress, node ipld.Node, traversalReason ipldbridge.TraversalReason) error {
return nil
}
ctx := context.Background()
err = bridge.Traverse(ctx, loader, root, selector, traversalFn)
if err != nil {
t.Fatal(err.Error())
}
if len(uniqueBlocksVisited) != 5 {
t.Fatal("did not visit all blocks")
}
}
func TestFailParseSelectorSpec(t *testing.T) {
cids := testutil.GenerateCids(5)
spec := NewUnparsableSelectorSpec(cids)
bridge := NewMockIPLDBridge()
bridge := ipldbridge.NewIPLDBridge()
_, err := bridge.ParseSelector(spec)
if err == nil {
t.Fatal("Spec should not decompose to node and selector")
......
package testutil
import (
"context"
"io/ioutil"
"testing"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/fluent"
ipldfree "github.com/ipld/go-ipld-prime/impl/free"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
mh "github.com/multiformats/go-multihash"
)
const blockChainTraversedNodesPerBlock = 2
// TestBlockChain is a simulated data structure similar to a blockchain
// which graphsync is uniquely suited for
type TestBlockChain struct {
t *testing.T
blockChainLength int
loader ipld.Loader
GenisisNode ipld.Node
GenisisLink ipld.Link
MiddleNodes []ipld.Node
MiddleLinks []ipld.Link
TipNode ipld.Node
TipLink ipld.Link
}
func createBlock(nb fluent.NodeBuilder, parents []ipld.Link, size uint64) ipld.Node {
return nb.CreateMap(func(mb fluent.MapBuilder, knb fluent.NodeBuilder, vnb fluent.NodeBuilder) {
mb.Insert(knb.CreateString("Parents"), vnb.CreateList(func(lb fluent.ListBuilder, vnb fluent.NodeBuilder) {
for _, parent := range parents {
lb.Append(vnb.CreateLink(parent))
}
}))
mb.Insert(knb.CreateString("Messages"), vnb.CreateList(func(lb fluent.ListBuilder, vnb fluent.NodeBuilder) {
lb.Append(vnb.CreateBytes(RandomBytes(int64(size))))
}))
})
}
// SetupBlockChain creates a new test block chain with the given height
func SetupBlockChain(
ctx context.Context,
t *testing.T,
loader ipld.Loader,
storer ipld.Storer,
size uint64,
blockChainLength int) *TestBlockChain {
linkBuilder := cidlink.LinkBuilder{Prefix: cid.NewPrefixV1(cid.DagCBOR, mh.SHA2_256)}
var genisisNode ipld.Node
err := fluent.Recover(func() {
nb := fluent.WrapNodeBuilder(ipldfree.NodeBuilder())
genisisNode = createBlock(nb, []ipld.Link{}, size)
})
if err != nil {
t.Fatal("Error creating genesis block")
}
genesisLink, err := linkBuilder.Build(ctx, ipld.LinkContext{}, genisisNode, storer)
if err != nil {
t.Fatal("Error creating link to genesis block")
}
parent := genesisLink
middleNodes := make([]ipld.Node, 0, blockChainLength-2)
middleLinks := make([]ipld.Link, 0, blockChainLength-2)
for i := 0; i < blockChainLength-2; i++ {
var node ipld.Node
err := fluent.Recover(func() {
nb := fluent.WrapNodeBuilder(ipldfree.NodeBuilder())
node = createBlock(nb, []ipld.Link{parent}, size)
})
if err != nil {
t.Fatal("Error creating middle block")
}
middleNodes = append(middleNodes, node)
link, err := linkBuilder.Build(ctx, ipld.LinkContext{}, node, storer)
if err != nil {
t.Fatal("Error creating link to middle block")
}
middleLinks = append(middleLinks, link)
parent = link
}
var tipNode ipld.Node
err = fluent.Recover(func() {
nb := fluent.WrapNodeBuilder(ipldfree.NodeBuilder())
tipNode = createBlock(nb, []ipld.Link{parent}, size)
})
if err != nil {
t.Fatal("Error creating tip block")
}
tipLink, err := linkBuilder.Build(ctx, ipld.LinkContext{}, tipNode, storer)
if err != nil {
t.Fatal("Error creating link to tip block")
}
return &TestBlockChain{t, blockChainLength, loader, genisisNode, genesisLink, middleNodes, middleLinks, tipNode, tipLink}
}
// Selector returns the selector to recursive traverse the block chain parent links
func (tbc *TestBlockChain) Selector() ipld.Node {
ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())
return ssb.ExploreRecursive(selector.RecursionLimitDepth(tbc.blockChainLength),
ssb.ExploreFields(func(efsb builder.ExploreFieldsSpecBuilder) {
efsb.Insert("Parents", ssb.ExploreAll(
ssb.ExploreRecursiveEdge()))
})).Node()
}
func (tbc *TestBlockChain) linkTipIndex(fromTip int) ipld.Link {
switch height := tbc.blockChainLength - 1 - fromTip; {
case height == 0:
return tbc.GenisisLink
case height == tbc.blockChainLength-1:
return tbc.TipLink
default:
return tbc.MiddleLinks[height-1]
}
}
func (tbc *TestBlockChain) checkResponses(responses []graphsync.ResponseProgress, start int, end int) {
if len(responses) != (end-start)*blockChainTraversedNodesPerBlock {
tbc.t.Fatal("did not traverse all nodes")
}
expectedPath := ""
for i := 0; i < start; i++ {
if expectedPath == "" {
expectedPath = "Parents/0"
} else {
expectedPath = expectedPath + "/Parents/0"
}
}
for i, response := range responses {
if response.Path.String() != expectedPath {
tbc.t.Fatal("incorrect path")
}
if i%2 == 0 {
if expectedPath == "" {
expectedPath = "Parents"
} else {
expectedPath = expectedPath + "/Parents"
}
} else {
expectedPath = expectedPath + "/0"
}
if response.LastBlock.Path.String() != response.Path.String() {
continue
}
if response.LastBlock.Link == nil {
continue
}
expectedLink := tbc.linkTipIndex((i / 2) + start)
if expectedLink != response.LastBlock.Link {
tbc.t.Fatal("Unexpected link in response")
}
}
}
// VerifyWholeChain verifies the given response channel returns the expected responses for the whole chain
func (tbc *TestBlockChain) VerifyWholeChain(ctx context.Context, responseChan <-chan graphsync.ResponseProgress) {
tbc.VerifyRemainder(ctx, responseChan, 0)
}
// VerifyRemainder verifies the given response channel returns the remainder of the chain starting at the nth block from the tip
func (tbc *TestBlockChain) VerifyRemainder(ctx context.Context, responseChan <-chan graphsync.ResponseProgress, from int) {
responses := CollectResponses(ctx, tbc.t, responseChan)
tbc.checkResponses(responses, from, tbc.blockChainLength)
}
// VerifyResponseRange verifies the given response channel returns the given range of respnses, indexed from the tip
// (with possibly more data left in the channel)
func (tbc *TestBlockChain) VerifyResponseRange(ctx context.Context, responseChan <-chan graphsync.ResponseProgress, from int, to int) {
responses := ReadNResponses(ctx, tbc.t, responseChan, (to-from)*blockChainTraversedNodesPerBlock)
tbc.checkResponses(responses, from, to)
}
// Blocks Returns the given raw blocks for the block chain for the given range, indexed from the tip
func (tbc *TestBlockChain) Blocks(from int, to int) []blocks.Block {
var blks []blocks.Block
for i := from; i < to; i++ {
link := tbc.linkTipIndex(i)
reader, err := tbc.loader(link, ipld.LinkContext{})
if err != nil {
tbc.t.Fatal("Unable to load link")
}
data, err := ioutil.ReadAll(reader)
if err != nil {
tbc.t.Fatal("Unable to read link data")
}
blk, err := blocks.NewBlockWithCid(data, link.(cidlink.Link).Cid)
if err != nil {
tbc.t.Fatal("Could not construct block")
}
blks = append(blks, blk)
}
return blks
}
// AllBlocks returns all blocks for a blockchain
func (tbc *TestBlockChain) AllBlocks() []blocks.Block {
return tbc.Blocks(0, tbc.blockChainLength)
}
// RemainderBlocks returns the remaining blocks for a blockchain, indexed from tip
func (tbc *TestBlockChain) RemainderBlocks(from int) []blocks.Block {
return tbc.Blocks(from, tbc.blockChainLength)
}
......@@ -10,6 +10,7 @@ import (
"github.com/ipfs/go-graphsync"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
util "github.com/ipfs/go-ipfs-util"
random "github.com/jbenet/go-random"
"github.com/libp2p/go-libp2p-core/peer"
)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment