requestmanager.go 19.3 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5
	"errors"
6
	"fmt"
Hannah Howard's avatar
Hannah Howard committed
7
	"sync/atomic"
8

9 10 11
	"github.com/ipfs/go-graphsync/listeners"
	"github.com/ipfs/go-graphsync/messagequeue"

Hannah Howard's avatar
Hannah Howard committed
12
	blocks "github.com/ipfs/go-block-format"
13
	"github.com/ipfs/go-cid"
Hannah Howard's avatar
Hannah Howard committed
14 15 16 17
	logging "github.com/ipfs/go-log"
	"github.com/ipld/go-ipld-prime"
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
	"github.com/libp2p/go-libp2p-core/peer"
Hannah Howard's avatar
Hannah Howard committed
18

19
	"github.com/ipfs/go-graphsync"
Hannah Howard's avatar
Hannah Howard committed
20
	"github.com/ipfs/go-graphsync/cidset"
Hannah Howard's avatar
Hannah Howard committed
21
	"github.com/ipfs/go-graphsync/dedupkey"
22
	ipldutil "github.com/ipfs/go-graphsync/ipldutil"
23
	gsmsg "github.com/ipfs/go-graphsync/message"
24
	"github.com/ipfs/go-graphsync/metadata"
25
	"github.com/ipfs/go-graphsync/notifications"
Hannah Howard's avatar
Hannah Howard committed
26 27
	"github.com/ipfs/go-graphsync/requestmanager/executor"
	"github.com/ipfs/go-graphsync/requestmanager/hooks"
28
	"github.com/ipfs/go-graphsync/requestmanager/types"
29 30
)

31
var log = logging.Logger("graphsync")
32 33

const (
34 35
	// defaultPriority is the default priority for requests sent by graphsync
	defaultPriority = graphsync.Priority(0)
36 37 38
)

type inProgressRequestStatus struct {
39 40 41 42 43 44 45 46
	ctx            context.Context
	cancelFn       func()
	p              peer.ID
	networkError   chan error
	resumeMessages chan []graphsync.ExtensionData
	pauseMessages  chan struct{}
	paused         bool
	lastResponse   atomic.Value
47 48 49 50
}

// PeerHandler is an interface that can send requests to peers
type PeerHandler interface {
51
	SendRequest(p peer.ID, graphSyncRequest gsmsg.GraphSyncRequest, notifees ...notifications.Notifee)
52 53
}

54 55 56
// AsyncLoader is an interface for loading links asynchronously, returning
// results as new responses are processed
type AsyncLoader interface {
57
	StartRequest(graphsync.RequestID, string) error
58
	ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
59
		blks []blocks.Block)
60 61 62
	AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult
	CompleteResponsesFor(requestID graphsync.RequestID)
	CleanupRequest(requestID graphsync.RequestID)
63 64
}

65 66 67 68 69 70 71 72
// RequestManager tracks outgoing requests and processes incoming reponses
// to them.
type RequestManager struct {
	ctx         context.Context
	cancel      func()
	messages    chan requestManagerMessage
	peerHandler PeerHandler
	rc          *responseCollector
73
	asyncLoader AsyncLoader
74
	// dont touch out side of run loop
75 76
	nextRequestID             graphsync.RequestID
	inProgressRequestStatuses map[graphsync.RequestID]*inProgressRequestStatus
Hannah Howard's avatar
Hannah Howard committed
77 78 79
	requestHooks              RequestHooks
	responseHooks             ResponseHooks
	blockHooks                BlockHooks
80
	networkErrorListeners     *listeners.NetworkErrorListeners
81 82 83 84 85 86
}

type requestManagerMessage interface {
	handle(rm *RequestManager)
}

Hannah Howard's avatar
Hannah Howard committed
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
// RequestHooks run for new requests
type RequestHooks interface {
	ProcessRequestHooks(p peer.ID, request graphsync.RequestData) hooks.RequestResult
}

// ResponseHooks run for new responses
type ResponseHooks interface {
	ProcessResponseHooks(p peer.ID, response graphsync.ResponseData) hooks.UpdateResult
}

// BlockHooks run for each block loaded
type BlockHooks interface {
	ProcessBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) hooks.UpdateResult
}

102
// New generates a new request manager from a context, network, and selectorQuerier
Hannah Howard's avatar
Hannah Howard committed
103 104
func New(ctx context.Context,
	asyncLoader AsyncLoader,
Hannah Howard's avatar
Hannah Howard committed
105 106
	requestHooks RequestHooks,
	responseHooks ResponseHooks,
107 108 109
	blockHooks BlockHooks,
	networkErrorListeners *listeners.NetworkErrorListeners,
) *RequestManager {
110 111 112 113
	ctx, cancel := context.WithCancel(ctx)
	return &RequestManager{
		ctx:                       ctx,
		cancel:                    cancel,
114
		asyncLoader:               asyncLoader,
115 116
		rc:                        newResponseCollector(ctx),
		messages:                  make(chan requestManagerMessage, 16),
117
		inProgressRequestStatuses: make(map[graphsync.RequestID]*inProgressRequestStatus),
Hannah Howard's avatar
Hannah Howard committed
118 119
		requestHooks:              requestHooks,
		responseHooks:             responseHooks,
Hannah Howard's avatar
Hannah Howard committed
120
		blockHooks:                blockHooks,
121
		networkErrorListeners:     networkErrorListeners,
122 123 124 125 126 127 128 129 130
	}
}

// SetDelegate specifies who will send messages out to the internet.
func (rm *RequestManager) SetDelegate(peerHandler PeerHandler) {
	rm.peerHandler = peerHandler
}

type inProgressRequest struct {
131 132
	requestID     graphsync.RequestID
	incoming      chan graphsync.ResponseProgress
133
	incomingError chan error
134 135 136 137
}

type newRequestMessage struct {
	p                     peer.ID
138
	root                  ipld.Link
139
	selector              ipld.Node
140
	extensions            []graphsync.ExtensionData
141 142 143 144
	inProgressRequestChan chan<- inProgressRequest
}

// SendRequest initiates a new GraphSync request to the given peer.
145 146
func (rm *RequestManager) SendRequest(ctx context.Context,
	p peer.ID,
147
	root ipld.Link,
148 149
	selector ipld.Node,
	extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
150
	if _, err := ipldutil.ParseSelector(selector); err != nil {
151
		return rm.singleErrorResponse(fmt.Errorf("Invalid Selector Spec"))
152 153 154 155 156
	}

	inProgressRequestChan := make(chan inProgressRequest)

	select {
157
	case rm.messages <- &newRequestMessage{p, root, selector, extensions, inProgressRequestChan}:
158
	case <-rm.ctx.Done():
159
		return rm.emptyResponse()
160
	case <-ctx.Done():
161
		return rm.emptyResponse()
162 163 164 165
	}
	var receivedInProgressRequest inProgressRequest
	select {
	case <-rm.ctx.Done():
166
		return rm.emptyResponse()
167 168 169
	case receivedInProgressRequest = <-inProgressRequestChan:
	}

170 171 172 173 174 175 176 177 178 179
	return rm.rc.collectResponses(ctx,
		receivedInProgressRequest.incoming,
		receivedInProgressRequest.incomingError,
		func() {
			rm.cancelRequest(receivedInProgressRequest.requestID,
				receivedInProgressRequest.incoming,
				receivedInProgressRequest.incomingError)
		})
}

180 181
func (rm *RequestManager) emptyResponse() (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
182
	close(ch)
183
	errCh := make(chan error)
184 185 186 187
	close(errCh)
	return ch, errCh
}

188 189
func (rm *RequestManager) singleErrorResponse(err error) (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
190
	close(ch)
191
	errCh := make(chan error, 1)
192
	errCh <- err
193 194
	close(errCh)
	return ch, errCh
195 196 197
}

type cancelRequestMessage struct {
198
	requestID graphsync.RequestID
199
	isPause   bool
200 201
}

202 203
func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID,
	incomingResponses chan graphsync.ResponseProgress,
