Commit 8f2ae29c authored by hannahhoward's avatar hannahhoward

feat(graphsync): can make roundtrip!

Successfully execute query on remote system and return results
parent 35b1c6b1
......@@ -97,6 +97,7 @@ func (gs *GraphSync) ReceiveMessage(
sender peer.ID,
incoming gsmsg.GraphSyncMessage) {
gs.responseManager.ProcessRequests(ctx, sender, incoming.Requests())
gs.requestManager.ProcessResponses(sender, incoming.Responses(), incoming.Blocks())
}
// ReceiveError is part of the network's Receiver interface and handles incoming
......
......@@ -2,6 +2,7 @@ package graphsync
import (
"context"
"fmt"
"math"
"math/rand"
"reflect"
......@@ -215,3 +216,98 @@ readAllMessages:
t.Fatal("transmitted full response when only partial was transmitted")
}
}
func TestGraphsyncRoundTrip(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
mn := mocknet.New(ctx)
// setup network
host1, err := mn.GenPeer()
if err != nil {
t.Fatal("error generating host")
}
host2, err := mn.GenPeer()
if err != nil {
t.Fatal("error generating host")
}
err = mn.LinkAll()
if err != nil {
t.Fatal("error linking hosts")
}
gsnet1 := gsnet.NewFromLibp2pHost(host1)
blockStore1 := make(map[ipld.Link][]byte)
loader1, storer1 := testbridge.NewMockStore(blockStore1)
bridge1 := testbridge.NewMockIPLDBridge()
// initialize graphsync on second node to response to requests
requestor := New(ctx, gsnet1, bridge1, loader1, storer1)
// setup receiving peer to just record message coming in
gsnet2 := gsnet.NewFromLibp2pHost(host2)
blks := testutil.GenerateBlocksOfSize(5, 100)
blockStore2 := make(map[ipld.Link][]byte)
for _, block := range blks {
blockStore2[cidlink.Link{Cid: block.Cid()}] = block.RawData()
}
loader2, storer2 := testbridge.NewMockStore(blockStore2)
bridge2 := testbridge.NewMockIPLDBridge()
// initialize graphsync on second node to response to requests
New(ctx, gsnet2, bridge2, loader2, storer2)
cids := make([]cid.Cid, 0, 7)
for _, block := range blks {
cids = append(cids, block.Cid())
}
// append block that should be deduped
cids = append(cids, blks[0].Cid())
unknownCid := testutil.GenerateCids(1)[0]
cids = append(cids, unknownCid)
spec := testbridge.NewMockSelectorSpec(cids)
progressChan, errChan := requestor.Request(ctx, host2.ID(), spec)
responses := testutil.CollectResponses(ctx, t, progressChan)
errs := testutil.CollectErrors(ctx, t, errChan)
expectedErr := fmt.Sprintf("Remote Peer Is Missing Block: %s", unknownCid.String())
if len(errs) != 1 || errs[0].Error() != expectedErr {
t.Fatal("did not transmit error for missing CID")
}
if len(responses) != 6 {
t.Fatal("did not traverse all nodes")
}
for i, response := range responses {
k := response.LastBlock.Link.(cidlink.Link).Cid
var expectedCid cid.Cid
if i == 5 {
expectedCid = blks[0].Cid()
} else {
expectedCid = blks[i].Cid()
}
if k != expectedCid {
t.Fatal("did not send the correct cids in order")
}
}
// verify data was stored in blockstore
if len(blockStore1) != 5 {
t.Fatal("did not store all blocks")
}
for link, data := range blockStore1 {
block, err := blocks.NewBlockWithCid(data, link.(cidlink.Link).Cid)
if err != nil || !testutil.ContainsBlock(blks, block) {
t.Fatal("Stored wrong block")
}
}
}
......@@ -55,6 +55,9 @@ func (s *streamMessageSender) SendMsg(ctx context.Context, msg gsmsg.GraphSyncMe
}
func msgToStream(ctx context.Context, s inet.Stream, msg gsmsg.GraphSyncMessage) error {
log.Debugf("Outgoing message with %d requests, %d responses, and %d blocks",
len(msg.Requests()), len(msg.Responses()), len(msg.Blocks()))
deadline := time.Now().Add(sendMessageTimeout)
if dl, ok := ctx.Deadline(); ok {
deadline = dl
......
......@@ -5,6 +5,7 @@ import (
"sync"
"github.com/ipfs/go-graphsync/metadata"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-block-format"
"github.com/ipfs/go-graphsync/linktracker"
......@@ -13,6 +14,8 @@ import (
"github.com/ipld/go-ipld-prime/linking/cid"
)
var log = logging.Logger("graphsync")
type responseCacheMessage interface {
handle(rc *ResponseCache)
}
......@@ -72,11 +75,13 @@ func (rc *ResponseCache) ProcessResponse(responses map[gsmsg.GraphSyncRequestID]
rc.responseCacheLk.Lock()
for _, block := range blks {
log.Debugf("Received block from network: %s", block.Cid().String())
rc.unverifiedBlockStore.AddUnverifiedBlock(cidlink.Link{Cid: block.Cid()}, block.RawData())
}
for requestID, md := range responses {
for _, item := range md {
log.Debugf("Traverse link %s on request ID %d", item.Link.String(), requestID)
rc.linkTracker.RecordLinkTraversal(requestID, item.Link, item.BlockPresent)
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment