requestmanager.go 14.8 KB
Newer Older
1 2 3 4 5
package requestmanager

import (
	"context"
	"fmt"
Hannah Howard's avatar
Hannah Howard committed
6
	"sync/atomic"
7

Hannah Howard's avatar
Hannah Howard committed
8 9
	"github.com/ipfs/go-graphsync/requestmanager/hooks"

10
	blocks "github.com/ipfs/go-block-format"
11
	"github.com/ipfs/go-graphsync"
12
	ipldutil "github.com/ipfs/go-graphsync/ipldutil"
13
	gsmsg "github.com/ipfs/go-graphsync/message"
14 15 16 17 18
	"github.com/ipfs/go-graphsync/metadata"
	"github.com/ipfs/go-graphsync/requestmanager/loader"
	"github.com/ipfs/go-graphsync/requestmanager/types"
	logging "github.com/ipfs/go-log"
	"github.com/ipld/go-ipld-prime"
19
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
20
	"github.com/ipld/go-ipld-prime/traversal/selector"
21
	"github.com/libp2p/go-libp2p-core/peer"
22 23
)

24
var log = logging.Logger("graphsync")
25 26

const (
27 28
	// defaultPriority is the default priority for requests sent by graphsync
	defaultPriority = graphsync.Priority(0)
29 30 31
)

type inProgressRequestStatus struct {
32 33 34 35
	ctx          context.Context
	cancelFn     func()
	p            peer.ID
	networkError chan error
Hannah Howard's avatar
Hannah Howard committed
36
	lastResponse atomic.Value
37 38 39 40
}

// PeerHandler is an interface that can send requests to peers
type PeerHandler interface {
41
	SendRequest(p peer.ID, graphSyncRequest gsmsg.GraphSyncRequest)
42 43
}

44 45 46
// AsyncLoader is an interface for loading links asynchronously, returning
// results as new responses are processed
type AsyncLoader interface {
47
	StartRequest(graphsync.RequestID, string) error
48
	ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
49
		blks []blocks.Block)
50 51 52
	AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult
	CompleteResponsesFor(requestID graphsync.RequestID)
	CleanupRequest(requestID graphsync.RequestID)
53 54
}

55 56 57 58 59 60 61 62
// RequestManager tracks outgoing requests and processes incoming reponses
// to them.
type RequestManager struct {
	ctx         context.Context
	cancel      func()
	messages    chan requestManagerMessage
	peerHandler PeerHandler
	rc          *responseCollector
63
	asyncLoader AsyncLoader
64
	// dont touch out side of run loop
65 66
	nextRequestID             graphsync.RequestID
	inProgressRequestStatuses map[graphsync.RequestID]*inProgressRequestStatus
Hannah Howard's avatar
Hannah Howard committed
67 68 69
	requestHooks              RequestHooks
	responseHooks             ResponseHooks
	blockHooks                BlockHooks
70 71 72 73 74 75
}

type requestManagerMessage interface {
	handle(rm *RequestManager)
}

Hannah Howard's avatar
Hannah Howard committed
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
// RequestHooks run for new requests
type RequestHooks interface {
	ProcessRequestHooks(p peer.ID, request graphsync.RequestData) hooks.RequestResult
}

// ResponseHooks run for new responses
type ResponseHooks interface {
	ProcessResponseHooks(p peer.ID, response graphsync.ResponseData) hooks.UpdateResult
}

// BlockHooks run for each block loaded
type BlockHooks interface {
	ProcessBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) hooks.UpdateResult
}

91
// New generates a new request manager from a context, network, and selectorQuerier
Hannah Howard's avatar
Hannah Howard committed
92 93
func New(ctx context.Context,
	asyncLoader AsyncLoader,
Hannah Howard's avatar
Hannah Howard committed
94 95 96
	requestHooks RequestHooks,
	responseHooks ResponseHooks,
	blockHooks BlockHooks) *RequestManager {
97 98 99 100
	ctx, cancel := context.WithCancel(ctx)
	return &RequestManager{
		ctx:                       ctx,
		cancel:                    cancel,
101
		asyncLoader:               asyncLoader,
102 103
		rc:                        newResponseCollector(ctx),
		messages:                  make(chan requestManagerMessage, 16),
104
		inProgressRequestStatuses: make(map[graphsync.RequestID]*inProgressRequestStatus),
Hannah Howard's avatar
Hannah Howard committed
105 106
		requestHooks:              requestHooks,
		responseHooks:             responseHooks,
Hannah Howard's avatar
Hannah Howard committed
107
		blockHooks:                blockHooks,
108 109 110 111 112 113 114 115 116
	}
}

// SetDelegate specifies who will send messages out to the internet.
func (rm *RequestManager) SetDelegate(peerHandler PeerHandler) {
	rm.peerHandler = peerHandler
}

type inProgressRequest struct {
117 118
	requestID     graphsync.RequestID
	incoming      chan graphsync.ResponseProgress
119
	incomingError chan error
120 121 122 123
}

type newRequestMessage struct {
	p                     peer.ID
124
	root                  ipld.Link
125
	selector              ipld.Node
126
	extensions            []graphsync.ExtensionData
127 128 129 130
	inProgressRequestChan chan<- inProgressRequest
}

// SendRequest initiates a new GraphSync request to the given peer.
131 132
func (rm *RequestManager) SendRequest(ctx context.Context,
	p peer.ID,
133
	root ipld.Link,
134 135
	selector ipld.Node,
	extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
136
	if _, err := ipldutil.ParseSelector(selector); err != nil {
137
		return rm.singleErrorResponse(fmt.Errorf("Invalid Selector Spec"))
138 139 140 141 142
	}

	inProgressRequestChan := make(chan inProgressRequest)

	select {
143
	case rm.messages <- &newRequestMessage{p, root, selector, extensions, inProgressRequestChan}:
144
	case <-rm.ctx.Done():
145
		return rm.emptyResponse()
146
	case <-ctx.Done():
147
		return rm.emptyResponse()
148 149 150 151
	}
	var receivedInProgressRequest inProgressRequest
	select {
	case <-rm.ctx.Done():
152
		return rm.emptyResponse()
153 154 155
	case receivedInProgressRequest = <-inProgressRequestChan:
	}

156 157 158 159 160 161 162 163 164 165
	return rm.rc.collectResponses(ctx,
		receivedInProgressRequest.incoming,
		receivedInProgressRequest.incomingError,
		func() {
			rm.cancelRequest(receivedInProgressRequest.requestID,
				receivedInProgressRequest.incoming,
				receivedInProgressRequest.incomingError)
		})
}

166 167
func (rm *RequestManager) emptyResponse() (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
168
	close(ch)
169
	errCh := make(chan error)
170 171 172 173
	close(errCh)
	return ch, errCh
}

174 175
func (rm *RequestManager) singleErrorResponse(err error) (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
176
	close(ch)
177
	errCh := make(chan error, 1)
178
	errCh <- err
179 180
	close(errCh)
	return ch, errCh
181 182 183
}

type cancelRequestMessage struct {
184
	requestID graphsync.RequestID
185 186
}

187 188
func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID,
	incomingResponses chan graphsync.ResponseProgress,
189
	incomingErrors chan error) {
190
	cancelMessageChannel := rm.messages
191
	for cancelMessageChannel != nil || incomingResponses != nil || incomingErrors != nil {
192 193 194 195 196 197 198
		select {
		case cancelMessageChannel <- &cancelRequestMessage{requestID}:
			cancelMessageChannel = nil
		// clear out any remaining responses, in case and "incoming reponse"
		// messages get processed before our cancel message
		case _, ok := <-incomingResponses:
			if !ok {
199 200 201 202 203
				incomingResponses = nil
			}
		case _, ok := <-incomingErrors:
			if !ok {
				incomingErrors = nil
204 205 206 207 208 209 210 211
			}
		case <-rm.ctx.Done():
			return
		}
	}
}

type processResponseMessage struct {
212
	p         peer.ID
213 214
	responses []gsmsg.GraphSyncResponse
	blks      []blocks.Block
215 216 217 218
}

// ProcessResponses ingests the given responses from the network and
// and updates the in progress requests based on those responses.
219
func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse,
220
	blks []blocks.Block) {
221
	select {
222
	case rm.messages <- &processResponseMessage{p, responses, blks}:
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
	case <-rm.ctx.Done():
	}
}

// Startup starts processing for the WantManager.
func (rm *RequestManager) Startup() {
	go rm.run()
}

// Shutdown ends processing for the want manager.
func (rm *RequestManager) Shutdown() {
	rm.cancel()
}

func (rm *RequestManager) run() {
	// NOTE: Do not open any streams or connections from anywhere in this
	// event loop. Really, just don't do anything likely to block.
	defer rm.cleanupInProcessRequests()

	for {
		select {
		case message := <-rm.messages:
			message.handle(rm)
		case <-rm.ctx.Done():
			return
		}
	}
}

func (rm *RequestManager) cleanupInProcessRequests() {
	for _, requestStatus := range rm.inProgressRequestStatuses {
254
		requestStatus.cancelFn()
255 256 257
	}
}

258
type terminateRequestMessage struct {
259
	requestID graphsync.RequestID
260
}
261

262
func (nrm *newRequestMessage) handle(rm *RequestManager) {
Hannah Howard's avatar
Hannah Howard committed
263 264
	var ipr inProgressRequest
	ipr.requestID = rm.nextRequestID
265
	rm.nextRequestID++
Hannah Howard's avatar
Hannah Howard committed
266 267 268 269 270 271
	request, hooksResult, selector, err := rm.validateRequest(ipr.requestID, nrm.p, nrm.root, nrm.selector, nrm.extensions)
	if err != nil {
		ipr.incoming, ipr.incomingError = rm.singleErrorResponse(err)
	} else {
		ipr.incoming, ipr.incomingError = rm.setupRequest(nrm.p, request, hooksResult, selector)
	}
272 273

	select {
Hannah Howard's avatar
Hannah Howard committed
274
	case nrm.inProgressRequestChan <- ipr:
275 276 277 278
	case <-rm.ctx.Done():
	}
}

279 280 281 282 283
func (trm *terminateRequestMessage) handle(rm *RequestManager) {
	delete(rm.inProgressRequestStatuses, trm.requestID)
	rm.asyncLoader.CleanupRequest(trm.requestID)
}

284 285 286 287 288 289
func (crm *cancelRequestMessage) handle(rm *RequestManager) {
	inProgressRequestStatus, ok := rm.inProgressRequestStatuses[crm.requestID]
	if !ok {
		return
	}

290
	rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID))
291
	delete(rm.inProgressRequestStatuses, crm.requestID)
292
	inProgressRequestStatus.cancelFn()
293 294 295
}

func (prm *processResponseMessage) handle(rm *RequestManager) {
296
	filteredResponses := rm.filterResponsesForPeer(prm.responses, prm.p)
297
	filteredResponses = rm.processExtensions(filteredResponses, prm.p)
Hannah Howard's avatar
Hannah Howard committed
298
	rm.updateLastResponses(filteredResponses)
299
	responseMetadata := metadataForResponses(filteredResponses)
300 301 302 303 304 305 306 307 308 309
	rm.asyncLoader.ProcessResponse(responseMetadata, prm.blks)
	rm.processTerminations(filteredResponses)
}

func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	responsesForPeer := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
		if !ok || requestStatus.p != p {
			continue
310
		}
311
		responsesForPeer = append(responsesForPeer, response)
312
	}
313 314
	return responsesForPeer
}
315

316 317 318 319 320 321 322 323 324 325 326
func (rm *RequestManager) processExtensions(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	remainingResponses := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		success := rm.processExtensionsForResponse(p, response)
		if success {
			remainingResponses = append(remainingResponses, response)
		}
	}
	return remainingResponses
}

Hannah Howard's avatar
Hannah Howard committed
327 328 329 330 331 332
func (rm *RequestManager) updateLastResponses(responses []gsmsg.GraphSyncResponse) {
	for _, response := range responses {
		rm.inProgressRequestStatuses[response.RequestID()].lastResponse.Store(response)
	}
}

333
func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg.GraphSyncResponse) bool {
Hannah Howard's avatar
Hannah Howard committed
334 335 336 337 338 339 340 341 342 343 344
	result := rm.responseHooks.ProcessResponseHooks(p, response)
	if len(result.Extensions) > 0 {
		updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...)
		rm.peerHandler.SendRequest(p, updateRequest)
	}
	if result.Err != nil {
		requestStatus := rm.inProgressRequestStatuses[response.RequestID()]
		responseError := rm.generateResponseErrorFromStatus(graphsync.RequestFailedUnknown)
		select {
		case requestStatus.networkError <- responseError:
		case <-requestStatus.ctx.Done():
345
		}
Hannah Howard's avatar
Hannah Howard committed
346
		rm.peerHandler.SendRequest(p, gsmsg.CancelRequest(response.RequestID()))
Hannah Howard's avatar
Hannah Howard committed
347 348
		requestStatus.cancelFn()
		return false
349 350 351 352
	}
	return true
}

353 354
func (rm *RequestManager) processTerminations(responses []gsmsg.GraphSyncResponse) {
	for _, response := range responses {
355
		if gsmsg.IsTerminalResponseCode(response.Status()) {
356 357 358 359 360 361
			if gsmsg.IsTerminalFailureCode(response.Status()) {
				requestStatus := rm.inProgressRequestStatuses[response.RequestID()]
				responseError := rm.generateResponseErrorFromStatus(response.Status())
				select {
				case requestStatus.networkError <- responseError:
				case <-requestStatus.ctx.Done():
362
				}
363
				requestStatus.cancelFn()
364
			}
365 366
			rm.asyncLoader.CompleteResponsesFor(response.RequestID())
			delete(rm.inProgressRequestStatuses, response.RequestID())
367 368 369
		}
	}
}
370

371
func (rm *RequestManager) generateResponseErrorFromStatus(status graphsync.ResponseStatusCode) error {
372
	switch status {
373
	case graphsync.RequestFailedBusy:
374
		return fmt.Errorf("Request Failed - Peer Is Busy")
375
	case graphsync.RequestFailedContentNotFound:
376
		return fmt.Errorf("Request Failed - Content Not Found")
377
	case graphsync.RequestFailedLegal:
378
		return fmt.Errorf("Request Failed - For Legal Reasons")
379
	case graphsync.RequestFailedUnknown:
380
		return fmt.Errorf("Request Failed - Unknown Reason")
381
	default:
382
		return fmt.Errorf("Unknown")
383 384
	}
}
385

Hannah Howard's avatar
Hannah Howard committed
386 387 388 389 390
func (rm *RequestManager) processBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) error {
	result := rm.blockHooks.ProcessBlockHooks(p, response, block)
	if len(result.Extensions) > 0 {
		updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...)
		rm.peerHandler.SendRequest(p, updateRequest)
391
	}
Hannah Howard's avatar
Hannah Howard committed
392 393 394 395 396
	if result.Err != nil {
		select {
		case <-rm.ctx.Done():
		case rm.messages <- &cancelRequestMessage{response.RequestID()}:
		}
397
	}
Hannah Howard's avatar
Hannah Howard committed
398 399 400 401
	return result.Err
}

func (rm *RequestManager) setupRequest(p peer.ID, request gsmsg.GraphSyncRequest, hooksResult hooks.RequestResult, selector selector.Selector) (chan graphsync.ResponseProgress, chan error) {
402 403
	networkErrorChan := make(chan error, 1)
	ctx, cancel := context.WithCancel(rm.ctx)
Hannah Howard's avatar
Hannah Howard committed
404 405
	requestStatus := &inProgressRequestStatus{
		ctx: ctx, cancelFn: cancel, p: p, networkError: networkErrorChan,
406
	}
Hannah Howard's avatar
Hannah Howard committed
407 408 409
	lastResponse := &requestStatus.lastResponse
	rm.inProgressRequestStatuses[request.ID()] = requestStatus
	lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged))
410
	rm.peerHandler.SendRequest(p, request)
411
	inProgressChan := make(chan graphsync.ResponseProgress)
412 413
	inProgressErr := make(chan error)
	go func() {
Hannah Howard's avatar
Hannah Howard committed
414 415 416 417 418 419
		loaderFn := loader.WrapAsyncLoader(ctx, rm.asyncLoader.AsyncLoad, request.ID(), inProgressErr, func(bd graphsync.BlockData) error {
			response := lastResponse.Load().(gsmsg.GraphSyncResponse)
			return rm.processBlockHooks(p, response, bd)
		})
		visitor := visitToChannel(ctx, inProgressChan)
		_ = ipldutil.Traverse(ctx, loaderFn, hooksResult.CustomChooser, cidlink.Link{Cid: request.Root()}, selector, visitor)
420 421 422 423 424 425 426 427 428 429
		select {
		case networkError := <-networkErrorChan:
			select {
			case <-rm.ctx.Done():
			case inProgressErr <- networkError:
			}
		default:
		}
		select {
		case <-ctx.Done():
Hannah Howard's avatar
Hannah Howard committed
430
		case rm.messages <- &terminateRequestMessage{request.ID()}:
431 432 433 434 435 436
		}
		close(inProgressChan)
		close(inProgressErr)
	}()
	return inProgressChan, inProgressErr
}
Hannah Howard's avatar
Hannah Howard committed
437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458

func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer.ID, root ipld.Link, selectorSpec ipld.Node, extensions []graphsync.ExtensionData) (gsmsg.GraphSyncRequest, hooks.RequestResult, selector.Selector, error) {
	_, err := ipldutil.EncodeNode(selectorSpec)
	if err != nil {
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, nil, err
	}
	selector, err := ipldutil.ParseSelector(selectorSpec)
	if err != nil {
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, nil, err
	}
	asCidLink, ok := root.(cidlink.Link)
	if !ok {
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, nil, fmt.Errorf("request failed: link has no cid")
	}
	request := gsmsg.NewRequest(requestID, asCidLink.Cid, selectorSpec, defaultPriority, extensions...)
	hooksResult := rm.requestHooks.ProcessRequestHooks(p, request)
	err = rm.asyncLoader.StartRequest(requestID, hooksResult.PersistenceOption)
	if err != nil {
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, nil, err
	}
	return request, hooksResult, selector, nil
}