requestmanager.go 11.1 KB
Newer Older
1 2 3 4 5 6 7
package requestmanager

import (
	"context"
	"fmt"
	"math"

8
	blocks "github.com/ipfs/go-block-format"
9 10
	ipldbridge "github.com/ipfs/go-graphsync/ipldbridge"
	gsmsg "github.com/ipfs/go-graphsync/message"
11 12 13 14 15
	"github.com/ipfs/go-graphsync/metadata"
	"github.com/ipfs/go-graphsync/requestmanager/loader"
	"github.com/ipfs/go-graphsync/requestmanager/types"
	logging "github.com/ipfs/go-log"
	"github.com/ipld/go-ipld-prime"
16
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
17
	"github.com/libp2p/go-libp2p-core/peer"
18 19
)

20
var log = logging.Logger("graphsync")
21 22 23 24 25 26 27

const (
	// maxPriority is the max priority as defined by the bitswap protocol
	maxPriority = gsmsg.GraphSyncPriority(math.MaxInt32)
)

type inProgressRequestStatus struct {
28 29 30 31
	ctx          context.Context
	cancelFn     func()
	p            peer.ID
	networkError chan error
32 33 34 35
}

// PeerHandler is an interface that can send requests to peers
type PeerHandler interface {
36
	SendRequest(p peer.ID, graphSyncRequest gsmsg.GraphSyncRequest)
37 38
}

39 40 41 42 43 44 45 46 47 48 49
// AsyncLoader is an interface for loading links asynchronously, returning
// results as new responses are processed
type AsyncLoader interface {
	StartRequest(requestID gsmsg.GraphSyncRequestID)
	ProcessResponse(responses map[gsmsg.GraphSyncRequestID]metadata.Metadata,
		blks []blocks.Block)
	AsyncLoad(requestID gsmsg.GraphSyncRequestID, link ipld.Link) <-chan types.AsyncLoadResult
	CompleteResponsesFor(requestID gsmsg.GraphSyncRequestID)
	CleanupRequest(requestID gsmsg.GraphSyncRequestID)
}

50 51 52 53 54 55 56 57 58
// RequestManager tracks outgoing requests and processes incoming reponses
// to them.
type RequestManager struct {
	ctx         context.Context
	cancel      func()
	messages    chan requestManagerMessage
	ipldBridge  ipldbridge.IPLDBridge
	peerHandler PeerHandler
	rc          *responseCollector
59
	asyncLoader AsyncLoader
60 61 62 63 64 65 66 67 68 69
	// dont touch out side of run loop
	nextRequestID             gsmsg.GraphSyncRequestID
	inProgressRequestStatuses map[gsmsg.GraphSyncRequestID]*inProgressRequestStatus
}

type requestManagerMessage interface {
	handle(rm *RequestManager)
}

// New generates a new request manager from a context, network, and selectorQuerier
70
func New(ctx context.Context, asyncLoader AsyncLoader, ipldBridge ipldbridge.IPLDBridge) *RequestManager {
71 72 73 74 75
	ctx, cancel := context.WithCancel(ctx)
	return &RequestManager{
		ctx:                       ctx,
		cancel:                    cancel,
		ipldBridge:                ipldBridge,
76
		asyncLoader:               asyncLoader,
77 78 79 80 81 82 83 84 85 86 87 88
		rc:                        newResponseCollector(ctx),
		messages:                  make(chan requestManagerMessage, 16),
		inProgressRequestStatuses: make(map[gsmsg.GraphSyncRequestID]*inProgressRequestStatus),
	}
}

// SetDelegate specifies who will send messages out to the internet.
func (rm *RequestManager) SetDelegate(peerHandler PeerHandler) {
	rm.peerHandler = peerHandler
}

type inProgressRequest struct {
89
	requestID     gsmsg.GraphSyncRequestID
90
	incoming      chan types.ResponseProgress
91
	incomingError chan error
92 93 94 95
}

type newRequestMessage struct {
	p                     peer.ID
96
	root                  ipld.Link
97 98 99 100 101
	selector              ipld.Node
	inProgressRequestChan chan<- inProgressRequest
}

// SendRequest initiates a new GraphSync request to the given peer.
102 103
func (rm *RequestManager) SendRequest(ctx context.Context,
	p peer.ID,
104 105 106
	root ipld.Link,
	selector ipld.Node) (<-chan types.ResponseProgress, <-chan error) {
	if _, err := rm.ipldBridge.ParseSelector(selector); err != nil {
107
		return rm.singleErrorResponse(fmt.Errorf("Invalid Selector Spec"))
108 109 110 111 112
	}

	inProgressRequestChan := make(chan inProgressRequest)

	select {
113
	case rm.messages <- &newRequestMessage{p, root, selector, inProgressRequestChan}:
114
	case <-rm.ctx.Done():
115
		return rm.emptyResponse()
116
	case <-ctx.Done():
117
		return rm.emptyResponse()
118 119 120 121
	}
	var receivedInProgressRequest inProgressRequest
	select {
	case <-rm.ctx.Done():
122
		return rm.emptyResponse()
123 124 125
	case receivedInProgressRequest = <-inProgressRequestChan:
	}

126 127 128 129 130 131 132 133 134 135
	return rm.rc.collectResponses(ctx,
		receivedInProgressRequest.incoming,
		receivedInProgressRequest.incomingError,
		func() {
			rm.cancelRequest(receivedInProgressRequest.requestID,
				receivedInProgressRequest.incoming,
				receivedInProgressRequest.incomingError)
		})
}

136 137
func (rm *RequestManager) emptyResponse() (chan types.ResponseProgress, chan error) {
	ch := make(chan types.ResponseProgress)
138
	close(ch)
139
	errCh := make(chan error)
140 141 142 143
	close(errCh)
	return ch, errCh
}

144 145
func (rm *RequestManager) singleErrorResponse(err error) (chan types.ResponseProgress, chan error) {
	ch := make(chan types.ResponseProgress)
146
	close(ch)
147
	errCh := make(chan error, 1)
148
	errCh <- err
149 150
	close(errCh)
	return ch, errCh
151 152 153 154 155 156
}

type cancelRequestMessage struct {
	requestID gsmsg.GraphSyncRequestID
}

157
func (rm *RequestManager) cancelRequest(requestID gsmsg.GraphSyncRequestID,
158
	incomingResponses chan types.ResponseProgress,
159
	incomingErrors chan error) {
160
	cancelMessageChannel := rm.messages
161
	for cancelMessageChannel != nil || incomingResponses != nil || incomingErrors != nil {
162 163 164 165 166 167 168
		select {
		case cancelMessageChannel <- &cancelRequestMessage{requestID}:
			cancelMessageChannel = nil
		// clear out any remaining responses, in case and "incoming reponse"
		// messages get processed before our cancel message
		case _, ok := <-incomingResponses:
			if !ok {
169 170 171 172 173
				incomingResponses = nil
			}
		case _, ok := <-incomingErrors:
			if !ok {
				incomingErrors = nil
174 175 176 177 178 179 180 181
			}
		case <-rm.ctx.Done():
			return
		}
	}
}

type processResponseMessage struct {
182
	p         peer.ID
183 184
	responses []gsmsg.GraphSyncResponse
	blks      []blocks.Block
185 186 187 188
}

// ProcessResponses ingests the given responses from the network and
// and updates the in progress requests based on those responses.
189
func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse,
190
	blks []blocks.Block) {
191
	select {
192
	case rm.messages <- &processResponseMessage{p, responses, blks}:
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
	case <-rm.ctx.Done():
	}
}

// Startup starts processing for the WantManager.
func (rm *RequestManager) Startup() {
	go rm.run()
}

// Shutdown ends processing for the want manager.
func (rm *RequestManager) Shutdown() {
	rm.cancel()
}

func (rm *RequestManager) run() {
	// NOTE: Do not open any streams or connections from anywhere in this
	// event loop. Really, just don't do anything likely to block.
	defer rm.cleanupInProcessRequests()

	for {
		select {
		case message := <-rm.messages:
			message.handle(rm)
		case <-rm.ctx.Done():
			return
		}
	}
}

func (rm *RequestManager) cleanupInProcessRequests() {
	for _, requestStatus := range rm.inProgressRequestStatuses {
224
		requestStatus.cancelFn()
225 226 227
	}
}

228 229 230
type terminateRequestMessage struct {
	requestID gsmsg.GraphSyncRequestID
}
231

232
func (nrm *newRequestMessage) handle(rm *RequestManager) {
233 234 235
	requestID := rm.nextRequestID
	rm.nextRequestID++

236
	inProgressChan, inProgressErr := rm.setupRequest(requestID, nrm.p, nrm.root, nrm.selector)
237 238 239

	select {
	case nrm.inProgressRequestChan <- inProgressRequest{
240 241 242
		requestID:     requestID,
		incoming:      inProgressChan,
		incomingError: inProgressErr,
243 244 245 246 247
	}:
	case <-rm.ctx.Done():
	}
}

248 249 250 251 252
func (trm *terminateRequestMessage) handle(rm *RequestManager) {
	delete(rm.inProgressRequestStatuses, trm.requestID)
	rm.asyncLoader.CleanupRequest(trm.requestID)
}

253 254 255 256 257 258
func (crm *cancelRequestMessage) handle(rm *RequestManager) {
	inProgressRequestStatus, ok := rm.inProgressRequestStatuses[crm.requestID]
	if !ok {
		return
	}

259
	rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID))
260
	delete(rm.inProgressRequestStatuses, crm.requestID)
261
	inProgressRequestStatus.cancelFn()
262 263 264
}

func (prm *processResponseMessage) handle(rm *RequestManager) {
265 266 267 268 269 270 271 272 273 274 275 276
	filteredResponses := rm.filterResponsesForPeer(prm.responses, prm.p)
	responseMetadata := metadataForResponses(filteredResponses, rm.ipldBridge)
	rm.asyncLoader.ProcessResponse(responseMetadata, prm.blks)
	rm.processTerminations(filteredResponses)
}

func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	responsesForPeer := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
		if !ok || requestStatus.p != p {
			continue
277
		}
278
		responsesForPeer = append(responsesForPeer, response)
279
	}
280 281
	return responsesForPeer
}
282

283 284
func (rm *RequestManager) processTerminations(responses []gsmsg.GraphSyncResponse) {
	for _, response := range responses {
285
		if gsmsg.IsTerminalResponseCode(response.Status()) {
286 287 288 289 290 291
			if gsmsg.IsTerminalFailureCode(response.Status()) {
				requestStatus := rm.inProgressRequestStatuses[response.RequestID()]
				responseError := rm.generateResponseErrorFromStatus(response.Status())
				select {
				case requestStatus.networkError <- responseError:
				case <-requestStatus.ctx.Done():
292
				}
293
				requestStatus.cancelFn()
294
			}
295 296
			rm.asyncLoader.CompleteResponsesFor(response.RequestID())
			delete(rm.inProgressRequestStatuses, response.RequestID())
297 298 299
		}
	}
}
300

301
func (rm *RequestManager) generateResponseErrorFromStatus(status gsmsg.GraphSyncResponseStatusCode) error {
302 303
	switch status {
	case gsmsg.RequestFailedBusy:
304
		return fmt.Errorf("Request Failed - Peer Is Busy")
305
	case gsmsg.RequestFailedContentNotFound:
306
		return fmt.Errorf("Request Failed - Content Not Found")
307
	case gsmsg.RequestFailedLegal:
308
		return fmt.Errorf("Request Failed - For Legal Reasons")
309
	case gsmsg.RequestFailedUnknown:
310
		return fmt.Errorf("Request Failed - Unknown Reason")
311
	default:
312
		return fmt.Errorf("Unknown")
313 314
	}
}
315

316
func (rm *RequestManager) setupRequest(requestID gsmsg.GraphSyncRequestID, p peer.ID, root ipld.Link, selectorSpec ipld.Node) (chan types.ResponseProgress, chan error) {
317 318 319 320
	selectorBytes, err := rm.ipldBridge.EncodeNode(selectorSpec)
	if err != nil {
		return rm.singleErrorResponse(err)
	}
321
	selector, err := rm.ipldBridge.ParseSelector(selectorSpec)
322 323 324
	if err != nil {
		return rm.singleErrorResponse(err)
	}
325 326 327 328
	asCidLink, ok := root.(cidlink.Link)
	if !ok {
		return rm.singleErrorResponse(fmt.Errorf("request failed: link has no cid"))
	}
329 330 331 332 333 334
	networkErrorChan := make(chan error, 1)
	ctx, cancel := context.WithCancel(rm.ctx)
	rm.inProgressRequestStatuses[requestID] = &inProgressRequestStatus{
		ctx, cancel, p, networkErrorChan,
	}
	rm.asyncLoader.StartRequest(requestID)
335
	rm.peerHandler.SendRequest(p, gsmsg.NewRequest(requestID, asCidLink.Cid, selectorBytes, maxPriority))
336 337 338 339 340 341
	return rm.executeTraversal(ctx, requestID, root, selector, networkErrorChan)
}

func (rm *RequestManager) executeTraversal(
	ctx context.Context,
	requestID gsmsg.GraphSyncRequestID,
342
	root ipld.Link,
343 344
	selector ipldbridge.Selector,
	networkErrorChan chan error,
345 346
) (chan types.ResponseProgress, chan error) {
	inProgressChan := make(chan types.ResponseProgress)
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
	inProgressErr := make(chan error)
	loaderFn := loader.WrapAsyncLoader(ctx, rm.asyncLoader.AsyncLoad, requestID, inProgressErr)
	visitor := visitToChannel(ctx, inProgressChan)
	go func() {
		rm.ipldBridge.Traverse(ctx, loaderFn, root, selector, visitor)
		select {
		case networkError := <-networkErrorChan:
			select {
			case <-rm.ctx.Done():
			case inProgressErr <- networkError:
			}
		default:
		}
		select {
		case <-ctx.Done():
		case rm.messages <- &terminateRequestMessage{requestID}:
		}
		close(inProgressChan)
		close(inProgressErr)
	}()
	return inProgressChan, inProgressErr
}