Commit 105e8cd2 authored by hannahhoward's avatar hannahhoward

feat(peerresponsemanager): setup peer manager

Refactor peer manager to handle abstract processes, then use the base class for the peer message
manager and the peer response manager
parent 5a9317f5
...@@ -27,7 +27,7 @@ type GraphSync struct { ...@@ -27,7 +27,7 @@ type GraphSync struct {
network gsnet.GraphSyncNetwork network gsnet.GraphSyncNetwork
loader ipldbridge.Loader loader ipldbridge.Loader
requestManager *requestmanager.RequestManager requestManager *requestmanager.RequestManager
peerManager *peermanager.PeerManager peerManager *peermanager.PeerMessageManager
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
} }
...@@ -41,7 +41,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, ...@@ -41,7 +41,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
createMessageQueue := func(ctx context.Context, p peer.ID) peermanager.PeerQueue { createMessageQueue := func(ctx context.Context, p peer.ID) peermanager.PeerQueue {
return messagequeue.New(ctx, p, network) return messagequeue.New(ctx, p, network)
} }
peerManager := peermanager.New(ctx, createMessageQueue) peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
requestManager := requestmanager.New(ctx, ipldBridge) requestManager := requestmanager.New(ctx, ipldBridge)
graphSync := &GraphSync{ graphSync := &GraphSync{
ipldBridge: ipldBridge, ipldBridge: ipldBridge,
......
...@@ -3,63 +3,48 @@ package peermanager ...@@ -3,63 +3,48 @@ package peermanager
import ( import (
"context" "context"
"sync" "sync"
"time"
gsmsg "github.com/ipfs/go-graphsync/message"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
) )
const ( // PeerProcess is any process that provides services for a peer
defaultCleanupInterval = time.Minute type PeerProcess interface {
)
var log = logging.Logger("graphsync")
var (
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}
)
// PeerQueue provides a queer of messages to be sent for a single peer.
type PeerQueue interface {
AddRequest(graphSyncRequest gsmsg.GraphSyncRequest)
Startup() Startup()
Shutdown() Shutdown()
} }
// PeerQueueFactory provides a function that will create a PeerQueue. // PeerProcessFactory provides a function that will create a PeerQueue.
type PeerQueueFactory func(ctx context.Context, p peer.ID) PeerQueue type PeerProcessFactory func(ctx context.Context, p peer.ID) PeerProcess
type peerQueueInstance struct { type peerProcessInstance struct {
refcnt int refcnt int
pq PeerQueue process PeerProcess
} }
// PeerManager manages a pool of peers and sends messages to peers in the pool. // PeerManager manages a pool of peers and sends messages to peers in the pool.
type PeerManager struct { type PeerManager struct {
peerQueues map[peer.ID]*peerQueueInstance peerProcesses map[peer.ID]*peerProcessInstance
peerQueuesLk sync.RWMutex peerProcessesLk sync.RWMutex
createPeerQueue PeerQueueFactory createPeerProcess PeerProcessFactory
ctx context.Context ctx context.Context
} }
// New creates a new PeerManager, given a context and a peerQueueFactory. // New creates a new PeerManager, given a context and a peerQueueFactory.
func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager { func New(ctx context.Context, createPeerQueue PeerProcessFactory) *PeerManager {
return &PeerManager{ return &PeerManager{
peerQueues: make(map[peer.ID]*peerQueueInstance), peerProcesses: make(map[peer.ID]*peerProcessInstance),
createPeerQueue: createPeerQueue, createPeerProcess: createPeerQueue,
ctx: ctx, ctx: ctx,
} }
} }
// ConnectedPeers returns a list of peers this PeerManager is managing. // ConnectedPeers returns a list of peers this PeerManager is managing.
func (pm *PeerManager) ConnectedPeers() []peer.ID { func (pm *PeerManager) ConnectedPeers() []peer.ID {
pm.peerQueuesLk.RLock() pm.peerProcessesLk.RLock()
defer pm.peerQueuesLk.RUnlock() defer pm.peerProcessesLk.RUnlock()
peers := make([]peer.ID, 0, len(pm.peerQueues)) peers := make([]peer.ID, 0, len(pm.peerProcesses))
for p := range pm.peerQueues { for p := range pm.peerProcesses {
peers = append(peers, p) peers = append(peers, p)
} }
return peers return peers
...@@ -67,51 +52,50 @@ func (pm *PeerManager) ConnectedPeers() []peer.ID { ...@@ -67,51 +52,50 @@ func (pm *PeerManager) ConnectedPeers() []peer.ID {
// Connected is called to add a new peer to the pool // Connected is called to add a new peer to the pool
func (pm *PeerManager) Connected(p peer.ID) { func (pm *PeerManager) Connected(p peer.ID) {
pm.peerQueuesLk.Lock() pm.peerProcessesLk.Lock()
pq := pm.getOrCreate(p) pq := pm.getOrCreate(p)
pq.refcnt++ pq.refcnt++
pm.peerQueuesLk.Unlock() pm.peerProcessesLk.Unlock()
} }
// Disconnected is called to remove a peer from the pool. // Disconnected is called to remove a peer from the pool.
func (pm *PeerManager) Disconnected(p peer.ID) { func (pm *PeerManager) Disconnected(p peer.ID) {
pm.peerQueuesLk.Lock() pm.peerProcessesLk.Lock()
pq, ok := pm.peerQueues[p] pq, ok := pm.peerProcesses[p]
if !ok { if !ok {
pm.peerQueuesLk.Unlock() pm.peerProcessesLk.Unlock()
return return
} }
pq.refcnt-- pq.refcnt--
if pq.refcnt > 0 { if pq.refcnt > 0 {
pm.peerQueuesLk.Unlock() pm.peerProcessesLk.Unlock()
return return
} }
delete(pm.peerQueues, p) delete(pm.peerProcesses, p)
pm.peerQueuesLk.Unlock() pm.peerProcessesLk.Unlock()
pq.pq.Shutdown() pq.process.Shutdown()
} }
// SendRequest sends the given request to the given peer. // GetProcess returns the process for the given peer
func (pm *PeerManager) SendRequest( func (pm *PeerManager) GetProcess(
p peer.ID, p peer.ID) PeerProcess {
graphSyncRequest gsmsg.GraphSyncRequest) { pm.peerProcessesLk.Lock()
pm.peerQueuesLk.Lock()
pqi := pm.getOrCreate(p) pqi := pm.getOrCreate(p)
pm.peerQueuesLk.Unlock() pm.peerProcessesLk.Unlock()
pqi.pq.AddRequest(graphSyncRequest) return pqi.process
} }
func (pm *PeerManager) getOrCreate(p peer.ID) *peerQueueInstance { func (pm *PeerManager) getOrCreate(p peer.ID) *peerProcessInstance {
pqi, ok := pm.peerQueues[p] pqi, ok := pm.peerProcesses[p]
if !ok { if !ok {
pq := pm.createPeerQueue(pm.ctx, p) pq := pm.createPeerProcess(pm.ctx, p)
pq.Startup() pq.Startup()
pqi = &peerQueueInstance{0, pq} pqi = &peerProcessInstance{0, pq}
pm.peerQueues[p] = pqi pm.peerProcesses[p] = pqi
} }
return pqi return pqi
} }
...@@ -2,51 +2,27 @@ package peermanager ...@@ -2,51 +2,27 @@ package peermanager
import ( import (
"context" "context"
"math/rand"
"reflect"
"testing" "testing"
"time"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/testutil" "github.com/ipfs/go-graphsync/testutil"
"github.com/libp2p/go-libp2p-peer" "github.com/libp2p/go-libp2p-peer"
) )
type messageSent struct { type fakePeerProcess struct {
p peer.ID
message gsmsg.GraphSyncMessage
} }
type fakePeer struct { func (fp *fakePeerProcess) Startup() {}
p peer.ID func (fp *fakePeerProcess) Shutdown() {}
messagesSent chan messageSent
}
func (fp *fakePeer) Startup() {}
func (fp *fakePeer) Shutdown() {}
func (fp *fakePeer) AddRequest(graphSyncRequest gsmsg.GraphSyncRequest) {
message := gsmsg.New()
message.AddRequest(graphSyncRequest)
fp.messagesSent <- messageSent{fp.p, message}
}
func makePeerQueueFactory(messagesSent chan messageSent) PeerQueueFactory {
return func(ctx context.Context, p peer.ID) PeerQueue {
return &fakePeer{
p: p,
messagesSent: messagesSent,
}
}
}
func TestAddingAndRemovingPeers(t *testing.T) { func TestAddingAndRemovingPeers(t *testing.T) {
ctx := context.Background() ctx := context.Background()
peerQueueFactory := makePeerQueueFactory(nil) peerProcessFatory := func(ctx context.Context, p peer.ID) PeerProcess {
return &fakePeerProcess{}
}
tp := testutil.GeneratePeers(5) tp := testutil.GeneratePeers(5)
peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4] peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4]
peerManager := New(ctx, peerQueueFactory) peerManager := New(ctx, peerProcessFatory)
peerManager.Connected(peer1) peerManager.Connected(peer1)
peerManager.Connected(peer2) peerManager.Connected(peer2)
...@@ -82,75 +58,3 @@ func TestAddingAndRemovingPeers(t *testing.T) { ...@@ -82,75 +58,3 @@ func TestAddingAndRemovingPeers(t *testing.T) {
t.Fatal("Peer was disconnected but should not have been") t.Fatal("Peer was disconnected but should not have been")
} }
} }
func TestSendingMessagesToPeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
messagesSent := make(chan messageSent, 5)
peerQueueFactory := makePeerQueueFactory(messagesSent)
tp := testutil.GeneratePeers(5)
id := gsmsg.GraphSyncRequestID(rand.Int31())
priority := gsmsg.GraphSyncPriority(rand.Int31())
selector := testutil.RandomBytes(100)
peerManager := New(ctx, peerQueueFactory)
request := gsmsg.NewRequest(id, selector, priority)
peerManager.SendRequest(tp[0], request)
peerManager.SendRequest(tp[1], request)
cancelRequest := gsmsg.CancelRequest(id)
peerManager.SendRequest(tp[0], cancelRequest)
select {
case <-ctx.Done():
t.Fatal("did not send first message")
case firstMessage := <-messagesSent:
if firstMessage.p != tp[0] {
t.Fatal("First message sent to wrong peer")
}
request := firstMessage.message.Requests()[0]
if request.ID() != id ||
request.IsCancel() != false ||
request.Priority() != priority ||
!reflect.DeepEqual(request.Selector(), selector) {
t.Fatal("did not send correct first message")
}
}
select {
case <-ctx.Done():
t.Fatal("did not send second message")
case secondMessage := <-messagesSent:
if secondMessage.p != tp[1] {
t.Fatal("Second message sent to wrong peer")
}
request := secondMessage.message.Requests()[0]
if request.ID() != id ||
request.IsCancel() != false ||
request.Priority() != priority ||
!reflect.DeepEqual(request.Selector(), selector) {
t.Fatal("did not send correct second message")
}
}
select {
case <-ctx.Done():
t.Fatal("did not send third message")
case thirdMessage := <-messagesSent:
if thirdMessage.p != tp[0] {
t.Fatal("Third message sent to wrong peer")
}
request := thirdMessage.message.Requests()[0]
if request.ID() != id ||
request.IsCancel() != true {
t.Fatal("third message was not a cancel")
}
}
connectedPeers := peerManager.ConnectedPeers()
if len(connectedPeers) != 2 ||
!testutil.ContainsPeer(connectedPeers, tp[0]) ||
!testutil.ContainsPeer(connectedPeers, tp[1]) {
t.Fatal("did not connect all peers that were sent messages")
}
}
package peermanager
import (
"context"
gsmsg "github.com/ipfs/go-graphsync/message"
peer "github.com/libp2p/go-libp2p-peer"
)
// PeerQueue is a process that sends messages to a peer
type PeerQueue interface {
PeerProcess
AddRequest(graphSyncRequest gsmsg.GraphSyncRequest)
}
// PeerQueueFactory provides a function that will create a PeerQueue.
type PeerQueueFactory func(ctx context.Context, p peer.ID) PeerQueue
// PeerMessageManager manages message queues for peers
type PeerMessageManager struct {
*PeerManager
}
// NewMessageManager generates a new manger for sending messages
func NewMessageManager(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerMessageManager {
return &PeerMessageManager{
PeerManager: New(ctx, func(ctx context.Context, p peer.ID) PeerProcess {
return createPeerQueue(ctx, p)
}),
}
}
// SendRequest sends the given GraphSyncRequest to the given peer
func (pmm *PeerMessageManager) SendRequest(p peer.ID, request gsmsg.GraphSyncRequest) {
pq := pmm.GetProcess(p).(PeerQueue)
pq.AddRequest(request)
}
package peermanager
import (
"context"
"math/rand"
"reflect"
"testing"
"time"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/testutil"
"github.com/libp2p/go-libp2p-peer"
)
type messageSent struct {
p peer.ID
message gsmsg.GraphSyncMessage
}
type fakePeer struct {
p peer.ID
messagesSent chan messageSent
}
func (fp *fakePeer) Startup() {}
func (fp *fakePeer) Shutdown() {}
func (fp *fakePeer) AddRequest(graphSyncRequest gsmsg.GraphSyncRequest) {
message := gsmsg.New()
message.AddRequest(graphSyncRequest)
fp.messagesSent <- messageSent{fp.p, message}
}
func makePeerQueueFactory(messagesSent chan messageSent) PeerQueueFactory {
return func(ctx context.Context, p peer.ID) PeerQueue {
return &fakePeer{
p: p,
messagesSent: messagesSent,
}
}
}
func TestSendingMessagesToPeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
messagesSent := make(chan messageSent, 5)
peerQueueFactory := makePeerQueueFactory(messagesSent)
tp := testutil.GeneratePeers(5)
id := gsmsg.GraphSyncRequestID(rand.Int31())
priority := gsmsg.GraphSyncPriority(rand.Int31())
selector := testutil.RandomBytes(100)
peerManager := NewMessageManager(ctx, peerQueueFactory)
request := gsmsg.NewRequest(id, selector, priority)
peerManager.SendRequest(tp[0], request)
peerManager.SendRequest(tp[1], request)
cancelRequest := gsmsg.CancelRequest(id)
peerManager.SendRequest(tp[0], cancelRequest)
select {
case <-ctx.Done():
t.Fatal("did not send first message")
case firstMessage := <-messagesSent:
if firstMessage.p != tp[0] {
t.Fatal("First message sent to wrong peer")
}
request := firstMessage.message.Requests()[0]
if request.ID() != id ||
request.IsCancel() != false ||
request.Priority() != priority ||
!reflect.DeepEqual(request.Selector(), selector) {
t.Fatal("did not send correct first message")
}
}
select {
case <-ctx.Done():
t.Fatal("did not send second message")
case secondMessage := <-messagesSent:
if secondMessage.p != tp[1] {
t.Fatal("Second message sent to wrong peer")
}
request := secondMessage.message.Requests()[0]
if request.ID() != id ||
request.IsCancel() != false ||
request.Priority() != priority ||
!reflect.DeepEqual(request.Selector(), selector) {
t.Fatal("did not send correct second message")
}
}
select {
case <-ctx.Done():
t.Fatal("did not send third message")
case thirdMessage := <-messagesSent:
if thirdMessage.p != tp[0] {
t.Fatal("Third message sent to wrong peer")
}
request := thirdMessage.message.Requests()[0]
if request.ID() != id ||
request.IsCancel() != true {
t.Fatal("third message was not a cancel")
}
}
connectedPeers := peerManager.ConnectedPeers()
if len(connectedPeers) != 2 ||
!testutil.ContainsPeer(connectedPeers, tp[0]) ||
!testutil.ContainsPeer(connectedPeers, tp[1]) {
t.Fatal("did not connect all peers that were sent messages")
}
}
...@@ -2,160 +2,29 @@ package peerresponsemanager ...@@ -2,160 +2,29 @@ package peerresponsemanager
import ( import (
"context" "context"
"sync"
"github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipfs/go-graphsync/peermanager"
"github.com/ipfs/go-graphsync/ipldbridge"
logging "github.com/ipfs/go-log"
"github.com/ipld/go-ipld-prime"
"github.com/ipfs/go-block-format"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/responsemanager/linktracker"
"github.com/ipfs/go-graphsync/responsemanager/responsebuilder"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
) )
var log = logging.Logger("graphsync") // PeerSenderFactory provides a function that will create a PeerResponseSender.
type PeerSenderFactory func(ctx context.Context, p peer.ID) PeerResponseSender
// PeerHandler is an interface that can send a response for a given peer across // PeerReponseManager manages message queues for peers
// the network. type PeerReponseManager struct {
type PeerHandler interface { *peermanager.PeerManager
SendResponse(peer.ID, []gsmsg.GraphSyncResponse, []blocks.Block) <-chan struct{}
} }
// PeerResponseManager handles batching, deduping, and sending responses for // New generates a new peer manager for sending responses
// a given peer across multiple requests. func New(ctx context.Context, createPeerSender PeerSenderFactory) *PeerReponseManager {
type PeerResponseManager struct { return &PeerReponseManager{
p peer.ID PeerManager: peermanager.New(ctx, func(ctx context.Context, p peer.ID) peermanager.PeerProcess {
ctx context.Context return createPeerSender(ctx, p)
cancel context.CancelFunc }),
peerHandler PeerHandler
ipldBridge ipldbridge.IPLDBridge
outgoingWork chan struct{}
linkTrackerLk sync.RWMutex
linkTracker *linktracker.LinkTracker
responseBuilderLk sync.RWMutex
responseBuilder *responsebuilder.ResponseBuilder
}
// New generates a new PeerResponse manager for the given context, peer ID,
// using the given peer handler and bridge to IPLD.
func New(ctx context.Context, p peer.ID, peerHandler PeerHandler, ipldBridge ipldbridge.IPLDBridge) *PeerResponseManager {
ctx, cancel := context.WithCancel(ctx)
return &PeerResponseManager{
p: p,
ctx: ctx,
cancel: cancel,
peerHandler: peerHandler,
ipldBridge: ipldBridge,
outgoingWork: make(chan struct{}, 1),
linkTracker: linktracker.New(),
} }
} }
// Startup initiates message sending for a peer // SenderForPeer returns a response sender to use with the given peer
func (prm *PeerResponseManager) Startup() { func (prm *PeerReponseManager) SenderForPeer(p peer.ID) PeerResponseSender {
go prm.run() return prm.GetProcess(p).(PeerResponseSender)
}
// Shutdown stops sending messages for a peer
func (prm *PeerResponseManager) Shutdown() {
prm.cancel()
}
// SendResponse sends a given link for a given
// requestID across the wire, as well as its corresponding
// block if the block is present and has not already been sent
func (prm *PeerResponseManager) SendResponse(
requestID gsmsg.GraphSyncRequestID,
link ipld.Link,
data []byte,
) {
hasBlock := data != nil
prm.linkTrackerLk.Lock()
sendBlock := hasBlock && prm.linkTracker.ShouldSendBlockFor(link)
prm.linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
prm.linkTrackerLk.Unlock()
if prm.buildResponse(func(responseBuilder *responsebuilder.ResponseBuilder) {
if sendBlock {
cidLink := link.(cidlink.Link)
block, err := blocks.NewBlockWithCid(data, cidLink.Cid)
if err != nil {
log.Errorf("Data did not match cid when sending link for %s", cidLink.String())
}
responseBuilder.AddBlock(block)
}
responseBuilder.AddLink(requestID, link, hasBlock)
}) {
prm.signalWork()
}
}
// FinishRequest marks the given requestID as having sent all responses
func (prm *PeerResponseManager) FinishRequest(requestID gsmsg.GraphSyncRequestID) {
prm.linkTrackerLk.Lock()
isComplete := prm.linkTracker.FinishRequest(requestID)
prm.linkTrackerLk.Unlock()
if prm.buildResponse(func(responseBuilder *responsebuilder.ResponseBuilder) {
responseBuilder.AddCompletedRequest(requestID, isComplete)
}) {
prm.signalWork()
}
}
func (prm *PeerResponseManager) buildResponse(buildResponseFn func(*responsebuilder.ResponseBuilder)) bool {
prm.responseBuilderLk.Lock()
defer prm.responseBuilderLk.Unlock()
if prm.responseBuilder == nil {
prm.responseBuilder = responsebuilder.New()
}
buildResponseFn(prm.responseBuilder)
return !prm.responseBuilder.Empty()
}
func (prm *PeerResponseManager) signalWork() {
select {
case prm.outgoingWork <- struct{}{}:
default:
}
}
func (prm *PeerResponseManager) run() {
for {
select {
case <-prm.ctx.Done():
return
case <-prm.outgoingWork:
prm.sendResponseMessage()
}
}
}
func (prm *PeerResponseManager) sendResponseMessage() {
prm.responseBuilderLk.Lock()
builder := prm.responseBuilder
prm.responseBuilder = nil
prm.responseBuilderLk.Unlock()
if builder == nil || builder.Empty() {
return
}
responses, blks, err := builder.Build(prm.ipldBridge)
if err != nil {
log.Errorf("Unable to assemble GraphSync response: %s", err.Error())
}
done := prm.peerHandler.SendResponse(prm.p, responses, blks)
// wait for message to be processed
select {
case <-done:
case <-prm.ctx.Done():
}
} }
package peerresponsemanager
import (
"context"
"sync"
"github.com/ipfs/go-graphsync/peermanager"
"github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipfs/go-graphsync/ipldbridge"
logging "github.com/ipfs/go-log"
"github.com/ipld/go-ipld-prime"
"github.com/ipfs/go-block-format"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/responsemanager/linktracker"
"github.com/ipfs/go-graphsync/responsemanager/responsebuilder"
peer "github.com/libp2p/go-libp2p-peer"
)
var log = logging.Logger("graphsync")
// PeerMessageHandler is an interface that can send a response for a given peer across
// the network.
type PeerMessageHandler interface {
SendResponse(peer.ID, []gsmsg.GraphSyncResponse, []blocks.Block) <-chan struct{}
}
type peerResponseSender struct {
p peer.ID
ctx context.Context
cancel context.CancelFunc
peerHandler PeerMessageHandler
ipldBridge ipldbridge.IPLDBridge
outgoingWork chan struct{}
linkTrackerLk sync.RWMutex
linkTracker *linktracker.LinkTracker
responseBuilderLk sync.RWMutex
responseBuilder *responsebuilder.ResponseBuilder
}
// PeerResponseSender handles batching, deduping, and sending responses for
// a given peer across multiple requests.
type PeerResponseSender interface {
peermanager.PeerProcess
SendResponse(
requestID gsmsg.GraphSyncRequestID,
link ipld.Link,
data []byte,
)
FinishRequest(requestID gsmsg.GraphSyncRequestID)
}
// NewResponseSender generates a new PeerResponseSender for the given context, peer ID,
// using the given peer message handler and bridge to IPLD.
func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHandler, ipldBridge ipldbridge.IPLDBridge) PeerResponseSender {
ctx, cancel := context.WithCancel(ctx)
return &peerResponseSender{
p: p,
ctx: ctx,
cancel: cancel,
peerHandler: peerHandler,
ipldBridge: ipldBridge,
outgoingWork: make(chan struct{}, 1),
linkTracker: linktracker.New(),
}
}
// Startup initiates message sending for a peer
func (prm *peerResponseSender) Startup() {
go prm.run()
}
// Shutdown stops sending messages for a peer
func (prm *peerResponseSender) Shutdown() {
prm.cancel()
}
// SendResponse sends a given link for a given
// requestID across the wire, as well as its corresponding
// block if the block is present and has not already been sent
func (prm *peerResponseSender) SendResponse(
requestID gsmsg.GraphSyncRequestID,
link ipld.Link,
data []byte,
) {
hasBlock := data != nil
prm.linkTrackerLk.Lock()
sendBlock := hasBlock && prm.linkTracker.ShouldSendBlockFor(link)
prm.linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
prm.linkTrackerLk.Unlock()
if prm.buildResponse(func(responseBuilder *responsebuilder.ResponseBuilder) {
if sendBlock {
cidLink := link.(cidlink.Link)
block, err := blocks.NewBlockWithCid(data, cidLink.Cid)
if err != nil {
log.Errorf("Data did not match cid when sending link for %s", cidLink.String())
}
responseBuilder.AddBlock(block)
}
responseBuilder.AddLink(requestID, link, hasBlock)
}) {
prm.signalWork()
}
}
// FinishRequest marks the given requestID as having sent all responses
func (prm *peerResponseSender) FinishRequest(requestID gsmsg.GraphSyncRequestID) {
prm.linkTrackerLk.Lock()
isComplete := prm.linkTracker.FinishRequest(requestID)
prm.linkTrackerLk.Unlock()
if prm.buildResponse(func(responseBuilder *responsebuilder.ResponseBuilder) {
responseBuilder.AddCompletedRequest(requestID, isComplete)
}) {
prm.signalWork()
}
}
func (prm *peerResponseSender) buildResponse(buildResponseFn func(*responsebuilder.ResponseBuilder)) bool {
prm.responseBuilderLk.Lock()
defer prm.responseBuilderLk.Unlock()
if prm.responseBuilder == nil {
prm.responseBuilder = responsebuilder.New()
}
buildResponseFn(prm.responseBuilder)
return !prm.responseBuilder.Empty()
}
func (prm *peerResponseSender) signalWork() {
select {
case prm.outgoingWork <- struct{}{}:
default:
}
}
func (prm *peerResponseSender) run() {
for {
select {
case <-prm.ctx.Done():
return
case <-prm.outgoingWork:
prm.sendResponseMessage()
}
}
}
func (prm *peerResponseSender) sendResponseMessage() {
prm.responseBuilderLk.Lock()
builder := prm.responseBuilder
prm.responseBuilder = nil
prm.responseBuilderLk.Unlock()
if builder == nil || builder.Empty() {
return
}
responses, blks, err := builder.Build(prm.ipldBridge)
if err != nil {
log.Errorf("Unable to assemble GraphSync response: %s", err.Error())
}
done := prm.peerHandler.SendResponse(prm.p, responses, blks)
// wait for message to be processed
select {
case <-done:
case <-prm.ctx.Done():
}
}
...@@ -51,7 +51,7 @@ func TestPeerResponseManagerSendsResponses(t *testing.T) { ...@@ -51,7 +51,7 @@ func TestPeerResponseManagerSendsResponses(t *testing.T) {
sent: sent, sent: sent,
} }
ipldBridge := testbridge.NewMockIPLDBridge() ipldBridge := testbridge.NewMockIPLDBridge()
peerResponseManager := New(ctx, p, fph, ipldBridge) peerResponseManager := NewResponseSender(ctx, p, fph, ipldBridge)
peerResponseManager.Startup() peerResponseManager.Startup()
peerResponseManager.SendResponse(requestID1, links[0], blks[0].RawData()) peerResponseManager.SendResponse(requestID1, links[0], blks[0].RawData())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment