Commit d1ef5e81 authored by hannahhoward's avatar hannahhoward

feat(requestmanager): perform traversals

Make the request manager actually inject responses from the network, and perform a selector
verification
parent 2adb6306
......@@ -3,6 +3,8 @@ package graphsync
import (
"context"
"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
"github.com/ipfs/go-graphsync/ipldbridge"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/messagequeue"
......@@ -23,17 +25,16 @@ var log = logging.Logger("graphsync")
// Graphsync.
type ResponseProgress = requestmanager.ResponseProgress
// ResponseError is an error that occurred during a traversal.
type ResponseError = requestmanager.ResponseError
// GraphSync is an instance of a GraphSync exchange that implements
// the graphsync protocol.
type GraphSync struct {
ipldBridge ipldbridge.IPLDBridge
network gsnet.GraphSyncNetwork
loader ipldbridge.Loader
storer ipldbridge.Storer
requestManager *requestmanager.RequestManager
responseManager *responsemanager.ResponseManager
asyncLoader *asyncloader.AsyncLoader
peerResponseManager *peerresponsemanager.PeerResponseManager
peerTaskQueue *peertaskqueue.PeerTaskQueue
peerManager *peermanager.PeerMessageManager
......@@ -44,14 +45,16 @@ type GraphSync struct {
// New creates a new GraphSync Exchange on the given network,
// using the given bridge to IPLD and the given link loader.
func New(parent context.Context, network gsnet.GraphSyncNetwork,
ipldBridge ipldbridge.IPLDBridge, loader ipldbridge.Loader) *GraphSync {
ipldBridge ipldbridge.IPLDBridge, loader ipldbridge.Loader,
storer ipldbridge.Storer) *GraphSync {
ctx, cancel := context.WithCancel(parent)
createMessageQueue := func(ctx context.Context, p peer.ID) peermanager.PeerQueue {
return messagequeue.New(ctx, p, network)
}
peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
requestManager := requestmanager.New(ctx, ipldBridge)
asyncLoader := asyncloader.New(ctx, loader, storer)
requestManager := requestmanager.New(ctx, asyncLoader, ipldBridge)
peerTaskQueue := peertaskqueue.New()
createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender {
return peerresponsemanager.NewResponseSender(ctx, p, peerManager, ipldBridge)
......@@ -62,6 +65,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
ipldBridge: ipldBridge,
network: network,
loader: loader,
storer: storer,
asyncLoader: asyncLoader,
requestManager: requestManager,
peerManager: peerManager,
peerTaskQueue: peerTaskQueue,
......@@ -71,6 +76,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
cancel: cancel,
}
asyncLoader.Startup()
requestManager.SetDelegate(peerManager)
requestManager.Startup()
responseManager.Startup()
......@@ -79,7 +85,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
}
// Request initiates a new GraphSync request to the given peer using the given selector spec.
func (gs *GraphSync) Request(ctx context.Context, p peer.ID, rootedSelector ipld.Node) (<-chan ResponseProgress, <-chan ResponseError) {
func (gs *GraphSync) Request(ctx context.Context, p peer.ID, rootedSelector ipld.Node) (<-chan ResponseProgress, <-chan error) {
return gs.requestManager.SendRequest(ctx, p, rootedSelector)
}
......
......@@ -2,18 +2,17 @@ package graphsync
import (
"context"
"fmt"
"io"
"math"
"math/rand"
"reflect"
"testing"
"time"
"github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync/ipldbridge"
gsmsg "github.com/ipfs/go-graphsync/message"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/testbridge"
......@@ -77,11 +76,10 @@ func TestMakeRequestToNetwork(t *testing.T) {
}
gsnet2.SetDelegate(r)
loader := func(ipldLink ipld.Link, lnkCtx ipldbridge.LinkContext) (io.Reader, error) {
return nil, fmt.Errorf("unable to load block")
}
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
bridge := testbridge.NewMockIPLDBridge()
graphSync := New(ctx, gsnet1, bridge, loader)
graphSync := New(ctx, gsnet1, bridge, loader, storer)
cids := testutil.GenerateCids(5)
spec := testbridge.NewMockSelectorSpec(cids)
......@@ -148,11 +146,15 @@ func TestSendResponseToIncomingRequest(t *testing.T) {
blks := testutil.GenerateBlocksOfSize(5, 100)
loader := testbridge.NewMockLoader(blks)
blockStore := make(map[ipld.Link][]byte)
for _, block := range blks {
blockStore[cidlink.Link{Cid: block.Cid()}] = block.RawData()
}
loader, storer := testbridge.NewMockStore(blockStore)
bridge := testbridge.NewMockIPLDBridge()
// initialize graphsync on second node to response to requests
New(ctx, gsnet2, bridge, loader)
New(ctx, gsnet2, bridge, loader, storer)
cids := make([]cid.Cid, 0, 7)
for _, block := range blks {
......
......@@ -6,21 +6,28 @@ import (
"math"
"github.com/ipfs/go-block-format"
"github.com/ipld/go-ipld-prime"
ipldbridge "github.com/ipfs/go-graphsync/ipldbridge"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/metadata"
"github.com/ipfs/go-graphsync/requestmanager/loader"
"github.com/ipfs/go-graphsync/requestmanager/types"
logging "github.com/ipfs/go-log"
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p-peer"
)
// ResponseProgress is the fundamental unit of responses making progress in
// the RequestManager. Still not sure about this one? Nodes? Blocks? Struct w/ error? more info?
// for now, it's just a block.
type ResponseProgress = blocks.Block
var log = logging.Logger("graphsync")
// ResponseError is an error that occurred during a traversal.
type ResponseError error
// ResponseProgress is the fundamental unit of responses making progress in
// the RequestManager.
type ResponseProgress struct {
Node ipld.Node // a node which matched the graphsync query
Path ipld.Path // the path of that node relative to the traversal start
LastBlock struct { // LastBlock stores the Path and Link of the last block edge we had to load.
ipld.Path
ipld.Link
}
}
const (
// maxPriority is the max priority as defined by the bitswap protocol
......@@ -28,17 +35,10 @@ const (
)
type inProgressRequestStatus struct {
ctx context.Context
cancelFn func()
p peer.ID
responseChannel chan ResponseProgress
errorChannel chan ResponseError
}
func (ipr *inProgressRequestStatus) shutdown() {
close(ipr.responseChannel)
close(ipr.errorChannel)
ipr.cancelFn()
ctx context.Context
cancelFn func()
p peer.ID
networkError chan error
}
// PeerHandler is an interface that can send requests to peers
......@@ -46,6 +46,17 @@ type PeerHandler interface {
SendRequest(p peer.ID, graphSyncRequest gsmsg.GraphSyncRequest)
}
// AsyncLoader is an interface for loading links asynchronously, returning
// results as new responses are processed
type AsyncLoader interface {
StartRequest(requestID gsmsg.GraphSyncRequestID)
ProcessResponse(responses map[gsmsg.GraphSyncRequestID]metadata.Metadata,
blks []blocks.Block)
AsyncLoad(requestID gsmsg.GraphSyncRequestID, link ipld.Link) <-chan types.AsyncLoadResult
CompleteResponsesFor(requestID gsmsg.GraphSyncRequestID)
CleanupRequest(requestID gsmsg.GraphSyncRequestID)
}
// RequestManager tracks outgoing requests and processes incoming reponses
// to them.
type RequestManager struct {
......@@ -55,7 +66,7 @@ type RequestManager struct {
ipldBridge ipldbridge.IPLDBridge
peerHandler PeerHandler
rc *responseCollector
asyncLoader AsyncLoader
// dont touch out side of run loop
nextRequestID gsmsg.GraphSyncRequestID
inProgressRequestStatuses map[gsmsg.GraphSyncRequestID]*inProgressRequestStatus
......@@ -66,12 +77,13 @@ type requestManagerMessage interface {
}
// New generates a new request manager from a context, network, and selectorQuerier
func New(ctx context.Context, ipldBridge ipldbridge.IPLDBridge) *RequestManager {
func New(ctx context.Context, asyncLoader AsyncLoader, ipldBridge ipldbridge.IPLDBridge) *RequestManager {
ctx, cancel := context.WithCancel(ctx)
return &RequestManager{
ctx: ctx,
cancel: cancel,
ipldBridge: ipldBridge,
asyncLoader: asyncLoader,
rc: newResponseCollector(ctx),
messages: make(chan requestManagerMessage, 16),
inProgressRequestStatuses: make(map[gsmsg.GraphSyncRequestID]*inProgressRequestStatus),
......@@ -86,7 +98,7 @@ func (rm *RequestManager) SetDelegate(peerHandler PeerHandler) {
type inProgressRequest struct {
requestID gsmsg.GraphSyncRequestID
incoming chan ResponseProgress
incomingError chan ResponseError
incomingError chan error
}
type newRequestMessage struct {
......@@ -98,7 +110,7 @@ type newRequestMessage struct {
// SendRequest initiates a new GraphSync request to the given peer.
func (rm *RequestManager) SendRequest(ctx context.Context,
p peer.ID,
cidRootedSelector ipld.Node) (<-chan ResponseProgress, <-chan ResponseError) {
cidRootedSelector ipld.Node) (<-chan ResponseProgress, <-chan error) {
if len(rm.ipldBridge.ValidateSelectorSpec(cidRootedSelector)) != 0 {
return rm.singleErrorResponse(fmt.Errorf("Invalid Selector Spec"))
}
......@@ -129,18 +141,18 @@ func (rm *RequestManager) SendRequest(ctx context.Context,
})
}
func (rm *RequestManager) emptyResponse() (chan ResponseProgress, chan ResponseError) {
func (rm *RequestManager) emptyResponse() (chan ResponseProgress, chan error) {
ch := make(chan ResponseProgress)
close(ch)
errCh := make(chan ResponseError)
errCh := make(chan error)
close(errCh)
return ch, errCh
}
func (rm *RequestManager) singleErrorResponse(err error) (chan ResponseProgress, chan ResponseError) {
func (rm *RequestManager) singleErrorResponse(err error) (chan ResponseProgress, chan error) {
ch := make(chan ResponseProgress)
close(ch)
errCh := make(chan ResponseError, 1)
errCh := make(chan error, 1)
errCh <- err
close(errCh)
return ch, errCh
......@@ -152,7 +164,7 @@ type cancelRequestMessage struct {
func (rm *RequestManager) cancelRequest(requestID gsmsg.GraphSyncRequestID,
incomingResponses chan ResponseProgress,
incomingErrors chan ResponseError) {
incomingErrors chan error) {
cancelMessageChannel := rm.messages
for cancelMessageChannel != nil || incomingResponses != nil || incomingErrors != nil {
select {
......@@ -175,16 +187,17 @@ func (rm *RequestManager) cancelRequest(requestID gsmsg.GraphSyncRequestID,
}
type processResponseMessage struct {
p peer.ID
responses []gsmsg.GraphSyncResponse
blks []blocks.Block
}
// ProcessResponses ingests the given responses from the network and
// and updates the in progress requests based on those responses.
func (rm *RequestManager) ProcessResponses(responses []gsmsg.GraphSyncResponse,
func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse,
blks []blocks.Block) {
select {
case rm.messages <- &processResponseMessage{responses, blks}:
case rm.messages <- &processResponseMessage{p, responses, blks}:
case <-rm.ctx.Done():
}
}
......@@ -216,31 +229,19 @@ func (rm *RequestManager) run() {
func (rm *RequestManager) cleanupInProcessRequests() {
for _, requestStatus := range rm.inProgressRequestStatuses {
requestStatus.shutdown()
requestStatus.cancelFn()
}
}
func (nrm *newRequestMessage) handle(rm *RequestManager) {
var inProgressChan chan ResponseProgress
var inProgressErr chan ResponseError
type terminateRequestMessage struct {
requestID gsmsg.GraphSyncRequestID
}
func (nrm *newRequestMessage) handle(rm *RequestManager) {
requestID := rm.nextRequestID
rm.nextRequestID++
selectorBytes, err := rm.ipldBridge.EncodeNode(nrm.selector)
if err != nil {
inProgressChan, inProgressErr = rm.singleErrorResponse(err)
} else {
inProgressChan = make(chan ResponseProgress)
inProgressErr = make(chan ResponseError)
ctx, cancel := context.WithCancel(rm.ctx)
rm.inProgressRequestStatuses[requestID] = &inProgressRequestStatus{
ctx, cancel, nrm.p, inProgressChan, inProgressErr,
}
rm.peerHandler.SendRequest(nrm.p, gsmsg.NewRequest(requestID, selectorBytes, maxPriority))
// not starting a traversal atm
}
inProgressChan, inProgressErr := rm.setupRequest(requestID, nrm.p, nrm.selector)
select {
case nrm.inProgressRequestChan <- inProgressRequest{
......@@ -252,6 +253,11 @@ func (nrm *newRequestMessage) handle(rm *RequestManager) {
}
}
func (trm *terminateRequestMessage) handle(rm *RequestManager) {
delete(rm.inProgressRequestStatuses, trm.requestID)
rm.asyncLoader.CleanupRequest(trm.requestID)
}
func (crm *cancelRequestMessage) handle(rm *RequestManager) {
inProgressRequestStatus, ok := rm.inProgressRequestStatuses[crm.requestID]
if !ok {
......@@ -260,49 +266,47 @@ func (crm *cancelRequestMessage) handle(rm *RequestManager) {
rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID))
delete(rm.inProgressRequestStatuses, crm.requestID)
inProgressRequestStatus.shutdown()
inProgressRequestStatus.cancelFn()
}
func (prm *processResponseMessage) handle(rm *RequestManager) {
for _, block := range prm.blks {
// dispatch every received block to every in flight request
// this is completely a temporary implementation
// meant to demonstrate we can produce a round trip of blocks
// the future implementation will actual have a temporary block store
// and will only dispatch to those requests whose selection transversal
// actually requires them
for _, requestStatus := range rm.inProgressRequestStatuses {
select {
case requestStatus.responseChannel <- block:
case <-rm.ctx.Done():
case <-requestStatus.ctx.Done():
}
filteredResponses := rm.filterResponsesForPeer(prm.responses, prm.p)
responseMetadata := metadataForResponses(filteredResponses, rm.ipldBridge)
rm.asyncLoader.ProcessResponse(responseMetadata, prm.blks)
rm.processTerminations(filteredResponses)
}
func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
responsesForPeer := make([]gsmsg.GraphSyncResponse, 0, len(responses))
for _, response := range responses {
requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
if !ok || requestStatus.p != p {
continue
}
responsesForPeer = append(responsesForPeer, response)
}
return responsesForPeer
}
for _, response := range prm.responses {
// we're keeping it super light for now -- basically just ignoring
// reason for termination and closing the channel
func (rm *RequestManager) processTerminations(responses []gsmsg.GraphSyncResponse) {
for _, response := range responses {
if gsmsg.IsTerminalResponseCode(response.Status()) {
requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
if ok {
if gsmsg.IsTerminalFailureCode(response.Status()) {
responseError := rm.generateResponseErrorFromStatus(response.Status())
select {
case requestStatus.errorChannel <- responseError:
case <-rm.ctx.Done():
case <-requestStatus.ctx.Done():
}
if gsmsg.IsTerminalFailureCode(response.Status()) {
requestStatus := rm.inProgressRequestStatuses[response.RequestID()]
responseError := rm.generateResponseErrorFromStatus(response.Status())
select {
case requestStatus.networkError <- responseError:
case <-requestStatus.ctx.Done():
}
delete(rm.inProgressRequestStatuses, response.RequestID())
requestStatus.shutdown()
requestStatus.cancelFn()
}
rm.asyncLoader.CompleteResponsesFor(response.RequestID())
delete(rm.inProgressRequestStatuses, response.RequestID())
}
}
}
func (rm *RequestManager) generateResponseErrorFromStatus(status gsmsg.GraphSyncResponseStatusCode) ResponseError {
func (rm *RequestManager) generateResponseErrorFromStatus(status gsmsg.GraphSyncResponseStatusCode) error {
switch status {
case gsmsg.RequestFailedBusy:
return fmt.Errorf("Request Failed - Peer Is Busy")
......@@ -316,3 +320,53 @@ func (rm *RequestManager) generateResponseErrorFromStatus(status gsmsg.GraphSync
return fmt.Errorf("Unknown")
}
}
func (rm *RequestManager) setupRequest(requestID gsmsg.GraphSyncRequestID, p peer.ID, selectorSpec ipld.Node) (chan ResponseProgress, chan error) {
selectorBytes, err := rm.ipldBridge.EncodeNode(selectorSpec)
if err != nil {
return rm.singleErrorResponse(err)
}
root, selector, err := rm.ipldBridge.DecodeSelectorSpec(selectorSpec)
if err != nil {
return rm.singleErrorResponse(err)
}
networkErrorChan := make(chan error, 1)
ctx, cancel := context.WithCancel(rm.ctx)
rm.inProgressRequestStatuses[requestID] = &inProgressRequestStatus{
ctx, cancel, p, networkErrorChan,
}
rm.asyncLoader.StartRequest(requestID)
rm.peerHandler.SendRequest(p, gsmsg.NewRequest(requestID, selectorBytes, maxPriority))
return rm.executeTraversal(ctx, requestID, root, selector, networkErrorChan)
}
func (rm *RequestManager) executeTraversal(
ctx context.Context,
requestID gsmsg.GraphSyncRequestID,
root ipld.Node,
selector ipldbridge.Selector,
networkErrorChan chan error,
) (chan ResponseProgress, chan error) {
inProgressChan := make(chan ResponseProgress)
inProgressErr := make(chan error)
loaderFn := loader.WrapAsyncLoader(ctx, rm.asyncLoader.AsyncLoad, requestID, inProgressErr)
visitor := visitToChannel(ctx, inProgressChan)
go func() {
rm.ipldBridge.Traverse(ctx, loaderFn, root, selector, visitor)
select {
case networkError := <-networkErrorChan:
select {
case <-rm.ctx.Done():
case inProgressErr <- networkError:
}
default:
}
select {
case <-ctx.Done():
case rm.messages <- &terminateRequestMessage{requestID}:
}
close(inProgressChan)
close(inProgressErr)
}()
return inProgressChan, inProgressErr
}
This diff is collapsed.
package requestmanager
import "context"
import (
"context"
)
type responseCollector struct {
ctx context.Context
......@@ -13,15 +15,15 @@ func newResponseCollector(ctx context.Context) *responseCollector {
func (rc *responseCollector) collectResponses(
requestCtx context.Context,
incomingResponses <-chan ResponseProgress,
incomingErrors <-chan ResponseError,
cancelRequest func()) (<-chan ResponseProgress, <-chan ResponseError) {
incomingErrors <-chan error,
cancelRequest func()) (<-chan ResponseProgress, <-chan error) {
returnedResponses := make(chan ResponseProgress)
returnedErrors := make(chan ResponseError)
returnedErrors := make(chan error)
go func() {
var receivedResponses []ResponseProgress
var receivedErrors []ResponseError
var receivedErrors []error
defer close(returnedResponses)
defer close(returnedErrors)
outgoingResponses := func() chan<- ResponseProgress {
......@@ -32,17 +34,17 @@ func (rc *responseCollector) collectResponses(
}
nextResponse := func() ResponseProgress {
if len(receivedResponses) == 0 {
return nil
return ResponseProgress{}
}
return receivedResponses[0]
}
outgoingErrors := func() chan<- ResponseError {
outgoingErrors := func() chan<- error {
if len(receivedErrors) == 0 {
return nil
}
return returnedErrors
}
nextError := func() ResponseError {
nextError := func() error {
if len(receivedErrors) == 0 {
return nil
}
......
......@@ -7,6 +7,10 @@ import (
"testing"
"time"
"github.com/ipfs/go-graphsync/testbridge"
ipld "github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipfs/go-graphsync/testutil"
)
......@@ -18,7 +22,7 @@ func TestBufferingResponseProgress(t *testing.T) {
requestCtx, requestCancel := context.WithCancel(backgroundCtx)
defer requestCancel()
incomingResponses := make(chan ResponseProgress)
incomingErrors := make(chan ResponseError)
incomingErrors := make(chan error)
cancelRequest := func() {}
outgoingResponses, outgoingErrors := rc.collectResponses(
......@@ -30,7 +34,13 @@ func TestBufferingResponseProgress(t *testing.T) {
select {
case <-ctx.Done():
t.Fatal("should have written to channel but couldn't")
case incomingResponses <- block:
case incomingResponses <- ResponseProgress{
Node: testbridge.NewMockBlockNode(block.RawData()),
LastBlock: struct {
ipld.Path
ipld.Link
}{ipld.Path{}, cidlink.Link{Cid: block.Cid()}},
}:
}
}
......@@ -51,8 +61,8 @@ func TestBufferingResponseProgress(t *testing.T) {
select {
case <-ctx.Done():
t.Fatal("should have read from channel but couldn't")
case testBlock := <-outgoingResponses:
if testBlock.Cid() != block.Cid() {
case testResponse := <-outgoingResponses:
if testResponse.LastBlock.Link.(cidlink.Link).Cid != block.Cid() {
t.Fatal("stored blocks incorrectly")
}
}
......
package requestmanager
import (
"context"
"github.com/ipfs/go-graphsync/ipldbridge"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/metadata"
ipld "github.com/ipld/go-ipld-prime"
)
func visitToChannel(ctx context.Context, inProgressChan chan ResponseProgress) ipldbridge.AdvVisitFn {
return func(tp ipldbridge.TraversalProgress, node ipld.Node, tr ipldbridge.TraversalReason) error {
select {
case <-ctx.Done():
case inProgressChan <- ResponseProgress{
Node: node,
Path: tp.Path,
LastBlock: tp.LastBlock,
}:
}
return nil
}
}
func metadataForResponses(responses []gsmsg.GraphSyncResponse, ipldBridge ipldbridge.IPLDBridge) map[gsmsg.GraphSyncRequestID]metadata.Metadata {
responseMetadata := make(map[gsmsg.GraphSyncRequestID]metadata.Metadata, len(responses))
for _, response := range responses {
md, err := metadata.DecodeMetadata(response.Extra(), ipldBridge)
if err != nil {
log.Warningf("Unable to decode metadata in response for request id: %d", response.RequestID())
continue
}
responseMetadata[response.RequestID()] = md
}
return responseMetadata
}
......@@ -106,7 +106,10 @@ func (mb *mockIPLDBridge) Traverse(ctx context.Context, loader ipldbridge.Loader
node, err := loadNode(lnk, loader)
if err == nil {
fn(ipldbridge.TraversalProgress{}, node, 0)
fn(ipldbridge.TraversalProgress{LastBlock: struct {
ipld.Path
ipld.Link
}{ipld.Path{}, cidlink.Link{Cid: lnk}}}, node, 0)
}
select {
case <-ctx.Done():
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment