asyncloader.go 11.2 KB
Newer Older
1 2 3 4 5
package asyncloader

import (
	"context"
	"errors"
6
	"io/ioutil"
7

8
	blocks "github.com/ipfs/go-block-format"
Hannah Howard's avatar
Hannah Howard committed
9
	"github.com/ipld/go-ipld-prime"
10

Hannah Howard's avatar
Hannah Howard committed
11
	"github.com/ipfs/go-graphsync"
12 13 14 15 16
	"github.com/ipfs/go-graphsync/metadata"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/loadattemptqueue"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/responsecache"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/unverifiedblockstore"
	"github.com/ipfs/go-graphsync/requestmanager/types"
17 18 19
)

type loaderMessage interface {
20
	handle(al *AsyncLoader)
21 22
}

23 24 25 26 27
type alternateQueue struct {
	responseCache    *responsecache.ResponseCache
	loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
}

28 29
// AsyncLoader manages loading links asynchronously in as new responses
// come in from the network
30 31 32 33 34 35
type AsyncLoader struct {
	ctx              context.Context
	cancel           context.CancelFunc
	incomingMessages chan loaderMessage
	outgoingMessages chan loaderMessage

36 37 38 39 40
	defaultLoader    ipld.Loader
	defaultStorer    ipld.Storer
	activeRequests   map[graphsync.RequestID]struct{}
	requestQueues    map[graphsync.RequestID]string
	alternateQueues  map[string]alternateQueue
41
	responseCache    *responsecache.ResponseCache
42
	loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
43 44 45 46 47
}

// New initializes a new link loading manager for asynchronous loads from the given context
// and local store loading and storing function
func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer) *AsyncLoader {
48
	responseCache, loadAttemptQueue := setupAttemptQueue(loader, storer)
49 50 51 52 53 54
	ctx, cancel := context.WithCancel(ctx)
	return &AsyncLoader{
		ctx:              ctx,
		cancel:           cancel,
		incomingMessages: make(chan loaderMessage),
		outgoingMessages: make(chan loaderMessage),
55 56 57 58 59
		defaultLoader:    loader,
		defaultStorer:    storer,
		activeRequests:   make(map[graphsync.RequestID]struct{}),
		requestQueues:    make(map[graphsync.RequestID]string),
		alternateQueues:  make(map[string]alternateQueue),
60 61
		responseCache:    responseCache,
		loadAttemptQueue: loadAttemptQueue,
62 63 64
	}
}

65 66 67 68 69 70 71 72 73 74 75
// Startup starts processing of messages
func (al *AsyncLoader) Startup() {
	go al.messageQueueWorker()
	go al.run()
}

// Shutdown finishes processing of messages
func (al *AsyncLoader) Shutdown() {
	al.cancel()
}

76 77 78 79 80 81
// RegisterPersistenceOption registers a new loader/storer option for processing requests
func (al *AsyncLoader) RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error {
	if name == "" {
		return errors.New("Persistence option must have a name")
	}
	response := make(chan error, 1)
82 83 84 85 86 87 88 89
	err := al.sendSyncMessage(&registerPersistenceOptionMessage{name, loader, storer, response}, response)
	return err
}

// UnregisterPersistenceOption unregisters an existing loader/storer option for processing requests
func (al *AsyncLoader) UnregisterPersistenceOption(name string) error {
	if name == "" {
		return errors.New("Persistence option must have a name")
90
	}
91 92 93
	response := make(chan error, 1)
	err := al.sendSyncMessage(&unregisterPersistenceOptionMessage{name, response}, response)
	return err
94 95
}

96 97
// StartRequest indicates the given request has started and the manager should
// continually attempt to load links for this request as new responses come in
98 99
func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID, persistenceOption string) error {
	response := make(chan error, 1)
100 101
	err := al.sendSyncMessage(&startRequestMessage{requestID, persistenceOption, response}, response)
	return err
102 103
}

104 105
// ProcessResponse injests new responses and completes asynchronous loads as
// neccesary
106
func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
107
	blks []blocks.Block) {
108
	select {
109
	case <-al.ctx.Done():
110
	case al.incomingMessages <- &newResponsesAvailableMessage{responses, blks}:
111 112 113
	}
}

114 115
// AsyncLoad asynchronously loads the given link for the given request ID. It returns a channel for data and a channel
// for errors -- only one message will be sent over either.
116
func (al *AsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
117
	resultChan := make(chan types.AsyncLoadResult, 1)
118
	response := make(chan error, 1)
119
	lr := loadattemptqueue.NewLoadRequest(requestID, link, resultChan)
120 121 122
	err := al.sendSyncMessage(&loadRequestMessage{response, requestID, lr}, response)
	if err != nil {
		resultChan <- types.AsyncLoadResult{Data: nil, Err: err}
123
		close(resultChan)
124
	}
125
	return resultChan
126 127
}

128 129 130
// CompleteResponsesFor indicates no further responses will come in for the given
// requestID, so if no responses are in the cache or local store, a link load
// should not retry
131
func (al *AsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) {
132
	select {
133 134
	case <-al.ctx.Done():
	case al.incomingMessages <- &finishRequestMessage{requestID}:
135 136 137
	}
}

138 139 140
// CleanupRequest indicates the given request is complete on the client side,
// and no further attempts will be made to load links for this request,
// so any cached response data is invalid can be cleaned
141
func (al *AsyncLoader) CleanupRequest(requestID graphsync.RequestID) {
142 143 144 145
	select {
	case <-al.ctx.Done():
	case al.incomingMessages <- &cleanupRequestMessage{requestID}:
	}
146 147
}

148 149 150 151 152 153 154 155 156 157 158 159 160 161
func (al *AsyncLoader) sendSyncMessage(message loaderMessage, response chan error) error {
	select {
	case <-al.ctx.Done():
		return errors.New("Context Closed")
	case al.incomingMessages <- message:
	}
	select {
	case <-al.ctx.Done():
		return errors.New("Context Closed")
	case err := <-response:
		return err
	}
}

162
type loadRequestMessage struct {
163
	response    chan error
164
	requestID   graphsync.RequestID
165 166 167 168
	loadRequest loadattemptqueue.LoadRequest
}

type newResponsesAvailableMessage struct {
169 170 171 172 173 174 175 176 177
	responses map[graphsync.RequestID]metadata.Metadata
	blks      []blocks.Block
}

type registerPersistenceOptionMessage struct {
	name     string
	loader   ipld.Loader
	storer   ipld.Storer
	response chan error
178 179
}

180 181 182 183 184
type unregisterPersistenceOptionMessage struct {
	name     string
	response chan error
}

185
type startRequestMessage struct {
186 187 188
	requestID         graphsync.RequestID
	persistenceOption string
	response          chan error
189 190
}

191
type finishRequestMessage struct {
192
	requestID graphsync.RequestID
193 194
}

195 196 197 198
type cleanupRequestMessage struct {
	requestID graphsync.RequestID
}

199
func (al *AsyncLoader) run() {
200 201
	for {
		select {
202
		case <-al.ctx.Done():
203
			return
204 205
		case message := <-al.outgoingMessages:
			message.handle(al)
206 207 208 209
		}
	}
}

210
func (al *AsyncLoader) messageQueueWorker() {
211 212 213 214 215 216 217 218 219 220 221
	var messageBuffer []loaderMessage
	nextMessage := func() loaderMessage {
		if len(messageBuffer) == 0 {
			return nil
		}
		return messageBuffer[0]
	}
	outgoingMessages := func() chan<- loaderMessage {
		if len(messageBuffer) == 0 {
			return nil
		}
222
		return al.outgoingMessages
223 224 225
	}
	for {
		select {
226
		case incomingMessage := <-al.incomingMessages:
227 228 229
			messageBuffer = append(messageBuffer, incomingMessage)
		case outgoingMessages() <- nextMessage():
			messageBuffer = messageBuffer[1:]
230
		case <-al.ctx.Done():
231 232 233 234 235
			return
		}
	}
}

236 237 238 239 240 241 242 243 244 245 246 247 248 249
func (al *AsyncLoader) getLoadAttemptQueue(queue string) *loadattemptqueue.LoadAttemptQueue {
	if queue == "" {
		return al.loadAttemptQueue
	}
	return al.alternateQueues[queue].loadAttemptQueue
}

func (al *AsyncLoader) getResponseCache(queue string) *responsecache.ResponseCache {
	if queue == "" {
		return al.responseCache
	}
	return al.alternateQueues[queue].responseCache
}

250
func (lrm *loadRequestMessage) handle(al *AsyncLoader) {
251 252 253
	_, retry := al.activeRequests[lrm.requestID]
	loadAttemptQueue := al.getLoadAttemptQueue(al.requestQueues[lrm.requestID])
	loadAttemptQueue.AttemptLoad(lrm.loadRequest, retry)
254 255
	select {
	case <-al.ctx.Done():
256
	case lrm.response <- nil:
257
	}
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
}

func (rpom *registerPersistenceOptionMessage) register(al *AsyncLoader) error {
	_, existing := al.alternateQueues[rpom.name]
	if existing {
		return errors.New("already registerd a persistence option with this name")
	}
	responseCache, loadAttemptQueue := setupAttemptQueue(rpom.loader, rpom.storer)
	al.alternateQueues[rpom.name] = alternateQueue{responseCache, loadAttemptQueue}
	return nil
}

func (rpom *registerPersistenceOptionMessage) handle(al *AsyncLoader) {
	err := rpom.register(al)
	select {
	case <-al.ctx.Done():
	case rpom.response <- err:
	}
}

278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
func (upom *unregisterPersistenceOptionMessage) unregister(al *AsyncLoader) error {
	_, ok := al.alternateQueues[upom.name]
	if !ok {
		return errors.New("Unknown persistence option")
	}
	for _, requestQueue := range al.requestQueues {
		if upom.name == requestQueue {
			return errors.New("cannot unregister while requests are in progress")
		}
	}
	delete(al.alternateQueues, upom.name)
	return nil
}

func (upom *unregisterPersistenceOptionMessage) handle(al *AsyncLoader) {
	err := upom.unregister(al)
	select {
	case <-al.ctx.Done():
	case upom.response <- err:
	}
}

300 301 302 303 304 305 306 307 308 309
func (srm *startRequestMessage) startRequest(al *AsyncLoader) error {
	if srm.persistenceOption != "" {
		_, ok := al.alternateQueues[srm.persistenceOption]
		if !ok {
			return errors.New("Unknown persistence option")
		}
		al.requestQueues[srm.requestID] = srm.persistenceOption
	}
	al.activeRequests[srm.requestID] = struct{}{}
	return nil
310 311
}

312
func (srm *startRequestMessage) handle(al *AsyncLoader) {
313 314 315 316 317
	err := srm.startRequest(al)
	select {
	case <-al.ctx.Done():
	case srm.response <- err:
	}
318 319
}

320 321
func (frm *finishRequestMessage) handle(al *AsyncLoader) {
	delete(al.activeRequests, frm.requestID)
322 323
	loadAttemptQueue := al.getLoadAttemptQueue(al.requestQueues[frm.requestID])
	loadAttemptQueue.ClearRequest(frm.requestID)
324 325
}

326
func (nram *newResponsesAvailableMessage) handle(al *AsyncLoader) {
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
	byQueue := make(map[string][]graphsync.RequestID)
	for requestID := range nram.responses {
		queue := al.requestQueues[requestID]
		byQueue[queue] = append(byQueue[queue], requestID)
	}
	for queue, requestIDs := range byQueue {
		loadAttemptQueue := al.getLoadAttemptQueue(queue)
		responseCache := al.getResponseCache(queue)
		responses := make(map[graphsync.RequestID]metadata.Metadata, len(requestIDs))
		for _, requestID := range requestIDs {
			responses[requestID] = nram.responses[requestID]
		}
		responseCache.ProcessResponse(responses, nram.blks)
		loadAttemptQueue.RetryLoads()
	}
}

func (crm *cleanupRequestMessage) handle(al *AsyncLoader) {
	aq, ok := al.requestQueues[crm.requestID]
	if ok {
		al.alternateQueues[aq].responseCache.FinishRequest(crm.requestID)
		delete(al.requestQueues, crm.requestID)
		return
	}
	al.responseCache.FinishRequest(crm.requestID)
}

func setupAttemptQueue(loader ipld.Loader, storer ipld.Storer) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) {

	unverifiedBlockStore := unverifiedblockstore.New(storer)
	responseCache := responsecache.New(unverifiedBlockStore)
Hannah Howard's avatar
Hannah Howard committed
358
	loadAttemptQueue := loadattemptqueue.New(func(requestID graphsync.RequestID, link ipld.Link) types.AsyncLoadResult {
359 360 361 362 363 364 365 366
		// load from response cache
		data, err := responseCache.AttemptLoad(requestID, link)
		if data == nil && err == nil {
			// fall back to local store
			stream, loadErr := loader(link, ipld.LinkContext{})
			if stream != nil && loadErr == nil {
				localData, loadErr := ioutil.ReadAll(stream)
				if loadErr == nil && localData != nil {
Hannah Howard's avatar
Hannah Howard committed
367 368 369 370 371
					return types.AsyncLoadResult{
						Data:  localData,
						Err:   nil,
						Local: true,
					}
372 373 374
				}
			}
		}
Hannah Howard's avatar
Hannah Howard committed
375 376 377 378 379
		return types.AsyncLoadResult{
			Data:  data,
			Err:   err,
			Local: false,
		}
380 381 382
	})

	return responseCache, loadAttemptQueue
383
}