Unverified Commit 072cee0d authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

feat(responsemanager): allow configuration of max requests (#122)

provide a configuration option to change the maximum number of requests processed at once by the
responder
parent 785e0b3b
......@@ -29,6 +29,7 @@ var log = logging.Logger("graphsync")
const maxRecursionDepth = 100
const defaultTotalMaxMemory = uint64(256 << 20)
const defaultMaxMemoryPerPeer = uint64(16 << 20)
const defaultMaxInProgressRequests = uint64(6)
// GraphSync is an instance of a GraphSync exchange that implements
// the graphsync protocol.
......@@ -59,6 +60,7 @@ type GraphSync struct {
allocator *allocator.Allocator
totalMaxMemory uint64
maxMemoryPerPeer uint64
maxInProgressRequests uint64
}
// Option defines the functional option type that can be used to configure
......@@ -73,18 +75,30 @@ func RejectAllRequestsByDefault() Option {
}
}
// MaxMemoryResponder defines the maximum amount of memory the responder
// may consume queueing up messages for a response in total
func MaxMemoryResponder(totalMaxMemory uint64) Option {
return func(gs *GraphSync) {
gs.totalMaxMemory = totalMaxMemory
}
}
// MaxMemoryPerPeerResponder defines the maximum amount of memory a peer
// may consume queueing up messages for a response
func MaxMemoryPerPeerResponder(maxMemoryPerPeer uint64) Option {
return func(gs *GraphSync) {
gs.maxMemoryPerPeer = maxMemoryPerPeer
}
}
// MaxInProgressRequests changes the maximum number of
// graphsync requests that are processed in parallel (default 6)
func MaxInProgressRequests(maxInProgressRequests uint64) Option {
return func(gs *GraphSync) {
gs.maxInProgressRequests = maxInProgressRequests
}
}
// New creates a new GraphSync Exchange on the given network,
// and the given link loader+storer.
func New(parent context.Context, network gsnet.GraphSyncNetwork,
......@@ -132,6 +146,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
peerTaskQueue: peerTaskQueue,
totalMaxMemory: defaultTotalMaxMemory,
maxMemoryPerPeer: defaultMaxMemoryPerPeer,
maxInProgressRequests: defaultMaxInProgressRequests,
ctx: ctx,
cancel: cancel,
unregisterDefaultValidator: unregisterDefaultValidator,
......@@ -147,7 +162,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
}
peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue)
graphSync.peerResponseManager = peerResponseManager
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners)
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, graphSync.maxInProgressRequests)
graphSync.responseManager = responseManager
asyncLoader.Startup()
......
......@@ -22,7 +22,6 @@ import (
var log = logging.Logger("graphsync")
const (
maxInProcessRequests = 6
thawSpeed = time.Millisecond * 100
)
......@@ -129,6 +128,7 @@ type ResponseManager struct {
workSignal chan struct{}
qe *queryExecutor
inProgressResponses map[responseKey]*inProgressResponseStatus
maxInProcessRequests uint64
}
// New creates a new response manager from the given context, loader,
......@@ -144,6 +144,7 @@ func New(ctx context.Context,
cancelledListeners CancelledListeners,
blockSentListeners BlockSentListeners,
networkErrorListeners NetworkErrorListeners,
maxInProcessRequests uint64,
) *ResponseManager {
ctx, cancelFn := context.WithCancel(ctx)
messages := make(chan responseManagerMessage, 16)
......@@ -175,6 +176,7 @@ func New(ctx context.Context,
workSignal: workSignal,
qe: qe,
inProgressResponses: make(map[responseKey]*inProgressResponseStatus),
maxInProcessRequests: maxInProcessRequests,
}
}
......@@ -294,7 +296,7 @@ func (rm *ResponseManager) cleanupInProcessResponses() {
func (rm *ResponseManager) run() {
defer rm.cleanupInProcessResponses()
for i := 0; i < maxInProcessRequests; i++ {
for i := uint64(0); i < rm.maxInProcessRequests; i++ {
go rm.qe.processQueriesWorker()
}
......
......@@ -986,13 +986,13 @@ func newTestData(t *testing.T) testData {
}
func (td *testData) newResponseManager() *ResponseManager {
return New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners, td.blockSentListeners, td.networkErrorListeners)
return New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners, td.blockSentListeners, td.networkErrorListeners, 6)
}
func (td *testData) alternateLoaderResponseManager() *ResponseManager {
obs := make(map[ipld.Link][]byte)
oloader, _ := testutil.NewTestStore(obs)
return New(td.ctx, oloader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners, td.blockSentListeners, td.networkErrorListeners)
return New(td.ctx, oloader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners, td.blockSentListeners, td.networkErrorListeners, 6)
}
func (td *testData) assertPausedRequest() {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment