requestmanager.go 12.6 KB
Newer Older
1 2 3 4 5 6 7
package requestmanager

import (
	"context"
	"fmt"
	"math"

8
	blocks "github.com/ipfs/go-block-format"
9
	"github.com/ipfs/go-graphsync"
10
	ipldutil "github.com/ipfs/go-graphsync/ipldutil"
11
	gsmsg "github.com/ipfs/go-graphsync/message"
12 13 14 15 16
	"github.com/ipfs/go-graphsync/metadata"
	"github.com/ipfs/go-graphsync/requestmanager/loader"
	"github.com/ipfs/go-graphsync/requestmanager/types"
	logging "github.com/ipfs/go-log"
	"github.com/ipld/go-ipld-prime"
17
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
18
	"github.com/ipld/go-ipld-prime/traversal/selector"
19
	"github.com/libp2p/go-libp2p-core/peer"
20 21
)

22
var log = logging.Logger("graphsync")
23 24 25

const (
	// maxPriority is the max priority as defined by the bitswap protocol
26
	maxPriority = graphsync.Priority(math.MaxInt32)
27 28 29
)

type inProgressRequestStatus struct {
30 31 32 33
	ctx          context.Context
	cancelFn     func()
	p            peer.ID
	networkError chan error
34 35
}

36 37 38 39
type responseHook struct {
	hook graphsync.OnResponseReceivedHook
}

40 41
// PeerHandler is an interface that can send requests to peers
type PeerHandler interface {
42
	SendRequest(p peer.ID, graphSyncRequest gsmsg.GraphSyncRequest)
43 44
}

45 46 47
// AsyncLoader is an interface for loading links asynchronously, returning
// results as new responses are processed
type AsyncLoader interface {
48 49
	StartRequest(requestID graphsync.RequestID)
	ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
50
		blks []blocks.Block)
51 52 53
	AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult
	CompleteResponsesFor(requestID graphsync.RequestID)
	CleanupRequest(requestID graphsync.RequestID)
54 55
}

56 57 58 59 60 61 62 63
// RequestManager tracks outgoing requests and processes incoming reponses
// to them.
type RequestManager struct {
	ctx         context.Context
	cancel      func()
	messages    chan requestManagerMessage
	peerHandler PeerHandler
	rc          *responseCollector
64
	asyncLoader AsyncLoader
65
	// dont touch out side of run loop
66 67
	nextRequestID             graphsync.RequestID
	inProgressRequestStatuses map[graphsync.RequestID]*inProgressRequestStatus
68
	responseHooks             []responseHook
69 70 71 72 73 74 75
}

type requestManagerMessage interface {
	handle(rm *RequestManager)
}

// New generates a new request manager from a context, network, and selectorQuerier
76
func New(ctx context.Context, asyncLoader AsyncLoader) *RequestManager {
77 78 79 80
	ctx, cancel := context.WithCancel(ctx)
	return &RequestManager{
		ctx:                       ctx,
		cancel:                    cancel,
81
		asyncLoader:               asyncLoader,
82 83
		rc:                        newResponseCollector(ctx),
		messages:                  make(chan requestManagerMessage, 16),
84
		inProgressRequestStatuses: make(map[graphsync.RequestID]*inProgressRequestStatus),
85 86 87 88 89 90 91 92 93
	}
}

// SetDelegate specifies who will send messages out to the internet.
func (rm *RequestManager) SetDelegate(peerHandler PeerHandler) {
	rm.peerHandler = peerHandler
}

type inProgressRequest struct {
94 95
	requestID     graphsync.RequestID
	incoming      chan graphsync.ResponseProgress
96
	incomingError chan error
97 98 99 100
}

type newRequestMessage struct {
	p                     peer.ID
101
	root                  ipld.Link
102
	selector              ipld.Node
103
	extensions            []graphsync.ExtensionData
104 105 106 107
	inProgressRequestChan chan<- inProgressRequest
}

// SendRequest initiates a new GraphSync request to the given peer.
108 109
func (rm *RequestManager) SendRequest(ctx context.Context,
	p peer.ID,
110
	root ipld.Link,
111 112
	selector ipld.Node,
	extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
113
	if _, err := ipldutil.ParseSelector(selector); err != nil {
114
		return rm.singleErrorResponse(fmt.Errorf("Invalid Selector Spec"))
115 116 117 118 119
	}

	inProgressRequestChan := make(chan inProgressRequest)

	select {
120
	case rm.messages <- &newRequestMessage{p, root, selector, extensions, inProgressRequestChan}:
121
	case <-rm.ctx.Done():
122
		return rm.emptyResponse()
123
	case <-ctx.Done():
124
		return rm.emptyResponse()
125 126 127 128
	}
	var receivedInProgressRequest inProgressRequest
	select {
	case <-rm.ctx.Done():
129
		return rm.emptyResponse()
130 131 132
	case receivedInProgressRequest = <-inProgressRequestChan:
	}

133 134 135 136 137 138 139 140 141 142
	return rm.rc.collectResponses(ctx,
		receivedInProgressRequest.incoming,
		receivedInProgressRequest.incomingError,
		func() {
			rm.cancelRequest(receivedInProgressRequest.requestID,
				receivedInProgressRequest.incoming,
				receivedInProgressRequest.incomingError)
		})
}

143 144
func (rm *RequestManager) emptyResponse() (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
145
	close(ch)
146
	errCh := make(chan error)
147 148 149 150
	close(errCh)
	return ch, errCh
}

151 152
func (rm *RequestManager) singleErrorResponse(err error) (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
153
	close(ch)
154
	errCh := make(chan error, 1)
155
	errCh <- err
156 157
	close(errCh)
	return ch, errCh
158 159 160
}

type cancelRequestMessage struct {
161
	requestID graphsync.RequestID
162 163
}

164 165
func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID,
	incomingResponses chan graphsync.ResponseProgress,
166
	incomingErrors chan error) {
167
	cancelMessageChannel := rm.messages
168
	for cancelMessageChannel != nil || incomingResponses != nil || incomingErrors != nil {
169 170 171 172 173 174 175
		select {
		case cancelMessageChannel <- &cancelRequestMessage{requestID}:
			cancelMessageChannel = nil
		// clear out any remaining responses, in case and "incoming reponse"
		// messages get processed before our cancel message
		case _, ok := <-incomingResponses:
			if !ok {
176 177 178 179 180
				incomingResponses = nil
			}
		case _, ok := <-incomingErrors:
			if !ok {
				incomingErrors = nil
181 182 183 184 185 186 187 188
			}
		case <-rm.ctx.Done():
			return
		}
	}
}

type processResponseMessage struct {
189
	p         peer.ID
190 191
	responses []gsmsg.GraphSyncResponse
	blks      []blocks.Block
192 193 194 195
}

// ProcessResponses ingests the given responses from the network and
// and updates the in progress requests based on those responses.
196
func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse,
197
	blks []blocks.Block) {
198
	select {
199
	case rm.messages <- &processResponseMessage{p, responses, blks}:
200 201 202 203
	case <-rm.ctx.Done():
	}
}

204 205 206 207 208 209 210 211 212
// RegisterHook registers an extension to processincoming responses
func (rm *RequestManager) RegisterHook(
	hook graphsync.OnResponseReceivedHook) {
	select {
	case rm.messages <- &responseHook{hook}:
	case <-rm.ctx.Done():
	}
}

213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
// Startup starts processing for the WantManager.
func (rm *RequestManager) Startup() {
	go rm.run()
}

// Shutdown ends processing for the want manager.
func (rm *RequestManager) Shutdown() {
	rm.cancel()
}

func (rm *RequestManager) run() {
	// NOTE: Do not open any streams or connections from anywhere in this
	// event loop. Really, just don't do anything likely to block.
	defer rm.cleanupInProcessRequests()

	for {
		select {
		case message := <-rm.messages:
			message.handle(rm)
		case <-rm.ctx.Done():
			return
		}
	}
}

func (rm *RequestManager) cleanupInProcessRequests() {
	for _, requestStatus := range rm.inProgressRequestStatuses {
240
		requestStatus.cancelFn()
241 242 243
	}
}

244
type terminateRequestMessage struct {
245
	requestID graphsync.RequestID
246
}
247

248
func (nrm *newRequestMessage) handle(rm *RequestManager) {
249 250 251
	requestID := rm.nextRequestID
	rm.nextRequestID++

252
	inProgressChan, inProgressErr := rm.setupRequest(requestID, nrm.p, nrm.root, nrm.selector, nrm.extensions)
253 254 255

	select {
	case nrm.inProgressRequestChan <- inProgressRequest{
256 257 258
		requestID:     requestID,
		incoming:      inProgressChan,
		incomingError: inProgressErr,
259 260 261 262 263
	}:
	case <-rm.ctx.Done():
	}
}

264 265 266 267 268
func (trm *terminateRequestMessage) handle(rm *RequestManager) {
	delete(rm.inProgressRequestStatuses, trm.requestID)
	rm.asyncLoader.CleanupRequest(trm.requestID)
}

269 270 271 272 273 274
func (crm *cancelRequestMessage) handle(rm *RequestManager) {
	inProgressRequestStatus, ok := rm.inProgressRequestStatuses[crm.requestID]
	if !ok {
		return
	}

275
	rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID))
276
	delete(rm.inProgressRequestStatuses, crm.requestID)
277
	inProgressRequestStatus.cancelFn()
278 279 280
}

func (prm *processResponseMessage) handle(rm *RequestManager) {
281
	filteredResponses := rm.filterResponsesForPeer(prm.responses, prm.p)
282
	filteredResponses = rm.processExtensions(filteredResponses, prm.p)
283
	responseMetadata := metadataForResponses(filteredResponses)
284 285 286 287
	rm.asyncLoader.ProcessResponse(responseMetadata, prm.blks)
	rm.processTerminations(filteredResponses)
}

288 289 290 291
func (rh *responseHook) handle(rm *RequestManager) {
	rm.responseHooks = append(rm.responseHooks, *rh)
}

292 293 294 295 296 297
func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	responsesForPeer := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
		if !ok || requestStatus.p != p {
			continue
298
		}
299
		responsesForPeer = append(responsesForPeer, response)
300
	}
301 302
	return responsesForPeer
}
303

304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
func (rm *RequestManager) processExtensions(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	remainingResponses := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		success := rm.processExtensionsForResponse(p, response)
		if success {
			remainingResponses = append(remainingResponses, response)
		}
	}
	return remainingResponses
}

func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg.GraphSyncResponse) bool {
	for _, responseHook := range rm.responseHooks {
		err := responseHook.hook(p, response)
		if err != nil {
			requestStatus := rm.inProgressRequestStatuses[response.RequestID()]
			responseError := rm.generateResponseErrorFromStatus(graphsync.RequestFailedUnknown)
			select {
			case requestStatus.networkError <- responseError:
			case <-requestStatus.ctx.Done():
			}
			requestStatus.cancelFn()
			return false
		}
	}
	return true
}

332 333
func (rm *RequestManager) processTerminations(responses []gsmsg.GraphSyncResponse) {
	for _, response := range responses {
334
		if gsmsg.IsTerminalResponseCode(response.Status()) {
335 336 337 338 339 340
			if gsmsg.IsTerminalFailureCode(response.Status()) {
				requestStatus := rm.inProgressRequestStatuses[response.RequestID()]
				responseError := rm.generateResponseErrorFromStatus(response.Status())
				select {
				case requestStatus.networkError <- responseError:
				case <-requestStatus.ctx.Done():
341
				}
342
				requestStatus.cancelFn()
343
			}
344 345
			rm.asyncLoader.CompleteResponsesFor(response.RequestID())
			delete(rm.inProgressRequestStatuses, response.RequestID())
346 347 348
		}
	}
}
349

350
func (rm *RequestManager) generateResponseErrorFromStatus(status graphsync.ResponseStatusCode) error {
351
	switch status {
352
	case graphsync.RequestFailedBusy:
353
		return fmt.Errorf("Request Failed - Peer Is Busy")
354
	case graphsync.RequestFailedContentNotFound:
355
		return fmt.Errorf("Request Failed - Content Not Found")
356
	case graphsync.RequestFailedLegal:
357
		return fmt.Errorf("Request Failed - For Legal Reasons")
358
	case graphsync.RequestFailedUnknown:
359
		return fmt.Errorf("Request Failed - Unknown Reason")
360
	default:
361
		return fmt.Errorf("Unknown")
362 363
	}
}
364

365
func (rm *RequestManager) setupRequest(requestID graphsync.RequestID, p peer.ID, root ipld.Link, selectorSpec ipld.Node, extensions []graphsync.ExtensionData) (chan graphsync.ResponseProgress, chan error) {
366
	_, err := ipldutil.EncodeNode(selectorSpec)
367 368 369
	if err != nil {
		return rm.singleErrorResponse(err)
	}
370
	selector, err := ipldutil.ParseSelector(selectorSpec)
371 372 373
	if err != nil {
		return rm.singleErrorResponse(err)
	}
374 375 376 377
	asCidLink, ok := root.(cidlink.Link)
	if !ok {
		return rm.singleErrorResponse(fmt.Errorf("request failed: link has no cid"))
	}
378 379 380 381 382 383
	networkErrorChan := make(chan error, 1)
	ctx, cancel := context.WithCancel(rm.ctx)
	rm.inProgressRequestStatuses[requestID] = &inProgressRequestStatus{
		ctx, cancel, p, networkErrorChan,
	}
	rm.asyncLoader.StartRequest(requestID)
384
	rm.peerHandler.SendRequest(p, gsmsg.NewRequest(requestID, asCidLink.Cid, selectorSpec, maxPriority, extensions...))
385 386 387 388 389
	return rm.executeTraversal(ctx, requestID, root, selector, networkErrorChan)
}

func (rm *RequestManager) executeTraversal(
	ctx context.Context,
390
	requestID graphsync.RequestID,
391
	root ipld.Link,
392
	selector selector.Selector,
393
	networkErrorChan chan error,
394 395
) (chan graphsync.ResponseProgress, chan error) {
	inProgressChan := make(chan graphsync.ResponseProgress)
396 397 398 399
	inProgressErr := make(chan error)
	loaderFn := loader.WrapAsyncLoader(ctx, rm.asyncLoader.AsyncLoad, requestID, inProgressErr)
	visitor := visitToChannel(ctx, inProgressChan)
	go func() {
400
		ipldutil.Traverse(ctx, loaderFn, root, selector, visitor)
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417
		select {
		case networkError := <-networkErrorChan:
			select {
			case <-rm.ctx.Done():
			case inProgressErr <- networkError:
			}
		default:
		}
		select {
		case <-ctx.Done():
		case rm.messages <- &terminateRequestMessage{requestID}:
		}
		close(inProgressChan)
		close(inProgressErr)
	}()
	return inProgressChan, inProgressErr
}