requestmanager.go 11.3 KB
Newer Older
1 2 3 4 5 6 7
package requestmanager

import (
	"context"
	"fmt"
	"math"

8
	blocks "github.com/ipfs/go-block-format"
9
	"github.com/ipfs/go-graphsync"
10 11
	ipldbridge "github.com/ipfs/go-graphsync/ipldbridge"
	gsmsg "github.com/ipfs/go-graphsync/message"
12 13 14 15 16
	"github.com/ipfs/go-graphsync/metadata"
	"github.com/ipfs/go-graphsync/requestmanager/loader"
	"github.com/ipfs/go-graphsync/requestmanager/types"
	logging "github.com/ipfs/go-log"
	"github.com/ipld/go-ipld-prime"
17
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
18
	"github.com/libp2p/go-libp2p-core/peer"
19 20
)

21
var log = logging.Logger("graphsync")
22 23 24

const (
	// maxPriority is the max priority as defined by the bitswap protocol
25
	maxPriority = graphsync.Priority(math.MaxInt32)
26 27 28
)

type inProgressRequestStatus struct {
29 30 31 32
	ctx          context.Context
	cancelFn     func()
	p            peer.ID
	networkError chan error
33 34 35 36
}

// PeerHandler is an interface that can send requests to peers
type PeerHandler interface {
37
	SendRequest(p peer.ID, graphSyncRequest gsmsg.GraphSyncRequest)
38 39
}

40 41 42
// AsyncLoader is an interface for loading links asynchronously, returning
// results as new responses are processed
type AsyncLoader interface {
43 44
	StartRequest(requestID graphsync.RequestID)
	ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
45
		blks []blocks.Block)
46 47 48
	AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult
	CompleteResponsesFor(requestID graphsync.RequestID)
	CleanupRequest(requestID graphsync.RequestID)
49 50
}

51 52 53 54 55 56 57 58 59
// RequestManager tracks outgoing requests and processes incoming reponses
// to them.
type RequestManager struct {
	ctx         context.Context
	cancel      func()
	messages    chan requestManagerMessage
	ipldBridge  ipldbridge.IPLDBridge
	peerHandler PeerHandler
	rc          *responseCollector
60
	asyncLoader AsyncLoader
61
	// dont touch out side of run loop
62 63
	nextRequestID             graphsync.RequestID
	inProgressRequestStatuses map[graphsync.RequestID]*inProgressRequestStatus
64 65 66 67 68 69 70
}

type requestManagerMessage interface {
	handle(rm *RequestManager)
}

// New generates a new request manager from a context, network, and selectorQuerier
71
func New(ctx context.Context, asyncLoader AsyncLoader, ipldBridge ipldbridge.IPLDBridge) *RequestManager {
72 73 74 75 76
	ctx, cancel := context.WithCancel(ctx)
	return &RequestManager{
		ctx:                       ctx,
		cancel:                    cancel,
		ipldBridge:                ipldBridge,
77
		asyncLoader:               asyncLoader,
78 79
		rc:                        newResponseCollector(ctx),
		messages:                  make(chan requestManagerMessage, 16),
80
		inProgressRequestStatuses: make(map[graphsync.RequestID]*inProgressRequestStatus),
81 82 83 84 85 86 87 88 89
	}
}

// SetDelegate specifies who will send messages out to the internet.
func (rm *RequestManager) SetDelegate(peerHandler PeerHandler) {
	rm.peerHandler = peerHandler
}

type inProgressRequest struct {
90 91
	requestID     graphsync.RequestID
	incoming      chan graphsync.ResponseProgress
92
	incomingError chan error
93 94 95 96
}

type newRequestMessage struct {
	p                     peer.ID
97
	root                  ipld.Link
98
	selector              ipld.Node
99
	extensions            []graphsync.ExtensionData
100 101 102 103
	inProgressRequestChan chan<- inProgressRequest
}

// SendRequest initiates a new GraphSync request to the given peer.
104 105
func (rm *RequestManager) SendRequest(ctx context.Context,
	p peer.ID,
106
	root ipld.Link,
107 108
	selector ipld.Node,
	extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
109
	if _, err := rm.ipldBridge.ParseSelector(selector); err != nil {
110
		return rm.singleErrorResponse(fmt.Errorf("Invalid Selector Spec"))
111 112 113 114 115
	}

	inProgressRequestChan := make(chan inProgressRequest)

	select {
116
	case rm.messages <- &newRequestMessage{p, root, selector, extensions, inProgressRequestChan}:
117
	case <-rm.ctx.Done():
118
		return rm.emptyResponse()
119
	case <-ctx.Done():
120
		return rm.emptyResponse()
121 122 123 124
	}
	var receivedInProgressRequest inProgressRequest
	select {
	case <-rm.ctx.Done():
125
		return rm.emptyResponse()
126 127 128
	case receivedInProgressRequest = <-inProgressRequestChan:
	}

129 130 131 132 133 134 135 136 137 138
	return rm.rc.collectResponses(ctx,
		receivedInProgressRequest.incoming,
		receivedInProgressRequest.incomingError,
		func() {
			rm.cancelRequest(receivedInProgressRequest.requestID,
				receivedInProgressRequest.incoming,
				receivedInProgressRequest.incomingError)
		})
}

139 140
func (rm *RequestManager) emptyResponse() (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
141
	close(ch)
142
	errCh := make(chan error)
143 144 145 146
	close(errCh)
	return ch, errCh
}

147 148
func (rm *RequestManager) singleErrorResponse(err error) (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
149
	close(ch)
150
	errCh := make(chan error, 1)
151
	errCh <- err
152 153
	close(errCh)
	return ch, errCh
154 155 156
}

type cancelRequestMessage struct {
157
	requestID graphsync.RequestID
158 159
}

160 161
func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID,
	incomingResponses chan graphsync.ResponseProgress,
162
	incomingErrors chan error) {
163
	cancelMessageChannel := rm.messages
164
	for cancelMessageChannel != nil || incomingResponses != nil || incomingErrors != nil {
165 166 167 168 169 170 171
		select {
		case cancelMessageChannel <- &cancelRequestMessage{requestID}:
			cancelMessageChannel = nil
		// clear out any remaining responses, in case and "incoming reponse"
		// messages get processed before our cancel message
		case _, ok := <-incomingResponses:
			if !ok {
172 173 174 175 176
				incomingResponses = nil
			}
		case _, ok := <-incomingErrors:
			if !ok {
				incomingErrors = nil
177 178 179 180 181 182 183 184
			}
		case <-rm.ctx.Done():
			return
		}
	}
}

type processResponseMessage struct {
185
	p         peer.ID
186 187
	responses []gsmsg.GraphSyncResponse
	blks      []blocks.Block
188 189 190 191
}

// ProcessResponses ingests the given responses from the network and
// and updates the in progress requests based on those responses.
192
func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse,
193
	blks []blocks.Block) {
194
	select {
195
	case rm.messages <- &processResponseMessage{p, responses, blks}:
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
	case <-rm.ctx.Done():
	}
}

// Startup starts processing for the WantManager.
func (rm *RequestManager) Startup() {
	go rm.run()
}

// Shutdown ends processing for the want manager.
func (rm *RequestManager) Shutdown() {
	rm.cancel()
}

func (rm *RequestManager) run() {
	// NOTE: Do not open any streams or connections from anywhere in this
	// event loop. Really, just don't do anything likely to block.
	defer rm.cleanupInProcessRequests()

	for {
		select {
		case message := <-rm.messages:
			message.handle(rm)
		case <-rm.ctx.Done():
			return
		}
	}
}

func (rm *RequestManager) cleanupInProcessRequests() {
	for _, requestStatus := range rm.inProgressRequestStatuses {
227
		requestStatus.cancelFn()
228 229 230
	}
}

231
type terminateRequestMessage struct {
232
	requestID graphsync.RequestID
233
}
234

235
func (nrm *newRequestMessage) handle(rm *RequestManager) {
236 237 238
	requestID := rm.nextRequestID
	rm.nextRequestID++

239
	inProgressChan, inProgressErr := rm.setupRequest(requestID, nrm.p, nrm.root, nrm.selector, nrm.extensions)
240 241 242

	select {
	case nrm.inProgressRequestChan <- inProgressRequest{
243 244 245
		requestID:     requestID,
		incoming:      inProgressChan,
		incomingError: inProgressErr,
246 247 248 249 250
	}:
	case <-rm.ctx.Done():
	}
}

251 252 253 254 255
func (trm *terminateRequestMessage) handle(rm *RequestManager) {
	delete(rm.inProgressRequestStatuses, trm.requestID)
	rm.asyncLoader.CleanupRequest(trm.requestID)
}

256 257 258 259 260 261
func (crm *cancelRequestMessage) handle(rm *RequestManager) {
	inProgressRequestStatus, ok := rm.inProgressRequestStatuses[crm.requestID]
	if !ok {
		return
	}

262
	rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID))
263
	delete(rm.inProgressRequestStatuses, crm.requestID)
264
	inProgressRequestStatus.cancelFn()
265 266 267
}

func (prm *processResponseMessage) handle(rm *RequestManager) {
268 269 270 271 272 273 274 275 276 277 278 279
	filteredResponses := rm.filterResponsesForPeer(prm.responses, prm.p)
	responseMetadata := metadataForResponses(filteredResponses, rm.ipldBridge)
	rm.asyncLoader.ProcessResponse(responseMetadata, prm.blks)
	rm.processTerminations(filteredResponses)
}

func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	responsesForPeer := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
		if !ok || requestStatus.p != p {
			continue
280
		}
281
		responsesForPeer = append(responsesForPeer, response)
282
	}
283 284
	return responsesForPeer
}
285

286 287
func (rm *RequestManager) processTerminations(responses []gsmsg.GraphSyncResponse) {
	for _, response := range responses {
288
		if gsmsg.IsTerminalResponseCode(response.Status()) {
289 290 291 292 293 294
			if gsmsg.IsTerminalFailureCode(response.Status()) {
				requestStatus := rm.inProgressRequestStatuses[response.RequestID()]
				responseError := rm.generateResponseErrorFromStatus(response.Status())
				select {
				case requestStatus.networkError <- responseError:
				case <-requestStatus.ctx.Done():
295
				}
296
				requestStatus.cancelFn()
297
			}
298 299
			rm.asyncLoader.CompleteResponsesFor(response.RequestID())
			delete(rm.inProgressRequestStatuses, response.RequestID())
300 301 302
		}
	}
}
303

304
func (rm *RequestManager) generateResponseErrorFromStatus(status graphsync.ResponseStatusCode) error {
305
	switch status {
306
	case graphsync.RequestFailedBusy:
307
		return fmt.Errorf("Request Failed - Peer Is Busy")
308
	case graphsync.RequestFailedContentNotFound:
309
		return fmt.Errorf("Request Failed - Content Not Found")
310
	case graphsync.RequestFailedLegal:
311
		return fmt.Errorf("Request Failed - For Legal Reasons")
312
	case graphsync.RequestFailedUnknown:
313
		return fmt.Errorf("Request Failed - Unknown Reason")
314
	default:
315
		return fmt.Errorf("Unknown")
316 317
	}
}
318

319
func (rm *RequestManager) setupRequest(requestID graphsync.RequestID, p peer.ID, root ipld.Link, selectorSpec ipld.Node, extensions []graphsync.ExtensionData) (chan graphsync.ResponseProgress, chan error) {
320 321 322 323
	selectorBytes, err := rm.ipldBridge.EncodeNode(selectorSpec)
	if err != nil {
		return rm.singleErrorResponse(err)
	}
324
	selector, err := rm.ipldBridge.ParseSelector(selectorSpec)
325 326 327
	if err != nil {
		return rm.singleErrorResponse(err)
	}
328 329 330 331
	asCidLink, ok := root.(cidlink.Link)
	if !ok {
		return rm.singleErrorResponse(fmt.Errorf("request failed: link has no cid"))
	}
332 333 334 335 336 337
	networkErrorChan := make(chan error, 1)
	ctx, cancel := context.WithCancel(rm.ctx)
	rm.inProgressRequestStatuses[requestID] = &inProgressRequestStatus{
		ctx, cancel, p, networkErrorChan,
	}
	rm.asyncLoader.StartRequest(requestID)
338
	rm.peerHandler.SendRequest(p, gsmsg.NewRequest(requestID, asCidLink.Cid, selectorBytes, maxPriority, extensions...))
339 340 341 342 343
	return rm.executeTraversal(ctx, requestID, root, selector, networkErrorChan)
}

func (rm *RequestManager) executeTraversal(
	ctx context.Context,
344
	requestID graphsync.RequestID,
345
	root ipld.Link,
346 347
	selector ipldbridge.Selector,
	networkErrorChan chan error,
348 349
) (chan graphsync.ResponseProgress, chan error) {
	inProgressChan := make(chan graphsync.ResponseProgress)
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
	inProgressErr := make(chan error)
	loaderFn := loader.WrapAsyncLoader(ctx, rm.asyncLoader.AsyncLoad, requestID, inProgressErr)
	visitor := visitToChannel(ctx, inProgressChan)
	go func() {
		rm.ipldBridge.Traverse(ctx, loaderFn, root, selector, visitor)
		select {
		case networkError := <-networkErrorChan:
			select {
			case <-rm.ctx.Done():
			case inProgressErr <- networkError:
			}
		default:
		}
		select {
		case <-ctx.Done():
		case rm.messages <- &terminateRequestMessage{requestID}:
		}
		close(inProgressChan)
		close(inProgressErr)
	}()
	return inProgressChan, inProgressErr
}