requestmanager.go 11.1 KB
Newer Older
1 2 3 4 5 6 7
package requestmanager

import (
	"context"
	"fmt"
	"math"

8
	blocks "github.com/ipfs/go-block-format"
9
	"github.com/ipfs/go-graphsync"
10 11
	ipldbridge "github.com/ipfs/go-graphsync/ipldbridge"
	gsmsg "github.com/ipfs/go-graphsync/message"
12 13 14 15 16
	"github.com/ipfs/go-graphsync/metadata"
	"github.com/ipfs/go-graphsync/requestmanager/loader"
	"github.com/ipfs/go-graphsync/requestmanager/types"
	logging "github.com/ipfs/go-log"
	"github.com/ipld/go-ipld-prime"
17
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
18
	"github.com/libp2p/go-libp2p-core/peer"
19 20
)

21
var log = logging.Logger("graphsync")
22 23 24

const (
	// maxPriority is the max priority as defined by the bitswap protocol
25
	maxPriority = graphsync.Priority(math.MaxInt32)
26 27 28
)

type inProgressRequestStatus struct {
29 30 31 32
	ctx          context.Context
	cancelFn     func()
	p            peer.ID
	networkError chan error
33 34 35 36
}

// PeerHandler is an interface that can send requests to peers
type PeerHandler interface {
37
	SendRequest(p peer.ID, graphSyncRequest gsmsg.GraphSyncRequest)
38 39
}

40 41 42
// AsyncLoader is an interface for loading links asynchronously, returning
// results as new responses are processed
type AsyncLoader interface {
43 44
	StartRequest(requestID graphsync.RequestID)
	ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
45
		blks []blocks.Block)
46 47 48
	AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult
	CompleteResponsesFor(requestID graphsync.RequestID)
	CleanupRequest(requestID graphsync.RequestID)
49 50
}

51 52 53 54 55 56 57 58 59
// RequestManager tracks outgoing requests and processes incoming reponses
// to them.
type RequestManager struct {
	ctx         context.Context
	cancel      func()
	messages    chan requestManagerMessage
	ipldBridge  ipldbridge.IPLDBridge
	peerHandler PeerHandler
	rc          *responseCollector
60
	asyncLoader AsyncLoader
61
	// dont touch out side of run loop
62 63
	nextRequestID             graphsync.RequestID
	inProgressRequestStatuses map[graphsync.RequestID]*inProgressRequestStatus
64 65 66 67 68 69 70
}

type requestManagerMessage interface {
	handle(rm *RequestManager)
}

// New generates a new request manager from a context, network, and selectorQuerier
71
func New(ctx context.Context, asyncLoader AsyncLoader, ipldBridge ipldbridge.IPLDBridge) *RequestManager {
72 73 74 75 76
	ctx, cancel := context.WithCancel(ctx)
	return &RequestManager{
		ctx:                       ctx,
		cancel:                    cancel,
		ipldBridge:                ipldBridge,
77
		asyncLoader:               asyncLoader,
78 79
		rc:                        newResponseCollector(ctx),
		messages:                  make(chan requestManagerMessage, 16),
80
		inProgressRequestStatuses: make(map[graphsync.RequestID]*inProgressRequestStatus),
81 82 83 84 85 86 87 88 89
	}
}

// SetDelegate specifies who will send messages out to the internet.
func (rm *RequestManager) SetDelegate(peerHandler PeerHandler) {
	rm.peerHandler = peerHandler
}

type inProgressRequest struct {
90 91
	requestID     graphsync.RequestID
	incoming      chan graphsync.ResponseProgress
92
	incomingError chan error
93 94 95 96
}

type newRequestMessage struct {
	p                     peer.ID
97
	root                  ipld.Link
98 99 100 101 102
	selector              ipld.Node
	inProgressRequestChan chan<- inProgressRequest
}

// SendRequest initiates a new GraphSync request to the given peer.
103 104
func (rm *RequestManager) SendRequest(ctx context.Context,
	p peer.ID,
105
	root ipld.Link,
106
	selector ipld.Node) (<-chan graphsync.ResponseProgress, <-chan error) {
107
	if _, err := rm.ipldBridge.ParseSelector(selector); err != nil {
108
		return rm.singleErrorResponse(fmt.Errorf("Invalid Selector Spec"))
109 110 111 112 113
	}

	inProgressRequestChan := make(chan inProgressRequest)

	select {
114
	case rm.messages <- &newRequestMessage{p, root, selector, inProgressRequestChan}:
115
	case <-rm.ctx.Done():
116
		return rm.emptyResponse()
117
	case <-ctx.Done():
118
		return rm.emptyResponse()
119 120 121 122
	}
	var receivedInProgressRequest inProgressRequest
	select {
	case <-rm.ctx.Done():
123
		return rm.emptyResponse()
124 125 126
	case receivedInProgressRequest = <-inProgressRequestChan:
	}

127 128 129 130 131 132 133 134 135 136
	return rm.rc.collectResponses(ctx,
		receivedInProgressRequest.incoming,
		receivedInProgressRequest.incomingError,
		func() {
			rm.cancelRequest(receivedInProgressRequest.requestID,
				receivedInProgressRequest.incoming,
				receivedInProgressRequest.incomingError)
		})
}

137 138
func (rm *RequestManager) emptyResponse() (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
139
	close(ch)
140
	errCh := make(chan error)
141 142 143 144
	close(errCh)
	return ch, errCh
}

145 146
func (rm *RequestManager) singleErrorResponse(err error) (chan graphsync.ResponseProgress, chan error) {
	ch := make(chan graphsync.ResponseProgress)
147
	close(ch)
148
	errCh := make(chan error, 1)
149
	errCh <- err
150 151
	close(errCh)
	return ch, errCh
152 153 154
}

type cancelRequestMessage struct {
155
	requestID graphsync.RequestID
156 157
}

158 159
func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID,
	incomingResponses chan graphsync.ResponseProgress,
160
	incomingErrors chan error) {
161
	cancelMessageChannel := rm.messages
162
	for cancelMessageChannel != nil || incomingResponses != nil || incomingErrors != nil {
163 164 165 166 167 168 169
		select {
		case cancelMessageChannel <- &cancelRequestMessage{requestID}:
			cancelMessageChannel = nil
		// clear out any remaining responses, in case and "incoming reponse"
		// messages get processed before our cancel message
		case _, ok := <-incomingResponses:
			if !ok {
170 171 172 173 174
				incomingResponses = nil
			}
		case _, ok := <-incomingErrors:
			if !ok {
				incomingErrors = nil
175 176 177 178 179 180 181 182
			}
		case <-rm.ctx.Done():
			return
		}
	}
}

type processResponseMessage struct {
183
	p         peer.ID
184 185
	responses []gsmsg.GraphSyncResponse
	blks      []blocks.Block
186 187 188 189
}

// ProcessResponses ingests the given responses from the network and
// and updates the in progress requests based on those responses.
190
func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse,
191
	blks []blocks.Block) {
192
	select {
193
	case rm.messages <- &processResponseMessage{p, responses, blks}:
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
	case <-rm.ctx.Done():
	}
}

// Startup starts processing for the WantManager.
func (rm *RequestManager) Startup() {
	go rm.run()
}

// Shutdown ends processing for the want manager.
func (rm *RequestManager) Shutdown() {
	rm.cancel()
}

func (rm *RequestManager) run() {
	// NOTE: Do not open any streams or connections from anywhere in this
	// event loop. Really, just don't do anything likely to block.
	defer rm.cleanupInProcessRequests()

	for {
		select {
		case message := <-rm.messages:
			message.handle(rm)
		case <-rm.ctx.Done():
			return
		}
	}
}

func (rm *RequestManager) cleanupInProcessRequests() {
	for _, requestStatus := range rm.inProgressRequestStatuses {
225
		requestStatus.cancelFn()
226 227 228
	}
}

229
type terminateRequestMessage struct {
230
	requestID graphsync.RequestID
231
}
232

233
func (nrm *newRequestMessage) handle(rm *RequestManager) {
234 235 236
	requestID := rm.nextRequestID
	rm.nextRequestID++

237
	inProgressChan, inProgressErr := rm.setupRequest(requestID, nrm.p, nrm.root, nrm.selector)
238 239 240

	select {
	case nrm.inProgressRequestChan <- inProgressRequest{
241 242 243
		requestID:     requestID,
		incoming:      inProgressChan,
		incomingError: inProgressErr,
244 245 246 247 248
	}:
	case <-rm.ctx.Done():
	}
}

249 250 251 252 253
func (trm *terminateRequestMessage) handle(rm *RequestManager) {
	delete(rm.inProgressRequestStatuses, trm.requestID)
	rm.asyncLoader.CleanupRequest(trm.requestID)
}

254 255 256 257 258 259
func (crm *cancelRequestMessage) handle(rm *RequestManager) {
	inProgressRequestStatus, ok := rm.inProgressRequestStatuses[crm.requestID]
	if !ok {
		return
	}

260
	rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID))
261
	delete(rm.inProgressRequestStatuses, crm.requestID)
262
	inProgressRequestStatus.cancelFn()
263 264 265
}

func (prm *processResponseMessage) handle(rm *RequestManager) {
266 267 268 269 270 271 272 273 274 275 276 277
	filteredResponses := rm.filterResponsesForPeer(prm.responses, prm.p)
	responseMetadata := metadataForResponses(filteredResponses, rm.ipldBridge)
	rm.asyncLoader.ProcessResponse(responseMetadata, prm.blks)
	rm.processTerminations(filteredResponses)
}

func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	responsesForPeer := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
		if !ok || requestStatus.p != p {
			continue
278
		}
279
		responsesForPeer = append(responsesForPeer, response)
280
	}
281 282
	return responsesForPeer
}
283

284 285
func (rm *RequestManager) processTerminations(responses []gsmsg.GraphSyncResponse) {
	for _, response := range responses {
286
		if gsmsg.IsTerminalResponseCode(response.Status()) {
287 288 289 290 291 292
			if gsmsg.IsTerminalFailureCode(response.Status()) {
				requestStatus := rm.inProgressRequestStatuses[response.RequestID()]
				responseError := rm.generateResponseErrorFromStatus(response.Status())
				select {
				case requestStatus.networkError <- responseError:
				case <-requestStatus.ctx.Done():
293
				}
294
				requestStatus.cancelFn()
295
			}
296 297
			rm.asyncLoader.CompleteResponsesFor(response.RequestID())
			delete(rm.inProgressRequestStatuses, response.RequestID())
298 299 300
		}
	}
}
301

302
func (rm *RequestManager) generateResponseErrorFromStatus(status graphsync.ResponseStatusCode) error {
303
	switch status {
304
	case graphsync.RequestFailedBusy:
305
		return fmt.Errorf("Request Failed - Peer Is Busy")
306
	case graphsync.RequestFailedContentNotFound:
307
		return fmt.Errorf("Request Failed - Content Not Found")
308
	case graphsync.RequestFailedLegal:
309
		return fmt.Errorf("Request Failed - For Legal Reasons")
310
	case graphsync.RequestFailedUnknown:
311
		return fmt.Errorf("Request Failed - Unknown Reason")
312
	default:
313
		return fmt.Errorf("Unknown")
314 315
	}
}
316

317
func (rm *RequestManager) setupRequest(requestID graphsync.RequestID, p peer.ID, root ipld.Link, selectorSpec ipld.Node) (chan graphsync.ResponseProgress, chan error) {
318 319 320 321
	selectorBytes, err := rm.ipldBridge.EncodeNode(selectorSpec)
	if err != nil {
		return rm.singleErrorResponse(err)
	}
322
	selector, err := rm.ipldBridge.ParseSelector(selectorSpec)
323 324 325
	if err != nil {
		return rm.singleErrorResponse(err)
	}
326 327 328 329
	asCidLink, ok := root.(cidlink.Link)
	if !ok {
		return rm.singleErrorResponse(fmt.Errorf("request failed: link has no cid"))
	}
330 331 332 333 334 335
	networkErrorChan := make(chan error, 1)
	ctx, cancel := context.WithCancel(rm.ctx)
	rm.inProgressRequestStatuses[requestID] = &inProgressRequestStatus{
		ctx, cancel, p, networkErrorChan,
	}
	rm.asyncLoader.StartRequest(requestID)
336
	rm.peerHandler.SendRequest(p, gsmsg.NewRequest(requestID, asCidLink.Cid, selectorBytes, maxPriority))
337 338 339 340 341
	return rm.executeTraversal(ctx, requestID, root, selector, networkErrorChan)
}

func (rm *RequestManager) executeTraversal(
	ctx context.Context,
342
	requestID graphsync.RequestID,
343
	root ipld.Link,
344 345
	selector ipldbridge.Selector,
	networkErrorChan chan error,
346 347
) (chan graphsync.ResponseProgress, chan error) {
	inProgressChan := make(chan graphsync.ResponseProgress)
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369
	inProgressErr := make(chan error)
	loaderFn := loader.WrapAsyncLoader(ctx, rm.asyncLoader.AsyncLoad, requestID, inProgressErr)
	visitor := visitToChannel(ctx, inProgressChan)
	go func() {
		rm.ipldBridge.Traverse(ctx, loaderFn, root, selector, visitor)
		select {
		case networkError := <-networkErrorChan:
			select {
			case <-rm.ctx.Done():
			case inProgressErr <- networkError:
			}
		default:
		}
		select {
		case <-ctx.Done():
		case rm.messages <- &terminateRequestMessage{requestID}:
		}
		close(inProgressChan)
		close(inProgressErr)
	}()
	return inProgressChan, inProgressErr
}