loadattemptqueue.go 2.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
package loadattemptqueue

import (
	"errors"

	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/requestmanager/types"
	"github.com/ipld/go-ipld-prime"
)

// LoadRequest is a request to load the given link for the given request id,
// with results returned to the given channel
type LoadRequest struct {
	requestID  gsmsg.GraphSyncRequestID
	link       ipld.Link
	resultChan chan types.AsyncLoadResult
}

// NewLoadRequest returns a new LoadRequest for the given request id, link,
// and results channel
func NewLoadRequest(requestID gsmsg.GraphSyncRequestID,
	link ipld.Link,
	resultChan chan types.AsyncLoadResult) LoadRequest {
	return LoadRequest{requestID, link, resultChan}
}

// LoadAttempter attempts to load a link to an array of bytes
// it has three results:
// bytes present, error nil = success
// bytes nil, error present = error
// bytes nil, error nil = did not load, but try again later
type LoadAttempter func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error)

// LoadAttemptQueue attempts to load using the load attempter, and then can
// place requests on a retry queue
type LoadAttemptQueue struct {
	loadAttempter  LoadAttempter
	pausedRequests []LoadRequest
}

// New initializes a new AsyncLoader from loadAttempter function
func New(loadAttempter LoadAttempter) *LoadAttemptQueue {
	return &LoadAttemptQueue{
		loadAttempter: loadAttempter,
	}
}

// AttemptLoad attempts to loads the given load request, and if retry is true
// it saves the loadrequest for retrying later
func (laq *LoadAttemptQueue) AttemptLoad(lr LoadRequest, retry bool) {
	response, err := laq.loadAttempter(lr.requestID, lr.link)
	if err != nil {
		lr.resultChan <- types.AsyncLoadResult{Data: nil, Err: err}
		close(lr.resultChan)
		return
	}
	if response != nil {
		lr.resultChan <- types.AsyncLoadResult{Data: response, Err: nil}
		close(lr.resultChan)
		return
	}
	if !retry {
		laq.terminateWithError("No active request", lr.resultChan)
		return
	}
	laq.pausedRequests = append(laq.pausedRequests, lr)
}

// ClearRequest purges the given request from the queue of load requests
// to retry
func (laq *LoadAttemptQueue) ClearRequest(requestID gsmsg.GraphSyncRequestID) {
	pausedRequests := laq.pausedRequests
	laq.pausedRequests = nil
	for _, lr := range pausedRequests {
		if lr.requestID == requestID {
			laq.terminateWithError("No active request", lr.resultChan)
		} else {
			laq.pausedRequests = append(laq.pausedRequests, lr)
		}
	}
}

// RetryLoads attempts loads on all saved load requests that were loaded with
// retry = true
func (laq *LoadAttemptQueue) RetryLoads() {
	// drain buffered
	pausedRequests := laq.pausedRequests
	laq.pausedRequests = nil
	for _, lr := range pausedRequests {
		laq.AttemptLoad(lr, true)
	}
}

func (laq *LoadAttemptQueue) terminateWithError(errMsg string, resultChan chan<- types.AsyncLoadResult) {
	resultChan <- types.AsyncLoadResult{Data: nil, Err: errors.New(errMsg)}
	close(resultChan)
}