204
	incomingErrors chan error) {
205
	cancelMessageChannel := rm.messages
206
	for cancelMessageChannel != nil || incomingResponses != nil || incomingErrors != nil {
207
		select {
208
		case cancelMessageChannel <- &cancelRequestMessage{requestID, false}:
209 210 211 212 213
			cancelMessageChannel = nil
		// clear out any remaining responses, in case and "incoming reponse"
		// messages get processed before our cancel message
		case _, ok := <-incomingResponses:
			if !ok {
214 215 216 217 218
				incomingResponses = nil
			}
		case _, ok := <-incomingErrors:
			if !ok {
				incomingErrors = nil
219 220 221 222 223 224 225 226
			}
		case <-rm.ctx.Done():
			return
		}
	}
}

type processResponseMessage struct {
227
	p         peer.ID
228 229
	responses []gsmsg.GraphSyncResponse
	blks      []blocks.Block
230 231 232 233
}

// ProcessResponses ingests the given responses from the network and
// and updates the in progress requests based on those responses.
234
func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse,
235
	blks []blocks.Block) {
236
	select {
237
	case rm.messages <- &processResponseMessage{p, responses, blks}:
238 239 240 241
	case <-rm.ctx.Done():
	}
}

242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
type unpauseRequestMessage struct {
	id         graphsync.RequestID
	extensions []graphsync.ExtensionData
	response   chan error
}

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
func (rm *RequestManager) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
	response := make(chan error, 1)
	return rm.sendSyncMessage(&unpauseRequestMessage{requestID, extensions, response}, response)
}

type pauseRequestMessage struct {
	id       graphsync.RequestID
	response chan error
}

// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
func (rm *RequestManager) PauseRequest(requestID graphsync.RequestID) error {
	response := make(chan error, 1)
	return rm.sendSyncMessage(&pauseRequestMessage{requestID, response}, response)
}

func (rm *RequestManager) sendSyncMessage(message requestManagerMessage, response chan error) error {
	select {
	case <-rm.ctx.Done():
		return errors.New("Context Cancelled")
	case rm.messages <- message:
	}
	select {
	case <-rm.ctx.Done():
		return errors.New("Context Cancelled")
	case err := <-response:
		return err
	}
}

280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
// Startup starts processing for the WantManager.
func (rm *RequestManager) Startup() {
	go rm.run()
}

// Shutdown ends processing for the want manager.
func (rm *RequestManager) Shutdown() {
	rm.cancel()
}

func (rm *RequestManager) run() {
	// NOTE: Do not open any streams or connections from anywhere in this
	// event loop. Really, just don't do anything likely to block.
	defer rm.cleanupInProcessRequests()

	for {
		select {
		case message := <-rm.messages:
			message.handle(rm)
		case <-rm.ctx.Done():
			return
		}
	}
}

func (rm *RequestManager) cleanupInProcessRequests() {
	for _, requestStatus := range rm.inProgressRequestStatuses {
307
		requestStatus.cancelFn()
308 309 310
	}
}

311
type terminateRequestMessage struct {
312
	requestID graphsync.RequestID
313
}
314

315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
func (nrm *newRequestMessage) setupRequest(requestID graphsync.RequestID, rm *RequestManager) (chan graphsync.ResponseProgress, chan error) {
	request, hooksResult, err := rm.validateRequest(requestID, nrm.p, nrm.root, nrm.selector, nrm.extensions)
	if err != nil {
		return rm.singleErrorResponse(err)
	}
	doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs)
	var doNotSendCids *cid.Set
	if has {
		doNotSendCids, err = cidset.DecodeCidSet(doNotSendCidsData)
		if err != nil {
			return rm.singleErrorResponse(err)
		}
	} else {
		doNotSendCids = cid.NewSet()
	}
	ctx, cancel := context.WithCancel(rm.ctx)
	p := nrm.p
	resumeMessages := make(chan []graphsync.ExtensionData, 1)
	pauseMessages := make(chan struct{}, 1)
	networkError := make(chan error, 1)
	requestStatus := &inProgressRequestStatus{
		ctx: ctx, cancelFn: cancel, p: p, resumeMessages: resumeMessages, pauseMessages: pauseMessages, networkError: networkError,
	}
	lastResponse := &requestStatus.lastResponse
	lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged))
	rm.inProgressRequestStatuses[request.ID()] = requestStatus
	incoming, incomingError := executor.ExecutionEnv{
		Ctx:              rm.ctx,
343
		SendRequest:      rm.sendRequest,
344 345 346 347 348
		TerminateRequest: rm.terminateRequest,
		RunBlockHooks:    rm.processBlockHooks,
		Loader:           rm.asyncLoader.AsyncLoad,
	}.Start(
		executor.RequestExecution{
hannahhoward's avatar
hannahhoward committed
349 350 351 352 353 354
			Ctx:                  ctx,
			P:                    p,
			Request:              request,
			NetworkError:         networkError,
			LastResponse:         lastResponse,
			DoNotSendCids:        doNotSendCids,
Eric Myhre's avatar
Eric Myhre committed
355
			NodePrototypeChooser: hooksResult.CustomChooser,
hannahhoward's avatar
hannahhoward committed
356 357
			ResumeMessages:       resumeMessages,
			PauseMessages:        pauseMessages,
358 359 360 361
		})
	return incoming, incomingError
}

362
func (nrm *newRequestMessage) handle(rm *RequestManager) {
Hannah Howard's avatar
Hannah Howard committed
363 364
	var ipr inProgressRequest
	ipr.requestID = rm.nextRequestID
365
	rm.nextRequestID++
366
	ipr.incoming, ipr.incomingError = nrm.setupRequest(ipr.requestID, rm)
367 368

	select {
Hannah Howard's avatar
Hannah Howard committed
369
	case nrm.inProgressRequestChan <- ipr:
370 371 372 373
	case <-rm.ctx.Done():
	}
}

374 375 376 377 378
func (trm *terminateRequestMessage) handle(rm *RequestManager) {
	delete(rm.inProgressRequestStatuses, trm.requestID)
	rm.asyncLoader.CleanupRequest(trm.requestID)
}

379 380 381 382 383 384
func (crm *cancelRequestMessage) handle(rm *RequestManager) {
	inProgressRequestStatus, ok := rm.inProgressRequestStatuses[crm.requestID]
	if !ok {
		return
	}

385
	rm.sendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID))
386 387 388 389 390
	if crm.isPause {
		inProgressRequestStatus.paused = true
	} else {
		inProgressRequestStatus.cancelFn()
	}
391 392 393
}

func (prm *processResponseMessage) handle(rm *RequestManager) {
394 395
	filteredResponses := rm.processExtensions(prm.responses, prm.p)
	filteredResponses = rm.filterResponsesForPeer(filteredResponses, prm.p)
Hannah Howard's avatar
Hannah Howard committed
396
	rm.updateLastResponses(filteredResponses)
397
	responseMetadata := metadataForResponses(filteredResponses)
398 399 400 401 402 403 404 405 406 407
	rm.asyncLoader.ProcessResponse(responseMetadata, prm.blks)
	rm.processTerminations(filteredResponses)
}

func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	responsesForPeer := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
		if !ok || requestStatus.p != p {
			continue
408
		}
409
		responsesForPeer = append(responsesForPeer, response)
410
	}
411 412
	return responsesForPeer
}
413

414 415 416 417 418 419 420 421 422 423 424
func (rm *RequestManager) processExtensions(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	remainingResponses := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		success := rm.processExtensionsForResponse(p, response)
		if success {
			remainingResponses = append(remainingResponses, response)
		}
	}
	return remainingResponses
}

Hannah Howard's avatar
Hannah Howard committed
425 426 427 428 429 430
func (rm *RequestManager) updateLastResponses(responses []gsmsg.GraphSyncResponse) {
	for _, response := range responses {
		rm.inProgressRequestStatuses[response.RequestID()].lastResponse.Store(response)
	}
}

431
func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg.GraphSyncResponse) bool {
Hannah Howard's avatar
Hannah Howard committed
432 433 434
	result := rm.responseHooks.ProcessResponseHooks(p, response)
	if len(result.Extensions) > 0 {
		updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...)
435
		rm.sendRequest(p, updateRequest)
Hannah Howard's avatar
Hannah Howard committed
436 437
	}
	if result.Err != nil {
438 439 440 441
		requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
		if !ok {
			return false
		}
Hannah Howard's avatar
Hannah Howard committed
442 443 444 445
		responseError := rm.generateResponseErrorFromStatus(graphsync.RequestFailedUnknown)
		select {
		case requestStatus.networkError <- responseError:
		case <-requestStatus.ctx.Done():
446
		}
447
		rm.sendRequest(p, gsmsg.CancelRequest(response.RequestID()))
Hannah Howard's avatar
Hannah Howard committed
448 449
		requestStatus.cancelFn()
		return false
450 451 452 453
	}
	return true
}

454 455
func (rm *RequestManager) processTerminations(responses []gsmsg.GraphSyncResponse) {
	for _, response := range responses {
456
		if gsmsg.IsTerminalResponseCode(response.Status()) {
457 458 459 460 461 462
			if gsmsg.IsTerminalFailureCode(response.Status()) {
				requestStatus := rm.inProgressRequestStatuses[response.RequestID()]
				responseError := rm.generateResponseErrorFromStatus(response.Status())
				select {
				case requestStatus.networkError <- responseError:
				case <-requestStatus.ctx.Done():
463
				}
464
				requestStatus.cancelFn()
465
			}
466
			rm.asyncLoader.CompleteResponsesFor(response.RequestID())
467 468 469
		}
	}
}
470

471
func (rm *RequestManager) generateResponseErrorFromStatus(status graphsync.ResponseStatusCode) error {
472
	switch status {
473
	case graphsync.RequestFailedBusy:
474
		return graphsync.RequestFailedBusyErr{}
475
	case graphsync.RequestFailedContentNotFound:
476
		return graphsync.RequestFailedContentNotFoundErr{}
477
	case graphsync.RequestFailedLegal:
478
		return graphsync.RequestFailedLegalErr{}
479
	case graphsync.RequestFailedUnknown:
480 481 482
		return graphsync.RequestFailedUnknownErr{}
	case graphsync.RequestCancelled:
		return graphsync.RequestCancelledErr{}
483
	default:
484
		return fmt.Errorf("Unknown")
485 486
	}
}
487

Hannah Howard's avatar
Hannah Howard committed
488 489 490 491
func (rm *RequestManager) processBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) error {
	result := rm.blockHooks.ProcessBlockHooks(p, response, block)
	if len(result.Extensions) > 0 {
		updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...)
492
		rm.sendRequest(p, updateRequest)
493
	}
Hannah Howard's avatar
Hannah Howard committed
494
	if result.Err != nil {
495
		_, isPause := result.Err.(hooks.ErrPaused)
Hannah Howard's avatar
Hannah Howard committed
496 497
		select {
		case <-rm.ctx.Done():
498
		case rm.messages <- &cancelRequestMessage{response.RequestID(), isPause}:
Hannah Howard's avatar
Hannah Howard committed
499
		}
500
	}
Hannah Howard's avatar
Hannah Howard committed
501 502 503
	return result.Err
}

504 505 506 507 508 509 510
func (rm *RequestManager) terminateRequest(requestID graphsync.RequestID) {
	select {
	case <-rm.ctx.Done():
	case rm.messages <- &terminateRequestMessage{requestID}:
	}
}

511
func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer.ID, root ipld.Link, selectorSpec ipld.Node, extensions []graphsync.ExtensionData) (gsmsg.GraphSyncRequest, hooks.RequestResult, error) {
Hannah Howard's avatar
Hannah Howard committed
512 513
	_, err := ipldutil.EncodeNode(selectorSpec)
	if err != nil {
514
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
Hannah Howard's avatar
Hannah Howard committed
515
	}
516
	_, err = ipldutil.ParseSelector(selectorSpec)
Hannah Howard's avatar
Hannah Howard committed
517
	if err != nil {
518
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
Hannah Howard's avatar
Hannah Howard committed
519 520 521
	}
	asCidLink, ok := root.(cidlink.Link)
	if !ok {
522
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, fmt.Errorf("request failed: link has no cid")
Hannah Howard's avatar
Hannah Howard committed
523 524 525
	}
	request := gsmsg.NewRequest(requestID, asCidLink.Cid, selectorSpec, defaultPriority, extensions...)
	hooksResult := rm.requestHooks.ProcessRequestHooks(p, request)
Hannah Howard's avatar
Hannah Howard committed
526 527 528 529 530 531 532 533 534 535 536 537
	if hooksResult.PersistenceOption != "" {
		dedupData, err := dedupkey.EncodeDedupKey(hooksResult.PersistenceOption)
		if err != nil {
			return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
		}
		request = request.ReplaceExtensions([]graphsync.ExtensionData{
			{
				Name: graphsync.ExtensionDeDupByKey,
				Data: dedupData,
			},
		})
	}
Hannah Howard's avatar
Hannah Howard committed
538 539
	err = rm.asyncLoader.StartRequest(requestID, hooksResult.PersistenceOption)
	if err != nil {
540
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
Hannah Howard's avatar
Hannah Howard committed
541
	}
542
	return request, hooksResult, nil
Hannah Howard's avatar
Hannah Howard committed
543
}
544

545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572
type reqSubscriber struct {
	p                     peer.ID
	request               gsmsg.GraphSyncRequest
	networkErrorListeners *listeners.NetworkErrorListeners
}

func (r *reqSubscriber) OnNext(topic notifications.Topic, event notifications.Event) {
	mqEvt, isMQEvt := event.(messagequeue.Event)
	if !isMQEvt || mqEvt.Name != messagequeue.Error {
		return
	}

	r.networkErrorListeners.NotifyNetworkErrorListeners(r.p, r.request, mqEvt.Err)
	//r.re.networkError <- mqEvt.Err
	//r.re.terminateRequest()
}

func (r reqSubscriber) OnClose(topic notifications.Topic) {
}

const requestNetworkError = "request_network_error"

func (rm *RequestManager) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest) {
	sub := notifications.NewTopicDataSubscriber(&reqSubscriber{p, request, rm.networkErrorListeners})
	failNotifee := notifications.Notifee{Data: requestNetworkError, Subscriber: sub}
	rm.peerHandler.SendRequest(p, request, failNotifee)
}

573 574 575 576 577 578 579 580 581 582 583
func (urm *unpauseRequestMessage) unpause(rm *RequestManager) error {
	inProgressRequestStatus, ok := rm.inProgressRequestStatuses[urm.id]
	if !ok {
		return errors.New("request not found")
	}
	if !inProgressRequestStatus.paused {
		return errors.New("request is not paused")
	}
	inProgressRequestStatus.paused = false
	select {
	case <-inProgressRequestStatus.pauseMessages:
584
		rm.sendRequest(inProgressRequestStatus.p, gsmsg.UpdateRequest(urm.id, urm.extensions...))
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621
		return nil
	case <-rm.ctx.Done():
		return errors.New("context cancelled")
	case inProgressRequestStatus.resumeMessages <- urm.extensions:
		return nil
	}
}
func (urm *unpauseRequestMessage) handle(rm *RequestManager) {
	err := urm.unpause(rm)
	select {
	case <-rm.ctx.Done():
	case urm.response <- err:
	}
}
func (prm *pauseRequestMessage) pause(rm *RequestManager) error {
	inProgressRequestStatus, ok := rm.inProgressRequestStatuses[prm.id]
	if !ok {
		return errors.New("request not found")
	}
	if inProgressRequestStatus.paused {
		return errors.New("request is already paused")
	}
	inProgressRequestStatus.paused = true
	select {
	case <-rm.ctx.Done():
		return errors.New("context cancelled")
	case inProgressRequestStatus.pauseMessages <- struct{}{}:
		return nil
	}
}
func (prm *pauseRequestMessage) handle(rm *RequestManager) {
	err := prm.pause(rm)
	select {
	case <-rm.ctx.Done():
	case prm.response <- err:
	}
}