requestmanager.go 12.9 KB
Newer Older
1 2 3 4 5 6
package requestmanager

import (
	"context"
	"fmt"

Hannah Howard's avatar
Hannah Howard committed
7 8
	"github.com/ipfs/go-graphsync/requestmanager/hooks"

9
	blocks "github.com/ipfs/go-block-format"
10
	"github.com/ipfs/go-graphsync"
11
	ipldutil "github.com/ipfs/go-graphsync/ipldutil"
12
	gsmsg "github.com/ipfs/go-graphsync/message"
13 14 15 16 17
	"github.com/ipfs/go-graphsync/metadata"
	"github.com/ipfs/go-graphsync/requestmanager/loader"
	"github.com/ipfs/go-graphsync/requestmanager/types"
	logging "github.com/ipfs/go-log"
	"github.com/ipld/go-ipld-prime"
18
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
19
	"github.com/ipld/go-ipld-prime/traversal"
20
	"github.com/ipld/go-ipld-prime/traversal/selector"
21
	"github.com/libp2p/go-libp2p-core/peer"
22 23
)

24
var log = logging.Logger("graphsync")
25 26

const (
27 28
	// defaultPriority is the default priority for requests sent by graphsync
	defaultPriority = graphsync.Priority(0)
29 30 31
)

type inProgressRequestStatus struct {
32 33 34 35
	ctx          context.Context
	cancelFn     func()
	p            peer.ID
	networkError chan error
36 37 38 39
}

// PeerHandler is an interface that can send requests to peers
type PeerHandler interface {
40
	SendRequest(p peer.ID, graphSyncRequest gsmsg.GraphSyncRequest)
41 42
}

43 44 45
// AsyncLoader is an interface for loading links asynchronously, returning
// results as new responses are processed
type AsyncLoader interface {
46
	StartRequest(graphsync.RequestID, string) error
47
	ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
48
		blks []blocks.Block)
49 50 51
	AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult
	CompleteResponsesFor(requestID graphsync.RequestID)
	CleanupRequest(requestID graphsync.RequestID)
52 53
}

54 55 56 57 58 59 60 61
// RequestManager tracks outgoing requests and processes incoming reponses
// to them.
type RequestManager struct {
	ctx         context.Context
	cancel      func()
	messages    chan requestManagerMessage
	peerHandler PeerHandler
	rc          *responseCollector
62
	asyncLoader AsyncLoader
63
	// dont touch out side of run loop
64 65
	nextRequestID             graphsync.RequestID
	inProgressRequestStatuses map[graphsync.RequestID]*inProgressRequestStatus
Hannah Howard's avatar
Hannah Howard committed
66 67
	requestHooks              *hooks.OutgoingRequestHooks
	responseHooks             *hooks.IncomingResponseHooks
68 69 70 71 72 73 74
}

type requestManagerMessage interface {
	handle(rm *RequestManager)
}

// New generates a new request manager from a context, network, and selectorQuerier
Hannah Howard's avatar
Hannah Howard committed
75 76 77 78
func New(ctx context.Context,
	asyncLoader AsyncLoader,
	requestHooks *hooks.OutgoingRequestHooks,
	responseHooks *hooks.IncomingResponseHooks) *RequestManager {
79 80 81 82
	ctx, cancel := context.WithCancel(ctx)
	return &RequestManager{
		ctx:                       ctx,
		cancel:                    cancel,
83
		asyncLoader:               asyncLoader,
84 85
		rc:                        newResponseCollector(ctx),
		messages:                  make(chan requestManagerMessage, 16),
86
		inProgressRequestStatuses: make(map[graphsync.RequestID]*inProgressRequestStatus),
Hannah Howard's avatar
Hannah Howard committed
87 88
		requestHooks:              requestHooks,
		responseHooks:             responseHooks,
89 90 91 92 93 94 95 96 97
	}
}

// SetDelegate specifies who will send messages out to the internet.
func (rm *RequestManager) SetDelegate(peerHandler PeerHandler) {
	rm.peerHandler = peerHandler
}

type inProgressRequest struct {
98 99
	requestID     graphsync.RequestID
	incoming      chan graphsync.ResponseProgress
100
	incomingError chan error
101 102 103 104
}

type newRequestMessage struct {
	p                     peer.ID
105
	root                  ipld.Link
106
	selector              ipld.Node
107
	extensions            []graphsync.ExtensionData
108 109 110 111
	inProgressRequestChan chan<- inProgressRequest
}

// SendRequest initiates a new GraphSync request to the given peer.
112 113
func (rm *RequestManager) SendRequest(ctx context.Context,
	p peer.ID,
114
	root ipld.Link,
115 116
	selector ipld.Node,
	extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
117
	if _, err := ipldutil.ParseSelector(selector); err != nil {
118
		return rm.singleErrorResponse(fmt.Errorf("Invalid Selector Spec"))
119 120 121 122 123
	}

	inProgressRequestChan := make(chan inProgressRequest)

	select {
124
	case rm.messages <- &newRequestMessage{p, root, selector, extensions, inProgressRequestChan}:
125
	case <-rm.ctx.Done():
126
		return rm.emptyResponse()
127
	case <-ctx.Done():
128
		return rm.emptyResponse()
129 130 131 132
	}
	var receivedInProgressRequest inProgressRequest
	select {
	case <-rm.ctx.Done():
133
		return rm.emptyResponse()
134 135 136
	case receivedInProgressRequest = <-inProgressRequestChan:
	}

137 138 139 140 141 142 143 144 145 146
	return rm.rc.collectResponses(ctx,
		receivedInProgressRequest.incoming,
		receivedInProgressRequest.incomingError,
		func() {
			rm.cancelRequest(receivedInProgressRequest.requestID,
				receivedInProgressRequest.incoming,
				receivedInProgressRequest.incomingError)
		})
}

147 148
func (rm *RequestManager) emptyResponse() (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
149
	close(ch)
150
	errCh := make(chan error)
151 152 153 154
	close(errCh)
	return ch, errCh
}

155 156
func (rm *RequestManager) singleErrorResponse(err error) (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
157
	close(ch)
158
	errCh := make(chan error, 1)
159
	errCh <- err
160 161
	close(errCh)
	return ch, errCh
162 163 164
}

type cancelRequestMessage struct {
165
	requestID graphsync.RequestID
166 167
}

168 169
func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID,
	incomingResponses chan graphsync.ResponseProgress,
170
	incomingErrors chan error) {
171
	cancelMessageChannel := rm.messages
172
	for cancelMessageChannel != nil || incomingResponses != nil || incomingErrors != nil {
173 174 175 176 177 178 179
		select {
		case cancelMessageChannel <- &cancelRequestMessage{requestID}:
			cancelMessageChannel = nil
		// clear out any remaining responses, in case and "incoming reponse"
		// messages get processed before our cancel message
		case _, ok := <-incomingResponses:
			if !ok {
180 181 182 183 184
				incomingResponses = nil
			}
		case _, ok := <-incomingErrors:
			if !ok {
				incomingErrors = nil
185 186 187 188 189 190 191 192
			}
		case <-rm.ctx.Done():
			return
		}
	}
}

type processResponseMessage struct {
193
	p         peer.ID
194 195
	responses []gsmsg.GraphSyncResponse
	blks      []blocks.Block
196 197 198 199
}

// ProcessResponses ingests the given responses from the network and
// and updates the in progress requests based on those responses.
200
func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse,
201
	blks []blocks.Block) {
202
	select {
203
	case rm.messages <- &processResponseMessage{p, responses, blks}:
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
	case <-rm.ctx.Done():
	}
}

// Startup starts processing for the WantManager.
func (rm *RequestManager) Startup() {
	go rm.run()
}

// Shutdown ends processing for the want manager.
func (rm *RequestManager) Shutdown() {
	rm.cancel()
}

func (rm *RequestManager) run() {
	// NOTE: Do not open any streams or connections from anywhere in this
	// event loop. Really, just don't do anything likely to block.
	defer rm.cleanupInProcessRequests()

	for {
		select {
		case message := <-rm.messages:
			message.handle(rm)
		case <-rm.ctx.Done():
			return
		}
	}
}

func (rm *RequestManager) cleanupInProcessRequests() {
	for _, requestStatus := range rm.inProgressRequestStatuses {
235
		requestStatus.cancelFn()
236 237 238
	}
}

239
type terminateRequestMessage struct {
240
	requestID graphsync.RequestID
241
}
242

243
func (nrm *newRequestMessage) handle(rm *RequestManager) {
244 245 246
	requestID := rm.nextRequestID
	rm.nextRequestID++

247
	inProgressChan, inProgressErr := rm.setupRequest(requestID, nrm.p, nrm.root, nrm.selector, nrm.extensions)
248 249 250

	select {
	case nrm.inProgressRequestChan <- inProgressRequest{
251 252 253
		requestID:     requestID,
		incoming:      inProgressChan,
		incomingError: inProgressErr,
254 255 256 257 258
	}:
	case <-rm.ctx.Done():
	}
}

259 260 261 262 263
func (trm *terminateRequestMessage) handle(rm *RequestManager) {
	delete(rm.inProgressRequestStatuses, trm.requestID)
	rm.asyncLoader.CleanupRequest(trm.requestID)
}

264 265 266 267 268 269
func (crm *cancelRequestMessage) handle(rm *RequestManager) {
	inProgressRequestStatus, ok := rm.inProgressRequestStatuses[crm.requestID]
	if !ok {
		return
	}

270
	rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID))
271
	delete(rm.inProgressRequestStatuses, crm.requestID)
272
	inProgressRequestStatus.cancelFn()
273 274 275
}

func (prm *processResponseMessage) handle(rm *RequestManager) {
276
	filteredResponses := rm.filterResponsesForPeer(prm.responses, prm.p)
277
	filteredResponses = rm.processExtensions(filteredResponses, prm.p)
278
	responseMetadata := metadataForResponses(filteredResponses)
279 280 281 282 283 284 285 286 287 288
	rm.asyncLoader.ProcessResponse(responseMetadata, prm.blks)
	rm.processTerminations(filteredResponses)
}

func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	responsesForPeer := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
		if !ok || requestStatus.p != p {
			continue
289
		}
290
		responsesForPeer = append(responsesForPeer, response)
291
	}
292 293
	return responsesForPeer
}
294

295 296 297 298 299 300 301 302 303 304 305 306
func (rm *RequestManager) processExtensions(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	remainingResponses := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		success := rm.processExtensionsForResponse(p, response)
		if success {
			remainingResponses = append(remainingResponses, response)
		}
	}
	return remainingResponses
}

func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg.GraphSyncResponse) bool {
Hannah Howard's avatar
Hannah Howard committed
307 308 309 310 311 312 313 314 315 316 317
	result := rm.responseHooks.ProcessResponseHooks(p, response)
	if len(result.Extensions) > 0 {
		updateRequest := gsmsg.UpdateRequest(response.RequestID(), result.Extensions...)
		rm.peerHandler.SendRequest(p, updateRequest)
	}
	if result.Err != nil {
		requestStatus := rm.inProgressRequestStatuses[response.RequestID()]
		responseError := rm.generateResponseErrorFromStatus(graphsync.RequestFailedUnknown)
		select {
		case requestStatus.networkError <- responseError:
		case <-requestStatus.ctx.Done():
318
		}
Hannah Howard's avatar
Hannah Howard committed
319 320
		requestStatus.cancelFn()
		return false
321 322 323 324
	}
	return true
}

325 326
func (rm *RequestManager) processTerminations(responses []gsmsg.GraphSyncResponse) {
	for _, response := range responses {
327
		if gsmsg.IsTerminalResponseCode(response.Status()) {
328 329 330 331 332 333
			if gsmsg.IsTerminalFailureCode(response.Status()) {
				requestStatus := rm.inProgressRequestStatuses[response.RequestID()]
				responseError := rm.generateResponseErrorFromStatus(response.Status())
				select {
				case requestStatus.networkError <- responseError:
				case <-requestStatus.ctx.Done():
334
				}
335
				requestStatus.cancelFn()
336
			}
337 338
			rm.asyncLoader.CompleteResponsesFor(response.RequestID())
			delete(rm.inProgressRequestStatuses, response.RequestID())
339 340 341
		}
	}
}
342

343
func (rm *RequestManager) generateResponseErrorFromStatus(status graphsync.ResponseStatusCode) error {
344
	switch status {
345
	case graphsync.RequestFailedBusy:
346
		return fmt.Errorf("Request Failed - Peer Is Busy")
347
	case graphsync.RequestFailedContentNotFound:
348
		return fmt.Errorf("Request Failed - Content Not Found")
349
	case graphsync.RequestFailedLegal:
350
		return fmt.Errorf("Request Failed - For Legal Reasons")
351
	case graphsync.RequestFailedUnknown:
352
		return fmt.Errorf("Request Failed - Unknown Reason")
353
	default:
354
		return fmt.Errorf("Unknown")
355 356
	}
}
357

358
func (rm *RequestManager) setupRequest(requestID graphsync.RequestID, p peer.ID, root ipld.Link, selectorSpec ipld.Node, extensions []graphsync.ExtensionData) (chan graphsync.ResponseProgress, chan error) {
359
	_, err := ipldutil.EncodeNode(selectorSpec)
360 361 362
	if err != nil {
		return rm.singleErrorResponse(err)
	}
363
	selector, err := ipldutil.ParseSelector(selectorSpec)
364 365 366
	if err != nil {
		return rm.singleErrorResponse(err)
	}
367 368 369 370
	asCidLink, ok := root.(cidlink.Link)
	if !ok {
		return rm.singleErrorResponse(fmt.Errorf("request failed: link has no cid"))
	}
371 372 373 374 375
	networkErrorChan := make(chan error, 1)
	ctx, cancel := context.WithCancel(rm.ctx)
	rm.inProgressRequestStatuses[requestID] = &inProgressRequestStatus{
		ctx, cancel, p, networkErrorChan,
	}
376
	request := gsmsg.NewRequest(requestID, asCidLink.Cid, selectorSpec, defaultPriority, extensions...)
Hannah Howard's avatar
Hannah Howard committed
377 378
	hooksResult := rm.requestHooks.ProcessRequestHooks(p, request)
	err = rm.asyncLoader.StartRequest(requestID, hooksResult.PersistenceOption)
379 380 381 382
	if err != nil {
		return rm.singleErrorResponse(err)
	}
	rm.peerHandler.SendRequest(p, request)
Hannah Howard's avatar
Hannah Howard committed
383
	return rm.executeTraversal(ctx, requestID, root, selector, hooksResult.CustomChooser, networkErrorChan)
384 385 386 387
}

func (rm *RequestManager) executeTraversal(
	ctx context.Context,
388
	requestID graphsync.RequestID,
389
	root ipld.Link,
390
	selector selector.Selector,
391
	nodeBuilderChooser traversal.NodeBuilderChooser,
392
	networkErrorChan chan error,
393 394
) (chan graphsync.ResponseProgress, chan error) {
	inProgressChan := make(chan graphsync.ResponseProgress)
395 396 397 398
	inProgressErr := make(chan error)
	loaderFn := loader.WrapAsyncLoader(ctx, rm.asyncLoader.AsyncLoad, requestID, inProgressErr)
	visitor := visitToChannel(ctx, inProgressChan)
	go func() {
399
		_ = ipldutil.Traverse(ctx, loaderFn, nodeBuilderChooser, root, selector, visitor)
400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416
		select {
		case networkError := <-networkErrorChan:
			select {
			case <-rm.ctx.Done():
			case inProgressErr <- networkError:
			}
		default:
		}
		select {
		case <-ctx.Done():
		case rm.messages <- &terminateRequestMessage{requestID}:
		}
		close(inProgressChan)
		close(inProgressErr)
	}()
	return inProgressChan, inProgressErr
}