requestmanager.go 15.2 KB
Newer Older
1 2 3 4 5 6
package requestmanager

import (
	"context"
	"fmt"

7
	blocks "github.com/ipfs/go-block-format"
8
	"github.com/ipfs/go-graphsync"
9
	ipldutil "github.com/ipfs/go-graphsync/ipldutil"
10
	gsmsg "github.com/ipfs/go-graphsync/message"
11 12 13 14 15
	"github.com/ipfs/go-graphsync/metadata"
	"github.com/ipfs/go-graphsync/requestmanager/loader"
	"github.com/ipfs/go-graphsync/requestmanager/types"
	logging "github.com/ipfs/go-log"
	"github.com/ipld/go-ipld-prime"
16
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
17
	"github.com/ipld/go-ipld-prime/traversal"
18
	"github.com/ipld/go-ipld-prime/traversal/selector"
19
	"github.com/libp2p/go-libp2p-core/peer"
20 21
)

22
var log = logging.Logger("graphsync")
23 24

const (
25 26
	// defaultPriority is the default priority for requests sent by graphsync
	defaultPriority = graphsync.Priority(0)
27 28 29
)

type inProgressRequestStatus struct {
30 31 32 33
	ctx          context.Context
	cancelFn     func()
	p            peer.ID
	networkError chan error
34 35
}

36
type responseHook struct {
37
	key  uint64
38 39 40 41 42 43
	hook graphsync.OnIncomingResponseHook
}

type requestHook struct {
	key  uint64
	hook graphsync.OnOutgoingRequestHook
44 45
}

46 47
// PeerHandler is an interface that can send requests to peers
type PeerHandler interface {
48
	SendRequest(p peer.ID, graphSyncRequest gsmsg.GraphSyncRequest)
49 50
}

51 52 53
// AsyncLoader is an interface for loading links asynchronously, returning
// results as new responses are processed
type AsyncLoader interface {
54
	StartRequest(graphsync.RequestID, string) error
55
	ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
56
		blks []blocks.Block)
57 58 59
	AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult
	CompleteResponsesFor(requestID graphsync.RequestID)
	CleanupRequest(requestID graphsync.RequestID)
60 61
}

62 63 64 65 66 67 68 69
// RequestManager tracks outgoing requests and processes incoming reponses
// to them.
type RequestManager struct {
	ctx         context.Context
	cancel      func()
	messages    chan requestManagerMessage
	peerHandler PeerHandler
	rc          *responseCollector
70
	asyncLoader AsyncLoader
71
	// dont touch out side of run loop
72 73
	nextRequestID             graphsync.RequestID
	inProgressRequestStatuses map[graphsync.RequestID]*inProgressRequestStatus
74
	hooksNextKey              uint64
75
	responseHooks             []responseHook
76
	requestHooks              []requestHook
77 78 79 80 81 82 83
}

type requestManagerMessage interface {
	handle(rm *RequestManager)
}

// New generates a new request manager from a context, network, and selectorQuerier
84
func New(ctx context.Context, asyncLoader AsyncLoader) *RequestManager {
85 86 87 88
	ctx, cancel := context.WithCancel(ctx)
	return &RequestManager{
		ctx:                       ctx,
		cancel:                    cancel,
89
		asyncLoader:               asyncLoader,
90 91
		rc:                        newResponseCollector(ctx),
		messages:                  make(chan requestManagerMessage, 16),
92
		inProgressRequestStatuses: make(map[graphsync.RequestID]*inProgressRequestStatus),
93 94 95 96 97 98 99 100 101
	}
}

// SetDelegate specifies who will send messages out to the internet.
func (rm *RequestManager) SetDelegate(peerHandler PeerHandler) {
	rm.peerHandler = peerHandler
}

type inProgressRequest struct {
102 103
	requestID     graphsync.RequestID
	incoming      chan graphsync.ResponseProgress
104
	incomingError chan error
105 106 107 108
}

type newRequestMessage struct {
	p                     peer.ID
109
	root                  ipld.Link
110
	selector              ipld.Node
111
	extensions            []graphsync.ExtensionData
112 113 114 115
	inProgressRequestChan chan<- inProgressRequest
}

// SendRequest initiates a new GraphSync request to the given peer.
116 117
func (rm *RequestManager) SendRequest(ctx context.Context,
	p peer.ID,
118
	root ipld.Link,
119 120
	selector ipld.Node,
	extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
121
	if _, err := ipldutil.ParseSelector(selector); err != nil {
122
		return rm.singleErrorResponse(fmt.Errorf("Invalid Selector Spec"))
123 124 125 126 127
	}

	inProgressRequestChan := make(chan inProgressRequest)

	select {
128
	case rm.messages <- &newRequestMessage{p, root, selector, extensions, inProgressRequestChan}:
129
	case <-rm.ctx.Done():
130
		return rm.emptyResponse()
131
	case <-ctx.Done():
132
		return rm.emptyResponse()
133 134 135 136
	}
	var receivedInProgressRequest inProgressRequest
	select {
	case <-rm.ctx.Done():
137
		return rm.emptyResponse()
138 139 140
	case receivedInProgressRequest = <-inProgressRequestChan:
	}

141 142 143 144 145 146 147 148 149 150
	return rm.rc.collectResponses(ctx,
		receivedInProgressRequest.incoming,
		receivedInProgressRequest.incomingError,
		func() {
			rm.cancelRequest(receivedInProgressRequest.requestID,
				receivedInProgressRequest.incoming,
				receivedInProgressRequest.incomingError)
		})
}

151 152
func (rm *RequestManager) emptyResponse() (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
153
	close(ch)
154
	errCh := make(chan error)
155 156 157 158
	close(errCh)
	return ch, errCh
}

159 160
func (rm *RequestManager) singleErrorResponse(err error) (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
161
	close(ch)
162
	errCh := make(chan error, 1)
163
	errCh <- err
164 165
	close(errCh)
	return ch, errCh
166 167 168
}

type cancelRequestMessage struct {
169
	requestID graphsync.RequestID
170 171
}

172 173
func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID,
	incomingResponses chan graphsync.ResponseProgress,
174
	incomingErrors chan error) {
175
	cancelMessageChannel := rm.messages
176
	for cancelMessageChannel != nil || incomingResponses != nil || incomingErrors != nil {
177 178 179 180 181 182 183
		select {
		case cancelMessageChannel <- &cancelRequestMessage{requestID}:
			cancelMessageChannel = nil
		// clear out any remaining responses, in case and "incoming reponse"
		// messages get processed before our cancel message
		case _, ok := <-incomingResponses:
			if !ok {
184 185 186 187 188
				incomingResponses = nil
			}
		case _, ok := <-incomingErrors:
			if !ok {
				incomingErrors = nil
189 190 191 192 193 194 195 196
			}
		case <-rm.ctx.Done():
			return
		}
	}
}

type processResponseMessage struct {
197
	p         peer.ID
198 199
	responses []gsmsg.GraphSyncResponse
	blks      []blocks.Block
200 201 202 203
}

// ProcessResponses ingests the given responses from the network and
// and updates the in progress requests based on those responses.
204
func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse,
205
	blks []blocks.Block) {
206
	select {
207
	case rm.messages <- &processResponseMessage{p, responses, blks}:
208 209 210 211
	case <-rm.ctx.Done():
	}
}

212 213 214 215 216 217 218
type registerRequestHookMessage struct {
	hook               graphsync.OnOutgoingRequestHook
	unregisterHookChan chan graphsync.UnregisterHookFunc
}

type registerResponseHookMessage struct {
	hook               graphsync.OnIncomingResponseHook
219 220 221
	unregisterHookChan chan graphsync.UnregisterHookFunc
}

222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
// RegisterRequestHook registers an extension to process outgoing requests
func (rm *RequestManager) RegisterRequestHook(hook graphsync.OnOutgoingRequestHook) graphsync.UnregisterHookFunc {
	response := make(chan graphsync.UnregisterHookFunc)
	select {
	case rm.messages <- &registerRequestHookMessage{hook, response}:
	case <-rm.ctx.Done():
		return nil
	}
	select {
	case unregister := <-response:
		return unregister
	case <-rm.ctx.Done():
		return nil
	}
}

// RegisterResponseHook registers an extension to process incoming responses
func (rm *RequestManager) RegisterResponseHook(
	hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
241 242
	response := make(chan graphsync.UnregisterHookFunc)
	select {
243
	case rm.messages <- &registerResponseHookMessage{hook, response}:
244 245 246
	case <-rm.ctx.Done():
		return nil
	}
247
	select {
248 249
	case unregister := <-response:
		return unregister
250
	case <-rm.ctx.Done():
251
		return nil
252 253 254
	}
}

255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
// Startup starts processing for the WantManager.
func (rm *RequestManager) Startup() {
	go rm.run()
}

// Shutdown ends processing for the want manager.
func (rm *RequestManager) Shutdown() {
	rm.cancel()
}

func (rm *RequestManager) run() {
	// NOTE: Do not open any streams or connections from anywhere in this
	// event loop. Really, just don't do anything likely to block.
	defer rm.cleanupInProcessRequests()

	for {
		select {
		case message := <-rm.messages:
			message.handle(rm)
		case <-rm.ctx.Done():
			return
		}
	}
}

func (rm *RequestManager) cleanupInProcessRequests() {
	for _, requestStatus := range rm.inProgressRequestStatuses {
282
		requestStatus.cancelFn()
283 284 285
	}
}

286
type terminateRequestMessage struct {
287
	requestID graphsync.RequestID
288
}
289

290
func (nrm *newRequestMessage) handle(rm *RequestManager) {
291 292 293
	requestID := rm.nextRequestID
	rm.nextRequestID++

294
	inProgressChan, inProgressErr := rm.setupRequest(requestID, nrm.p, nrm.root, nrm.selector, nrm.extensions)
295 296 297

	select {
	case nrm.inProgressRequestChan <- inProgressRequest{
298 299 300
		requestID:     requestID,
		incoming:      inProgressChan,
		incomingError: inProgressErr,
301 302 303 304 305
	}:
	case <-rm.ctx.Done():
	}
}

306 307 308 309 310
func (trm *terminateRequestMessage) handle(rm *RequestManager) {
	delete(rm.inProgressRequestStatuses, trm.requestID)
	rm.asyncLoader.CleanupRequest(trm.requestID)
}

311 312 313 314 315 316
func (crm *cancelRequestMessage) handle(rm *RequestManager) {
	inProgressRequestStatus, ok := rm.inProgressRequestStatuses[crm.requestID]
	if !ok {
		return
	}

317
	rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID))
318
	delete(rm.inProgressRequestStatuses, crm.requestID)
319
	inProgressRequestStatus.cancelFn()
320 321 322
}

func (prm *processResponseMessage) handle(rm *RequestManager) {
323
	filteredResponses := rm.filterResponsesForPeer(prm.responses, prm.p)
324
	filteredResponses = rm.processExtensions(filteredResponses, prm.p)
325
	responseMetadata := metadataForResponses(filteredResponses)
326 327 328 329
	rm.asyncLoader.ProcessResponse(responseMetadata, prm.blks)
	rm.processTerminations(filteredResponses)
}

330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
func (rhm *registerRequestHookMessage) handle(rm *RequestManager) {
	rh := requestHook{rm.hooksNextKey, rhm.hook}
	rm.hooksNextKey++
	rm.requestHooks = append(rm.requestHooks, rh)
	select {
	case rhm.unregisterHookChan <- func() {
		for i, matchHook := range rm.requestHooks {
			if rh.key == matchHook.key {
				rm.requestHooks = append(rm.requestHooks[:i], rm.requestHooks[i+1:]...)
				return
			}
		}
	}:
	case <-rm.ctx.Done():
	}
}

func (rhm *registerResponseHookMessage) handle(rm *RequestManager) {
	rh := responseHook{rm.hooksNextKey, rhm.hook}
	rm.hooksNextKey++
350 351 352 353 354 355 356 357 358 359 360 361
	rm.responseHooks = append(rm.responseHooks, rh)
	select {
	case rhm.unregisterHookChan <- func() {
		for i, matchHook := range rm.responseHooks {
			if rh.key == matchHook.key {
				rm.responseHooks = append(rm.responseHooks[:i], rm.responseHooks[i+1:]...)
				return
			}
		}
	}:
	case <-rm.ctx.Done():
	}
362 363
}

364 365 366 367 368 369
func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	responsesForPeer := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
		if !ok || requestStatus.p != p {
			continue
370
		}
371
		responsesForPeer = append(responsesForPeer, response)
372
	}
373 374
	return responsesForPeer
}
375

376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
func (rm *RequestManager) processExtensions(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	remainingResponses := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		success := rm.processExtensionsForResponse(p, response)
		if success {
			remainingResponses = append(remainingResponses, response)
		}
	}
	return remainingResponses
}

func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg.GraphSyncResponse) bool {
	for _, responseHook := range rm.responseHooks {
		err := responseHook.hook(p, response)
		if err != nil {
			requestStatus := rm.inProgressRequestStatuses[response.RequestID()]
			responseError := rm.generateResponseErrorFromStatus(graphsync.RequestFailedUnknown)
			select {
			case requestStatus.networkError <- responseError:
			case <-requestStatus.ctx.Done():
			}
			requestStatus.cancelFn()
			return false
		}
	}
	return true
}

404 405
func (rm *RequestManager) processTerminations(responses []gsmsg.GraphSyncResponse) {
	for _, response := range responses {
406
		if gsmsg.IsTerminalResponseCode(response.Status()) {
407 408 409 410 411 412
			if gsmsg.IsTerminalFailureCode(response.Status()) {
				requestStatus := rm.inProgressRequestStatuses[response.RequestID()]
				responseError := rm.generateResponseErrorFromStatus(response.Status())
				select {
				case requestStatus.networkError <- responseError:
				case <-requestStatus.ctx.Done():
413
				}
414
				requestStatus.cancelFn()
415
			}
416 417
			rm.asyncLoader.CompleteResponsesFor(response.RequestID())
			delete(rm.inProgressRequestStatuses, response.RequestID())
418 419 420
		}
	}
}
421

422
func (rm *RequestManager) generateResponseErrorFromStatus(status graphsync.ResponseStatusCode) error {
423
	switch status {
424
	case graphsync.RequestFailedBusy:
425
		return fmt.Errorf("Request Failed - Peer Is Busy")
426
	case graphsync.RequestFailedContentNotFound:
427
		return fmt.Errorf("Request Failed - Content Not Found")
428
	case graphsync.RequestFailedLegal:
429
		return fmt.Errorf("Request Failed - For Legal Reasons")
430
	case graphsync.RequestFailedUnknown:
431
		return fmt.Errorf("Request Failed - Unknown Reason")
432
	default:
433
		return fmt.Errorf("Unknown")
434 435
	}
}
436

437 438 439 440 441 442 443 444 445 446 447 448 449
type hookActions struct {
	persistenceOption  string
	nodeBuilderChooser traversal.NodeBuilderChooser
}

func (ha *hookActions) UsePersistenceOption(name string) {
	ha.persistenceOption = name
}

func (ha *hookActions) UseNodeBuilderChooser(nodeBuilderChooser traversal.NodeBuilderChooser) {
	ha.nodeBuilderChooser = nodeBuilderChooser
}

450
func (rm *RequestManager) setupRequest(requestID graphsync.RequestID, p peer.ID, root ipld.Link, selectorSpec ipld.Node, extensions []graphsync.ExtensionData) (chan graphsync.ResponseProgress, chan error) {
451
	_, err := ipldutil.EncodeNode(selectorSpec)
452 453 454
	if err != nil {
		return rm.singleErrorResponse(err)
	}
455
	selector, err := ipldutil.ParseSelector(selectorSpec)
456 457 458
	if err != nil {
		return rm.singleErrorResponse(err)
	}
459 460 461 462
	asCidLink, ok := root.(cidlink.Link)
	if !ok {
		return rm.singleErrorResponse(fmt.Errorf("request failed: link has no cid"))
	}
463 464 465 466 467
	networkErrorChan := make(chan error, 1)
	ctx, cancel := context.WithCancel(rm.ctx)
	rm.inProgressRequestStatuses[requestID] = &inProgressRequestStatus{
		ctx, cancel, p, networkErrorChan,
	}
468
	request := gsmsg.NewRequest(requestID, asCidLink.Cid, selectorSpec, defaultPriority, extensions...)
469 470 471 472 473 474 475 476 477 478
	ha := &hookActions{}
	for _, hook := range rm.requestHooks {
		hook.hook(p, request, ha)
	}
	err = rm.asyncLoader.StartRequest(requestID, ha.persistenceOption)
	if err != nil {
		return rm.singleErrorResponse(err)
	}
	rm.peerHandler.SendRequest(p, request)
	return rm.executeTraversal(ctx, requestID, root, selector, ha.nodeBuilderChooser, networkErrorChan)
479 480 481 482
}

func (rm *RequestManager) executeTraversal(
	ctx context.Context,
483
	requestID graphsync.RequestID,
484
	root ipld.Link,
485
	selector selector.Selector,
486
	nodeBuilderChooser traversal.NodeBuilderChooser,
487
	networkErrorChan chan error,
488 489
) (chan graphsync.ResponseProgress, chan error) {
	inProgressChan := make(chan graphsync.ResponseProgress)
490 491 492 493
	inProgressErr := make(chan error)
	loaderFn := loader.WrapAsyncLoader(ctx, rm.asyncLoader.AsyncLoad, requestID, inProgressErr)
	visitor := visitToChannel(ctx, inProgressChan)
	go func() {
494
		_ = ipldutil.Traverse(ctx, loaderFn, nodeBuilderChooser, root, selector, visitor)
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511
		select {
		case networkError := <-networkErrorChan:
			select {
			case <-rm.ctx.Done():
			case inProgressErr <- networkError:
			}
		default:
		}
		select {
		case <-ctx.Done():
		case rm.messages <- &terminateRequestMessage{requestID}:
		}
		close(inProgressChan)
		close(inProgressErr)
	}()
	return inProgressChan, inProgressErr
}