requestmanager.go 17.7 KB
Newer Older
1 2 3 4
package requestmanager

import (
	"context"
5
	"errors"
6
	"fmt"
Hannah Howard's avatar
Hannah Howard committed
7
	"sync/atomic"
8

9 10
	"github.com/ipfs/go-cid"
	"github.com/ipfs/go-graphsync/cidset"
11
	"github.com/ipfs/go-graphsync/requestmanager/executor"
Hannah Howard's avatar
Hannah Howard committed
12 13
	"github.com/ipfs/go-graphsync/requestmanager/hooks"

14
	blocks "github.com/ipfs/go-block-format"
15
	"github.com/ipfs/go-graphsync"
16
	ipldutil "github.com/ipfs/go-graphsync/ipldutil"
17
	gsmsg "github.com/ipfs/go-graphsync/message"
18 19 20 21
	"github.com/ipfs/go-graphsync/metadata"
	"github.com/ipfs/go-graphsync/requestmanager/types"
	logging "github.com/ipfs/go-log"
	"github.com/ipld/go-ipld-prime"
22
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
23
	"github.com/libp2p/go-libp2p-core/peer"
24 25
)

26
var log = logging.Logger("graphsync")
27 28

const (
29 30
	// defaultPriority is the default priority for requests sent by graphsync
	defaultPriority = graphsync.Priority(0)
31 32 33
)

type inProgressRequestStatus struct {
34 35 36 37 38 39 40 41
	ctx            context.Context
	cancelFn       func()
	p              peer.ID
	networkError   chan error
	resumeMessages chan []graphsync.ExtensionData
	pauseMessages  chan struct{}
	paused         bool
	lastResponse   atomic.Value
42 43 44 45
}

// PeerHandler is an interface that can send requests to peers
type PeerHandler interface {
46
	SendRequest(p peer.ID, graphSyncRequest gsmsg.GraphSyncRequest)
47 48
}

49 50 51
// AsyncLoader is an interface for loading links asynchronously, returning
// results as new responses are processed
type AsyncLoader interface {
52
	StartRequest(graphsync.RequestID, string) error
53
	ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
54
		blks []blocks.Block)
55 56 57
	AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult
	CompleteResponsesFor(requestID graphsync.RequestID)
	CleanupRequest(requestID graphsync.RequestID)
58 59
}

60 61 62 63 64 65 66 67
// RequestManager tracks outgoing requests and processes incoming reponses
// to them.
type RequestManager struct {
	ctx         context.Context
	cancel      func()
	messages    chan requestManagerMessage
	peerHandler PeerHandler
	rc          *responseCollector
68
	asyncLoader AsyncLoader
69
	// dont touch out side of run loop
70 71
	nextRequestID             graphsync.RequestID
	inProgressRequestStatuses map[graphsync.RequestID]*inProgressRequestStatus
Hannah Howard's avatar
Hannah Howard committed
72 73 74
	requestHooks              RequestHooks
	responseHooks             ResponseHooks
	blockHooks                BlockHooks
75 76 77 78 79 80
}

type requestManagerMessage interface {
	handle(rm *RequestManager)
}

Hannah Howard's avatar
Hannah Howard committed
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
// RequestHooks run for new requests
type RequestHooks interface {
	ProcessRequestHooks(p peer.ID, request graphsync.RequestData) hooks.RequestResult
}

// ResponseHooks run for new responses
type ResponseHooks interface {
	ProcessResponseHooks(p peer.ID, response graphsync.ResponseData) hooks.UpdateResult
}

// BlockHooks run for each block loaded
type BlockHooks interface {
	ProcessBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) hooks.UpdateResult
}

96
// New generates a new request manager from a context, network, and selectorQuerier
Hannah Howard's avatar
Hannah Howard committed
97 98
func New(ctx context.Context,
	asyncLoader AsyncLoader,
Hannah Howard's avatar
Hannah Howard committed
99 100 101
	requestHooks RequestHooks,
	responseHooks ResponseHooks,
	blockHooks BlockHooks) *RequestManager {
102 103 104 105
	ctx, cancel := context.WithCancel(ctx)
	return &RequestManager{
		ctx:                       ctx,
		cancel:                    cancel,
106
		asyncLoader:               asyncLoader,
107 108
		rc:                        newResponseCollector(ctx),
		messages:                  make(chan requestManagerMessage, 16),
109
		inProgressRequestStatuses: make(map[graphsync.RequestID]*inProgressRequestStatus),
Hannah Howard's avatar
Hannah Howard committed
110 111
		requestHooks:              requestHooks,
		responseHooks:             responseHooks,
Hannah Howard's avatar
Hannah Howard committed
112
		blockHooks:                blockHooks,
113 114 115 116 117 118 119 120 121
	}
}

// SetDelegate specifies who will send messages out to the internet.
func (rm *RequestManager) SetDelegate(peerHandler PeerHandler) {
	rm.peerHandler = peerHandler
}

type inProgressRequest struct {
122 123
	requestID     graphsync.RequestID
	incoming      chan graphsync.ResponseProgress
124
	incomingError chan error
125 126 127 128
}

type newRequestMessage struct {
	p                     peer.ID
129
	root                  ipld.Link
130
	selector              ipld.Node
131
	extensions            []graphsync.ExtensionData
132 133 134 135
	inProgressRequestChan chan<- inProgressRequest
}

// SendRequest initiates a new GraphSync request to the given peer.
136 137
func (rm *RequestManager) SendRequest(ctx context.Context,
	p peer.ID,
138
	root ipld.Link,
139 140
	selector ipld.Node,
	extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
141
	if _, err := ipldutil.ParseSelector(selector); err != nil {
142
		return rm.singleErrorResponse(fmt.Errorf("Invalid Selector Spec"))
143 144 145 146 147
	}

	inProgressRequestChan := make(chan inProgressRequest)

	select {
148
	case rm.messages <- &newRequestMessage{p, root, selector, extensions, inProgressRequestChan}:
149
	case <-rm.ctx.Done():
150
		return rm.emptyResponse()
151
	case <-ctx.Done():
152
		return rm.emptyResponse()
153 154 155 156
	}
	var receivedInProgressRequest inProgressRequest
	select {
	case <-rm.ctx.Done():
157
		return rm.emptyResponse()
158 159 160
	case receivedInProgressRequest = <-inProgressRequestChan:
	}

161 162 163 164 165 166 167 168 169 170
	return rm.rc.collectResponses(ctx,
		receivedInProgressRequest.incoming,
		receivedInProgressRequest.incomingError,
		func() {
			rm.cancelRequest(receivedInProgressRequest.requestID,
				receivedInProgressRequest.incoming,
				receivedInProgressRequest.incomingError)
		})
}

171 172
func (rm *RequestManager) emptyResponse() (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
173
	close(ch)
174
	errCh := make(chan error)
175 176 177 178
	close(errCh)
	return ch, errCh
}

179 180
func (rm *RequestManager) singleErrorResponse(err error) (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
181
	close(ch)
182
	errCh := make(chan error, 1)
183
	errCh <- err
184 185
	close(errCh)
	return ch, errCh
186 187 188
}

type cancelRequestMessage struct {
189
	requestID graphsync.RequestID
190
	isPause   bool
191 192
}

193 194
func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID,
	incomingResponses chan graphsync.ResponseProgress,
195
	incomingErrors chan error) {
196
	cancelMessageChannel := rm.messages
197
	for cancelMessageChannel != nil || incomingResponses != nil || incomingErrors != nil {
198
		select {
199
		case cancelMessageChannel <- &cancelRequestMessage{requestID, false}:
200 201 202 203 204
			cancelMessageChannel = nil
		// clear out any remaining responses, in case and "incoming reponse"
		// messages get processed before our cancel message
		case _, ok := <-incomingResponses:
			if !ok {
205 206 207 208 209
				incomingResponses = nil
			}
		case _, ok := <-incomingErrors:
			if !ok {
				incomingErrors = nil
210 211 212 213 214 215 216 217
			}
		case <-rm.ctx.Done():
			return
		}
	}
}

type processResponseMessage struct {
218
	p         peer.ID
219 220
	responses []gsmsg.GraphSyncResponse
	blks      []blocks.Block
221 222 223 224
}

// ProcessResponses ingests the given responses from the network and
// and updates the in progress requests based on those responses.
225
func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse,
226
	blks []blocks.Block) {
227
	select {
228
	case rm.messages <- &processResponseMessage{p, responses, blks}:
229 230 231 232
	case <-rm.ctx.Done():
	}
}

233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
type unpauseRequestMessage struct {
	id         graphsync.RequestID
	extensions []graphsync.ExtensionData
	response   chan error
}

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
func (rm *RequestManager) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
	response := make(chan error, 1)
	return rm.sendSyncMessage(&unpauseRequestMessage{requestID, extensions, response}, response)
}

type pauseRequestMessage struct {
	id       graphsync.RequestID
	response chan error
}

// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
func (rm *RequestManager) PauseRequest(requestID graphsync.RequestID) error {
	response := make(chan error, 1)
	return rm.sendSyncMessage(&pauseRequestMessage{requestID, response}, response)
}

func (rm *RequestManager) sendSyncMessage(message requestManagerMessage, response chan error) error {
	select {
	case <-rm.ctx.Done():
		return errors.New("Context Cancelled")
	case rm.messages <- message:
	}
	select {
	case <-rm.ctx.Done():
		return errors.New("Context Cancelled")
	case err := <-response:
		return err
	}
}

271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
// Startup starts processing for the WantManager.
func (rm *RequestManager) Startup() {
	go rm.run()
}

// Shutdown ends processing for the want manager.
func (rm *RequestManager) Shutdown() {
	rm.cancel()
}

func (rm *RequestManager) run() {
	// NOTE: Do not open any streams or connections from anywhere in this
	// event loop. Really, just don't do anything likely to block.
	defer rm.cleanupInProcessRequests()

	for {
		select {
		case message := <-rm.messages:
			message.handle(rm)
		case <-rm.ctx.Done():
			return
		}
	}
}

func (rm *RequestManager) cleanupInProcessRequests() {
	for _, requestStatus := range rm.inProgressRequestStatuses {
298
		requestStatus.cancelFn()
299 300 301
	}
}

302
type terminateRequestMessage struct {
303
	requestID graphsync.RequestID
304
}
305

306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
func (nrm *newRequestMessage) setupRequest(requestID graphsync.RequestID, rm *RequestManager) (chan graphsync.ResponseProgress, chan error) {
	request, hooksResult, err := rm.validateRequest(requestID, nrm.p, nrm.root, nrm.selector, nrm.extensions)
	if err != nil {
		return rm.singleErrorResponse(err)
	}
	doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs)
	var doNotSendCids *cid.Set
	if has {
		doNotSendCids, err = cidset.DecodeCidSet(doNotSendCidsData)
		if err != nil {
			return rm.singleErrorResponse(err)
		}
	} else {
		doNotSendCids = cid.NewSet()
	}
	ctx, cancel := context.WithCancel(rm.ctx)
	p := nrm.p
	resumeMessages := make(chan []graphsync.ExtensionData, 1)
	pauseMessages := make(chan struct{}, 1)
	networkError := make(chan error, 1)
	requestStatus := &inProgressRequestStatus{
		ctx: ctx, cancelFn: cancel, p: p, resumeMessages: resumeMessages, pauseMessages: pauseMessages, networkError: networkError,
	}
	lastResponse := &requestStatus.lastResponse
	lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged))
	rm.inProgressRequestStatuses[request.ID()] = requestStatus
	incoming, incomingError := executor.ExecutionEnv{
		Ctx:              rm.ctx,
		SendRequest:      rm.peerHandler.SendRequest,
		TerminateRequest: rm.terminateRequest,
		RunBlockHooks:    rm.processBlockHooks,
		Loader:           rm.asyncLoader.AsyncLoad,
	}.Start(
		executor.RequestExecution{
			Ctx:              ctx,
			P:                p,
			Request:          request,
			NetworkError:     networkError,
			LastResponse:     lastResponse,
			DoNotSendCids:    doNotSendCids,
			NodeStyleChooser: hooksResult.CustomChooser,
			ResumeMessages:   resumeMessages,
			PauseMessages:    pauseMessages,
		})
	return incoming, incomingError
}

353
func (nrm *newRequestMessage) handle(rm *RequestManager) {
Hannah Howard's avatar
Hannah Howard committed
354 355
	var ipr inProgressRequest
	ipr.requestID = rm.nextRequestID
356
	rm.nextRequestID++
357
	ipr.incoming, ipr.incomingError = nrm.setupRequest(ipr.requestID, rm)
358 359

	select {
Hannah Howard's avatar
Hannah Howard committed
360
	case nrm.inProgressRequestChan <- ipr:
361 362 363 364
	case <-rm.ctx.Done():
	}
}

365 366 367 368 369
func (trm *terminateRequestMessage) handle(rm *RequestManager) {
	delete(rm.inProgressRequestStatuses, trm.requestID)
	rm.asyncLoader.CleanupRequest(trm.requestID)
}

370 371 372 373 374 375
func (crm *cancelRequestMessage) handle(rm *RequestManager) {
	inProgressRequestStatus, ok := rm.inProgressRequestStatuses[crm.requestID]
	if !ok {
		return
	}

376
	rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID))
377 378 379 380 381
	if crm.isPause {
		inProgressRequestStatus.paused = true
	} else {
		inProgressRequestStatus.cancelFn()
	}
382 383 384
}

func (prm *processResponseMessage) handle(rm *RequestManager) {
385 386
	filteredResponses := rm.processExtensions(prm.responses, prm.p)
	filteredResponses = rm.filterResponsesForPeer(filteredResponses, prm.p)
Hannah Howard's avatar
Hannah Howard committed
387
	rm.updateLastResponses(filteredResponses)
388
	responseMetadata := metadataForResponses(filteredResponses)
389 390 391 392 393 394 395 396 397 398
	rm.asyncLoader.ProcessResponse(responseMetadata, prm.blks)
	rm.processTerminations(filteredResponses)
}

func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	responsesForPeer := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
		if !ok || requestStatus.p != p {
			continue
399
		}
400
		responsesForPeer = append(responsesForPeer, response)
401
	}
402 403
	return responsesForPeer
}
404

405 406 407 408 409 410 411 412 413 414 415
func (rm *RequestManager) processExtensions(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	remainingResponses := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		success := rm.processExtensionsForResponse(p, response)
		if success {
			remainingResponses = append(remainingResponses, response)
		}
	}
	return remainingResponses
}

Hannah Howard's avatar
Hannah Howard committed
416 417 418 419 420 421
func (rm *RequestManager) updateLastResponses(responses []gsmsg.GraphSyncResponse) {
	for _, response := range responses {
		rm.inProgressRequestStatuses[response.RequestID()].lastResponse.Store(response)
	}
}

422
func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg.GraphSyncResponse) bool {
Hannah Howard's avatar
Hannah Howard committed
423 424 425 426 427 428
	result := rm.responseHooks.ProcessResponseHooks(p, response)
	if len(result.Extensions) > 0 {
		updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...)
		rm.peerHandler.SendRequest(p, updateRequest)
	}
	if result.Err != nil {
429 430 431 432
		requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
		if !ok {
			return false
		}
Hannah Howard's avatar
Hannah Howard committed
433 434 435 436
		responseError := rm.generateResponseErrorFromStatus(graphsync.RequestFailedUnknown)
		select {
		case requestStatus.networkError <- responseError:
		case <-requestStatus.ctx.Done():
437
		}
Hannah Howard's avatar
Hannah Howard committed
438
		rm.peerHandler.SendRequest(p, gsmsg.CancelRequest(response.RequestID()))
Hannah Howard's avatar
Hannah Howard committed
439 440
		requestStatus.cancelFn()
		return false
441 442 443 444
	}
	return true
}

445 446
func (rm *RequestManager) processTerminations(responses []gsmsg.GraphSyncResponse) {
	for _, response := range responses {
447
		if gsmsg.IsTerminalResponseCode(response.Status()) {
448 449 450 451 452 453
			if gsmsg.IsTerminalFailureCode(response.Status()) {
				requestStatus := rm.inProgressRequestStatuses[response.RequestID()]
				responseError := rm.generateResponseErrorFromStatus(response.Status())
				select {
				case requestStatus.networkError <- responseError:
				case <-requestStatus.ctx.Done():
454
				}
455
				requestStatus.cancelFn()
456
			}
457
			rm.asyncLoader.CompleteResponsesFor(response.RequestID())
458 459 460
		}
	}
}
461

462
func (rm *RequestManager) generateResponseErrorFromStatus(status graphsync.ResponseStatusCode) error {
463
	switch status {
464
	case graphsync.RequestFailedBusy:
465
		return graphsync.RequestFailedBusyErr{}
466
	case graphsync.RequestFailedContentNotFound:
467
		return graphsync.RequestFailedContentNotFoundErr{}
468
	case graphsync.RequestFailedLegal:
469
		return graphsync.RequestFailedLegalErr{}
470
	case graphsync.RequestFailedUnknown:
471 472 473
		return graphsync.RequestFailedUnknownErr{}
	case graphsync.RequestCancelled:
		return graphsync.RequestCancelledErr{}
474
	default:
475
		return fmt.Errorf("Unknown")
476 477
	}
}
478

Hannah Howard's avatar
Hannah Howard committed
479 480 481 482 483
func (rm *RequestManager) processBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) error {
	result := rm.blockHooks.ProcessBlockHooks(p, response, block)
	if len(result.Extensions) > 0 {
		updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...)
		rm.peerHandler.SendRequest(p, updateRequest)
484
	}
Hannah Howard's avatar
Hannah Howard committed
485
	if result.Err != nil {
486
		_, isPause := result.Err.(hooks.ErrPaused)
Hannah Howard's avatar
Hannah Howard committed
487 488
		select {
		case <-rm.ctx.Done():
489
		case rm.messages <- &cancelRequestMessage{response.RequestID(), isPause}:
Hannah Howard's avatar
Hannah Howard committed
490
		}
491
	}
Hannah Howard's avatar
Hannah Howard committed
492 493 494
	return result.Err
}

495 496 497 498 499 500 501
func (rm *RequestManager) terminateRequest(requestID graphsync.RequestID) {
	select {
	case <-rm.ctx.Done():
	case rm.messages <- &terminateRequestMessage{requestID}:
	}
}

502
func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer.ID, root ipld.Link, selectorSpec ipld.Node, extensions []graphsync.ExtensionData) (gsmsg.GraphSyncRequest, hooks.RequestResult, error) {
Hannah Howard's avatar
Hannah Howard committed
503 504
	_, err := ipldutil.EncodeNode(selectorSpec)
	if err != nil {
505
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
Hannah Howard's avatar
Hannah Howard committed
506
	}
507
	_, err = ipldutil.ParseSelector(selectorSpec)
Hannah Howard's avatar
Hannah Howard committed
508
	if err != nil {
509
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
Hannah Howard's avatar
Hannah Howard committed
510 511 512
	}
	asCidLink, ok := root.(cidlink.Link)
	if !ok {
513
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, fmt.Errorf("request failed: link has no cid")
Hannah Howard's avatar
Hannah Howard committed
514 515 516 517 518
	}
	request := gsmsg.NewRequest(requestID, asCidLink.Cid, selectorSpec, defaultPriority, extensions...)
	hooksResult := rm.requestHooks.ProcessRequestHooks(p, request)
	err = rm.asyncLoader.StartRequest(requestID, hooksResult.PersistenceOption)
	if err != nil {
519
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
Hannah Howard's avatar
Hannah Howard committed
520
	}
521
	return request, hooksResult, nil
Hannah Howard's avatar
Hannah Howard committed
522
}
523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572

func (urm *unpauseRequestMessage) unpause(rm *RequestManager) error {
	inProgressRequestStatus, ok := rm.inProgressRequestStatuses[urm.id]
	if !ok {
		return errors.New("request not found")
	}
	if !inProgressRequestStatus.paused {
		return errors.New("request is not paused")
	}
	inProgressRequestStatus.paused = false
	select {
	case <-inProgressRequestStatus.pauseMessages:
		rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.UpdateRequest(urm.id, urm.extensions...))
		return nil
	case <-rm.ctx.Done():
		return errors.New("context cancelled")
	case inProgressRequestStatus.resumeMessages <- urm.extensions:
		return nil
	}
}
func (urm *unpauseRequestMessage) handle(rm *RequestManager) {
	err := urm.unpause(rm)
	select {
	case <-rm.ctx.Done():
	case urm.response <- err:
	}
}
func (prm *pauseRequestMessage) pause(rm *RequestManager) error {
	inProgressRequestStatus, ok := rm.inProgressRequestStatuses[prm.id]
	if !ok {
		return errors.New("request not found")
	}
	if inProgressRequestStatus.paused {
		return errors.New("request is already paused")
	}
	inProgressRequestStatus.paused = true
	select {
	case <-rm.ctx.Done():
		return errors.New("context cancelled")
	case inProgressRequestStatus.pauseMessages <- struct{}{}:
		return nil
	}
}
func (prm *pauseRequestMessage) handle(rm *RequestManager) {
	err := prm.pause(rm)
	select {
	case <-rm.ctx.Done():
	case prm.response <- err:
	}
}