requestmanager.go 20.8 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5
	"errors"
6
	"fmt"
7 8
	"github.com/hannahhoward/go-pubsub"
	"golang.org/x/xerrors"
Hannah Howard's avatar
Hannah Howard committed
9
	"sync/atomic"
10

Hannah Howard's avatar
Hannah Howard committed
11
	blocks "github.com/ipfs/go-block-format"
12
	"github.com/ipfs/go-cid"
Hannah Howard's avatar
Hannah Howard committed
13 14 15 16
	logging "github.com/ipfs/go-log"
	"github.com/ipld/go-ipld-prime"
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
	"github.com/libp2p/go-libp2p-core/peer"
Hannah Howard's avatar
Hannah Howard committed
17

18
	"github.com/ipfs/go-graphsync"
Hannah Howard's avatar
Hannah Howard committed
19
	"github.com/ipfs/go-graphsync/cidset"
Hannah Howard's avatar
Hannah Howard committed
20
	"github.com/ipfs/go-graphsync/dedupkey"
21
	ipldutil "github.com/ipfs/go-graphsync/ipldutil"
22
	"github.com/ipfs/go-graphsync/listeners"
23
	gsmsg "github.com/ipfs/go-graphsync/message"
24
	"github.com/ipfs/go-graphsync/messagequeue"
25
	"github.com/ipfs/go-graphsync/metadata"
26
	"github.com/ipfs/go-graphsync/notifications"
Hannah Howard's avatar
Hannah Howard committed
27 28
	"github.com/ipfs/go-graphsync/requestmanager/executor"
	"github.com/ipfs/go-graphsync/requestmanager/hooks"
29
	"github.com/ipfs/go-graphsync/requestmanager/types"
30 31
)

32
var log = logging.Logger("graphsync")
33 34

const (
35 36
	// defaultPriority is the default priority for requests sent by graphsync
	defaultPriority = graphsync.Priority(0)
37 38 39
)

type inProgressRequestStatus struct {
40 41 42 43 44 45 46 47
	ctx            context.Context
	cancelFn       func()
	p              peer.ID
	networkError   chan error
	resumeMessages chan []graphsync.ExtensionData
	pauseMessages  chan struct{}
	paused         bool
	lastResponse   atomic.Value
48 49 50 51
}

// PeerHandler is an interface that can send requests to peers
type PeerHandler interface {
52
	AllocateAndBuildMessage(p peer.ID, blkSize uint64, buildMessageFn func(*gsmsg.Builder), notifees []notifications.Notifee)
53 54
}

55 56 57
// AsyncLoader is an interface for loading links asynchronously, returning
// results as new responses are processed
type AsyncLoader interface {
58
	StartRequest(graphsync.RequestID, string) error
59
	ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
60
		blks []blocks.Block)
61 62 63
	AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult
	CompleteResponsesFor(requestID graphsync.RequestID)
	CleanupRequest(requestID graphsync.RequestID)
64 65
}

66 67 68 69 70 71 72 73
// RequestManager tracks outgoing requests and processes incoming reponses
// to them.
type RequestManager struct {
	ctx         context.Context
	cancel      func()
	messages    chan requestManagerMessage
	peerHandler PeerHandler
	rc          *responseCollector
74
	asyncLoader AsyncLoader
75
	disconnectNotif *pubsub.PubSub
76
	// dont touch out side of run loop
77 78
	nextRequestID             graphsync.RequestID
	inProgressRequestStatuses map[graphsync.RequestID]*inProgressRequestStatus
Hannah Howard's avatar
Hannah Howard committed
79 80 81
	requestHooks              RequestHooks
	responseHooks             ResponseHooks
	blockHooks                BlockHooks
82
	networkErrorListeners     *listeners.NetworkErrorListeners
83 84 85 86 87 88
}

type requestManagerMessage interface {
	handle(rm *RequestManager)
}

Hannah Howard's avatar
Hannah Howard committed
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
// RequestHooks run for new requests
type RequestHooks interface {
	ProcessRequestHooks(p peer.ID, request graphsync.RequestData) hooks.RequestResult
}

// ResponseHooks run for new responses
type ResponseHooks interface {
	ProcessResponseHooks(p peer.ID, response graphsync.ResponseData) hooks.UpdateResult
}

// BlockHooks run for each block loaded
type BlockHooks interface {
	ProcessBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) hooks.UpdateResult
}

104
// New generates a new request manager from a context, network, and selectorQuerier
Hannah Howard's avatar
Hannah Howard committed
105 106
func New(ctx context.Context,
	asyncLoader AsyncLoader,
Hannah Howard's avatar
Hannah Howard committed
107 108
	requestHooks RequestHooks,
	responseHooks ResponseHooks,
109 110 111
	blockHooks BlockHooks,
	networkErrorListeners *listeners.NetworkErrorListeners,
) *RequestManager {
112 113 114 115
	ctx, cancel := context.WithCancel(ctx)
	return &RequestManager{
		ctx:                       ctx,
		cancel:                    cancel,
116
		asyncLoader:               asyncLoader,
117
		disconnectNotif:           pubsub.New(disconnectDispatcher),
118 119
		rc:                        newResponseCollector(ctx),
		messages:                  make(chan requestManagerMessage, 16),
120
		inProgressRequestStatuses: make(map[graphsync.RequestID]*inProgressRequestStatus),
Hannah Howard's avatar
Hannah Howard committed
121 122
		requestHooks:              requestHooks,
		responseHooks:             responseHooks,
Hannah Howard's avatar
Hannah Howard committed
123
		blockHooks:                blockHooks,
124
		networkErrorListeners:     networkErrorListeners,
125 126 127 128 129 130 131 132 133
	}
}

// SetDelegate specifies who will send messages out to the internet.
func (rm *RequestManager) SetDelegate(peerHandler PeerHandler) {
	rm.peerHandler = peerHandler
}

type inProgressRequest struct {
134
	requestID     graphsync.RequestID
135
	request       gsmsg.GraphSyncRequest
136
	incoming      chan graphsync.ResponseProgress
137
	incomingError chan error
138 139 140 141
}

type newRequestMessage struct {
	p                     peer.ID
142
	root                  ipld.Link
143
	selector              ipld.Node
144
	extensions            []graphsync.ExtensionData
145 146 147 148
	inProgressRequestChan chan<- inProgressRequest
}

// SendRequest initiates a new GraphSync request to the given peer.
149 150
func (rm *RequestManager) SendRequest(ctx context.Context,
	p peer.ID,
151
	root ipld.Link,
152 153
	selector ipld.Node,
	extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
154
	if _, err := ipldutil.ParseSelector(selector); err != nil {
155
		return rm.singleErrorResponse(fmt.Errorf("Invalid Selector Spec"))
156 157 158 159 160
	}

	inProgressRequestChan := make(chan inProgressRequest)

	select {
161
	case rm.messages <- &newRequestMessage{p, root, selector, extensions, inProgressRequestChan}:
162
	case <-rm.ctx.Done():
163
		return rm.emptyResponse()
164
	case <-ctx.Done():
165
		return rm.emptyResponse()
166 167 168 169
	}
	var receivedInProgressRequest inProgressRequest
	select {
	case <-rm.ctx.Done():
170
		return rm.emptyResponse()
171 172 173
	case receivedInProgressRequest = <-inProgressRequestChan:
	}

174 175 176 177 178
	// If the connection to the peer is disconnected, fire an error
	unsub := rm.listenForDisconnect(p, func(neterr error) {
		rm.networkErrorListeners.NotifyNetworkErrorListeners(p, receivedInProgressRequest.request, neterr)
	})

179 180 181 182 183 184 185
	return rm.rc.collectResponses(ctx,
		receivedInProgressRequest.incoming,
		receivedInProgressRequest.incomingError,
		func() {
			rm.cancelRequest(receivedInProgressRequest.requestID,
				receivedInProgressRequest.incoming,
				receivedInProgressRequest.incomingError)
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
		},
		// Once the request has completed, stop listening for disconnect events
		unsub,
	)
}

// Dispatch the Disconnect event to subscribers
func disconnectDispatcher(p pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
	listener := subscriberFn.(func(peer.ID))
	listener(p.(peer.ID))
	return nil
}

// Listen for the Disconnect event for the given peer
func (rm *RequestManager) listenForDisconnect(p peer.ID, onDisconnect func(neterr error)) func() {
	// Subscribe to Disconnect notifications
	return rm.disconnectNotif.Subscribe(func(evtPeer peer.ID) {
		// If the peer is the one we're interested in, call the listener
		if evtPeer == p {
			onDisconnect(xerrors.Errorf("disconnected from peer %s", p))
		}
	})
}

// Disconnected is called when a peer disconnects
func (rm *RequestManager) Disconnected(p peer.ID) {
	// Notify any listeners that a peer has disconnected
	rm.disconnectNotif.Publish(p)
214 215
}

216 217
func (rm *RequestManager) emptyResponse() (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
218
	close(ch)
219
	errCh := make(chan error)
220 221 222 223
	close(errCh)
	return ch, errCh
}

224 225
func (rm *RequestManager) singleErrorResponse(err error) (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
226
	close(ch)
227
	errCh := make(chan error, 1)
228
	errCh <- err
229 230
	close(errCh)
	return ch, errCh
231 232 233
}

type cancelRequestMessage struct {
234
	requestID graphsync.RequestID
235
	isPause   bool
236 237
}

238 239
func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID,
	incomingResponses chan graphsync.ResponseProgress,
240
	incomingErrors chan error) {
241
	cancelMessageChannel := rm.messages
242
	for cancelMessageChannel != nil || incomingResponses != nil || incomingErrors != nil {
243
		select {
244
		case cancelMessageChannel <- &cancelRequestMessage{requestID, false}:
245 246 247 248 249
			cancelMessageChannel = nil
		// clear out any remaining responses, in case and "incoming reponse"
		// messages get processed before our cancel message
		case _, ok := <-incomingResponses:
			if !ok {
250 251 252 253 254
				incomingResponses = nil
			}
		case _, ok := <-incomingErrors:
			if !ok {
				incomingErrors = nil
255 256 257 258 259 260 261 262
			}
		case <-rm.ctx.Done():
			return
		}
	}
}

type processResponseMessage struct {
263
	p         peer.ID
264 265
	responses []gsmsg.GraphSyncResponse
	blks      []blocks.Block
266 267 268 269
}

// ProcessResponses ingests the given responses from the network and
// and updates the in progress requests based on those responses.
270
func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse,
271
	blks []blocks.Block) {
272
	select {
273
	case rm.messages <- &processResponseMessage{p, responses, blks}:
274 275 276 277
	case <-rm.ctx.Done():
	}
}

278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
type unpauseRequestMessage struct {
	id         graphsync.RequestID
	extensions []graphsync.ExtensionData
	response   chan error
}

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
func (rm *RequestManager) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
	response := make(chan error, 1)
	return rm.sendSyncMessage(&unpauseRequestMessage{requestID, extensions, response}, response)
}

type pauseRequestMessage struct {
	id       graphsync.RequestID
	response chan error
}

// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
func (rm *RequestManager) PauseRequest(requestID graphsync.RequestID) error {
	response := make(chan error, 1)
	return rm.sendSyncMessage(&pauseRequestMessage{requestID, response}, response)
}

func (rm *RequestManager) sendSyncMessage(message requestManagerMessage, response chan error) error {
	select {
	case <-rm.ctx.Done():
		return errors.New("Context Cancelled")
	case rm.messages <- message:
	}
	select {
	case <-rm.ctx.Done():
		return errors.New("Context Cancelled")
	case err := <-response:
		return err
	}
}

316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
// Startup starts processing for the WantManager.
func (rm *RequestManager) Startup() {
	go rm.run()
}

// Shutdown ends processing for the want manager.
func (rm *RequestManager) Shutdown() {
	rm.cancel()
}

func (rm *RequestManager) run() {
	// NOTE: Do not open any streams or connections from anywhere in this
	// event loop. Really, just don't do anything likely to block.
	defer rm.cleanupInProcessRequests()

	for {
		select {
		case message := <-rm.messages:
			message.handle(rm)
		case <-rm.ctx.Done():
			return
		}
	}
}

func (rm *RequestManager) cleanupInProcessRequests() {
	for _, requestStatus := range rm.inProgressRequestStatuses {
343
		requestStatus.cancelFn()
344 345 346
	}
}

347
type terminateRequestMessage struct {
348
	requestID graphsync.RequestID
349
}
350

351
func (nrm *newRequestMessage) setupRequest(requestID graphsync.RequestID, rm *RequestManager) (gsmsg.GraphSyncRequest, chan graphsync.ResponseProgress, chan error) {
352 353
	request, hooksResult, err := rm.validateRequest(requestID, nrm.p, nrm.root, nrm.selector, nrm.extensions)
	if err != nil {
354 355
		rp, err := rm.singleErrorResponse(err)
		return request, rp, err
356 357 358 359 360 361
	}
	doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs)
	var doNotSendCids *cid.Set
	if has {
		doNotSendCids, err = cidset.DecodeCidSet(doNotSendCidsData)
		if err != nil {
362 363
			rp, err := rm.singleErrorResponse(err)
			return request, rp, err
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380
		}
	} else {
		doNotSendCids = cid.NewSet()
	}
	ctx, cancel := context.WithCancel(rm.ctx)
	p := nrm.p
	resumeMessages := make(chan []graphsync.ExtensionData, 1)
	pauseMessages := make(chan struct{}, 1)
	networkError := make(chan error, 1)
	requestStatus := &inProgressRequestStatus{
		ctx: ctx, cancelFn: cancel, p: p, resumeMessages: resumeMessages, pauseMessages: pauseMessages, networkError: networkError,
	}
	lastResponse := &requestStatus.lastResponse
	lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged))
	rm.inProgressRequestStatuses[request.ID()] = requestStatus
	incoming, incomingError := executor.ExecutionEnv{
		Ctx:              rm.ctx,
381
		SendRequest:      rm.sendRequest,
382 383 384 385 386
		TerminateRequest: rm.terminateRequest,
		RunBlockHooks:    rm.processBlockHooks,
		Loader:           rm.asyncLoader.AsyncLoad,
	}.Start(
		executor.RequestExecution{
hannahhoward's avatar
hannahhoward committed
387 388 389 390 391 392
			Ctx:                  ctx,
			P:                    p,
			Request:              request,
			NetworkError:         networkError,
			LastResponse:         lastResponse,
			DoNotSendCids:        doNotSendCids,
Eric Myhre's avatar
Eric Myhre committed
393
			NodePrototypeChooser: hooksResult.CustomChooser,
hannahhoward's avatar
hannahhoward committed
394 395
			ResumeMessages:       resumeMessages,
			PauseMessages:        pauseMessages,
396
		})
397
	return request, incoming, incomingError
398 399
}

400
func (nrm *newRequestMessage) handle(rm *RequestManager) {
Hannah Howard's avatar
Hannah Howard committed
401 402
	var ipr inProgressRequest
	ipr.requestID = rm.nextRequestID
403
	rm.nextRequestID++
404
	ipr.request, ipr.incoming, ipr.incomingError = nrm.setupRequest(ipr.requestID, rm)
405 406

	select {
Hannah Howard's avatar
Hannah Howard committed
407
	case nrm.inProgressRequestChan <- ipr:
408 409 410 411
	case <-rm.ctx.Done():
	}
}

412 413 414 415 416
func (trm *terminateRequestMessage) handle(rm *RequestManager) {
	delete(rm.inProgressRequestStatuses, trm.requestID)
	rm.asyncLoader.CleanupRequest(trm.requestID)
}

417 418 419 420 421 422
func (crm *cancelRequestMessage) handle(rm *RequestManager) {
	inProgressRequestStatus, ok := rm.inProgressRequestStatuses[crm.requestID]
	if !ok {
		return
	}

423
	rm.sendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID))
424 425 426 427 428
	if crm.isPause {
		inProgressRequestStatus.paused = true
	} else {
		inProgressRequestStatus.cancelFn()
	}
429 430 431
}

func (prm *processResponseMessage) handle(rm *RequestManager) {
432 433
	filteredResponses := rm.processExtensions(prm.responses, prm.p)
	filteredResponses = rm.filterResponsesForPeer(filteredResponses, prm.p)
Hannah Howard's avatar
Hannah Howard committed
434
	rm.updateLastResponses(filteredResponses)
435
	responseMetadata := metadataForResponses(filteredResponses)
436 437 438 439 440 441 442 443 444 445
	rm.asyncLoader.ProcessResponse(responseMetadata, prm.blks)
	rm.processTerminations(filteredResponses)
}

func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	responsesForPeer := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
		if !ok || requestStatus.p != p {
			continue
446
		}
447
		responsesForPeer = append(responsesForPeer, response)
448
	}
449 450
	return responsesForPeer
}
451

452 453 454 455 456 457 458 459 460 461 462
func (rm *RequestManager) processExtensions(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	remainingResponses := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		success := rm.processExtensionsForResponse(p, response)
		if success {
			remainingResponses = append(remainingResponses, response)
		}
	}
	return remainingResponses
}

Hannah Howard's avatar
Hannah Howard committed
463 464 465 466 467 468
func (rm *RequestManager) updateLastResponses(responses []gsmsg.GraphSyncResponse) {
	for _, response := range responses {
		rm.inProgressRequestStatuses[response.RequestID()].lastResponse.Store(response)
	}
}

469
func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg.GraphSyncResponse) bool {
Hannah Howard's avatar
Hannah Howard committed
470 471 472
	result := rm.responseHooks.ProcessResponseHooks(p, response)
	if len(result.Extensions) > 0 {
		updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...)
473
		rm.sendRequest(p, updateRequest)
Hannah Howard's avatar
Hannah Howard committed
474 475
	}
	if result.Err != nil {
476 477 478 479
		requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
		if !ok {
			return false
		}
Hannah Howard's avatar
Hannah Howard committed
480 481 482 483
		responseError := rm.generateResponseErrorFromStatus(graphsync.RequestFailedUnknown)
		select {
		case requestStatus.networkError <- responseError:
		case <-requestStatus.ctx.Done():
484
		}
485
		rm.sendRequest(p, gsmsg.CancelRequest(response.RequestID()))
Hannah Howard's avatar
Hannah Howard committed
486 487
		requestStatus.cancelFn()
		return false
488 489 490 491
	}
	return true
}

492 493
func (rm *RequestManager) processTerminations(responses []gsmsg.GraphSyncResponse) {
	for _, response := range responses {
494
		if gsmsg.IsTerminalResponseCode(response.Status()) {
495 496 497 498 499 500
			if gsmsg.IsTerminalFailureCode(response.Status()) {
				requestStatus := rm.inProgressRequestStatuses[response.RequestID()]
				responseError := rm.generateResponseErrorFromStatus(response.Status())
				select {
				case requestStatus.networkError <- responseError:
				case <-requestStatus.ctx.Done():
501
				}
502
				requestStatus.cancelFn()
503
			}
504
			rm.asyncLoader.CompleteResponsesFor(response.RequestID())
505 506 507
		}
	}
}
508

509
func (rm *RequestManager) generateResponseErrorFromStatus(status graphsync.ResponseStatusCode) error {
510
	switch status {
511
	case graphsync.RequestFailedBusy:
512
		return graphsync.RequestFailedBusyErr{}
513
	case graphsync.RequestFailedContentNotFound:
514
		return graphsync.RequestFailedContentNotFoundErr{}
515
	case graphsync.RequestFailedLegal:
516
		return graphsync.RequestFailedLegalErr{}
517
	case graphsync.RequestFailedUnknown:
518 519 520
		return graphsync.RequestFailedUnknownErr{}
	case graphsync.RequestCancelled:
		return graphsync.RequestCancelledErr{}
521
	default:
522
		return fmt.Errorf("Unknown")
523 524
	}
}
525

Hannah Howard's avatar
Hannah Howard committed
526 527 528 529
func (rm *RequestManager) processBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) error {
	result := rm.blockHooks.ProcessBlockHooks(p, response, block)
	if len(result.Extensions) > 0 {
		updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...)
530
		rm.sendRequest(p, updateRequest)
531
	}
Hannah Howard's avatar
Hannah Howard committed
532
	if result.Err != nil {
533
		_, isPause := result.Err.(hooks.ErrPaused)
Hannah Howard's avatar
Hannah Howard committed
534 535
		select {
		case <-rm.ctx.Done():
536
		case rm.messages <- &cancelRequestMessage{response.RequestID(), isPause}:
Hannah Howard's avatar
Hannah Howard committed
537
		}
538
	}
Hannah Howard's avatar
Hannah Howard committed
539 540 541
	return result.Err
}

542 543 544 545 546 547 548
func (rm *RequestManager) terminateRequest(requestID graphsync.RequestID) {
	select {
	case <-rm.ctx.Done():
	case rm.messages <- &terminateRequestMessage{requestID}:
	}
}

549
func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer.ID, root ipld.Link, selectorSpec ipld.Node, extensions []graphsync.ExtensionData) (gsmsg.GraphSyncRequest, hooks.RequestResult, error) {
Hannah Howard's avatar
Hannah Howard committed
550 551
	_, err := ipldutil.EncodeNode(selectorSpec)
	if err != nil {
552
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
Hannah Howard's avatar
Hannah Howard committed
553
	}
554
	_, err = ipldutil.ParseSelector(selectorSpec)
Hannah Howard's avatar
Hannah Howard committed
555
	if err != nil {
556
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
Hannah Howard's avatar
Hannah Howard committed
557 558 559
	}
	asCidLink, ok := root.(cidlink.Link)
	if !ok {
560
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, fmt.Errorf("request failed: link has no cid")
Hannah Howard's avatar
Hannah Howard committed
561 562 563
	}
	request := gsmsg.NewRequest(requestID, asCidLink.Cid, selectorSpec, defaultPriority, extensions...)
	hooksResult := rm.requestHooks.ProcessRequestHooks(p, request)
Hannah Howard's avatar
Hannah Howard committed
564 565 566 567 568 569 570 571 572 573 574 575
	if hooksResult.PersistenceOption != "" {
		dedupData, err := dedupkey.EncodeDedupKey(hooksResult.PersistenceOption)
		if err != nil {
			return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
		}
		request = request.ReplaceExtensions([]graphsync.ExtensionData{
			{
				Name: graphsync.ExtensionDeDupByKey,
				Data: dedupData,
			},
		})
	}
Hannah Howard's avatar
Hannah Howard committed
576 577
	err = rm.asyncLoader.StartRequest(requestID, hooksResult.PersistenceOption)
	if err != nil {
578
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
Hannah Howard's avatar
Hannah Howard committed
579
	}
580
	return request, hooksResult, nil
Hannah Howard's avatar
Hannah Howard committed
581
}
582

583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607
type reqSubscriber struct {
	p                     peer.ID
	request               gsmsg.GraphSyncRequest
	networkErrorListeners *listeners.NetworkErrorListeners
}

func (r *reqSubscriber) OnNext(topic notifications.Topic, event notifications.Event) {
	mqEvt, isMQEvt := event.(messagequeue.Event)
	if !isMQEvt || mqEvt.Name != messagequeue.Error {
		return
	}

	r.networkErrorListeners.NotifyNetworkErrorListeners(r.p, r.request, mqEvt.Err)
	//r.re.networkError <- mqEvt.Err
	//r.re.terminateRequest()
}

func (r reqSubscriber) OnClose(topic notifications.Topic) {
}

const requestNetworkError = "request_network_error"

func (rm *RequestManager) sendRequest(p peer.ID, request gsmsg.GraphSyncRequest) {
	sub := notifications.NewTopicDataSubscriber(&reqSubscriber{p, request, rm.networkErrorListeners})
	failNotifee := notifications.Notifee{Data: requestNetworkError, Subscriber: sub}
608
	rm.peerHandler.AllocateAndBuildMessage(p, 0, func(builder *gsmsg.Builder) {
609 610
		builder.AddRequest(request)
	}, []notifications.Notifee{failNotifee})
611 612
}

613 614 615 616 617 618 619 620 621 622 623
func (urm *unpauseRequestMessage) unpause(rm *RequestManager) error {
	inProgressRequestStatus, ok := rm.inProgressRequestStatuses[urm.id]
	if !ok {
		return errors.New("request not found")
	}
	if !inProgressRequestStatus.paused {
		return errors.New("request is not paused")
	}
	inProgressRequestStatus.paused = false
	select {
	case <-inProgressRequestStatus.pauseMessages:
624
		rm.sendRequest(inProgressRequestStatus.p, gsmsg.UpdateRequest(urm.id, urm.extensions...))
625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661
		return nil
	case <-rm.ctx.Done():
		return errors.New("context cancelled")
	case inProgressRequestStatus.resumeMessages <- urm.extensions:
		return nil
	}
}
func (urm *unpauseRequestMessage) handle(rm *RequestManager) {
	err := urm.unpause(rm)
	select {
	case <-rm.ctx.Done():
	case urm.response <- err:
	}
}
func (prm *pauseRequestMessage) pause(rm *RequestManager) error {
	inProgressRequestStatus, ok := rm.inProgressRequestStatuses[prm.id]
	if !ok {
		return errors.New("request not found")
	}
	if inProgressRequestStatus.paused {
		return errors.New("request is already paused")
	}
	inProgressRequestStatus.paused = true
	select {
	case <-rm.ctx.Done():
		return errors.New("context cancelled")
	case inProgressRequestStatus.pauseMessages <- struct{}{}:
		return nil
	}
}
func (prm *pauseRequestMessage) handle(rm *RequestManager) {
	err := prm.pause(rm)
	select {
	case <-rm.ctx.Done():
	case prm.response <- err:
	}
}