requestmanager.go 10.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
package requestmanager

import (
	"context"
	"fmt"
	"math"

	"github.com/ipfs/go-block-format"
	ipldbridge "github.com/ipfs/go-graphsync/ipldbridge"
	gsmsg "github.com/ipfs/go-graphsync/message"
11 12 13 14 15
	"github.com/ipfs/go-graphsync/metadata"
	"github.com/ipfs/go-graphsync/requestmanager/loader"
	"github.com/ipfs/go-graphsync/requestmanager/types"
	logging "github.com/ipfs/go-log"
	"github.com/ipld/go-ipld-prime"
16 17 18
	peer "github.com/libp2p/go-libp2p-peer"
)

19
var log = logging.Logger("graphsync")
20 21 22 23 24 25 26

const (
	// maxPriority is the max priority as defined by the bitswap protocol
	maxPriority = gsmsg.GraphSyncPriority(math.MaxInt32)
)

type inProgressRequestStatus struct {
27 28 29 30
	ctx          context.Context
	cancelFn     func()
	p            peer.ID
	networkError chan error
31 32 33 34
}

// PeerHandler is an interface that can send requests to peers
type PeerHandler interface {
35
	SendRequest(p peer.ID, graphSyncRequest gsmsg.GraphSyncRequest)
36 37
}

38 39 40 41 42 43 44 45 46 47 48
// AsyncLoader is an interface for loading links asynchronously, returning
// results as new responses are processed
type AsyncLoader interface {
	StartRequest(requestID gsmsg.GraphSyncRequestID)
	ProcessResponse(responses map[gsmsg.GraphSyncRequestID]metadata.Metadata,
		blks []blocks.Block)
	AsyncLoad(requestID gsmsg.GraphSyncRequestID, link ipld.Link) <-chan types.AsyncLoadResult
	CompleteResponsesFor(requestID gsmsg.GraphSyncRequestID)
	CleanupRequest(requestID gsmsg.GraphSyncRequestID)
}

49 50 51 52 53 54 55 56 57
// RequestManager tracks outgoing requests and processes incoming reponses
// to them.
type RequestManager struct {
	ctx         context.Context
	cancel      func()
	messages    chan requestManagerMessage
	ipldBridge  ipldbridge.IPLDBridge
	peerHandler PeerHandler
	rc          *responseCollector
58
	asyncLoader AsyncLoader
59 60 61 62 63 64 65 66 67 68
	// dont touch out side of run loop
	nextRequestID             gsmsg.GraphSyncRequestID
	inProgressRequestStatuses map[gsmsg.GraphSyncRequestID]*inProgressRequestStatus
}

type requestManagerMessage interface {
	handle(rm *RequestManager)
}

// New generates a new request manager from a context, network, and selectorQuerier
69
func New(ctx context.Context, asyncLoader AsyncLoader, ipldBridge ipldbridge.IPLDBridge) *RequestManager {
70 71 72 73 74
	ctx, cancel := context.WithCancel(ctx)
	return &RequestManager{
		ctx:                       ctx,
		cancel:                    cancel,
		ipldBridge:                ipldBridge,
75
		asyncLoader:               asyncLoader,
76 77 78 79 80 81 82 83 84 85 86 87
		rc:                        newResponseCollector(ctx),
		messages:                  make(chan requestManagerMessage, 16),
		inProgressRequestStatuses: make(map[gsmsg.GraphSyncRequestID]*inProgressRequestStatus),
	}
}

// SetDelegate specifies who will send messages out to the internet.
func (rm *RequestManager) SetDelegate(peerHandler PeerHandler) {
	rm.peerHandler = peerHandler
}

type inProgressRequest struct {
88
	requestID     gsmsg.GraphSyncRequestID
89
	incoming      chan types.ResponseProgress
90
	incomingError chan error
91 92 93 94 95 96 97 98 99
}

type newRequestMessage struct {
	p                     peer.ID
	selector              ipld.Node
	inProgressRequestChan chan<- inProgressRequest
}

// SendRequest initiates a new GraphSync request to the given peer.
100 101
func (rm *RequestManager) SendRequest(ctx context.Context,
	p peer.ID,
102
	cidRootedSelector ipld.Node) (<-chan types.ResponseProgress, <-chan error) {
103
	if len(rm.ipldBridge.ValidateSelectorSpec(cidRootedSelector)) != 0 {
104
		return rm.singleErrorResponse(fmt.Errorf("Invalid Selector Spec"))
105 106 107 108 109 110 111
	}

	inProgressRequestChan := make(chan inProgressRequest)

	select {
	case rm.messages <- &newRequestMessage{p, cidRootedSelector, inProgressRequestChan}:
	case <-rm.ctx.Done():
112
		return rm.emptyResponse()
113
	case <-ctx.Done():
114
		return rm.emptyResponse()
115 116 117 118
	}
	var receivedInProgressRequest inProgressRequest
	select {
	case <-rm.ctx.Done():
119
		return rm.emptyResponse()
120 121 122
	case receivedInProgressRequest = <-inProgressRequestChan:
	}

123 124 125 126 127 128 129 130 131 132
	return rm.rc.collectResponses(ctx,
		receivedInProgressRequest.incoming,
		receivedInProgressRequest.incomingError,
		func() {
			rm.cancelRequest(receivedInProgressRequest.requestID,
				receivedInProgressRequest.incoming,
				receivedInProgressRequest.incomingError)
		})
}

133 134
func (rm *RequestManager) emptyResponse() (chan types.ResponseProgress, chan error) {
	ch := make(chan types.ResponseProgress)
135
	close(ch)
136
	errCh := make(chan error)
137 138 139 140
	close(errCh)
	return ch, errCh
}

141 142
func (rm *RequestManager) singleErrorResponse(err error) (chan types.ResponseProgress, chan error) {
	ch := make(chan types.ResponseProgress)
143
	close(ch)
144
	errCh := make(chan error, 1)
145
	errCh <- err
146 147
	close(errCh)
	return ch, errCh
148 149 150 151 152 153
}

type cancelRequestMessage struct {
	requestID gsmsg.GraphSyncRequestID
}

154
func (rm *RequestManager) cancelRequest(requestID gsmsg.GraphSyncRequestID,
155
	incomingResponses chan types.ResponseProgress,
156
	incomingErrors chan error) {
157
	cancelMessageChannel := rm.messages
158
	for cancelMessageChannel != nil || incomingResponses != nil || incomingErrors != nil {
159 160 161 162 163 164 165
		select {
		case cancelMessageChannel <- &cancelRequestMessage{requestID}:
			cancelMessageChannel = nil
		// clear out any remaining responses, in case and "incoming reponse"
		// messages get processed before our cancel message
		case _, ok := <-incomingResponses:
			if !ok {
166 167 168 169 170
				incomingResponses = nil
			}
		case _, ok := <-incomingErrors:
			if !ok {
				incomingErrors = nil
171 172 173 174 175 176 177 178
			}
		case <-rm.ctx.Done():
			return
		}
	}
}

type processResponseMessage struct {
179
	p         peer.ID
180 181
	responses []gsmsg.GraphSyncResponse
	blks      []blocks.Block
182 183 184 185
}

// ProcessResponses ingests the given responses from the network and
// and updates the in progress requests based on those responses.
186
func (rm *RequestManager) ProcessResponses(p peer.ID, responses []gsmsg.GraphSyncResponse,
187
	blks []blocks.Block) {
188
	select {
189
	case rm.messages <- &processResponseMessage{p, responses, blks}:
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
	case <-rm.ctx.Done():
	}
}

// Startup starts processing for the WantManager.
func (rm *RequestManager) Startup() {
	go rm.run()
}

// Shutdown ends processing for the want manager.
func (rm *RequestManager) Shutdown() {
	rm.cancel()
}

func (rm *RequestManager) run() {
	// NOTE: Do not open any streams or connections from anywhere in this
	// event loop. Really, just don't do anything likely to block.
	defer rm.cleanupInProcessRequests()

	for {
		select {
		case message := <-rm.messages:
			message.handle(rm)
		case <-rm.ctx.Done():
			return
		}
	}
}

func (rm *RequestManager) cleanupInProcessRequests() {
	for _, requestStatus := range rm.inProgressRequestStatuses {
221
		requestStatus.cancelFn()
222 223 224
	}
}

225 226 227
type terminateRequestMessage struct {
	requestID gsmsg.GraphSyncRequestID
}
228

229
func (nrm *newRequestMessage) handle(rm *RequestManager) {
230 231 232
	requestID := rm.nextRequestID
	rm.nextRequestID++

233
	inProgressChan, inProgressErr := rm.setupRequest(requestID, nrm.p, nrm.selector)
234 235 236

	select {
	case nrm.inProgressRequestChan <- inProgressRequest{
237 238 239
		requestID:     requestID,
		incoming:      inProgressChan,
		incomingError: inProgressErr,
240 241 242 243 244
	}:
	case <-rm.ctx.Done():
	}
}

245 246 247 248 249
func (trm *terminateRequestMessage) handle(rm *RequestManager) {
	delete(rm.inProgressRequestStatuses, trm.requestID)
	rm.asyncLoader.CleanupRequest(trm.requestID)
}

250 251 252 253 254 255
func (crm *cancelRequestMessage) handle(rm *RequestManager) {
	inProgressRequestStatus, ok := rm.inProgressRequestStatuses[crm.requestID]
	if !ok {
		return
	}

256
	rm.peerHandler.SendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID))
257
	delete(rm.inProgressRequestStatuses, crm.requestID)
258
	inProgressRequestStatus.cancelFn()
259 260 261
}

func (prm *processResponseMessage) handle(rm *RequestManager) {
262 263 264 265 266 267 268 269 270 271 272 273
	filteredResponses := rm.filterResponsesForPeer(prm.responses, prm.p)
	responseMetadata := metadataForResponses(filteredResponses, rm.ipldBridge)
	rm.asyncLoader.ProcessResponse(responseMetadata, prm.blks)
	rm.processTerminations(filteredResponses)
}

func (rm *RequestManager) filterResponsesForPeer(responses []gsmsg.GraphSyncResponse, p peer.ID) []gsmsg.GraphSyncResponse {
	responsesForPeer := make([]gsmsg.GraphSyncResponse, 0, len(responses))
	for _, response := range responses {
		requestStatus, ok := rm.inProgressRequestStatuses[response.RequestID()]
		if !ok || requestStatus.p != p {
			continue
274
		}
275
		responsesForPeer = append(responsesForPeer, response)
276
	}
277 278
	return responsesForPeer
}
279

280 281
func (rm *RequestManager) processTerminations(responses []gsmsg.GraphSyncResponse) {
	for _, response := range responses {
282
		if gsmsg.IsTerminalResponseCode(response.Status()) {
283 284 285 286 287 288
			if gsmsg.IsTerminalFailureCode(response.Status()) {
				requestStatus := rm.inProgressRequestStatuses[response.RequestID()]
				responseError := rm.generateResponseErrorFromStatus(response.Status())
				select {
				case requestStatus.networkError <- responseError:
				case <-requestStatus.ctx.Done():
289
				}
290
				requestStatus.cancelFn()
291
			}
292 293
			rm.asyncLoader.CompleteResponsesFor(response.RequestID())
			delete(rm.inProgressRequestStatuses, response.RequestID())
294 295 296
		}
	}
}
297

298
func (rm *RequestManager) generateResponseErrorFromStatus(status gsmsg.GraphSyncResponseStatusCode) error {
299 300
	switch status {
	case gsmsg.RequestFailedBusy:
301
		return fmt.Errorf("Request Failed - Peer Is Busy")
302
	case gsmsg.RequestFailedContentNotFound:
303
		return fmt.Errorf("Request Failed - Content Not Found")
304
	case gsmsg.RequestFailedLegal:
305
		return fmt.Errorf("Request Failed - For Legal Reasons")
306
	case gsmsg.RequestFailedUnknown:
307
		return fmt.Errorf("Request Failed - Unknown Reason")
308
	default:
309
		return fmt.Errorf("Unknown")
310 311
	}
}
312

313
func (rm *RequestManager) setupRequest(requestID gsmsg.GraphSyncRequestID, p peer.ID, selectorSpec ipld.Node) (chan types.ResponseProgress, chan error) {
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
	selectorBytes, err := rm.ipldBridge.EncodeNode(selectorSpec)
	if err != nil {
		return rm.singleErrorResponse(err)
	}
	root, selector, err := rm.ipldBridge.DecodeSelectorSpec(selectorSpec)
	if err != nil {
		return rm.singleErrorResponse(err)
	}
	networkErrorChan := make(chan error, 1)
	ctx, cancel := context.WithCancel(rm.ctx)
	rm.inProgressRequestStatuses[requestID] = &inProgressRequestStatus{
		ctx, cancel, p, networkErrorChan,
	}
	rm.asyncLoader.StartRequest(requestID)
	rm.peerHandler.SendRequest(p, gsmsg.NewRequest(requestID, selectorBytes, maxPriority))
	return rm.executeTraversal(ctx, requestID, root, selector, networkErrorChan)
}

func (rm *RequestManager) executeTraversal(
	ctx context.Context,
	requestID gsmsg.GraphSyncRequestID,
	root ipld.Node,
	selector ipldbridge.Selector,
	networkErrorChan chan error,
338 339
) (chan types.ResponseProgress, chan error) {
	inProgressChan := make(chan types.ResponseProgress)
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
	inProgressErr := make(chan error)
	loaderFn := loader.WrapAsyncLoader(ctx, rm.asyncLoader.AsyncLoad, requestID, inProgressErr)
	visitor := visitToChannel(ctx, inProgressChan)
	go func() {
		rm.ipldBridge.Traverse(ctx, loaderFn, root, selector, visitor)
		select {
		case networkError := <-networkErrorChan:
			select {
			case <-rm.ctx.Done():
			case inProgressErr <- networkError:
			}
		default:
		}
		select {
		case <-ctx.Done():
		case rm.messages <- &terminateRequestMessage{requestID}:
		}
		close(inProgressChan)
		close(inProgressErr)
	}()
	return inProgressChan, inProgressErr
}