Commit 1448d3bb authored by hannahhoward's avatar hannahhoward

feat(responsebuilder): add response builder

add a response builder that collects components of a response and then assembles a completed
response message
parent 007e60f2
package responsebuilder
import (
"github.com/ipfs/go-block-format"
"github.com/ipfs/go-graphsync/ipldbridge"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipld/go-ipld-prime"
)
// ResponseBuilder captures componenst of a response message across multiple
// requests for a given peer and then generates the corresponding
// GraphSync message components once responses are ready to send.
type ResponseBuilder struct {
outgoingBlocks []blocks.Block
completedResponses map[gsmsg.GraphSyncRequestID]bool
outgoingResponses map[gsmsg.GraphSyncRequestID]map[ipld.Link]bool
}
// New generates a new ResponseBuilder.
func New() *ResponseBuilder {
return &ResponseBuilder{
completedResponses: make(map[gsmsg.GraphSyncRequestID]bool),
outgoingResponses: make(map[gsmsg.GraphSyncRequestID]map[ipld.Link]bool),
}
}
// AddBlock adds the given block to the response.
func (rb *ResponseBuilder) AddBlock(block blocks.Block) {
rb.outgoingBlocks = append(rb.outgoingBlocks, block)
}
// AddLink adds the given link and whether its block is present
// to the response for the given request ID.
func (rb *ResponseBuilder) AddLink(requestID gsmsg.GraphSyncRequestID, link ipld.Link, blockPresent bool) {
linksForRequest, ok := rb.outgoingResponses[requestID]
if !ok {
linksForRequest = make(map[ipld.Link]bool)
rb.outgoingResponses[requestID] = linksForRequest
}
linksForRequest[link] = blockPresent
}
// AddCompletedRequest marks the given request as completed in the response,
// as well as whether the graphsync request responded with complete or partial
// data.
func (rb *ResponseBuilder) AddCompletedRequest(requestID gsmsg.GraphSyncRequestID, isComplete bool) {
rb.completedResponses[requestID] = isComplete
// make sure this completion goes out in next response even if no links are sent
_, ok := rb.outgoingResponses[requestID]
if !ok {
rb.outgoingResponses[requestID] = make(map[ipld.Link]bool)
}
}
// Empty returns true if there is no content to send
func (rb *ResponseBuilder) Empty() bool {
return len(rb.outgoingBlocks) == 0 && len(rb.outgoingResponses) == 0
}
// Build assembles and encodes response data from the added requests, links, and blocks.
func (rb *ResponseBuilder) Build(ipldBridge ipldbridge.IPLDBridge) ([]gsmsg.GraphSyncResponse, []blocks.Block, error) {
responses := make([]gsmsg.GraphSyncResponse, 0, len(rb.outgoingResponses))
for requestID, linkMap := range rb.outgoingResponses {
extra, err := makeEncodedData(linkMap, ipldBridge)
if err != nil {
return nil, nil, err
}
isFull, isComplete := rb.completedResponses[requestID]
responses = append(responses, gsmsg.NewResponse(requestID, responseCode(isFull, isComplete), extra))
}
return responses, rb.outgoingBlocks, nil
}
func makeEncodedData(entries map[ipld.Link]bool, ipldBridge ipldbridge.IPLDBridge) ([]byte, error) {
node, err := ipldBridge.BuildNode(func(nb ipldbridge.NodeBuilder) ipld.Node {
return nb.CreateList(func(lb ipldbridge.ListBuilder, nb ipldbridge.NodeBuilder) {
for link, blockPresent := range entries {
lb.Append(
nb.CreateMap(func(mb ipldbridge.MapBuilder, knb ipldbridge.NodeBuilder, vnb ipldbridge.NodeBuilder) {
mb.Insert(knb.CreateString("link"), vnb.CreateLink(link))
mb.Insert(knb.CreateString("blockPresent"), vnb.CreateBool(blockPresent))
}),
)
}
})
})
if err != nil {
return nil, err
}
return ipldBridge.EncodeNode(node)
}
func responseCode(isFull bool, isComplete bool) gsmsg.GraphSyncResponseStatusCode {
if !isComplete {
return gsmsg.PartialResponse
}
if isFull {
return gsmsg.RequestCompletedFull
}
return gsmsg.RequestCompletedPartial
}
package responsebuilder
import (
"fmt"
"math/rand"
"testing"
"github.com/ipld/go-ipld-prime/fluent"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/testbridge"
"github.com/ipfs/go-graphsync/testutil"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/linking/cid"
)
func TestMessageBuilding(t *testing.T) {
ipldBridge := testbridge.NewMockIPLDBridge()
rb := New()
blocks := testutil.GenerateBlocksOfSize(3, 100)
links := make([]ipld.Link, 0, len(blocks))
for _, block := range blocks {
links = append(links, cidlink.Link{Cid: block.Cid()})
}
requestID1 := gsmsg.GraphSyncRequestID(rand.Int31())
requestID2 := gsmsg.GraphSyncRequestID(rand.Int31())
requestID3 := gsmsg.GraphSyncRequestID(rand.Int31())
requestID4 := gsmsg.GraphSyncRequestID(rand.Int31())
rb.AddLink(requestID1, links[0], true)
rb.AddLink(requestID1, links[1], false)
rb.AddLink(requestID1, links[2], true)
rb.AddCompletedRequest(requestID1, false)
rb.AddLink(requestID2, links[1], true)
rb.AddLink(requestID2, links[2], true)
rb.AddCompletedRequest(requestID2, true)
rb.AddLink(requestID3, links[0], true)
rb.AddLink(requestID3, links[1], true)
rb.AddCompletedRequest(requestID4, true)
for _, block := range blocks {
rb.AddBlock(block)
}
responses, sentBlocks, err := rb.Build(ipldBridge)
if err != nil {
t.Fatal("Error building responses")
}
if len(responses) != 4 {
t.Fatal("Assembled wrong number of responses")
}
response1, err := findResponseForRequestID(responses, requestID1)
if err != nil || response1.Status() != gsmsg.RequestCompletedPartial {
t.Fatal("did not generate completed partial response")
}
response1Metadata, err := ipldBridge.DecodeNode(response1.Extra())
if err != nil {
t.Fatal("unable to read metadata from response")
}
analyzeMetadata(t, response1Metadata, map[ipld.Link]bool{
links[0]: true,
links[1]: false,
links[2]: true,
})
response2, err := findResponseForRequestID(responses, requestID2)
if err != nil || response2.Status() != gsmsg.RequestCompletedFull {
t.Fatal("did not generate completed partial response")
}
response2Metadata, err := ipldBridge.DecodeNode(response2.Extra())
if err != nil {
t.Fatal("unable to read metadata from response")
}
analyzeMetadata(t, response2Metadata, map[ipld.Link]bool{
links[1]: true,
links[2]: true,
})
response3, err := findResponseForRequestID(responses, requestID3)
if err != nil || response3.Status() != gsmsg.PartialResponse {
t.Fatal("did not generate completed partial response")
}
response3Metadata, err := ipldBridge.DecodeNode(response3.Extra())
if err != nil {
t.Fatal("unable to read metadata from response")
}
analyzeMetadata(t, response3Metadata, map[ipld.Link]bool{
links[0]: true,
links[1]: true,
})
response4, err := findResponseForRequestID(responses, requestID4)
if err != nil || response4.Status() != gsmsg.RequestCompletedFull {
t.Fatal("did not generate completed partial response")
}
if len(sentBlocks) != len(blocks) {
t.Fatal("Did not send all blocks")
}
for _, block := range sentBlocks {
if !testutil.ContainsBlock(blocks, block) {
t.Fatal("Sent incorrect block")
}
}
}
func findResponseForRequestID(responses []gsmsg.GraphSyncResponse, requestID gsmsg.GraphSyncRequestID) (gsmsg.GraphSyncResponse, error) {
for _, response := range responses {
if response.RequestID() == requestID {
return response, nil
}
}
return gsmsg.GraphSyncResponse{}, fmt.Errorf("Response Not Found")
}
func analyzeMetadata(t *testing.T, metadata ipld.Node, expectedMetadata map[ipld.Link]bool) {
if metadata.Length() != len(expectedMetadata) {
t.Fatal("Wrong amount of metadata on first response")
}
err := fluent.Recover(func() {
safeMetadata := fluent.WrapNode(metadata)
for i := 0; i < len(expectedMetadata); i++ {
metadatum := safeMetadata.TraverseIndex(i)
link := metadatum.TraverseField("link").AsLink()
blockPresent := metadatum.TraverseField("blockPresent").AsBool()
expectedBlockPresent, ok := expectedMetadata[link]
if !ok || expectedBlockPresent != blockPresent {
t.Fatal("Metadata did not match expected")
}
}
})
if err != nil {
t.Fatal(err)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment