requestmanager.go 14.1 KB
Newer Older
1 2 3 4 5
package requestmanager

import (
	"context"
	"fmt"
Hannah Howard's avatar
Hannah Howard committed
6
	"sync/atomic"
7

8
	"github.com/ipfs/go-graphsync/requestmanager/executor"
Hannah Howard's avatar
Hannah Howard committed
9 10
	"github.com/ipfs/go-graphsync/requestmanager/hooks"

11
	blocks "github.com/ipfs/go-block-format"
12
	"github.com/ipfs/go-graphsync"
13
	ipldutil "github.com/ipfs/go-graphsync/ipldutil"
14
	gsmsg "github.com/ipfs/go-graphsync/message"
15 16 17 18
	"github.com/ipfs/go-graphsync/metadata"
	"github.com/ipfs/go-graphsync/requestmanager/types"
	logging "github.com/ipfs/go-log"
	"github.com/ipld/go-ipld-prime"
19
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
20
	"github.com/libp2p/go-libp2p-core/peer"
21 22
)

23
var log = logging.Logger("graphsync")
24 25

const (
26 27
	// defaultPriority is the default priority for requests sent by graphsync
	defaultPriority = graphsync.Priority(0)
28 29 30
)

type inProgressRequestStatus struct {
31 32 33 34
	ctx          context.Context
	cancelFn     func()
	p            peer.ID
	networkError chan error
Hannah Howard's avatar
Hannah Howard committed
35
	lastResponse atomic.Value
36 37 38 39
}

// PeerHandler is an interface that can send requests to peers
type PeerHandler interface {
40
	SendRequest(p peer.ID, graphSyncRequest gsmsg.GraphSyncRequest)
41 42
}

43 44 45
// AsyncLoader is an interface for loading links asynchronously, returning
// results as new responses are processed
type AsyncLoader interface {
46
	StartRequest(graphsync.RequestID, string) error
47
	ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
48
		blks []blocks.Block)
49 50 51
	AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult
	CompleteResponsesFor(requestID graphsync.RequestID)
	CleanupRequest(requestID graphsync.RequestID)
52 53
}

54 55 56 57 58 59 60 61
// RequestManager tracks outgoing requests and processes incoming reponses
// to them.
type RequestManager struct {
	ctx         context.Context
	cancel      func()
	messages    chan requestManagerMessage
	peerHandler PeerHandler
	rc          *responseCollector
62
	asyncLoader AsyncLoader
63
	// dont touch out side of run loop
64 65
	nextRequestID             graphsync.RequestID
	inProgressRequestStatuses map[graphsync.RequestID]*inProgressRequestStatus
Hannah Howard's avatar
Hannah Howard committed
66 67 68
	requestHooks              RequestHooks
	responseHooks             ResponseHooks
	blockHooks                BlockHooks
69 70 71 72 73 74
}

type requestManagerMessage interface {
	handle(rm *RequestManager)
}

Hannah Howard's avatar
Hannah Howard committed
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
// RequestHooks run for new requests
type RequestHooks interface {
	ProcessRequestHooks(p peer.ID, request graphsync.RequestData) hooks.RequestResult
}

// ResponseHooks run for new responses
type ResponseHooks interface {
	ProcessResponseHooks(p peer.ID, response graphsync.ResponseData) hooks.UpdateResult
}

// BlockHooks run for each block loaded
type BlockHooks interface {
	ProcessBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) hooks.UpdateResult
}

90
// New generates a new request manager from a context, network, and selectorQuerier
Hannah Howard's avatar
Hannah Howard committed
91 92
func New(ctx context.Context,
	asyncLoader AsyncLoader,
Hannah Howard's avatar
Hannah Howard committed
93 94 95
	requestHooks RequestHooks,
	responseHooks ResponseHooks,
	blockHooks BlockHooks) *RequestManager {
96 97 98 99
	ctx, cancel := context.WithCancel(ctx)
	return &RequestManager{
		ctx:                       ctx,
		cancel:                    cancel,
100
		asyncLoader:               asyncLoader,
101 102
		rc:                        newResponseCollector(ctx),
		messages:                  make(chan requestManagerMessage, 16),
103
		inProgressRequestStatuses: make(map[graphsync.RequestID]*inProgressRequestStatus),
Hannah Howard's avatar
Hannah Howard committed
104 105
		requestHooks:              requestHooks,
		responseHooks:             responseHooks,
Hannah Howard's avatar
Hannah Howard committed
106
		blockHooks:                blockHooks,
107 108 109 110 111 112 113 114 115
	}
}

// SetDelegate specifies who will send messages out to the internet.
func (rm *RequestManager) SetDelegate(peerHandler PeerHandler) {
	rm.peerHandler = peerHandler
}

type inProgressRequest struct {
116 117
	requestID     graphsync.RequestID
	incoming      chan graphsync.ResponseProgress
118
	incomingError chan error
119 120 121 122
}

type newRequestMessage struct {
	p                     peer.ID
123
	root                  ipld.Link
124
	selector              ipld.Node
125
	extensions            []graphsync.ExtensionData
126 127 128 129
	inProgressRequestChan chan<- inProgressRequest
}

// SendRequest initiates a new GraphSync request to the given peer.
130 131
func (rm *RequestManager) SendRequest(ctx context.Context,
	p peer.ID,
132
	root ipld.Link,
133 134
	selector ipld.Node,
	extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
135
	if _, err := ipldutil.ParseSelector(selector); err != nil {
136
		return rm.singleErrorResponse(fmt.Errorf("Invalid Selector Spec"))
137 138 139 140 141
	}

	inProgressRequestChan := make(chan inProgressRequest)

	select {
142
	case rm.messages <- &newRequestMessage{p, root, selector, extensions, inProgressRequestChan}:
143
	case <-rm.ctx.Done():
144
		return rm.emptyResponse()
145
	case <-ctx.Done():
146
		return rm.emptyResponse()
147 148 149 150
	}
	var receivedInProgressRequest inProgressRequest
	select {
	case <-rm.ctx.Done():
151
		return rm.emptyResponse()
152 153 154
	case receivedInProgressRequest = <-inProgressRequestChan:
	}

155 156 157 158 159 160 161 162 163 164
	return rm.rc.collectResponses(ctx,
		receivedInProgressRequest.incoming,
		receivedInProgressRequest.incomingError,
		func() {
			rm.cancelRequest(receivedInProgressRequest.requestID,
				receivedInProgressRequest.incoming,
				receivedInProgressRequest.incomingError)
		})
}

165 166
func (rm *RequestManager) emptyResponse() (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
167
	close(ch)
168
	errCh := make(chan error)
169 170 171 172
	close(errCh)
	return ch, errCh
}

173 174
func (rm *RequestManager) singleErrorResponse(err error) (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
175
	close(ch)
176
	errCh := make(chan error, 1)
177
	errCh <- err
178 179
	close(errCh)
	return ch, errCh
180 181 182
}

type cancelRequestMessage struct {
183
	requestID graphsync.RequestID
184 185
}

186 187
func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID,
	incomingResponses chan graphsync.ResponseProgress,
188
	incomingErrors chan error) {
189
	cancelMessageChannel := rm.messages
190
	for cancelMessageChannel != nil || incomingResponses != nil || incomingErrors != nil {
191 192 193 194 195 196 197
		select {
		case cancelMessageChannel <- &cancelRequestMessage{requestID}:
			cancelMessageChannel = nil
		// clear out any remaining responses, in case and "incoming reponse"
		// messages get processed before our cancel message
		case _, ok := <-incomingResponses:
			if !ok {
198 199 200 201 202
				incomingResponses = nil
			}
		case _, ok := <-incomingErrors:
			if !ok {
				incomingErrors = nil
203 204 205 206 207 208 209 210
			}
		case <-rm.ctx.Done():
			return
		}
	}
}

type processResponseMessage struct {
211
	p         peer.ID
212 213
	responses []gsmsg.GraphSyncResponse
	blks      []blocks.Block
214 215 216 217
}

// ProcessResponses ingests the given responses from the network and
// and updates the in progress requests based on those responses.
218
func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse,
219
	blks []blocks.Block) {
220
	select {
221
	case rm.messages <- &processResponseMessage{p, responses, blks}:
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
	case <-rm.ctx.Done():
	}
}

// Startup starts processing for the WantManager.
func (rm *RequestManager) Startup() {
	go rm.run()
}

// Shutdown ends processing for the want manager.
func (rm *RequestManager) Shutdown() {
	rm.cancel()
}

func (rm *RequestManager) run() {
	// NOTE: Do not open any streams or connections from anywhere in this
	// event loop. Really, just don't do anything likely to block.
	defer rm.cleanupInProcessRequests()

	for {
		select {
		case message := <-rm.messages:
			message.handle(rm)
		case <-rm.ctx.Done():
			return
		}
	}
}

func (rm *RequestManager) cleanupInProcessRequests() {
	for _, requestStatus := range rm.inProgressRequestStatuses {
253
		requestStatus.cancelFn()
254 255 256
	}
}

257
type terminateRequestMessage struct {
258
	requestID graphsync.RequestID
259
}
260

261
func (nrm *newRequestMessage) handle(rm *RequestManager) {
Hannah Howard's avatar
Hannah Howard committed
262 263
	var ipr inProgressRequest
	ipr.requestID = rm.nextRequestID
264
	rm.nextRequestID++
265
	request, hooksResult, err := rm.validateRequest(ipr.requestID, nrm.p, nrm.root, nrm.selector, nrm.extensions)
Hannah Howard's avatar
Hannah Howard committed
266 267 268
	if err != nil {
		ipr.incoming, ipr.incomingError = rm.singleErrorResponse(err)
	} else {
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
		ctx, cancel := context.WithCancel(rm.ctx)
		p := nrm.p
		requestStatus := &inProgressRequestStatus{
			ctx: ctx, cancelFn: cancel, p: p,
		}
		lastResponse := &requestStatus.lastResponse
		lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged))
		rm.inProgressRequestStatuses[request.ID()] = requestStatus
		ipr.incoming, ipr.incomingError = executor.RequestExecution{
			Request:     request,
			SendRequest: func(gsRequest gsmsg.GraphSyncRequest) { rm.peerHandler.SendRequest(p, gsRequest) },
			Loader:      rm.asyncLoader.AsyncLoad,
			RunBlockHooks: func(bd graphsync.BlockData) error {
				response := lastResponse.Load().(gsmsg.GraphSyncResponse)
				return rm.processBlockHooks(p, response, bd)
			},
			TerminateRequest: func() {
				select {
				case <-ctx.Done():
				case rm.messages <- &terminateRequestMessage{request.ID()}:
				}
			},
			NodeStyleChooser: hooksResult.CustomChooser,
		}.Start(ctx)
		requestStatus.networkError = ipr.incomingError
Hannah Howard's avatar
Hannah Howard committed
294
	}
295 296

	select {
Hannah Howard's avatar
Hannah Howard committed
297
	case nrm.inProgressRequestChan <- ipr:
298 299 300 301
	case <-rm.ctx.Done():
	}
}

302 303 304 305 306
func (trm *terminateRequestMessage) handle(rm *RequestManager) {
	delete(rm.inProgressRequestStatuses, trm.requestID)
	rm.asyncLoader.CleanupRequest(trm.requestID)
}

307 308 309 310 311 312
func (crm *cancelRequestMessage) handle(rm *RequestManager) {
	inProgressRequestStatus, ok := rm.inProgressRequestStatuses[crm.requestID]
	if !ok {
		return
	}

313
	rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID))
314
	delete(rm.inProgressRequestStatuses, crm.requestID)
315
	inProgressRequestStatus.cancelFn()
316 317 318
}

func (prm *processResponseMessage) handle(rm *RequestManager) {
319
	filteredResponses := rm.filterResponsesForPeer(prm.responses, prm.p)
320
	filteredResponses = rm.processExtensions(filteredResponses, prm.p)
Hannah Howard's avatar
Hannah Howard committed
321
	rm.updateLastResponses(filteredResponses)
322
	responseMetadata := metadataForResponses(filteredResponses)
323 324 325 326 327 328 329 330 331 332
	rm.asyncLoader.ProcessResponse(responseMetadata, prm.blks)
	rm.processTerminations(filteredResponses)
}

func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	responsesForPeer := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
		if !ok || requestStatus.p != p {
			continue
333
		}
334
		responsesForPeer = append(responsesForPeer, response)
335
	}
336 337
	return responsesForPeer
}
338

339 340 341 342 343 344 345 346 347 348 349
func (rm *RequestManager) processExtensions(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	remainingResponses := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		success := rm.processExtensionsForResponse(p, response)
		if success {
			remainingResponses = append(remainingResponses, response)
		}
	}
	return remainingResponses
}

Hannah Howard's avatar
Hannah Howard committed
350 351 352 353 354 355
func (rm *RequestManager) updateLastResponses(responses []gsmsg.GraphSyncResponse) {
	for _, response := range responses {
		rm.inProgressRequestStatuses[response.RequestID()].lastResponse.Store(response)
	}
}

356
func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg.GraphSyncResponse) bool {
Hannah Howard's avatar
Hannah Howard committed
357 358 359 360 361 362 363 364 365 366 367
	result := rm.responseHooks.ProcessResponseHooks(p, response)
	if len(result.Extensions) > 0 {
		updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...)
		rm.peerHandler.SendRequest(p, updateRequest)
	}
	if result.Err != nil {
		requestStatus := rm.inProgressRequestStatuses[response.RequestID()]
		responseError := rm.generateResponseErrorFromStatus(graphsync.RequestFailedUnknown)
		select {
		case requestStatus.networkError <- responseError:
		case <-requestStatus.ctx.Done():
368
		}
Hannah Howard's avatar
Hannah Howard committed
369
		rm.peerHandler.SendRequest(p, gsmsg.CancelRequest(response.RequestID()))
Hannah Howard's avatar
Hannah Howard committed
370 371
		requestStatus.cancelFn()
		return false
372 373 374 375
	}
	return true
}

376 377
func (rm *RequestManager) processTerminations(responses []gsmsg.GraphSyncResponse) {
	for _, response := range responses {
378
		if gsmsg.IsTerminalResponseCode(response.Status()) {
379 380 381 382 383 384
			if gsmsg.IsTerminalFailureCode(response.Status()) {
				requestStatus := rm.inProgressRequestStatuses[response.RequestID()]
				responseError := rm.generateResponseErrorFromStatus(response.Status())
				select {
				case requestStatus.networkError <- responseError:
				case <-requestStatus.ctx.Done():
385
				}
386
				requestStatus.cancelFn()
387
			}
388 389
			rm.asyncLoader.CompleteResponsesFor(response.RequestID())
			delete(rm.inProgressRequestStatuses, response.RequestID())
390 391 392
		}
	}
}
393

394
func (rm *RequestManager) generateResponseErrorFromStatus(status graphsync.ResponseStatusCode) error {
395
	switch status {
396
	case graphsync.RequestFailedBusy:
397
		return fmt.Errorf("Request Failed - Peer Is Busy")
398
	case graphsync.RequestFailedContentNotFound:
399
		return fmt.Errorf("Request Failed - Content Not Found")
400
	case graphsync.RequestFailedLegal:
401
		return fmt.Errorf("Request Failed - For Legal Reasons")
402
	case graphsync.RequestFailedUnknown:
403
		return fmt.Errorf("Request Failed - Unknown Reason")
404
	default:
405
		return fmt.Errorf("Unknown")
406 407
	}
}
408

Hannah Howard's avatar
Hannah Howard committed
409 410 411 412 413
func (rm *RequestManager) processBlockHooks(p peer.ID, response graphsync.ResponseData, block graphsync.BlockData) error {
	result := rm.blockHooks.ProcessBlockHooks(p, response, block)
	if len(result.Extensions) > 0 {
		updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...)
		rm.peerHandler.SendRequest(p, updateRequest)
414
	}
Hannah Howard's avatar
Hannah Howard committed
415 416 417 418 419
	if result.Err != nil {
		select {
		case <-rm.ctx.Done():
		case rm.messages <- &cancelRequestMessage{response.RequestID()}:
		}
420
	}
Hannah Howard's avatar
Hannah Howard committed
421 422 423
	return result.Err
}

424
func (rm *RequestManager) validateRequest(requestID graphsync.RequestID, p peer.ID, root ipld.Link, selectorSpec ipld.Node, extensions []graphsync.ExtensionData) (gsmsg.GraphSyncRequest, hooks.RequestResult, error) {
Hannah Howard's avatar
Hannah Howard committed
425 426
	_, err := ipldutil.EncodeNode(selectorSpec)
	if err != nil {
427
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
Hannah Howard's avatar
Hannah Howard committed
428
	}
429
	_, err = ipldutil.ParseSelector(selectorSpec)
Hannah Howard's avatar
Hannah Howard committed
430
	if err != nil {
431
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
Hannah Howard's avatar
Hannah Howard committed
432 433 434
	}
	asCidLink, ok := root.(cidlink.Link)
	if !ok {
435
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, fmt.Errorf("request failed: link has no cid")
Hannah Howard's avatar
Hannah Howard committed
436 437 438 439 440
	}
	request := gsmsg.NewRequest(requestID, asCidLink.Cid, selectorSpec, defaultPriority, extensions...)
	hooksResult := rm.requestHooks.ProcessRequestHooks(p, request)
	err = rm.asyncLoader.StartRequest(requestID, hooksResult.PersistenceOption)
	if err != nil {
441
		return gsmsg.GraphSyncRequest{}, hooks.RequestResult{}, err
Hannah Howard's avatar
Hannah Howard committed
442
	}
443
	return request, hooksResult, nil
Hannah Howard's avatar
Hannah Howard committed
444
}