Commit fb8f4bb3 authored by hannahhoward's avatar hannahhoward

Revert "add extensions on complete (#76)"

This reverts commit 31cc0d5e.

Needs further exploration
parent 31cc0d5e
......@@ -236,12 +236,6 @@ type RequestUpdatedHookActions interface {
UnpauseResponse()
}
// ResponseCompletedHookActions are actions that can be taken in response completed hook to add a
// final extension on a response
type ResponseCompletedHookActions interface {
SendExtensionData(ExtensionData)
}
// OnIncomingRequestHook is a hook that runs each time a new request is received.
// It receives the peer that sent the request and all data about the request.
// It receives an interface for customizing the response to this request
......@@ -278,8 +272,8 @@ type OnOutgoingBlockHook func(p peer.ID, request RequestData, block BlockData, h
// It receives an interface to taking further action on the response
type OnRequestUpdatedHook func(p peer.ID, request RequestData, updateRequest RequestData, hookActions RequestUpdatedHookActions)
// OnResponseCompletedHook provides a way to listen for when responder has finished serving a response
type OnResponseCompletedHook func(p peer.ID, request RequestData, status ResponseStatusCode, hookActions ResponseCompletedHookActions)
// OnResponseCompletedListener provides a way to listen for when responder has finished serving a response
type OnResponseCompletedListener func(p peer.ID, request RequestData, status ResponseStatusCode)
// OnRequestorCancelledListener provides a way to listen for responses the requestor canncels
type OnRequestorCancelledListener func(p peer.ID, request RequestData)
......@@ -313,8 +307,8 @@ type GraphExchange interface {
// RegisterRequestUpdatedHook adds a hook that runs every time an update to a request is received
RegisterRequestUpdatedHook(hook OnRequestUpdatedHook) UnregisterHookFunc
// RegisterCompletedResponseHook adds a hook on the responder for completed responses
RegisterCompletedResponseHook(hook OnResponseCompletedHook) UnregisterHookFunc
// RegisterCompletedResponseListener adds a listener on the responder for completed responses
RegisterCompletedResponseListener(listener OnResponseCompletedListener) UnregisterHookFunc
// RegisterRequestorCancelledListener adds a listener on the responder for
// responses cancelled by the requestor
......
......@@ -41,7 +41,7 @@ type GraphSync struct {
incomingRequestHooks *responderhooks.IncomingRequestHooks
outgoingBlockHooks *responderhooks.OutgoingBlockHooks
requestUpdatedHooks *responderhooks.RequestUpdatedHooks
completedResponseHooks *responderhooks.CompletedResponseHooks
completedResponseListeners *responderhooks.CompletedResponseListeners
requestorCancelledListeners *responderhooks.RequestorCancelledListeners
incomingResponseHooks *requestorhooks.IncomingResponseHooks
outgoingRequestHooks *requestorhooks.OutgoingRequestHooks
......@@ -88,9 +88,9 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions)
outgoingBlockHooks := responderhooks.NewBlockHooks()
requestUpdatedHooks := responderhooks.NewUpdateHooks()
completedResponseHooks := responderhooks.NewCompletedResponseHooks()
completedResponseListeners := responderhooks.NewCompletedResponseListeners()
requestorCancelledListeners := responderhooks.NewRequestorCancelledListeners()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseHooks, requestorCancelledListeners)
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners)
unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
graphSync := &GraphSync{
network: network,
......@@ -103,7 +103,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
incomingRequestHooks: incomingRequestHooks,
outgoingBlockHooks: outgoingBlockHooks,
requestUpdatedHooks: requestUpdatedHooks,
completedResponseHooks: completedResponseHooks,
completedResponseListeners: completedResponseListeners,
requestorCancelledListeners: requestorCancelledListeners,
incomingResponseHooks: incomingResponseHooks,
outgoingRequestHooks: outgoingRequestHooks,
......@@ -170,9 +170,9 @@ func (gs *GraphSync) RegisterRequestUpdatedHook(hook graphsync.OnRequestUpdatedH
return gs.requestUpdatedHooks.Register(hook)
}
// RegisterCompletedResponseHook adds a hook on the responder for completed responses
func (gs *GraphSync) RegisterCompletedResponseHook(hook graphsync.OnResponseCompletedHook) graphsync.UnregisterHookFunc {
return gs.completedResponseHooks.Register(hook)
// RegisterCompletedResponseListener adds a listener on the responder for completed responses
func (gs *GraphSync) RegisterCompletedResponseListener(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc {
return gs.completedResponseListeners.Register(listener)
}
// RegisterIncomingBlockHook adds a hook that runs when a block is received and validated (put in block store)
......
......@@ -191,14 +191,14 @@ func TestGraphsyncRoundTrip(t *testing.T) {
// initialize graphsync on second node to response to requests
responder := td.GraphSyncHost2()
var receivedResponseData [][]byte
var receivedResponseData []byte
var receivedRequestData []byte
requestor.RegisterIncomingResponseHook(
func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) {
data, has := responseData.Extension(td.extensionName)
if has {
receivedResponseData = append(receivedResponseData, data)
receivedResponseData = data
}
})
......@@ -213,8 +213,7 @@ func TestGraphsyncRoundTrip(t *testing.T) {
})
finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseHook(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode, hookActions graphsync.ResponseCompletedHookActions) {
hookActions.SendExtensionData(td.extensionFinal)
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
select {
case finalResponseStatusChan <- status:
default:
......@@ -228,11 +227,9 @@ func TestGraphsyncRoundTrip(t *testing.T) {
// verify extension roundtrip
require.Equal(t, td.extensionData, receivedRequestData, "did not receive correct extension request data")
require.Len(t, receivedResponseData, 2)
require.Equal(t, td.extensionResponseData, receivedResponseData[0], "did not receive correct extension response data")
require.Equal(t, td.extensionFinalData, receivedResponseData[1], "did not receive correct extension response data")
require.Equal(t, td.extensionResponseData, receivedResponseData, "did not receive correct extension response data")
// verify completed hook
// verify listener
var finalResponseStatus graphsync.ResponseStatusCode
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus)
......@@ -259,7 +256,7 @@ func TestGraphsyncRoundTripPartial(t *testing.T) {
responder := td.GraphSyncHost2()
finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1)
responder.RegisterCompletedResponseHook(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode, hookActions graphsync.ResponseCompletedHookActions) {
responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
select {
case finalResponseStatusChan <- status:
default:
......@@ -281,7 +278,7 @@ func TestGraphsyncRoundTripPartial(t *testing.T) {
require.Equal(t, tree.MiddleMapBlock.RawData(), td.blockStore1[tree.MiddleMapNodeLnk])
require.Equal(t, tree.RootBlock.RawData(), td.blockStore1[tree.RootNodeLnk])
// verify completed hook
// verify listener
var finalResponseStatus graphsync.ResponseStatusCode
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedPartial, finalResponseStatus)
......@@ -823,8 +820,6 @@ type gsTestData struct {
extensionResponse graphsync.ExtensionData
extensionUpdateData []byte
extensionUpdate graphsync.ExtensionData
extensionFinalData []byte
extensionFinal graphsync.ExtensionData
}
func newGsTestData(ctx context.Context, t *testing.T) *gsTestData {
......@@ -862,11 +857,7 @@ func newGsTestData(ctx context.Context, t *testing.T) *gsTestData {
Name: td.extensionName,
Data: td.extensionUpdateData,
}
td.extensionFinalData = testutil.RandomBytes(100)
td.extensionFinal = graphsync.ExtensionData{
Name: td.extensionName,
Data: td.extensionFinalData,
}
return td
}
......
......@@ -385,60 +385,3 @@ func TestUpdateHookProcessing(t *testing.T) {
})
}
}
func TestCompleteHookProcessing(t *testing.T) {
extensionData := testutil.RandomBytes(100)
extensionName := graphsync.ExtensionName("AppleSauce/McGee")
extension := graphsync.ExtensionData{
Name: extensionName,
Data: extensionData,
}
extensionResponseData := testutil.RandomBytes(100)
extensionResponse := graphsync.ExtensionData{
Name: extensionName,
Data: extensionResponseData,
}
root := testutil.GenerateCids(1)[0]
requestID := graphsync.RequestID(rand.Int31())
ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any)
request := gsmsg.NewRequest(requestID, root, ssb.Matcher().Node(), graphsync.Priority(0), extension)
status := graphsync.RequestCompletedFull
p := testutil.GeneratePeers(1)[0]
testCases := map[string]struct {
configure func(t *testing.T, completedHooks *hooks.CompletedResponseHooks)
assert func(t *testing.T, result hooks.CompleteResult)
}{
"no hooks": {
assert: func(t *testing.T, result hooks.CompleteResult) {
require.Empty(t, result.Extensions)
},
},
"send extension data": {
configure: func(t *testing.T, completedHooks *hooks.CompletedResponseHooks) {
completedHooks.Register(func(p peer.ID, requestData graphsync.RequestData, status graphsync.ResponseStatusCode, hookActions graphsync.ResponseCompletedHookActions) {
_, found := requestData.Extension(extensionName)
if found {
hookActions.SendExtensionData(extensionResponse)
}
})
},
assert: func(t *testing.T, result hooks.CompleteResult) {
require.Len(t, result.Extensions, 1)
require.Contains(t, result.Extensions, extensionResponse)
},
},
}
for testCase, data := range testCases {
t.Run(testCase, func(t *testing.T) {
completedHooks := hooks.NewCompletedResponseHooks()
if data.configure != nil {
data.configure(t, completedHooks)
}
result := completedHooks.ProcessCompleteHooks(p, request, status)
if data.assert != nil {
data.assert(t, result)
}
})
}
}
......@@ -6,8 +6,8 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer"
)
// CompletedResponseHooks is a set of hooks for completed responses
type CompletedResponseHooks struct {
// CompletedResponseListeners is a set of listeners for completed responses
type CompletedResponseListeners struct {
pubSub *pubsub.PubSub
}
......@@ -15,50 +15,28 @@ type internalCompletedResponseEvent struct {
p peer.ID
request graphsync.RequestData
status graphsync.ResponseStatusCode
cha *completeHookActions
}
func completedResponseDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalCompletedResponseEvent)
hook := subscriberFn.(graphsync.OnResponseCompletedHook)
hook(ie.p, ie.request, ie.status, ie.cha)
listener := subscriberFn.(graphsync.OnResponseCompletedListener)
listener(ie.p, ie.request, ie.status)
return nil
}
// NewCompletedResponseHooks returns a new list of completed response hooks
func NewCompletedResponseHooks() *CompletedResponseHooks {
return &CompletedResponseHooks{pubSub: pubsub.New(completedResponseDispatcher)}
// NewCompletedResponseListeners returns a new list of completed response listeners
func NewCompletedResponseListeners() *CompletedResponseListeners {
return &CompletedResponseListeners{pubSub: pubsub.New(completedResponseDispatcher)}
}
// Register registers an hook for completed responses
func (crl *CompletedResponseHooks) Register(hook graphsync.OnResponseCompletedHook) graphsync.UnregisterHookFunc {
return graphsync.UnregisterHookFunc(crl.pubSub.Subscribe(hook))
}
// ProcessCompleteHooks runs notifies all completed hooks that a response has completed
func (crl *CompletedResponseHooks) ProcessCompleteHooks(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) CompleteResult {
ha := &completeHookActions{}
_ = crl.pubSub.Publish(internalCompletedResponseEvent{p, request, status, ha})
return ha.result()
}
// CompleteResult is the outcome of running complete response hooks
type CompleteResult struct {
Extensions []graphsync.ExtensionData
}
type completeHookActions struct {
extensions []graphsync.ExtensionData
}
func (ha *completeHookActions) result() CompleteResult {
return CompleteResult{
Extensions: ha.extensions,
}
// Register registers an listener for completed responses
func (crl *CompletedResponseListeners) Register(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc {
return graphsync.UnregisterHookFunc(crl.pubSub.Subscribe(listener))
}
func (ha *completeHookActions) SendExtensionData(ext graphsync.ExtensionData) {
ha.extensions = append(ha.extensions, ext)
// NotifyCompletedListeners runs notifies all completed listeners that a response has completed
func (crl *CompletedResponseListeners) NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) {
_ = crl.pubSub.Publish(internalCompletedResponseEvent{p, request, status})
}
// RequestorCancelledListeners is a set of listeners for when requestors cancel
......
......@@ -26,7 +26,7 @@ type queryExecutor struct {
requestHooks RequestHooks
blockHooks BlockHooks
updateHooks UpdateHooks
completedHooks CompletedHooks
completedListeners CompletedListeners
cancelledListeners CancelledListeners
peerManager PeerManager
loader ipld.Loader
......@@ -71,6 +71,13 @@ func (qe *queryExecutor) processQueriesWorker() {
continue
}
status, err := qe.executeTask(key, taskData)
_, isPaused := err.(hooks.ErrPaused)
isCancelled := err != nil && isContextErr(err)
if isCancelled {
qe.cancelledListeners.NotifyCancelledListeners(key.p, taskData.request)
} else if !isPaused {
qe.completedListeners.NotifyCompletedListeners(key.p, taskData.request, status)
}
select {
case qe.messages <- &finishTaskRequest{key, status, err}:
case <-qe.ctx.Done():
......@@ -200,41 +207,23 @@ func (qe *queryExecutor) executeQuery(
})
return err
})
var status graphsync.ResponseStatusCode
_ = peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
status = qe.closeRequest(transaction, err)
if err != nil {
_, isPaused := err.(hooks.ErrPaused)
if isPaused {
return graphsync.RequestPaused, err
}
if isContextErr(err) {
qe.cancelledListeners.NotifyCancelledListeners(p, request)
} else if status != graphsync.RequestPaused {
result := qe.completedHooks.ProcessCompleteHooks(p, request, status)
for _, extension := range result.Extensions {
transaction.SendExtensionData(extension)
}
peerResponseSender.FinishWithCancel(request.ID())
return graphsync.RequestCancelled, err
}
return nil
})
return status, err
}
func (qe *queryExecutor) closeRequest(peerResponseSender peerresponsemanager.PeerResponseTransactionSender, err error) graphsync.ResponseStatusCode {
_, isPaused := err.(hooks.ErrPaused)
if isPaused {
return graphsync.RequestPaused
}
if isContextErr(err) {
peerResponseSender.FinishWithCancel()
return graphsync.RequestCancelled
}
if err == errCancelledByCommand {
peerResponseSender.FinishWithError(graphsync.RequestCancelled)
return graphsync.RequestCancelled
}
if err != nil {
peerResponseSender.FinishWithError(graphsync.RequestFailedUnknown)
return graphsync.RequestFailedUnknown
if err == errCancelledByCommand {
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestCancelled)
return graphsync.RequestCancelled, err
}
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
return graphsync.RequestFailedUnknown, err
}
return peerResponseSender.FinishRequest()
return peerResponseSender.FinishRequest(request.ID()), nil
}
func (qe *queryExecutor) checkForUpdates(
......@@ -279,5 +268,5 @@ func (qe *queryExecutor) checkForUpdates(
func isContextErr(err error) bool {
// TODO: Match with errors.Is when https://github.com/ipld/go-ipld-prime/issues/58 is resolved
return err != nil && strings.Contains(err.Error(), ipldutil.ContextCancelError{}.Error())
return strings.Contains(err.Error(), ipldutil.ContextCancelError{}.Error())
}
......@@ -81,9 +81,9 @@ type UpdateHooks interface {
ProcessUpdateHooks(p peer.ID, request graphsync.RequestData, update graphsync.RequestData) hooks.UpdateResult
}
// CompletedHooks is an interface for processing complete hooks
type CompletedHooks interface {
ProcessCompleteHooks(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) hooks.CompleteResult
// CompletedListeners is an interface for notifying listeners that responses are complete
type CompletedListeners interface {
NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode)
}
// CancelledListeners is an interface for notifying listeners that requestor cancelled
......@@ -109,7 +109,7 @@ type ResponseManager struct {
queryQueue QueryQueue
updateHooks UpdateHooks
cancelledListeners CancelledListeners
completedHooks CompletedHooks
completedListeners CompletedListeners
messages chan responseManagerMessage
workSignal chan struct{}
qe *queryExecutor
......@@ -125,7 +125,7 @@ func New(ctx context.Context,
requestHooks RequestHooks,
blockHooks BlockHooks,
updateHooks UpdateHooks,
completedHooks CompletedHooks,
completedListeners CompletedListeners,
cancelledListeners CancelledListeners,
) *ResponseManager {
ctx, cancelFn := context.WithCancel(ctx)
......@@ -135,7 +135,7 @@ func New(ctx context.Context,
requestHooks: requestHooks,
blockHooks: blockHooks,
updateHooks: updateHooks,
completedHooks: completedHooks,
completedListeners: completedListeners,
cancelledListeners: cancelledListeners,
peerManager: peerManager,
loader: loader,
......@@ -151,7 +151,7 @@ func New(ctx context.Context,
peerManager: peerManager,
queryQueue: queryQueue,
updateHooks: updateHooks,
completedHooks: completedHooks,
completedListeners: completedListeners,
cancelledListeners: cancelledListeners,
messages: messages,
workSignal: workSignal,
......@@ -368,19 +368,13 @@ func (rm *ResponseManager) cancelRequest(p peer.ID, requestID graphsync.RequestI
if response.isPaused {
peerResponseSender := rm.peerManager.SenderForPeer(key.p)
_ = peerResponseSender.Transaction(requestID, func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
if selfCancel {
result := rm.completedHooks.ProcessCompleteHooks(p, response.request, graphsync.RequestCancelled)
for _, extension := range result.Extensions {
transaction.SendExtensionData(extension)
}
transaction.FinishWithError(graphsync.RequestCancelled)
} else {
rm.cancelledListeners.NotifyCancelledListeners(p, response.request)
transaction.FinishWithCancel()
}
return nil
})
if selfCancel {
rm.completedListeners.NotifyCompletedListeners(p, response.request, graphsync.RequestCancelled)
peerResponseSender.FinishWithError(requestID, graphsync.RequestCancelled)
} else {
rm.cancelledListeners.NotifyCancelledListeners(p, response.request)
peerResponseSender.FinishWithCancel(requestID)
}
delete(rm.inProgressResponses, key)
response.cancelFn()
return nil
......
......@@ -210,7 +210,7 @@ func TestIncomingQuery(t *testing.T) {
defer td.cancel()
blks := td.blockChain.AllBlocks()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
td.requestHooks.Register(selectorvalidator.SelectorValidator(100))
responseManager.Startup()
......@@ -231,7 +231,7 @@ func TestCancellationQueryInProgress(t *testing.T) {
td := newTestData(t)
defer td.cancel()
blks := td.blockChain.AllBlocks()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
td.requestHooks.Register(selectorvalidator.SelectorValidator(100))
cancelledListenerCalled := make(chan struct{}, 1)
td.cancelledListeners.Register(func(p peer.ID, request graphsync.RequestData) {
......@@ -284,7 +284,7 @@ func TestCancellationViaCommand(t *testing.T) {
td := newTestData(t)
defer td.cancel()
blks := td.blockChain.AllBlocks()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
td.requestHooks.Register(selectorvalidator.SelectorValidator(100))
responseManager.Startup()
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
......@@ -328,7 +328,7 @@ func TestEarlyCancellation(t *testing.T) {
td := newTestData(t)
defer td.cancel()
td.queryQueue.popWait.Add(1)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
......@@ -352,7 +352,7 @@ func TestValidationAndExtensions(t *testing.T) {
t.Run("on its own, should fail validation", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
var lastRequest completedRequest
......@@ -363,7 +363,7 @@ func TestValidationAndExtensions(t *testing.T) {
t.Run("if non validating hook succeeds, does not pass validation", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.SendExtensionData(td.extensionResponse)
......@@ -380,7 +380,7 @@ func TestValidationAndExtensions(t *testing.T) {
t.Run("if validating hook succeeds, should pass validation", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
......@@ -398,7 +398,7 @@ func TestValidationAndExtensions(t *testing.T) {
t.Run("if any hook fails, should fail", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
......@@ -419,7 +419,7 @@ func TestValidationAndExtensions(t *testing.T) {
t.Run("hooks can be unregistered", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
unregister := td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
......@@ -449,7 +449,7 @@ func TestValidationAndExtensions(t *testing.T) {
defer td.cancel()
obs := make(map[ipld.Link][]byte)
oloader, _ := testutil.NewTestStore(obs)
responseManager := New(td.ctx, oloader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, oloader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
// add validating hook -- so the request SHOULD succeed
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
......@@ -483,7 +483,7 @@ func TestValidationAndExtensions(t *testing.T) {
t.Run("hooks can alter the node builder chooser", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
customChooserCallCount := 0
......@@ -525,7 +525,7 @@ func TestValidationAndExtensions(t *testing.T) {
t.Run("do-not-send-cids extension", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
......@@ -558,7 +558,7 @@ func TestValidationAndExtensions(t *testing.T) {
t.Run("test pause/resume", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
......@@ -580,7 +580,7 @@ func TestValidationAndExtensions(t *testing.T) {
t.Run("can send extension data", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
......@@ -602,7 +602,7 @@ func TestValidationAndExtensions(t *testing.T) {
t.Run("can send errors", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
......@@ -619,7 +619,7 @@ func TestValidationAndExtensions(t *testing.T) {
t.Run("can pause/unpause", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
......@@ -654,7 +654,7 @@ func TestValidationAndExtensions(t *testing.T) {
t.Run("can pause/unpause externally", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
......@@ -693,7 +693,7 @@ func TestValidationAndExtensions(t *testing.T) {
t.Run("can pause/unpause", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
......@@ -732,7 +732,7 @@ func TestValidationAndExtensions(t *testing.T) {
t.Run("when unpaused", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
......@@ -769,7 +769,7 @@ func TestValidationAndExtensions(t *testing.T) {
t.Run("when paused", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
......@@ -814,7 +814,7 @@ func TestValidationAndExtensions(t *testing.T) {
t.Run("when unpaused", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
......@@ -848,7 +848,7 @@ func TestValidationAndExtensions(t *testing.T) {
t.Run("when paused", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
......@@ -891,26 +891,22 @@ func TestValidationAndExtensions(t *testing.T) {
})
})
t.Run("final response status hook", func(t *testing.T) {
t.Run("final response status listeners", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners)
responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners)
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
})
statusChan := make(chan graphsync.ResponseStatusCode, 1)
td.completedHooks.Register(func(p peer.ID, requestData graphsync.RequestData, status graphsync.ResponseStatusCode, hookActions graphsync.ResponseCompletedHookActions) {
hookActions.SendExtensionData(td.extensionResponse)
td.completedListeners.Register(func(p peer.ID, requestData graphsync.RequestData, status graphsync.ResponseStatusCode) {
select {
case statusChan <- status:
default:
}
})
responseManager.ProcessRequests(td.ctx, td.p, td.requests)
var receivedExtension sentExtension
testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response")
require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent")
var lastRequest completedRequest
testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request")
require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed")
......@@ -951,7 +947,7 @@ type testData struct {
requestHooks *hooks.IncomingRequestHooks
blockHooks *hooks.OutgoingBlockHooks
updateHooks *hooks.RequestUpdatedHooks
completedHooks *hooks.CompletedResponseHooks
completedListeners *hooks.CompletedResponseListeners
cancelledListeners *hooks.RequestorCancelledListeners
}
......@@ -1010,7 +1006,7 @@ func newTestData(t *testing.T) testData {
td.requestHooks = hooks.NewRequestHooks(td.peristenceOptions)
td.blockHooks = hooks.NewBlockHooks()
td.updateHooks = hooks.NewUpdateHooks()
td.completedHooks = hooks.NewCompletedResponseHooks()
td.completedListeners = hooks.NewCompletedResponseListeners()
td.cancelledListeners = hooks.NewRequestorCancelledListeners()
return td
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment