Unverified Commit ea953565 authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

feat(graphsync): implement do-no-send-cids extension (#69)

Provides full implementation of do-no-send-cids on responder side
parent 430b4dc6
package cidset
import (
"errors"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync/ipldutil"
"github.com/ipld/go-ipld-prime/fluent"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
)
// EncodeCidSet encodes a cid set into bytes for the do-no-send-cids extension
func EncodeCidSet(cids *cid.Set) ([]byte, error) {
list := fluent.MustBuildList(basicnode.Style.List, cids.Len(), func(la fluent.ListAssembler) {
_ = cids.ForEach(func(c cid.Cid) error {
la.AssembleValue().AssignLink(cidlink.Link{Cid: c})
return nil
})
})
return ipldutil.EncodeNode(list)
}
// DecodeCidSet decode a cid set from data for the do-no-send-cids extension
func DecodeCidSet(data []byte) (*cid.Set, error) {
list, err := ipldutil.DecodeNode(data)
if err != nil {
return nil, err
}
set := cid.NewSet()
iter := list.ListIterator()
for !iter.Done() {
_, next, err := iter.Next()
if err != nil {
return nil, err
}
link, err := next.AsLink()
if err != nil {
return nil, err
}
asCidLink, ok := link.(cidlink.Link)
if !ok {
return nil, errors.New("contained non CID link")
}
set.Add(asCidLink.Cid)
}
return set, nil
}
package cidset
import (
"testing"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync/testutil"
"github.com/stretchr/testify/require"
)
func TestDecodeEncodeCidSet(t *testing.T) {
cids := testutil.GenerateCids(10)
set := cid.NewSet()
for _, c := range cids {
set.Add(c)
}
encoded, err := EncodeCidSet(set)
require.NoError(t, err, "encode errored")
decodedCidSet, err := DecodeCidSet(encoded)
require.NoError(t, err, "decode errored")
require.Equal(t, decodedCidSet.Len(), set.Len())
err = decodedCidSet.ForEach(func(c cid.Cid) error {
require.True(t, set.Has(c))
return nil
})
require.NoError(t, err)
}
...@@ -20,6 +20,7 @@ import ( ...@@ -20,6 +20,7 @@ import (
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice" "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync" dss "github.com/ipfs/go-datastore/sync"
bstore "github.com/ipfs/go-ipfs-blockstore" bstore "github.com/ipfs/go-ipfs-blockstore"
...@@ -34,6 +35,7 @@ import ( ...@@ -34,6 +35,7 @@ import (
"github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/ipldutil" "github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message" gsmsg "github.com/ipfs/go-graphsync/message"
gsnet "github.com/ipfs/go-graphsync/network" gsnet "github.com/ipfs/go-graphsync/network"
...@@ -231,6 +233,55 @@ func TestGraphsyncRoundTrip(t *testing.T) { ...@@ -231,6 +233,55 @@ func TestGraphsyncRoundTrip(t *testing.T) {
require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus) require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus)
} }
func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
td := newGsTestData(ctx, t)
// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()
// setup receiving peer to just record message coming in
blockChainLength := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, 100, blockChainLength)
firstHalf := blockChain.Blocks(0, 50)
set := cid.NewSet()
for _, blk := range firstHalf {
td.blockStore1[cidlink.Link{Cid: blk.Cid()}] = blk.RawData()
set.Add(blk.Cid())
}
encodedCidSet, err := cidset.EncodeCidSet(set)
require.NoError(t, err)
extension := graphsync.ExtensionData{
Name: graphsync.ExtensionDoNotSendCIDs,
Data: encodedCidSet,
}
// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()
totalSent := 0
totalSentOnWire := 0
responder.RegisterOutgoingBlockHook(func(p peer.ID, requestData graphsync.RequestData, blockData graphsync.BlockData, hookActions graphsync.OutgoingBlockHookActions) {
totalSent++
if blockData.BlockSizeOnWire() > 0 {
totalSentOnWire++
}
})
progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), extension)
blockChain.VerifyWholeChain(ctx, progressChan)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")
require.Equal(t, blockChainLength, totalSent)
require.Equal(t, blockChainLength-set.Len(), totalSentOnWire)
}
func TestPauseResume(t *testing.T) { func TestPauseResume(t *testing.T) {
// create network // create network
ctx := context.Background() ctx := context.Background()
......
...@@ -52,6 +52,7 @@ type peerResponseSender struct { ...@@ -52,6 +52,7 @@ type peerResponseSender struct {
// a given peer across multiple requests. // a given peer across multiple requests.
type PeerResponseSender interface { type PeerResponseSender interface {
peermanager.PeerProcess peermanager.PeerProcess
IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link)
SendResponse( SendResponse(
requestID graphsync.RequestID, requestID graphsync.RequestID,
link ipld.Link, link ipld.Link,
...@@ -97,6 +98,14 @@ func (prs *peerResponseSender) Startup() { ...@@ -97,6 +98,14 @@ func (prs *peerResponseSender) Startup() {
go prs.run() go prs.run()
} }
func (prs *peerResponseSender) IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link) {
prs.linkTrackerLk.Lock()
for _, link := range links {
prs.linkTracker.RecordLinkTraversal(requestID, link, true)
}
prs.linkTrackerLk.Unlock()
}
type responseOperation interface { type responseOperation interface {
build(responseBuilder *responsebuilder.ResponseBuilder) build(responseBuilder *responsebuilder.ResponseBuilder)
size() uint64 size() uint64
......
...@@ -32,7 +32,7 @@ func (fph *fakePeerHandler) SendResponse(p peer.ID, responses []gsmsg.GraphSyncR ...@@ -32,7 +32,7 @@ func (fph *fakePeerHandler) SendResponse(p peer.ID, responses []gsmsg.GraphSyncR
return fph.done return fph.done
} }
func TestPeerResponseManagerSendsResponses(t *testing.T) { func TestPeerResponseSenderSendsResponses(t *testing.T) {
ctx := context.Background() ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second) ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel() defer cancel()
...@@ -134,7 +134,7 @@ func TestPeerResponseManagerSendsResponses(t *testing.T) { ...@@ -134,7 +134,7 @@ func TestPeerResponseManagerSendsResponses(t *testing.T) {
require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status()) require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())
} }
func TestPeerResponseManagerSendsVeryLargeBlocksResponses(t *testing.T) { func TestPeerResponseSenderSendsVeryLargeBlocksResponses(t *testing.T) {
p := testutil.GeneratePeers(1)[0] p := testutil.GeneratePeers(1)[0]
requestID1 := graphsync.RequestID(rand.Int31()) requestID1 := graphsync.RequestID(rand.Int31())
...@@ -222,7 +222,7 @@ func TestPeerResponseManagerSendsVeryLargeBlocksResponses(t *testing.T) { ...@@ -222,7 +222,7 @@ func TestPeerResponseManagerSendsVeryLargeBlocksResponses(t *testing.T) {
} }
func TestPeerResponseManagerSendsExtensionData(t *testing.T) { func TestPeerResponseSenderSendsExtensionData(t *testing.T) {
ctx := context.Background() ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second) ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel() defer cancel()
...@@ -285,7 +285,7 @@ func TestPeerResponseManagerSendsExtensionData(t *testing.T) { ...@@ -285,7 +285,7 @@ func TestPeerResponseManagerSendsExtensionData(t *testing.T) {
require.Equal(t, extensionData2, returnedData2, "did not encode first extension") require.Equal(t, extensionData2, returnedData2, "did not encode first extension")
} }
func TestPeerResponseManagerSendsResponsesInTransaction(t *testing.T) { func TestPeerResponseSenderSendsResponsesInTransaction(t *testing.T) {
ctx := context.Background() ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second) ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel() defer cancel()
...@@ -332,7 +332,87 @@ func TestPeerResponseManagerSendsResponsesInTransaction(t *testing.T) { ...@@ -332,7 +332,87 @@ func TestPeerResponseManagerSendsResponsesInTransaction(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
testutil.AssertDoesReceive(ctx, t, sent, "should sent first message") testutil.AssertDoesReceive(ctx, t, sent, "should sent first message")
}
func TestPeerResponseSenderIgnoreBlocks(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
p := testutil.GeneratePeers(1)[0]
requestID1 := graphsync.RequestID(rand.Int31())
requestID2 := graphsync.RequestID(rand.Int31())
blks := testutil.GenerateBlocksOfSize(5, 100)
links := make([]ipld.Link, 0, len(blks))
for _, block := range blks {
links = append(links, cidlink.Link{Cid: block.Cid()})
}
done := make(chan struct{}, 1)
sent := make(chan struct{}, 1)
fph := &fakePeerHandler{
done: done,
sent: sent,
}
peerResponseSender := NewResponseSender(ctx, p, fph)
peerResponseSender.Startup()
peerResponseSender.IgnoreBlocks(requestID1, links)
bd := peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData())
require.Equal(t, links[0], bd.Link())
require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSize())
require.Equal(t, uint64(0), bd.BlockSizeOnWire())
testutil.AssertDoesReceive(ctx, t, sent, "did not send first message")
require.Len(t, fph.lastBlocks, 0)
require.Len(t, fph.lastResponses, 1)
require.Equal(t, requestID1, fph.lastResponses[0].RequestID())
require.Equal(t, graphsync.PartialResponse, fph.lastResponses[0].Status())
bd = peerResponseSender.SendResponse(requestID2, links[0], blks[0].RawData())
require.Equal(t, links[0], bd.Link())
require.Equal(t, uint64(len(blks[0].RawData())), bd.BlockSize())
require.Equal(t, uint64(0), bd.BlockSizeOnWire())
bd = peerResponseSender.SendResponse(requestID1, links[1], blks[1].RawData())
require.Equal(t, links[1], bd.Link())
require.Equal(t, uint64(len(blks[1].RawData())), bd.BlockSize())
require.Equal(t, uint64(0), bd.BlockSizeOnWire())
bd = peerResponseSender.SendResponse(requestID1, links[2], blks[2].RawData())
require.Equal(t, links[2], bd.Link())
require.Equal(t, uint64(len(blks[2].RawData())), bd.BlockSize())
require.Equal(t, uint64(0), bd.BlockSizeOnWire())
peerResponseSender.FinishRequest(requestID1)
// let peer reponse manager know last message was sent so message sending can continue
done <- struct{}{}
testutil.AssertDoesReceive(ctx, t, sent, "did not send second message")
require.Len(t, fph.lastBlocks, 0)
require.Len(t, fph.lastResponses, 2, "did not send correct number of responses")
response1, err := findResponseForRequestID(fph.lastResponses, requestID1)
require.NoError(t, err)
require.Equal(t, graphsync.RequestCompletedFull, response1.Status(), "did not send correct response code in second message")
response2, err := findResponseForRequestID(fph.lastResponses, requestID2)
require.NoError(t, err)
require.Equal(t, graphsync.PartialResponse, response2.Status(), "did not send corrent response code in second message")
peerResponseSender.SendResponse(requestID2, links[3], blks[3].RawData())
peerResponseSender.FinishRequest(requestID2)
// let peer reponse manager know last message was sent so message sending can continue
done <- struct{}{}
testutil.AssertDoesReceive(ctx, t, sent, "did not send third message")
require.Equal(t, 1, len(fph.lastBlocks))
testutil.AssertContainsBlock(t, fph.lastBlocks, blks[3])
require.Len(t, fph.lastResponses, 1, "did not send correct number of responses")
response2, err = findResponseForRequestID(fph.lastResponses, requestID2)
require.NoError(t, err)
require.Equal(t, graphsync.RequestCompletedFull, response2.Status(), "did not send correct response code in third message")
} }
func findResponseForRequestID(responses []gsmsg.GraphSyncResponse, requestID graphsync.RequestID) (gsmsg.GraphSyncResponse, error) { func findResponseForRequestID(responses []gsmsg.GraphSyncResponse, requestID graphsync.RequestID) (gsmsg.GraphSyncResponse, error) {
......
...@@ -6,6 +6,8 @@ import ( ...@@ -6,6 +6,8 @@ import (
"math" "math"
"time" "time"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/responsemanager/hooks" "github.com/ipfs/go-graphsync/responsemanager/hooks"
"github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync"
...@@ -296,6 +298,9 @@ func (rm *ResponseManager) prepareQuery(ctx context.Context, ...@@ -296,6 +298,9 @@ func (rm *ResponseManager) prepareQuery(ctx context.Context,
if validationErr != nil { if validationErr != nil {
return nil, nil, validationErr return nil, nil, validationErr
} }
if err := rm.processDoNoSendCids(request, peerResponseSender); err != nil {
return nil, nil, err
}
rootLink := cidlink.Link{Cid: request.Root()} rootLink := cidlink.Link{Cid: request.Root()}
traverser := ipldutil.TraversalBuilder{ traverser := ipldutil.TraversalBuilder{
Root: rootLink, Root: rootLink,
...@@ -309,6 +314,28 @@ func (rm *ResponseManager) prepareQuery(ctx context.Context, ...@@ -309,6 +314,28 @@ func (rm *ResponseManager) prepareQuery(ctx context.Context,
return loader, traverser, nil return loader, traverser, nil
} }
func (rm *ResponseManager) processDoNoSendCids(request gsmsg.GraphSyncRequest, peerResponseSender peerresponsemanager.PeerResponseSender) error {
doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs)
if !has {
return nil
}
cidSet, err := cidset.DecodeCidSet(doNotSendCidsData)
if err != nil {
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
return err
}
links := make([]ipld.Link, 0, cidSet.Len())
err = cidSet.ForEach(func(c cid.Cid) error {
links = append(links, cidlink.Link{Cid: c})
return nil
})
if err != nil {
return err
}
peerResponseSender.IgnoreBlocks(request.ID(), links)
return nil
}
func (rm *ResponseManager) executeQuery( func (rm *ResponseManager) executeQuery(
p peer.ID, p peer.ID,
request gsmsg.GraphSyncRequest, request gsmsg.GraphSyncRequest,
......
...@@ -8,7 +8,9 @@ import ( ...@@ -8,7 +8,9 @@ import (
"testing" "testing"
"time" "time"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
gsmsg "github.com/ipfs/go-graphsync/message" gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/responsemanager/hooks" "github.com/ipfs/go-graphsync/responsemanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager" "github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
...@@ -105,6 +107,7 @@ type fakePeerResponseSender struct { ...@@ -105,6 +107,7 @@ type fakePeerResponseSender struct {
sentExtensions chan sentExtension sentExtensions chan sentExtension
lastCompletedRequest chan completedRequest lastCompletedRequest chan completedRequest
pausedRequests chan pausedRequest pausedRequests chan pausedRequest
ignoredLinks chan []ipld.Link
} }
func (fprs *fakePeerResponseSender) Startup() {} func (fprs *fakePeerResponseSender) Startup() {}
...@@ -115,6 +118,10 @@ type fakeBlkData struct { ...@@ -115,6 +118,10 @@ type fakeBlkData struct {
size uint64 size uint64
} }
func (fprs *fakePeerResponseSender) IgnoreBlocks(requestID graphsync.RequestID, links []ipld.Link) {
fprs.ignoredLinks <- links
}
func (fbd fakeBlkData) Link() ipld.Link { func (fbd fakeBlkData) Link() ipld.Link {
return fbd.link return fbd.link
} }
...@@ -453,6 +460,39 @@ func TestValidationAndExtensions(t *testing.T) { ...@@ -453,6 +460,39 @@ func TestValidationAndExtensions(t *testing.T) {
require.Equal(t, 5, customChooserCallCount) require.Equal(t, 5, customChooserCallCount)
}) })
t.Run("do-not-send-cids extension", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
})
set := cid.NewSet()
blks := td.blockChain.Blocks(0, 5)
for _, blk := range blks {
set.Add(blk.Cid())
}
data, err := cidset.EncodeCidSet(set)
require.NoError(t, err)
requests := []gsmsg.GraphSyncRequest{
gsmsg.NewRequest(td.requestID, td.blockChain.TipLink.(cidlink.Link).Cid, td.blockChain.Selector(), graphsync.Priority(0),
graphsync.ExtensionData{
Name: graphsync.ExtensionDoNotSendCIDs,
Data: data,
}),
}
responseManager.ProcessRequests(td.ctx, td.p, requests)
var lastRequest completedRequest
testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
var lastLinks []ipld.Link
testutil.AssertReceive(td.ctx, t, td.ignoredLinks, &lastLinks, "should send ignored links")
require.Len(t, lastLinks, set.Len())
for _, link := range lastLinks {
require.True(t, set.Has(link.(cidlink.Link).Cid))
}
})
t.Run("test block hook processing", func(t *testing.T) { t.Run("test block hook processing", func(t *testing.T) {
t.Run("can send extension data", func(t *testing.T) { t.Run("can send extension data", func(t *testing.T) {
td := newTestData(t) td := newTestData(t)
...@@ -767,6 +807,7 @@ type testData struct { ...@@ -767,6 +807,7 @@ type testData struct {
sentResponses chan sentResponse sentResponses chan sentResponse
sentExtensions chan sentExtension sentExtensions chan sentExtension
pausedRequests chan pausedRequest pausedRequests chan pausedRequest
ignoredLinks chan []ipld.Link
peerManager *fakePeerManager peerManager *fakePeerManager
queryQueue *fakeQueryQueue queryQueue *fakeQueryQueue
extensionData []byte extensionData []byte
...@@ -801,7 +842,14 @@ func newTestData(t *testing.T) testData { ...@@ -801,7 +842,14 @@ func newTestData(t *testing.T) testData {
td.sentResponses = make(chan sentResponse, td.blockChainLength*2) td.sentResponses = make(chan sentResponse, td.blockChainLength*2)
td.sentExtensions = make(chan sentExtension, td.blockChainLength*2) td.sentExtensions = make(chan sentExtension, td.blockChainLength*2)
td.pausedRequests = make(chan pausedRequest, 1) td.pausedRequests = make(chan pausedRequest, 1)
fprs := &fakePeerResponseSender{lastCompletedRequest: td.completedRequestChan, sentResponses: td.sentResponses, sentExtensions: td.sentExtensions, pausedRequests: td.pausedRequests} td.ignoredLinks = make(chan []ipld.Link, 1)
fprs := &fakePeerResponseSender{
lastCompletedRequest: td.completedRequestChan,
sentResponses: td.sentResponses,
sentExtensions: td.sentExtensions,
pausedRequests: td.pausedRequests,
ignoredLinks: td.ignoredLinks,
}
td.peerManager = &fakePeerManager{peerResponseSender: fprs} td.peerManager = &fakePeerManager{peerResponseSender: fprs}
td.queryQueue = &fakeQueryQueue{} td.queryQueue = &fakeQueryQueue{}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment