Commit 492e1fd4 authored by Hannah Howard's avatar Hannah Howard Committed by hannahhoward

refactor(responsemanager): breakup function

Breakup super long and complicated executeQuery function in two
parent 6d98af58
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
"github.com/ipfs/go-peertaskqueue/peertask" "github.com/ipfs/go-peertaskqueue/peertask"
ipld "github.com/ipld/go-ipld-prime" ipld "github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid" cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
) )
...@@ -114,7 +115,7 @@ func (rm *ResponseManager) ProcessRequests(ctx context.Context, p peer.ID, reque ...@@ -114,7 +115,7 @@ func (rm *ResponseManager) ProcessRequests(ctx context.Context, p peer.ID, reque
} }
} }
// RegisterExtension registers an extension to process new incoming requests // RegisterHook registers an extension to process new incoming requests
func (rm *ResponseManager) RegisterHook( func (rm *ResponseManager) RegisterHook(
overrideDefaultValidation bool, overrideDefaultValidation bool,
hook graphsync.OnRequestReceivedHook) { hook graphsync.OnRequestReceivedHook) {
...@@ -198,20 +199,36 @@ func (rm *ResponseManager) executeQuery(ctx context.Context, ...@@ -198,20 +199,36 @@ func (rm *ResponseManager) executeQuery(ctx context.Context,
p peer.ID, p peer.ID,
request gsmsg.GraphSyncRequest) { request gsmsg.GraphSyncRequest) {
peerResponseSender := rm.peerManager.SenderForPeer(p) peerResponseSender := rm.peerManager.SenderForPeer(p)
selectorSpec, err := rm.ipldBridge.DecodeNode(request.Selector()) extensionData, selector, err := rm.validateRequest(p, request)
for _, datum := range extensionData {
peerResponseSender.SendExtensionData(request.ID(), datum)
}
if err != nil {
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
return
}
rootLink := cidlink.Link{Cid: request.Root()}
wrappedLoader := loader.WrapLoader(rm.loader, request.ID(), peerResponseSender)
err = rm.ipldBridge.Traverse(ctx, wrappedLoader, rootLink, selector, noopVisitor)
if err != nil { if err != nil {
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown) peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
return return
} }
peerResponseSender.FinishRequest(request.ID())
}
func (rm *ResponseManager) validateRequest(p peer.ID, request graphsync.RequestData) ([]graphsync.ExtensionData, selector.Selector, error) {
selectorSpec, err := rm.ipldBridge.DecodeNode(request.Selector())
if err != nil {
return nil, nil, err
}
var isValidated bool var isValidated bool
var allExtensionData []graphsync.ExtensionData
for _, requestHook := range rm.requestHooks { for _, requestHook := range rm.requestHooks {
extensionData, err := requestHook.hook(p, request) extensionData, err := requestHook.hook(p, request)
for _, datum := range extensionData { allExtensionData = append(allExtensionData, extensionData...)
peerResponseSender.SendExtensionData(request.ID(), datum)
}
if err != nil { if err != nil {
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown) return allExtensionData, nil, err
return
} }
if requestHook.overrideDefaultValidation { if requestHook.overrideDefaultValidation {
isValidated = true isValidated = true
...@@ -220,24 +237,11 @@ func (rm *ResponseManager) executeQuery(ctx context.Context, ...@@ -220,24 +237,11 @@ func (rm *ResponseManager) executeQuery(ctx context.Context,
if !isValidated { if !isValidated {
err = selectorvalidator.ValidateSelector(rm.ipldBridge, selectorSpec, maxRecursionDepth) err = selectorvalidator.ValidateSelector(rm.ipldBridge, selectorSpec, maxRecursionDepth)
if err != nil { if err != nil {
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown) return allExtensionData, nil, err
return
} }
} }
rootLink := cidlink.Link{Cid: request.Root()}
selector, err := rm.ipldBridge.ParseSelector(selectorSpec) selector, err := rm.ipldBridge.ParseSelector(selectorSpec)
return allExtensionData, selector, err
if err != nil {
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
return
}
wrappedLoader := loader.WrapLoader(rm.loader, request.ID(), peerResponseSender)
err = rm.ipldBridge.Traverse(ctx, wrappedLoader, rootLink, selector, noopVisitor)
if err != nil {
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
return
}
peerResponseSender.FinishRequest(request.ID())
} }
// Startup starts processing for the WantManager. // Startup starts processing for the WantManager.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment