asyncloader.go 5.92 KB
Newer Older
1 2 3 4 5
package asyncloader

import (
	"context"
	"errors"
6
	"io/ioutil"
7

8 9 10
	"github.com/ipfs/go-block-format"

	"github.com/ipfs/go-graphsync/ipldbridge"
11
	gsmsg "github.com/ipfs/go-graphsync/message"
12 13 14 15 16
	"github.com/ipfs/go-graphsync/metadata"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/loadattemptqueue"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/responsecache"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/unverifiedblockstore"
	"github.com/ipfs/go-graphsync/requestmanager/types"
17 18 19 20
	"github.com/ipld/go-ipld-prime"
)

type loaderMessage interface {
21
	handle(al *AsyncLoader)
22 23
}

24 25
// AsyncLoader manages loading links asynchronously in as new responses
// come in from the network
26 27 28 29 30 31
type AsyncLoader struct {
	ctx              context.Context
	cancel           context.CancelFunc
	incomingMessages chan loaderMessage
	outgoingMessages chan loaderMessage

32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
	activeRequests   map[gsmsg.GraphSyncRequestID]bool
	loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
	responseCache    *responsecache.ResponseCache
}

// New initializes a new link loading manager for asynchronous loads from the given context
// and local store loading and storing function
func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer) *AsyncLoader {
	unverifiedBlockStore := unverifiedblockstore.New(storer)
	responseCache := responsecache.New(unverifiedBlockStore)
	loadAttemptQueue := loadattemptqueue.New(func(requestID gsmsg.GraphSyncRequestID, link ipld.Link) ([]byte, error) {
		// load from response cache
		data, err := responseCache.AttemptLoad(requestID, link)
		if data == nil && err == nil {
			// fall back to local store
			stream, loadErr := loader(link, ipldbridge.LinkContext{})
			if stream != nil && loadErr == nil {
				localData, loadErr := ioutil.ReadAll(stream)
				if loadErr == nil && localData != nil {
					return localData, nil
				}
			}
		}
		return data, err
	})
57 58 59 60 61 62
	ctx, cancel := context.WithCancel(ctx)
	return &AsyncLoader{
		ctx:              ctx,
		cancel:           cancel,
		incomingMessages: make(chan loaderMessage),
		outgoingMessages: make(chan loaderMessage),
63 64 65
		activeRequests:   make(map[gsmsg.GraphSyncRequestID]bool),
		responseCache:    responseCache,
		loadAttemptQueue: loadAttemptQueue,
66 67 68
	}
}

69 70 71 72 73 74 75 76 77 78 79 80 81 82
// Startup starts processing of messages
func (al *AsyncLoader) Startup() {
	go al.messageQueueWorker()
	go al.run()
}

// Shutdown finishes processing of messages
func (al *AsyncLoader) Shutdown() {
	al.cancel()
}

// StartRequest indicates the given request has started and the manager should
// continually attempt to load links for this request as new responses come in
func (al *AsyncLoader) StartRequest(requestID gsmsg.GraphSyncRequestID) {
83
	select {
84 85
	case <-al.ctx.Done():
	case al.incomingMessages <- &startRequestMessage{requestID}:
86 87 88
	}
}

89 90 91 92 93
// ProcessResponse injests new responses and completes asynchronous loads as
// neccesary
func (al *AsyncLoader) ProcessResponse(responses map[gsmsg.GraphSyncRequestID]metadata.Metadata,
	blks []blocks.Block) {
	al.responseCache.ProcessResponse(responses, blks)
94
	select {
95 96
	case <-al.ctx.Done():
	case al.incomingMessages <- &newResponsesAvailableMessage{}:
97 98 99
	}
}

100 101 102 103 104
// AsyncLoad asynchronously loads the given link for the given request ID. It returns a channel for data and a channel
// for errors -- only one message will be sent over either.
func (al *AsyncLoader) AsyncLoad(requestID gsmsg.GraphSyncRequestID, link ipld.Link) <-chan types.AsyncLoadResult {
	resultChan := make(chan types.AsyncLoadResult, 1)
	lr := loadattemptqueue.NewLoadRequest(requestID, link, resultChan)
105
	select {
106 107 108 109
	case <-al.ctx.Done():
		resultChan <- types.AsyncLoadResult{Data: nil, Err: errors.New("Context closed")}
		close(resultChan)
	case al.incomingMessages <- &loadRequestMessage{requestID, lr}:
110
	}
111
	return resultChan
112 113
}

114 115 116 117
// CompleteResponsesFor indicates no further responses will come in for the given
// requestID, so if no responses are in the cache or local store, a link load
// should not retry
func (al *AsyncLoader) CompleteResponsesFor(requestID gsmsg.GraphSyncRequestID) {
118
	select {
119 120
	case <-al.ctx.Done():
	case al.incomingMessages <- &finishRequestMessage{requestID}:
121 122 123
	}
}

124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
// CleanupRequest indicates the given request is complete on the client side,
// and no further attempts will be made to load links for this request,
// so any cached response data is invalid can be cleaned
func (al *AsyncLoader) CleanupRequest(requestID gsmsg.GraphSyncRequestID) {
	al.responseCache.FinishRequest(requestID)
}

type loadRequestMessage struct {
	requestID   gsmsg.GraphSyncRequestID
	loadRequest loadattemptqueue.LoadRequest
}

type newResponsesAvailableMessage struct {
}

type startRequestMessage struct {
	requestID gsmsg.GraphSyncRequestID
141 142
}

143 144
type finishRequestMessage struct {
	requestID gsmsg.GraphSyncRequestID
145 146
}

147
func (al *AsyncLoader) run() {
148 149
	for {
		select {
150
		case <-al.ctx.Done():
151
			return
152 153
		case message := <-al.outgoingMessages:
			message.handle(al)
154 155 156 157
		}
	}
}

158
func (al *AsyncLoader) messageQueueWorker() {
159 160 161 162 163 164 165 166 167 168 169
	var messageBuffer []loaderMessage
	nextMessage := func() loaderMessage {
		if len(messageBuffer) == 0 {
			return nil
		}
		return messageBuffer[0]
	}
	outgoingMessages := func() chan<- loaderMessage {
		if len(messageBuffer) == 0 {
			return nil
		}
170
		return al.outgoingMessages
171 172 173
	}
	for {
		select {
174
		case incomingMessage := <-al.incomingMessages:
175 176 177
			messageBuffer = append(messageBuffer, incomingMessage)
		case outgoingMessages() <- nextMessage():
			messageBuffer = messageBuffer[1:]
178
		case <-al.ctx.Done():
179 180 181 182 183
			return
		}
	}
}

184 185 186
func (lrm *loadRequestMessage) handle(al *AsyncLoader) {
	retry := al.activeRequests[lrm.requestID]
	al.loadAttemptQueue.AttemptLoad(lrm.loadRequest, retry)
187 188
}

189 190
func (srm *startRequestMessage) handle(al *AsyncLoader) {
	al.activeRequests[srm.requestID] = true
191 192
}

193 194 195
func (frm *finishRequestMessage) handle(al *AsyncLoader) {
	delete(al.activeRequests, frm.requestID)
	al.loadAttemptQueue.ClearRequest(frm.requestID)
196 197
}

198 199
func (nram *newResponsesAvailableMessage) handle(al *AsyncLoader) {
	al.loadAttemptQueue.RetryLoads()
200
}