Commit acd1bf4d authored by hannahhoward's avatar hannahhoward

feat(responsemanager): create response manager

Creates the Response Manager that processes incoming requests, initiates selector traversal, and
generates responses
parent 105e8cd2
......@@ -52,6 +52,7 @@ type PeerResponseSender interface {
data []byte,
)
FinishRequest(requestID gsmsg.GraphSyncRequestID)
FinishWithError(requestID gsmsg.GraphSyncRequestID, status gsmsg.GraphSyncResponseStatusCode)
}
// NewResponseSender generates a new PeerResponseSender for the given context, peer ID,
......@@ -113,14 +114,31 @@ func (prm *peerResponseSender) FinishRequest(requestID gsmsg.GraphSyncRequestID)
prm.linkTrackerLk.Lock()
isComplete := prm.linkTracker.FinishRequest(requestID)
prm.linkTrackerLk.Unlock()
var status gsmsg.GraphSyncResponseStatusCode
if isComplete {
status = gsmsg.RequestCompletedFull
} else {
status = gsmsg.RequestCompletedPartial
}
prm.finish(requestID, status)
}
// FinishWithError marks the given requestID as having terminated with an error
func (prm *peerResponseSender) FinishWithError(requestID gsmsg.GraphSyncRequestID, status gsmsg.GraphSyncResponseStatusCode) {
prm.linkTrackerLk.Lock()
prm.linkTracker.FinishRequest(requestID)
prm.linkTrackerLk.Unlock()
prm.finish(requestID, status)
}
func (prm *peerResponseSender) finish(requestID gsmsg.GraphSyncRequestID, status gsmsg.GraphSyncResponseStatusCode) {
if prm.buildResponse(func(responseBuilder *responsebuilder.ResponseBuilder) {
responseBuilder.AddCompletedRequest(requestID, isComplete)
responseBuilder.AddCompletedRequest(requestID, status)
}) {
prm.signalWork()
}
}
func (prm *peerResponseSender) buildResponse(buildResponseFn func(*responsebuilder.ResponseBuilder)) bool {
prm.responseBuilderLk.Lock()
defer prm.responseBuilderLk.Unlock()
......
......@@ -12,14 +12,14 @@ import (
// GraphSync message components once responses are ready to send.
type ResponseBuilder struct {
outgoingBlocks []blocks.Block
completedResponses map[gsmsg.GraphSyncRequestID]bool
completedResponses map[gsmsg.GraphSyncRequestID]gsmsg.GraphSyncResponseStatusCode
outgoingResponses map[gsmsg.GraphSyncRequestID]map[ipld.Link]bool
}
// New generates a new ResponseBuilder.
func New() *ResponseBuilder {
return &ResponseBuilder{
completedResponses: make(map[gsmsg.GraphSyncRequestID]bool),
completedResponses: make(map[gsmsg.GraphSyncRequestID]gsmsg.GraphSyncResponseStatusCode),
outgoingResponses: make(map[gsmsg.GraphSyncRequestID]map[ipld.Link]bool),
}
}
......@@ -43,8 +43,8 @@ func (rb *ResponseBuilder) AddLink(requestID gsmsg.GraphSyncRequestID, link ipld
// AddCompletedRequest marks the given request as completed in the response,
// as well as whether the graphsync request responded with complete or partial
// data.
func (rb *ResponseBuilder) AddCompletedRequest(requestID gsmsg.GraphSyncRequestID, isComplete bool) {
rb.completedResponses[requestID] = isComplete
func (rb *ResponseBuilder) AddCompletedRequest(requestID gsmsg.GraphSyncRequestID, status gsmsg.GraphSyncResponseStatusCode) {
rb.completedResponses[requestID] = status
// make sure this completion goes out in next response even if no links are sent
_, ok := rb.outgoingResponses[requestID]
if !ok {
......@@ -65,8 +65,8 @@ func (rb *ResponseBuilder) Build(ipldBridge ipldbridge.IPLDBridge) ([]gsmsg.Grap
if err != nil {
return nil, nil, err
}
isFull, isComplete := rb.completedResponses[requestID]
responses = append(responses, gsmsg.NewResponse(requestID, responseCode(isFull, isComplete), extra))
status, isComplete := rb.completedResponses[requestID]
responses = append(responses, gsmsg.NewResponse(requestID, responseCode(status, isComplete), extra))
}
return responses, rb.outgoingBlocks, nil
}
......@@ -90,12 +90,9 @@ func makeEncodedData(entries map[ipld.Link]bool, ipldBridge ipldbridge.IPLDBridg
return ipldBridge.EncodeNode(node)
}
func responseCode(isFull bool, isComplete bool) gsmsg.GraphSyncResponseStatusCode {
func responseCode(status gsmsg.GraphSyncResponseStatusCode, isComplete bool) gsmsg.GraphSyncResponseStatusCode {
if !isComplete {
return gsmsg.PartialResponse
}
if isFull {
return gsmsg.RequestCompletedFull
}
return gsmsg.RequestCompletedPartial
return status
}
......@@ -31,17 +31,17 @@ func TestMessageBuilding(t *testing.T) {
rb.AddLink(requestID1, links[1], false)
rb.AddLink(requestID1, links[2], true)
rb.AddCompletedRequest(requestID1, false)
rb.AddCompletedRequest(requestID1, gsmsg.RequestCompletedPartial)
rb.AddLink(requestID2, links[1], true)
rb.AddLink(requestID2, links[2], true)
rb.AddCompletedRequest(requestID2, true)
rb.AddCompletedRequest(requestID2, gsmsg.RequestCompletedFull)
rb.AddLink(requestID3, links[0], true)
rb.AddLink(requestID3, links[1], true)
rb.AddCompletedRequest(requestID4, true)
rb.AddCompletedRequest(requestID4, gsmsg.RequestCompletedFull)
for _, block := range blocks {
rb.AddBlock(block)
......
package responsemanager
import (
"context"
"time"
"github.com/ipfs/go-graphsync/responsemanager/loader"
ipld "github.com/ipld/go-ipld-prime"
"github.com/ipfs/go-graphsync/ipldbridge"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
"github.com/ipfs/go-graphsync/responsemanager/peertaskqueue/peertask"
peer "github.com/libp2p/go-libp2p-peer"
)
const (
maxInProcessRequests = 6
thawSpeed = time.Millisecond * 100
)
type inProgressResponseStatus struct {
ctx context.Context
cancelFn func()
selector []byte
}
type responseKey struct {
p peer.ID
requestID gsmsg.GraphSyncRequestID
}
type responseTaskData struct {
ctx context.Context
selector []byte
}
// QueryQueue is an interface that can receive new selector query tasks
// and prioritize them as needed, and pop them off later
type QueryQueue interface {
PushBlock(to peer.ID, tasks ...peertask.Task)
PopBlock() *peertask.TaskBlock
Remove(identifier peertask.Identifier, p peer.ID)
ThawRound()
}
// PeerManager is an interface that returns sender interfaces for peer responses.
type PeerManager interface {
SenderForPeer(p peer.ID) peerresponsemanager.PeerResponseSender
}
type responseManagerMessage interface {
handle(rm *ResponseManager)
}
// ResponseManager handles incoming requests from the network, initiates selector
// traversals, and transmits responses
type ResponseManager struct {
ctx context.Context
cancelFn context.CancelFunc
loader ipldbridge.Loader
ipldBridge ipldbridge.IPLDBridge
peerManager PeerManager
queryQueue QueryQueue
messages chan responseManagerMessage
workSignal chan struct{}
ticker *time.Ticker
inProgressResponses map[responseKey]inProgressResponseStatus
}
// New creates a new response manager from the given context, loader,
// bridge to IPLD interface, peerManager, and queryQueue.
func New(ctx context.Context,
loader ipldbridge.Loader,
ipldBridge ipldbridge.IPLDBridge,
peerManager PeerManager,
queryQueue QueryQueue) *ResponseManager {
ctx, cancelFn := context.WithCancel(ctx)
return &ResponseManager{
ctx: ctx,
cancelFn: cancelFn,
loader: loader,
ipldBridge: ipldBridge,
peerManager: peerManager,
queryQueue: queryQueue,
messages: make(chan responseManagerMessage, 16),
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(thawSpeed),
inProgressResponses: make(map[responseKey]inProgressResponseStatus),
}
}
type processRequestMessage struct {
p peer.ID
requests []gsmsg.GraphSyncRequest
}
// ProcessRequests processes incoming requests for the given peer
func (rm *ResponseManager) ProcessRequests(p peer.ID, requests []gsmsg.GraphSyncRequest) {
select {
case rm.messages <- &processRequestMessage{p, requests}:
case <-rm.ctx.Done():
}
}
type synchronizeMessage struct {
sync chan struct{}
}
// this is a test utility method to force all messages to get processed
func (rm *ResponseManager) synchronize() {
sync := make(chan struct{})
select {
case rm.messages <- &synchronizeMessage{sync}:
case <-rm.ctx.Done():
}
select {
case <-sync:
case <-rm.ctx.Done():
}
}
type responseDataRequest struct {
key responseKey
taskDataChan chan *responseTaskData
}
type finishResponseRequest struct {
key responseKey
}
func (rm *ResponseManager) processQueriesWorker() {
taskDataChan := make(chan *responseTaskData)
var taskData *responseTaskData
for {
nextTaskBlock := rm.queryQueue.PopBlock()
for nextTaskBlock == nil {
select {
case <-rm.ctx.Done():
return
case <-rm.workSignal:
nextTaskBlock = rm.queryQueue.PopBlock()
case <-rm.ticker.C:
rm.queryQueue.ThawRound()
nextTaskBlock = rm.queryQueue.PopBlock()
}
}
for _, task := range nextTaskBlock.Tasks {
key := task.Identifier.(responseKey)
select {
case rm.messages <- &responseDataRequest{key, taskDataChan}:
case <-rm.ctx.Done():
return
}
select {
case taskData = <-taskDataChan:
case <-rm.ctx.Done():
return
}
rm.executeQuery(taskData.ctx, key.p, key.requestID, taskData.selector)
select {
case rm.messages <- &finishResponseRequest{key}:
case <-rm.ctx.Done():
}
}
nextTaskBlock.Done(nextTaskBlock.Tasks)
}
}
func noopVisitor(ipldbridge.TraversalProgress, ipld.Node, ipldbridge.TraversalReason) error {
return nil
}
func (rm *ResponseManager) executeQuery(ctx context.Context,
p peer.ID,
requestID gsmsg.GraphSyncRequestID,
selector []byte) {
peerResponseSender := rm.peerManager.SenderForPeer(p)
selectorSpec, err := rm.ipldBridge.DecodeNode(selector)
if err != nil {
peerResponseSender.FinishWithError(requestID, gsmsg.RequestFailedUnknown)
return
}
root, reifiedSelector, err := rm.ipldBridge.DecodeSelectorSpec(selectorSpec)
if err != nil {
peerResponseSender.FinishWithError(requestID, gsmsg.RequestFailedUnknown)
return
}
wrappedLoader := loader.WrapLoader(rm.loader, requestID, peerResponseSender)
err = rm.ipldBridge.Traverse(ctx, wrappedLoader, root, reifiedSelector, noopVisitor)
if err != nil {
peerResponseSender.FinishWithError(requestID, gsmsg.RequestFailedUnknown)
return
}
peerResponseSender.FinishRequest(requestID)
}
// Startup starts processing for the WantManager.
func (rm *ResponseManager) Startup() {
go rm.run()
}
// Shutdown ends processing for the want manager.
func (rm *ResponseManager) Shutdown() {
rm.cancelFn()
}
func (rm *ResponseManager) cleanupInProcessResponses() {
for _, response := range rm.inProgressResponses {
response.cancelFn()
}
}
func (rm *ResponseManager) run() {
defer rm.cleanupInProcessResponses()
for i := 0; i < maxInProcessRequests; i++ {
go rm.processQueriesWorker()
}
for {
select {
case <-rm.ctx.Done():
return
case message := <-rm.messages:
message.handle(rm)
}
}
}
func (prm *processRequestMessage) handle(rm *ResponseManager) {
for _, request := range prm.requests {
key := responseKey{p: prm.p, requestID: request.ID()}
if !request.IsCancel() {
ctx, cancelFn := context.WithCancel(rm.ctx)
rm.inProgressResponses[key] =
inProgressResponseStatus{
ctx: ctx,
cancelFn: cancelFn,
selector: request.Selector(),
}
rm.queryQueue.PushBlock(prm.p, peertask.Task{Identifier: key, Priority: int(request.Priority())})
select {
case rm.workSignal <- struct{}{}:
default:
}
} else {
rm.queryQueue.Remove(key, key.p)
response, ok := rm.inProgressResponses[key]
if ok {
response.cancelFn()
}
}
}
}
func (rdr *responseDataRequest) handle(rm *ResponseManager) {
response, ok := rm.inProgressResponses[rdr.key]
var taskData *responseTaskData
if ok {
taskData = &responseTaskData{response.ctx, response.selector}
} else {
taskData = nil
}
select {
case <-rm.ctx.Done():
case rdr.taskDataChan <- taskData:
}
}
func (frr *finishResponseRequest) handle(rm *ResponseManager) {
response, ok := rm.inProgressResponses[frr.key]
if !ok {
return
}
delete(rm.inProgressResponses, frr.key)
response.cancelFn()
}
func (sm *synchronizeMessage) handle(rm *ResponseManager) {
select {
case <-rm.ctx.Done():
case sm.sync <- struct{}{}:
}
}
package responsemanager
import (
"bytes"
"context"
"fmt"
"io"
"math"
"math/rand"
"reflect"
"sync"
"testing"
"time"
"github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync/ipldbridge"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
"github.com/ipfs/go-graphsync/responsemanager/peertaskqueue/peertask"
"github.com/ipfs/go-graphsync/testbridge"
"github.com/ipfs/go-graphsync/testutil"
ipld "github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
peer "github.com/libp2p/go-libp2p-peer"
)
type fakeQueryQueue struct {
popWait sync.WaitGroup
queriesLk sync.RWMutex
queries []*peertask.TaskBlock
}
func (fqq *fakeQueryQueue) PushBlock(to peer.ID, tasks ...peertask.Task) {
fqq.queriesLk.Lock()
fqq.queries = append(fqq.queries, &peertask.TaskBlock{
Tasks: tasks,
Priority: tasks[0].Priority,
Target: to,
Done: func([]peertask.Task) {},
})
fqq.queriesLk.Unlock()
}
func (fqq *fakeQueryQueue) PopBlock() *peertask.TaskBlock {
fqq.popWait.Wait()
fqq.queriesLk.Lock()
defer fqq.queriesLk.Unlock()
if len(fqq.queries) == 0 {
return nil
}
block := fqq.queries[0]
fqq.queries = fqq.queries[1:]
return block
}
func (fqq *fakeQueryQueue) Remove(identifier peertask.Identifier, p peer.ID) {
fqq.queriesLk.Lock()
defer fqq.queriesLk.Unlock()
for i, query := range fqq.queries {
if query.Target == p {
for j, task := range query.Tasks {
if task.Identifier == identifier {
query.Tasks = append(query.Tasks[:j], query.Tasks[j+1:]...)
}
}
if len(query.Tasks) == 0 {
fqq.queries = append(fqq.queries[:i], fqq.queries[i+1:]...)
}
}
}
}
func (fqq *fakeQueryQueue) ThawRound() {
}
type fakePeerManager struct {
lastPeer peer.ID
peerResponseSender peerresponsemanager.PeerResponseSender
}
func (fpm *fakePeerManager) SenderForPeer(p peer.ID) peerresponsemanager.PeerResponseSender {
fpm.lastPeer = p
return fpm.peerResponseSender
}
type sentResponse struct {
requestID gsmsg.GraphSyncRequestID
link ipld.Link
data []byte
}
type fakePeerResponseSender struct {
sentResponses chan sentResponse
lastCompletedRequest chan gsmsg.GraphSyncRequestID
}
func (fprs *fakePeerResponseSender) Startup() {}
func (fprs *fakePeerResponseSender) Shutdown() {}
func (fprs *fakePeerResponseSender) SendResponse(
requestID gsmsg.GraphSyncRequestID,
link ipld.Link,
data []byte,
) {
fprs.sentResponses <- sentResponse{requestID, link, data}
}
func (fprs *fakePeerResponseSender) FinishRequest(requestID gsmsg.GraphSyncRequestID) {
fprs.lastCompletedRequest <- requestID
}
func (fprs *fakePeerResponseSender) FinishWithError(requestID gsmsg.GraphSyncRequestID, status gsmsg.GraphSyncResponseStatusCode) {
fprs.lastCompletedRequest <- requestID
}
func makeLoader(blks []blocks.Block) ipldbridge.Loader {
return func(ipldLink ipld.Link, lnkCtx ipldbridge.LinkContext) (io.Reader, error) {
lnk := ipldLink.(cidlink.Link).Cid
for _, block := range blks {
if block.Cid() == lnk {
return bytes.NewReader(block.RawData()), nil
}
}
return nil, fmt.Errorf("unable to load block")
}
}
func TestIncomingQuery(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 40*time.Millisecond)
defer cancel()
blks := testutil.GenerateBlocksOfSize(5, 20)
loader := makeLoader(blks)
ipldBridge := testbridge.NewMockIPLDBridge()
requestIDChan := make(chan gsmsg.GraphSyncRequestID, 1)
sentResponses := make(chan sentResponse, len(blks))
fprs := &fakePeerResponseSender{lastCompletedRequest: requestIDChan, sentResponses: sentResponses}
peerManager := &fakePeerManager{peerResponseSender: fprs}
queryQueue := &fakeQueryQueue{}
responseManager := New(ctx, loader, ipldBridge, peerManager, queryQueue)
responseManager.Startup()
cids := make([]cid.Cid, 0, 5)
for _, block := range blks {
cids = append(cids, block.Cid())
}
selectorSpec := testbridge.NewMockSelectorSpec(cids)
selector, err := ipldBridge.EncodeNode(selectorSpec)
if err != nil {
t.Fatal("error encoding selector")
}
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
requests := []gsmsg.GraphSyncRequest{
gsmsg.NewRequest(requestID, selector, gsmsg.GraphSyncPriority(math.MaxInt32)),
}
p := testutil.GeneratePeers(1)[0]
responseManager.ProcessRequests(p, requests)
select {
case <-ctx.Done():
t.Fatal("Should have completed request but didn't")
case <-requestIDChan:
}
for i := 0; i < len(blks); i++ {
select {
case sentResponse := <-sentResponses:
k := sentResponse.link.(cidlink.Link)
blockIndex := testutil.IndexOf(blks, k.Cid)
if blockIndex == -1 {
t.Fatal("sent incorrect link")
}
if !reflect.DeepEqual(sentResponse.data, blks[blockIndex].RawData()) {
t.Fatal("sent incorrect data")
}
if sentResponse.requestID != requestID {
t.Fatal("incorrect response id")
}
case <-ctx.Done():
t.Fatal("did not send enough responses")
}
}
}
func TestCancellationQueryInProgress(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 40*time.Millisecond)
defer cancel()
blks := testutil.GenerateBlocksOfSize(5, 20)
loader := makeLoader(blks)
ipldBridge := testbridge.NewMockIPLDBridge()
requestIDChan := make(chan gsmsg.GraphSyncRequestID)
sentResponses := make(chan sentResponse)
fprs := &fakePeerResponseSender{lastCompletedRequest: requestIDChan, sentResponses: sentResponses}
peerManager := &fakePeerManager{peerResponseSender: fprs}
queryQueue := &fakeQueryQueue{}
responseManager := New(ctx, loader, ipldBridge, peerManager, queryQueue)
responseManager.Startup()
cids := make([]cid.Cid, 0, 5)
for _, block := range blks {
cids = append(cids, block.Cid())
}
selectorSpec := testbridge.NewMockSelectorSpec(cids)
selector, err := ipldBridge.EncodeNode(selectorSpec)
if err != nil {
t.Fatal("error encoding selector")
}
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
requests := []gsmsg.GraphSyncRequest{
gsmsg.NewRequest(requestID, selector, gsmsg.GraphSyncPriority(math.MaxInt32)),
}
p := testutil.GeneratePeers(1)[0]
responseManager.ProcessRequests(p, requests)
// read one block
select {
case sentResponse := <-sentResponses:
k := sentResponse.link.(cidlink.Link)
blockIndex := testutil.IndexOf(blks, k.Cid)
if blockIndex == -1 {
t.Fatal("sent incorrect link")
}
if !reflect.DeepEqual(sentResponse.data, blks[blockIndex].RawData()) {
t.Fatal("sent incorrect data")
}
if sentResponse.requestID != requestID {
t.Fatal("incorrect response id")
}
case <-ctx.Done():
t.Fatal("did not send responses")
}
// send a cancellation
requests = []gsmsg.GraphSyncRequest{
gsmsg.CancelRequest(requestID),
}
responseManager.ProcessRequests(p, requests)
responseManager.synchronize()
// read one block -- to unblock processing
select {
case sentResponse := <-sentResponses:
k := sentResponse.link.(cidlink.Link)
blockIndex := testutil.IndexOf(blks, k.Cid)
if blockIndex == -1 {
t.Fatal("sent incorrect link")
}
if !reflect.DeepEqual(sentResponse.data, blks[blockIndex].RawData()) {
t.Fatal("sent incorrect data")
}
if sentResponse.requestID != requestID {
t.Fatal("incorrect response id")
}
case <-ctx.Done():
t.Fatal("did not send responses")
}
// at this point traversal should abort and we should receive a completion
select {
case <-ctx.Done():
t.Fatal("Should have completed request but didn't")
case <-sentResponses:
t.Fatal("should not send any more responses")
case <-requestIDChan:
}
}
func TestEarlyCancellation(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 40*time.Millisecond)
defer cancel()
blks := testutil.GenerateBlocksOfSize(5, 20)
loader := makeLoader(blks)
ipldBridge := testbridge.NewMockIPLDBridge()
requestIDChan := make(chan gsmsg.GraphSyncRequestID)
sentResponses := make(chan sentResponse)
fprs := &fakePeerResponseSender{lastCompletedRequest: requestIDChan, sentResponses: sentResponses}
peerManager := &fakePeerManager{peerResponseSender: fprs}
queryQueue := &fakeQueryQueue{}
queryQueue.popWait.Add(1)
responseManager := New(ctx, loader, ipldBridge, peerManager, queryQueue)
responseManager.Startup()
cids := make([]cid.Cid, 0, 5)
for _, block := range blks {
cids = append(cids, block.Cid())
}
selectorSpec := testbridge.NewMockSelectorSpec(cids)
selector, err := ipldBridge.EncodeNode(selectorSpec)
if err != nil {
t.Fatal("error encoding selector")
}
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
requests := []gsmsg.GraphSyncRequest{
gsmsg.NewRequest(requestID, selector, gsmsg.GraphSyncPriority(math.MaxInt32)),
}
p := testutil.GeneratePeers(1)[0]
responseManager.ProcessRequests(p, requests)
// send a cancellation
requests = []gsmsg.GraphSyncRequest{
gsmsg.CancelRequest(requestID),
}
responseManager.ProcessRequests(p, requests)
responseManager.synchronize()
// unblock popping from queue
queryQueue.popWait.Done()
// verify no responses processed
select {
case <-ctx.Done():
case <-sentResponses:
t.Fatal("should not send any more responses")
case <-requestIDChan:
t.Fatal("should not send have completed response")
}
}
......@@ -96,6 +96,11 @@ func (mb *mockIPLDBridge) Traverse(ctx context.Context, loader ipldbridge.Loader
} else {
fn(ipldbridge.TraversalProgress{}, node, 0)
}
select {
case <-ctx.Done():
return lastErr
default:
}
}
return lastErr
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment