Commit 9e9139f4 authored by hannahhoward's avatar hannahhoward

feat(peerresponsemanager): create response manager

Create a response manager for a single peer to handle assembling and managing responses for a given
peer
parent 1448d3bb
package peerresponsemanager
import (
"context"
"sync"
"github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipfs/go-graphsync/ipldbridge"
logging "github.com/ipfs/go-log"
"github.com/ipld/go-ipld-prime"
"github.com/ipfs/go-block-format"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/responsemanager/linktracker"
"github.com/ipfs/go-graphsync/responsemanager/responsebuilder"
peer "github.com/libp2p/go-libp2p-peer"
)
var log = logging.Logger("graphsync")
// PeerHandler is an interface that can send a response for a given peer across
// the network.
type PeerHandler interface {
SendResponse(peer.ID, []gsmsg.GraphSyncResponse, []blocks.Block) <-chan struct{}
}
// PeerResponseManager handles batching, deduping, and sending responses for
// a given peer across multiple requests.
type PeerResponseManager struct {
p peer.ID
ctx context.Context
cancel context.CancelFunc
peerHandler PeerHandler
ipldBridge ipldbridge.IPLDBridge
outgoingWork chan struct{}
linkTrackerLk sync.RWMutex
linkTracker *linktracker.LinkTracker
responseBuilderLk sync.RWMutex
responseBuilder *responsebuilder.ResponseBuilder
}
// New generates a new PeerResponse manager for the given context, peer ID,
// using the given peer handler and bridge to IPLD.
func New(ctx context.Context, p peer.ID, peerHandler PeerHandler, ipldBridge ipldbridge.IPLDBridge) *PeerResponseManager {
ctx, cancel := context.WithCancel(ctx)
return &PeerResponseManager{
p: p,
ctx: ctx,
cancel: cancel,
peerHandler: peerHandler,
ipldBridge: ipldBridge,
outgoingWork: make(chan struct{}, 1),
linkTracker: linktracker.New(),
}
}
// Startup initiates message sending for a peer
func (prm *PeerResponseManager) Startup() {
go prm.run()
}
// Shutdown stops sending messages for a peer
func (prm *PeerResponseManager) Shutdown() {
prm.cancel()
}
// SendResponse sends a given link for a given
// requestID across the wire, as well as its corresponding
// block if the block is present and has not already been sent
func (prm *PeerResponseManager) SendResponse(
requestID gsmsg.GraphSyncRequestID,
link ipld.Link,
data []byte,
) {
hasBlock := data != nil
prm.linkTrackerLk.Lock()
sendBlock := hasBlock && prm.linkTracker.ShouldSendBlockFor(link)
prm.linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
prm.linkTrackerLk.Unlock()
if prm.buildResponse(func(responseBuilder *responsebuilder.ResponseBuilder) {
if sendBlock {
cidLink := link.(cidlink.Link)
block, err := blocks.NewBlockWithCid(data, cidLink.Cid)
if err != nil {
log.Errorf("Data did not match cid when sending link for %s", cidLink.String())
}
responseBuilder.AddBlock(block)
}
responseBuilder.AddLink(requestID, link, hasBlock)
}) {
prm.signalWork()
}
}
// FinishRequest marks the given requestID as having sent all responses
func (prm *PeerResponseManager) FinishRequest(requestID gsmsg.GraphSyncRequestID) {
prm.linkTrackerLk.Lock()
isComplete := prm.linkTracker.FinishRequest(requestID)
prm.linkTrackerLk.Unlock()
if prm.buildResponse(func(responseBuilder *responsebuilder.ResponseBuilder) {
responseBuilder.AddCompletedRequest(requestID, isComplete)
}) {
prm.signalWork()
}
}
func (prm *PeerResponseManager) buildResponse(buildResponseFn func(*responsebuilder.ResponseBuilder)) bool {
prm.responseBuilderLk.Lock()
defer prm.responseBuilderLk.Unlock()
if prm.responseBuilder == nil {
prm.responseBuilder = responsebuilder.New()
}
buildResponseFn(prm.responseBuilder)
return !prm.responseBuilder.Empty()
}
func (prm *PeerResponseManager) signalWork() {
select {
case prm.outgoingWork <- struct{}{}:
default:
}
}
func (prm *PeerResponseManager) run() {
for {
select {
case <-prm.ctx.Done():
return
case <-prm.outgoingWork:
prm.sendResponseMessage()
}
}
}
func (prm *PeerResponseManager) sendResponseMessage() {
prm.responseBuilderLk.Lock()
builder := prm.responseBuilder
prm.responseBuilder = nil
prm.responseBuilderLk.Unlock()
if builder == nil || builder.Empty() {
return
}
responses, blks, err := builder.Build(prm.ipldBridge)
if err != nil {
log.Errorf("Unable to assemble GraphSync response: %s", err.Error())
}
done := prm.peerHandler.SendResponse(prm.p, responses, blks)
// wait for message to be processed
select {
case <-done:
case <-prm.ctx.Done():
}
}
package peerresponsemanager
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
"github.com/ipfs/go-graphsync/testbridge"
"github.com/ipfs/go-block-format"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/testutil"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/linking/cid"
peer "github.com/libp2p/go-libp2p-peer"
)
type fakePeerHandler struct {
lastBlocks []blocks.Block
lastResponses []gsmsg.GraphSyncResponse
sent chan struct{}
done chan struct{}
}
func (fph *fakePeerHandler) SendResponse(p peer.ID, responses []gsmsg.GraphSyncResponse, blks []blocks.Block) <-chan struct{} {
fph.lastResponses = responses
fph.lastBlocks = blks
fph.sent <- struct{}{}
return fph.done
}
func TestPeerResponseManagerSendsResponses(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
defer cancel()
p := testutil.GeneratePeers(1)[0]
requestID1 := gsmsg.GraphSyncRequestID(rand.Int31())
requestID2 := gsmsg.GraphSyncRequestID(rand.Int31())
requestID3 := gsmsg.GraphSyncRequestID(rand.Int31())
blks := testutil.GenerateBlocksOfSize(5, 100)
links := make([]ipld.Link, 0, len(blks))
for _, block := range blks {
links = append(links, cidlink.Link{Cid: block.Cid()})
}
done := make(chan struct{}, 1)
sent := make(chan struct{}, 1)
fph := &fakePeerHandler{
done: done,
sent: sent,
}
ipldBridge := testbridge.NewMockIPLDBridge()
peerResponseManager := New(ctx, p, fph, ipldBridge)
peerResponseManager.Startup()
peerResponseManager.SendResponse(requestID1, links[0], blks[0].RawData())
select {
case <-ctx.Done():
t.Fatal("Did not send first message")
case <-sent:
}
if len(fph.lastBlocks) != 1 || fph.lastBlocks[0].Cid() != blks[0].Cid() {
t.Fatal("Did not send correct blocks for first message")
}
if len(fph.lastResponses) != 1 || fph.lastResponses[0].RequestID() != requestID1 ||
fph.lastResponses[0].Status() != gsmsg.PartialResponse {
t.Fatal("Did not send correct responses for first message")
}
peerResponseManager.SendResponse(requestID2, links[0], blks[0].RawData())
peerResponseManager.SendResponse(requestID1, links[1], blks[1].RawData())
peerResponseManager.SendResponse(requestID1, links[2], nil)
peerResponseManager.FinishRequest(requestID1)
// let peer reponse manager know last message was sent so message sending can continue
done <- struct{}{}
select {
case <-ctx.Done():
t.Fatal("Should have sent second message but didn't")
case <-sent:
}
if len(fph.lastBlocks) != 1 || fph.lastBlocks[0].Cid() != blks[1].Cid() {
t.Fatal("Did not dedup blocks correctly on second message")
}
if len(fph.lastResponses) != 2 {
t.Fatal("Did not send correct number of responses")
}
response1, err := findResponseForRequestID(fph.lastResponses, requestID1)
if err != nil {
t.Fatal("Did not send correct response for second message")
}
if response1.Status() != gsmsg.RequestCompletedPartial {
t.Fatal("Did not send proper response code in second message")
}
response2, err := findResponseForRequestID(fph.lastResponses, requestID2)
if err != nil {
t.Fatal("Did not send correct response for second message")
}
if response2.Status() != gsmsg.PartialResponse {
t.Fatal("Did not send proper response code in second message")
}
peerResponseManager.SendResponse(requestID2, links[3], blks[3].RawData())
peerResponseManager.SendResponse(requestID3, links[4], blks[4].RawData())
peerResponseManager.FinishRequest(requestID2)
// let peer reponse manager know last message was sent so message sending can continue
done <- struct{}{}
select {
case <-ctx.Done():
t.Fatal("Should have sent third message but didn't")
case <-sent:
}
if len(fph.lastBlocks) != 2 ||
!testutil.ContainsBlock(fph.lastBlocks, blks[3]) ||
!testutil.ContainsBlock(fph.lastBlocks, blks[4]) {
t.Fatal("Did not send correct blocks for third message")
}
if len(fph.lastResponses) != 2 {
t.Fatal("Did not send correct number of responses")
}
response2, err = findResponseForRequestID(fph.lastResponses, requestID2)
if err != nil {
t.Fatal("Did not send correct response for third message")
}
if response2.Status() != gsmsg.RequestCompletedFull {
t.Fatal("Did not send proper response code in third message")
}
response3, err := findResponseForRequestID(fph.lastResponses, requestID3)
if err != nil {
t.Fatal("Did not send correct response for third message")
}
if response3.Status() != gsmsg.PartialResponse {
t.Fatal("Did not send proper response code in third message")
}
peerResponseManager.SendResponse(requestID3, links[0], blks[0].RawData())
peerResponseManager.SendResponse(requestID3, links[4], blks[4].RawData())
// let peer reponse manager know last message was sent so message sending can continue
done <- struct{}{}
select {
case <-ctx.Done():
t.Fatal("Should have sent third message but didn't")
case <-sent:
}
if len(fph.lastBlocks) != 1 || fph.lastBlocks[0].Cid() != blks[0].Cid() {
t.Fatal("Should have resent block cause there were no in progress requests but did not")
}
if len(fph.lastResponses) != 1 || fph.lastResponses[0].RequestID() != requestID3 ||
fph.lastResponses[0].Status() != gsmsg.PartialResponse {
t.Fatal("Did not send correct responses for fourth message")
}
}
func findResponseForRequestID(responses []gsmsg.GraphSyncResponse, requestID gsmsg.GraphSyncRequestID) (gsmsg.GraphSyncResponse, error) {
for _, response := range responses {
if response.RequestID() == requestID {
return response, nil
}
}
return gsmsg.GraphSyncResponse{}, fmt.Errorf("Response Not Found")
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment