asyncloader.go 9.96 KB
Newer Older
1 2 3 4 5
package asyncloader

import (

8 9
	blocks ""
10 11 12 13 14 15

16 17 18 19

type loaderMessage interface {
	handle(al *AsyncLoader)
21 22

23 24 25 26 27
type alternateQueue struct {
	responseCache    *responsecache.ResponseCache
	loadAttemptQueue *loadattemptqueue.LoadAttemptQueue

28 29
// AsyncLoader manages loading links asynchronously in as new responses
// come in from the network
30 31 32 33 34 35
type AsyncLoader struct {
	ctx              context.Context
	cancel           context.CancelFunc
	incomingMessages chan loaderMessage
	outgoingMessages chan loaderMessage

36 37 38 39 40
	defaultLoader    ipld.Loader
	defaultStorer    ipld.Storer
	activeRequests   map[graphsync.RequestID]struct{}
	requestQueues    map[graphsync.RequestID]string
	alternateQueues  map[string]alternateQueue
	responseCache    *responsecache.ResponseCache
	loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
43 44 45 46 47

// New initializes a new link loading manager for asynchronous loads from the given context
// and local store loading and storing function
func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer) *AsyncLoader {
	responseCache, loadAttemptQueue := setupAttemptQueue(loader, storer)
49 50 51 52 53 54
	ctx, cancel := context.WithCancel(ctx)
	return &AsyncLoader{
		ctx:              ctx,
		cancel:           cancel,
		incomingMessages: make(chan loaderMessage),
		outgoingMessages: make(chan loaderMessage),
55 56 57 58 59
		defaultLoader:    loader,
		defaultStorer:    storer,
		activeRequests:   make(map[graphsync.RequestID]struct{}),
		requestQueues:    make(map[graphsync.RequestID]string),
		alternateQueues:  make(map[string]alternateQueue),
60 61
		responseCache:    responseCache,
		loadAttemptQueue: loadAttemptQueue,
62 63 64

65 66 67 68 69 70 71 72 73 74 75
// Startup starts processing of messages
func (al *AsyncLoader) Startup() {
	go al.messageQueueWorker()

// Shutdown finishes processing of messages
func (al *AsyncLoader) Shutdown() {

76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
// RegisterPersistenceOption registers a new loader/storer option for processing requests
func (al *AsyncLoader) RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error {
	if name == "" {
		return errors.New("Persistence option must have a name")
	response := make(chan error, 1)
	select {
	case <-al.ctx.Done():
		return errors.New("context closed")
	case al.incomingMessages <- &registerPersistenceOptionMessage{name, loader, storer, response}:
	select {
	case <-al.ctx.Done():
		return errors.New("context closed")
	case err := <-response:
		return err

95 96
// StartRequest indicates the given request has started and the manager should
// continually attempt to load links for this request as new responses come in
97 98 99 100 101 102 103
func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID, persistenceOption string) error {
	response := make(chan error, 1)
	select {
	case <-al.ctx.Done():
		return errors.New("context closed")
	case al.incomingMessages <- &startRequestMessage{requestID, persistenceOption, response}:
	select {
	case <-al.ctx.Done():
106 107 108
		return errors.New("context closed")
	case err := <-response:
		return err
109 110 111

112 113
// ProcessResponse injests new responses and completes asynchronous loads as
// neccesary
func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
	blks []blocks.Block) {
	select {
	case <-al.ctx.Done():
	case al.incomingMessages <- &newResponsesAvailableMessage{responses, blks}:
119 120 121

122 123
// AsyncLoad asynchronously loads the given link for the given request ID. It returns a channel for data and a channel
// for errors -- only one message will be sent over either.
func (al *AsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
125 126
	resultChan := make(chan types.AsyncLoadResult, 1)
	lr := loadattemptqueue.NewLoadRequest(requestID, link, resultChan)
	select {
128 129 130 131
	case <-al.ctx.Done():
		resultChan <- types.AsyncLoadResult{Data: nil, Err: errors.New("Context closed")}
	case al.incomingMessages <- &loadRequestMessage{requestID, lr}:
	return resultChan
134 135

136 137 138
// CompleteResponsesFor indicates no further responses will come in for the given
// requestID, so if no responses are in the cache or local store, a link load
// should not retry
func (al *AsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) {
	select {
141 142
	case <-al.ctx.Done():
	case al.incomingMessages <- &finishRequestMessage{requestID}:
143 144 145

146 147 148
// CleanupRequest indicates the given request is complete on the client side,
// and no further attempts will be made to load links for this request,
// so any cached response data is invalid can be cleaned
func (al *AsyncLoader) CleanupRequest(requestID graphsync.RequestID) {
150 151 152 153
	select {
	case <-al.ctx.Done():
	case al.incomingMessages <- &cleanupRequestMessage{requestID}:
154 155 156

type loadRequestMessage struct {
	requestID   graphsync.RequestID
158 159 160 161
	loadRequest loadattemptqueue.LoadRequest

type newResponsesAvailableMessage struct {
162 163 164 165 166 167 168 169 170
	responses map[graphsync.RequestID]metadata.Metadata
	blks      []blocks.Block

type registerPersistenceOptionMessage struct {
	name     string
	loader   ipld.Loader
	storer   ipld.Storer
	response chan error
171 172 173

type startRequestMessage struct {
174 175 176
	requestID         graphsync.RequestID
	persistenceOption string
	response          chan error
177 178

type finishRequestMessage struct {
	requestID graphsync.RequestID
181 182

183 184 185 186
type cleanupRequestMessage struct {
	requestID graphsync.RequestID

func (al *AsyncLoader) run() {
188 189
	for {
		select {
		case <-al.ctx.Done():
192 193
		case message := <-al.outgoingMessages:
194 195 196 197

func (al *AsyncLoader) messageQueueWorker() {
199 200 201 202 203 204 205 206 207 208 209
	var messageBuffer []loaderMessage
	nextMessage := func() loaderMessage {
		if len(messageBuffer) == 0 {
			return nil
		return messageBuffer[0]
	outgoingMessages := func() chan<- loaderMessage {
		if len(messageBuffer) == 0 {
			return nil
		return al.outgoingMessages
211 212 213
	for {
		select {
		case incomingMessage := <-al.incomingMessages:
215 216 217
			messageBuffer = append(messageBuffer, incomingMessage)
		case outgoingMessages() <- nextMessage():
			messageBuffer = messageBuffer[1:]
		case <-al.ctx.Done():
219 220 221 222 223

224 225 226 227 228 229 230 231 232 233 234 235 236 237
func (al *AsyncLoader) getLoadAttemptQueue(queue string) *loadattemptqueue.LoadAttemptQueue {
	if queue == "" {
		return al.loadAttemptQueue
	return al.alternateQueues[queue].loadAttemptQueue

func (al *AsyncLoader) getResponseCache(queue string) *responsecache.ResponseCache {
	if queue == "" {
		return al.responseCache
	return al.alternateQueues[queue].responseCache

func (lrm *loadRequestMessage) handle(al *AsyncLoader) {
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
	_, retry := al.activeRequests[lrm.requestID]
	loadAttemptQueue := al.getLoadAttemptQueue(al.requestQueues[lrm.requestID])
	loadAttemptQueue.AttemptLoad(lrm.loadRequest, retry)

func (rpom *registerPersistenceOptionMessage) register(al *AsyncLoader) error {
	_, existing := al.alternateQueues[]
	if existing {
		return errors.New("already registerd a persistence option with this name")
	responseCache, loadAttemptQueue := setupAttemptQueue(rpom.loader, rpom.storer)
	al.alternateQueues[] = alternateQueue{responseCache, loadAttemptQueue}
	return nil

func (rpom *registerPersistenceOptionMessage) handle(al *AsyncLoader) {
	err := rpom.register(al)
	select {
	case <-al.ctx.Done():
	case rpom.response <- err:

func (srm *startRequestMessage) startRequest(al *AsyncLoader) error {
	if srm.persistenceOption != "" {
		_, ok := al.alternateQueues[srm.persistenceOption]
		if !ok {
			return errors.New("Unknown persistence option")
		al.requestQueues[srm.requestID] = srm.persistenceOption
	al.activeRequests[srm.requestID] = struct{}{}
	return nil
272 273

func (srm *startRequestMessage) handle(al *AsyncLoader) {
275 276 277 278 279
	err := srm.startRequest(al)
	select {
	case <-al.ctx.Done():
	case srm.response <- err:
280 281

282 283
func (frm *finishRequestMessage) handle(al *AsyncLoader) {
	delete(al.activeRequests, frm.requestID)
284 285
	loadAttemptQueue := al.getLoadAttemptQueue(al.requestQueues[frm.requestID])
286 287

func (nram *newResponsesAvailableMessage) handle(al *AsyncLoader) {
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
	byQueue := make(map[string][]graphsync.RequestID)
	for requestID := range nram.responses {
		queue := al.requestQueues[requestID]
		byQueue[queue] = append(byQueue[queue], requestID)
	for queue, requestIDs := range byQueue {
		loadAttemptQueue := al.getLoadAttemptQueue(queue)
		responseCache := al.getResponseCache(queue)
		responses := make(map[graphsync.RequestID]metadata.Metadata, len(requestIDs))
		for _, requestID := range requestIDs {
			responses[requestID] = nram.responses[requestID]
		responseCache.ProcessResponse(responses, nram.blks)

func (crm *cleanupRequestMessage) handle(al *AsyncLoader) {
	aq, ok := al.requestQueues[crm.requestID]
	if ok {
		delete(al.requestQueues, crm.requestID)

func setupAttemptQueue(loader ipld.Loader, storer ipld.Storer) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) {

	unverifiedBlockStore := unverifiedblockstore.New(storer)
	responseCache := responsecache.New(unverifiedBlockStore)
	loadAttemptQueue := loadattemptqueue.New(func(requestID graphsync.RequestID, link ipld.Link) ([]byte, error) {
		// load from response cache
		data, err := responseCache.AttemptLoad(requestID, link)
		if data == nil && err == nil {
			// fall back to local store
			stream, loadErr := loader(link, ipld.LinkContext{})
			if stream != nil && loadErr == nil {
				localData, loadErr := ioutil.ReadAll(stream)
				if loadErr == nil && localData != nil {
					return localData, nil
		return data, err

	return responseCache, loadAttemptQueue