asyncloader.go 5.92 KB
Newer Older
1 2 3 4 5
package asyncloader

import (

8 9 10

	gsmsg ""
12 13 14 15 16
17 18 19 20

type loaderMessage interface {
	handle(al *AsyncLoader)
22 23

24 25
// AsyncLoader manages loading links asynchronously in as new responses
// come in from the network
26 27 28 29 30 31
type AsyncLoader struct {
	ctx              context.Context
	cancel           context.CancelFunc
	incomingMessages chan loaderMessage
	outgoingMessages chan loaderMessage

32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
	activeRequests   map[gsmsg.GraphSyncRequestID]bool
	loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
	responseCache    *responsecache.ResponseCache

// New initializes a new link loading manager for asynchronous loads from the given context
// and local store loading and storing function
func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer) *AsyncLoader {
	unverifiedBlockStore := unverifiedblockstore.New(storer)
	responseCache := responsecache.New(unverifiedBlockStore)
	loadAttemptQueue := loadattemptqueue.New(func(requestID gsmsg.GraphSyncRequestID, link ipld.Link) ([]byte, error) {
		// load from response cache
		data, err := responseCache.AttemptLoad(requestID, link)
		if data == nil && err == nil {
			// fall back to local store
			stream, loadErr := loader(link, ipldbridge.LinkContext{})
			if stream != nil && loadErr == nil {
				localData, loadErr := ioutil.ReadAll(stream)
				if loadErr == nil && localData != nil {
					return localData, nil
		return data, err
57 58 59 60 61 62
	ctx, cancel := context.WithCancel(ctx)
	return &AsyncLoader{
		ctx:              ctx,
		cancel:           cancel,
		incomingMessages: make(chan loaderMessage),
		outgoingMessages: make(chan loaderMessage),
63 64 65
		activeRequests:   make(map[gsmsg.GraphSyncRequestID]bool),
		responseCache:    responseCache,
		loadAttemptQueue: loadAttemptQueue,
66 67 68

69 70 71 72 73 74 75 76 77 78 79 80 81 82
// Startup starts processing of messages
func (al *AsyncLoader) Startup() {
	go al.messageQueueWorker()

// Shutdown finishes processing of messages
func (al *AsyncLoader) Shutdown() {

// StartRequest indicates the given request has started and the manager should
// continually attempt to load links for this request as new responses come in
func (al *AsyncLoader) StartRequest(requestID gsmsg.GraphSyncRequestID) {
	select {
84 85
	case <-al.ctx.Done():
	case al.incomingMessages <- &startRequestMessage{requestID}:
86 87 88

89 90 91 92 93
// ProcessResponse injests new responses and completes asynchronous loads as
// neccesary
func (al *AsyncLoader) ProcessResponse(responses map[gsmsg.GraphSyncRequestID]metadata.Metadata,
	blks []blocks.Block) {
	al.responseCache.ProcessResponse(responses, blks)
	select {
95 96
	case <-al.ctx.Done():
	case al.incomingMessages <- &newResponsesAvailableMessage{}:
97 98 99

100 101 102 103 104
// AsyncLoad asynchronously loads the given link for the given request ID. It returns a channel for data and a channel
// for errors -- only one message will be sent over either.
func (al *AsyncLoader) AsyncLoad(requestID gsmsg.GraphSyncRequestID, link ipld.Link) <-chan types.AsyncLoadResult {
	resultChan := make(chan types.AsyncLoadResult, 1)
	lr := loadattemptqueue.NewLoadRequest(requestID, link, resultChan)
	select {
106 107 108 109
	case <-al.ctx.Done():
		resultChan <- types.AsyncLoadResult{Data: nil, Err: errors.New("Context closed")}
	case al.incomingMessages <- &loadRequestMessage{requestID, lr}:
	return resultChan
112 113

114 115 116 117
// CompleteResponsesFor indicates no further responses will come in for the given
// requestID, so if no responses are in the cache or local store, a link load
// should not retry
func (al *AsyncLoader) CompleteResponsesFor(requestID gsmsg.GraphSyncRequestID) {
	select {
119 120
	case <-al.ctx.Done():
	case al.incomingMessages <- &finishRequestMessage{requestID}:
121 122 123

124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
// CleanupRequest indicates the given request is complete on the client side,
// and no further attempts will be made to load links for this request,
// so any cached response data is invalid can be cleaned
func (al *AsyncLoader) CleanupRequest(requestID gsmsg.GraphSyncRequestID) {

type loadRequestMessage struct {
	requestID   gsmsg.GraphSyncRequestID
	loadRequest loadattemptqueue.LoadRequest

type newResponsesAvailableMessage struct {

type startRequestMessage struct {
	requestID gsmsg.GraphSyncRequestID
141 142

143 144
type finishRequestMessage struct {
	requestID gsmsg.GraphSyncRequestID
145 146

func (al *AsyncLoader) run() {
148 149
	for {
		select {
		case <-al.ctx.Done():
152 153
		case message := <-al.outgoingMessages:
154 155 156 157

func (al *AsyncLoader) messageQueueWorker() {
159 160 161 162 163 164 165 166 167 168 169
	var messageBuffer []loaderMessage
	nextMessage := func() loaderMessage {
		if len(messageBuffer) == 0 {
			return nil
		return messageBuffer[0]
	outgoingMessages := func() chan<- loaderMessage {
		if len(messageBuffer) == 0 {
			return nil
		return al.outgoingMessages
171 172 173
	for {
		select {
		case incomingMessage := <-al.incomingMessages:
175 176 177
			messageBuffer = append(messageBuffer, incomingMessage)
		case outgoingMessages() <- nextMessage():
			messageBuffer = messageBuffer[1:]
		case <-al.ctx.Done():
179 180 181 182 183

184 185 186
func (lrm *loadRequestMessage) handle(al *AsyncLoader) {
	retry := al.activeRequests[lrm.requestID]
	al.loadAttemptQueue.AttemptLoad(lrm.loadRequest, retry)
187 188

189 190
func (srm *startRequestMessage) handle(al *AsyncLoader) {
	al.activeRequests[srm.requestID] = true
191 192

193 194 195
func (frm *finishRequestMessage) handle(al *AsyncLoader) {
	delete(al.activeRequests, frm.requestID)
196 197

198 199
func (nram *newResponsesAvailableMessage) handle(al *AsyncLoader) {