Commit 796ecae2 authored by hannahhoward's avatar hannahhoward

fix(peerresponsemanager): limit response size

limit the size of blocks sent in a response to 512k a.l.a. bitswap
parent a690d47e
...@@ -6,20 +6,25 @@ import ( ...@@ -6,20 +6,25 @@ import (
"github.com/ipfs/go-graphsync/peermanager" "github.com/ipfs/go-graphsync/peermanager"
"github.com/ipld/go-ipld-prime/linking/cid" cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipfs/go-graphsync/ipldbridge" "github.com/ipfs/go-graphsync/ipldbridge"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime"
"github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-graphsync/linktracker" "github.com/ipfs/go-graphsync/linktracker"
gsmsg "github.com/ipfs/go-graphsync/message" gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/responsemanager/responsebuilder" "github.com/ipfs/go-graphsync/responsemanager/responsebuilder"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
) )
const (
// max block size is the maximum size for batching blocks in a single payload
maxBlockSize = 512 * 1024
)
var log = logging.Logger("graphsync") var log = logging.Logger("graphsync")
// PeerMessageHandler is an interface that can send a response for a given peer across // PeerMessageHandler is an interface that can send a response for a given peer across
...@@ -36,10 +41,10 @@ type peerResponseSender struct { ...@@ -36,10 +41,10 @@ type peerResponseSender struct {
ipldBridge ipldbridge.IPLDBridge ipldBridge ipldbridge.IPLDBridge
outgoingWork chan struct{} outgoingWork chan struct{}
linkTrackerLk sync.RWMutex linkTrackerLk sync.RWMutex
linkTracker *linktracker.LinkTracker linkTracker *linktracker.LinkTracker
responseBuilderLk sync.RWMutex responseBuildersLk sync.RWMutex
responseBuilder *responsebuilder.ResponseBuilder responseBuilders []*responsebuilder.ResponseBuilder
} }
// PeerResponseSender handles batching, deduping, and sending responses for // PeerResponseSender handles batching, deduping, and sending responses for
...@@ -91,10 +96,14 @@ func (prm *peerResponseSender) SendResponse( ...@@ -91,10 +96,14 @@ func (prm *peerResponseSender) SendResponse(
hasBlock := data != nil hasBlock := data != nil
prm.linkTrackerLk.Lock() prm.linkTrackerLk.Lock()
sendBlock := hasBlock && prm.linkTracker.BlockRefCount(link) == 0 sendBlock := hasBlock && prm.linkTracker.BlockRefCount(link) == 0
blkSize := len(data)
if !sendBlock {
blkSize = 0
}
prm.linkTracker.RecordLinkTraversal(requestID, link, hasBlock) prm.linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
prm.linkTrackerLk.Unlock() prm.linkTrackerLk.Unlock()
if prm.buildResponse(func(responseBuilder *responsebuilder.ResponseBuilder) { if prm.buildResponse(blkSize, func(responseBuilder *responsebuilder.ResponseBuilder) {
if sendBlock { if sendBlock {
cidLink := link.(cidlink.Link) cidLink := link.(cidlink.Link)
block, err := blocks.NewBlockWithCid(data, cidLink.Cid) block, err := blocks.NewBlockWithCid(data, cidLink.Cid)
...@@ -133,20 +142,31 @@ func (prm *peerResponseSender) FinishWithError(requestID gsmsg.GraphSyncRequestI ...@@ -133,20 +142,31 @@ func (prm *peerResponseSender) FinishWithError(requestID gsmsg.GraphSyncRequestI
} }
func (prm *peerResponseSender) finish(requestID gsmsg.GraphSyncRequestID, status gsmsg.GraphSyncResponseStatusCode) { func (prm *peerResponseSender) finish(requestID gsmsg.GraphSyncRequestID, status gsmsg.GraphSyncResponseStatusCode) {
if prm.buildResponse(func(responseBuilder *responsebuilder.ResponseBuilder) { if prm.buildResponse(0, func(responseBuilder *responsebuilder.ResponseBuilder) {
responseBuilder.AddCompletedRequest(requestID, status) responseBuilder.AddCompletedRequest(requestID, status)
}) { }) {
prm.signalWork() prm.signalWork()
} }
} }
func (prm *peerResponseSender) buildResponse(buildResponseFn func(*responsebuilder.ResponseBuilder)) bool { func (prm *peerResponseSender) buildResponse(blkSize int, buildResponseFn func(*responsebuilder.ResponseBuilder)) bool {
prm.responseBuilderLk.Lock() prm.responseBuildersLk.Lock()
defer prm.responseBuilderLk.Unlock() defer prm.responseBuildersLk.Unlock()
if prm.responseBuilder == nil { if shouldBeginNewResponse(prm.responseBuilders, blkSize) {
prm.responseBuilder = responsebuilder.New() prm.responseBuilders = append(prm.responseBuilders, responsebuilder.New())
}
responseBuilder := prm.responseBuilders[len(prm.responseBuilders)-1]
buildResponseFn(responseBuilder)
return !responseBuilder.Empty()
}
func shouldBeginNewResponse(responseBuilders []*responsebuilder.ResponseBuilder, blkSize int) bool {
if len(responseBuilders) == 0 {
return true
} }
buildResponseFn(prm.responseBuilder) if blkSize == 0 {
return !prm.responseBuilder.Empty() return false
}
return responseBuilders[len(responseBuilders)-1].BlockSize()+blkSize > maxBlockSize
} }
func (prm *peerResponseSender) signalWork() { func (prm *peerResponseSender) signalWork() {
...@@ -162,30 +182,33 @@ func (prm *peerResponseSender) run() { ...@@ -162,30 +182,33 @@ func (prm *peerResponseSender) run() {
case <-prm.ctx.Done(): case <-prm.ctx.Done():
return return
case <-prm.outgoingWork: case <-prm.outgoingWork:
prm.sendResponseMessage() prm.sendResponseMessages()
} }
} }
} }
func (prm *peerResponseSender) sendResponseMessage() { func (prm *peerResponseSender) sendResponseMessages() {
prm.responseBuilderLk.Lock() prm.responseBuildersLk.Lock()
builder := prm.responseBuilder builders := prm.responseBuilders
prm.responseBuilder = nil prm.responseBuilders = nil
prm.responseBuilderLk.Unlock() prm.responseBuildersLk.Unlock()
if builder == nil || builder.Empty() { for _, builder := range builders {
return if builder.Empty() {
} continue
responses, blks, err := builder.Build(prm.ipldBridge) }
if err != nil { responses, blks, err := builder.Build(prm.ipldBridge)
log.Errorf("Unable to assemble GraphSync response: %s", err.Error()) if err != nil {
} log.Errorf("Unable to assemble GraphSync response: %s", err.Error())
}
done := prm.peerHandler.SendResponse(prm.p, responses, blks) done := prm.peerHandler.SendResponse(prm.p, responses, blks)
// wait for message to be processed // wait for message to be processed
select { select {
case <-done: case <-done:
case <-prm.ctx.Done(): case <-prm.ctx.Done():
}
} }
} }
...@@ -9,11 +9,11 @@ import ( ...@@ -9,11 +9,11 @@ import (
"github.com/ipfs/go-graphsync/testbridge" "github.com/ipfs/go-graphsync/testbridge"
"github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
gsmsg "github.com/ipfs/go-graphsync/message" gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/testutil" "github.com/ipfs/go-graphsync/testutil"
"github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/linking/cid" cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
) )
...@@ -166,6 +166,133 @@ func TestPeerResponseManagerSendsResponses(t *testing.T) { ...@@ -166,6 +166,133 @@ func TestPeerResponseManagerSendsResponses(t *testing.T) {
} }
} }
func TestPeerResponseManagerSendsVeryLargeBlocksResponses(t *testing.T) {
p := testutil.GeneratePeers(1)[0]
requestID1 := gsmsg.GraphSyncRequestID(rand.Int31())
// generate large blocks before proceeding
blks := testutil.GenerateBlocksOfSize(5, 1000000)
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
links := make([]ipld.Link, 0, len(blks))
for _, block := range blks {
links = append(links, cidlink.Link{Cid: block.Cid()})
}
done := make(chan struct{}, 1)
sent := make(chan struct{}, 1)
fph := &fakePeerHandler{
done: done,
sent: sent,
}
ipldBridge := testbridge.NewMockIPLDBridge()
peerResponseManager := NewResponseSender(ctx, p, fph, ipldBridge)
peerResponseManager.Startup()
peerResponseManager.SendResponse(requestID1, links[0], blks[0].RawData())
select {
case <-ctx.Done():
t.Fatal("Did not send first message")
case <-sent:
}
if len(fph.lastBlocks) != 1 || fph.lastBlocks[0].Cid() != blks[0].Cid() {
t.Fatal("Did not send correct blocks for first message")
}
if len(fph.lastResponses) != 1 || fph.lastResponses[0].RequestID() != requestID1 ||
fph.lastResponses[0].Status() != gsmsg.PartialResponse {
t.Fatal("Did not send correct responses for first message")
}
// Send 3 very large blocks
peerResponseManager.SendResponse(requestID1, links[1], blks[1].RawData())
peerResponseManager.SendResponse(requestID1, links[2], blks[2].RawData())
peerResponseManager.SendResponse(requestID1, links[3], blks[3].RawData())
// let peer reponse manager know last message was sent so message sending can continue
done <- struct{}{}
select {
case <-ctx.Done():
t.Fatal("Should have sent second message but didn't")
case <-sent:
}
if len(fph.lastBlocks) != 1 || fph.lastBlocks[0].Cid() != blks[1].Cid() {
t.Fatal("Should have broken up message but didn't")
}
if len(fph.lastResponses) != 1 {
t.Fatal("Should have broken up message but didn't")
}
// Send one more block while waiting
peerResponseManager.SendResponse(requestID1, links[4], blks[4].RawData())
peerResponseManager.FinishRequest(requestID1)
// let peer reponse manager know last message was sent so message sending can continue
done <- struct{}{}
select {
case <-ctx.Done():
t.Fatal("Should have sent third message but didn't")
case <-sent:
}
if len(fph.lastBlocks) != 1 || fph.lastBlocks[0].Cid() != blks[2].Cid() {
t.Fatal("Should have broken up message but didn't")
}
if len(fph.lastResponses) != 1 {
t.Fatal("Should have broken up message but didn't")
}
// let peer reponse manager know last message was sent so message sending can continue
done <- struct{}{}
select {
case <-ctx.Done():
t.Fatal("Should have sent fourth message but didn't")
case <-sent:
}
if len(fph.lastBlocks) != 1 || fph.lastBlocks[0].Cid() != blks[3].Cid() {
t.Fatal("Should have broken up message but didn't")
}
if len(fph.lastResponses) != 1 {
t.Fatal("Should have broken up message but didn't")
}
// let peer reponse manager know last message was sent so message sending can continue
done <- struct{}{}
select {
case <-ctx.Done():
t.Fatal("Should have sent fifth message but didn't")
case <-sent:
}
if len(fph.lastBlocks) != 1 || fph.lastBlocks[0].Cid() != blks[4].Cid() {
t.Fatal("Should have broken up message but didn't")
}
if len(fph.lastResponses) != 1 {
t.Fatal("Should have broken up message but didn't")
}
response, err := findResponseForRequestID(fph.lastResponses, requestID1)
if err != nil {
t.Fatal("Did not send correct response for fifth message")
}
if response.Status() != gsmsg.RequestCompletedFull {
t.Fatal("Did not send proper response code in fifth message")
}
}
func findResponseForRequestID(responses []gsmsg.GraphSyncResponse, requestID gsmsg.GraphSyncRequestID) (gsmsg.GraphSyncResponse, error) { func findResponseForRequestID(responses []gsmsg.GraphSyncResponse, requestID gsmsg.GraphSyncRequestID) (gsmsg.GraphSyncResponse, error) {
for _, response := range responses { for _, response := range responses {
if response.RequestID() == requestID { if response.RequestID() == requestID {
......
package responsebuilder package responsebuilder
import ( import (
"github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-graphsync/ipldbridge" "github.com/ipfs/go-graphsync/ipldbridge"
gsmsg "github.com/ipfs/go-graphsync/message" gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/metadata" "github.com/ipfs/go-graphsync/metadata"
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
// GraphSync message components once responses are ready to send. // GraphSync message components once responses are ready to send.
type ResponseBuilder struct { type ResponseBuilder struct {
outgoingBlocks []blocks.Block outgoingBlocks []blocks.Block
blkSize int
completedResponses map[gsmsg.GraphSyncRequestID]gsmsg.GraphSyncResponseStatusCode completedResponses map[gsmsg.GraphSyncRequestID]gsmsg.GraphSyncResponseStatusCode
outgoingResponses map[gsmsg.GraphSyncRequestID]metadata.Metadata outgoingResponses map[gsmsg.GraphSyncRequestID]metadata.Metadata
} }
...@@ -27,9 +28,15 @@ func New() *ResponseBuilder { ...@@ -27,9 +28,15 @@ func New() *ResponseBuilder {
// AddBlock adds the given block to the response. // AddBlock adds the given block to the response.
func (rb *ResponseBuilder) AddBlock(block blocks.Block) { func (rb *ResponseBuilder) AddBlock(block blocks.Block) {
rb.blkSize += len(block.RawData())
rb.outgoingBlocks = append(rb.outgoingBlocks, block) rb.outgoingBlocks = append(rb.outgoingBlocks, block)
} }
// BlockSize returns the total size of all blocks in this response
func (rb *ResponseBuilder) BlockSize() int {
return rb.blkSize
}
// AddLink adds the given link and whether its block is present // AddLink adds the given link and whether its block is present
// to the response for the given request ID. // to the response for the given request ID.
func (rb *ResponseBuilder) AddLink(requestID gsmsg.GraphSyncRequestID, link ipld.Link, blockPresent bool) { func (rb *ResponseBuilder) AddLink(requestID gsmsg.GraphSyncRequestID, link ipld.Link, blockPresent bool) {
......
...@@ -11,7 +11,7 @@ import ( ...@@ -11,7 +11,7 @@ import (
"github.com/ipfs/go-graphsync/testbridge" "github.com/ipfs/go-graphsync/testbridge"
"github.com/ipfs/go-graphsync/testutil" "github.com/ipfs/go-graphsync/testutil"
"github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/linking/cid" cidlink "github.com/ipld/go-ipld-prime/linking/cid"
) )
func TestMessageBuilding(t *testing.T) { func TestMessageBuilding(t *testing.T) {
...@@ -48,6 +48,9 @@ func TestMessageBuilding(t *testing.T) { ...@@ -48,6 +48,9 @@ func TestMessageBuilding(t *testing.T) {
rb.AddBlock(block) rb.AddBlock(block)
} }
if rb.BlockSize() != 300 {
t.Fatal("did not calculate block size correctly")
}
responses, sentBlocks, err := rb.Build(ipldBridge) responses, sentBlocks, err := rb.Build(ipldBridge)
if err != nil { if err != nil {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment