asyncloader.go 11.1 KB
Newer Older
1 2 3 4 5
package asyncloader

import (
	"context"
	"errors"
6
	"io/ioutil"
7

8
	blocks "github.com/ipfs/go-block-format"
Hannah Howard's avatar
Hannah Howard committed
9
	"github.com/ipld/go-ipld-prime"
10

Hannah Howard's avatar
Hannah Howard committed
11
	"github.com/ipfs/go-graphsync"
12 13 14 15 16
	"github.com/ipfs/go-graphsync/metadata"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/loadattemptqueue"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/responsecache"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/unverifiedblockstore"
	"github.com/ipfs/go-graphsync/requestmanager/types"
17 18 19
)

type loaderMessage interface {
20
	handle(al *AsyncLoader)
21 22
}

23 24 25 26 27
type alternateQueue struct {
	responseCache    *responsecache.ResponseCache
	loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
}

28 29
// AsyncLoader manages loading links asynchronously in as new responses
// come in from the network
30 31 32 33 34 35
type AsyncLoader struct {
	ctx              context.Context
	cancel           context.CancelFunc
	incomingMessages chan loaderMessage
	outgoingMessages chan loaderMessage

36 37 38 39 40
	defaultLoader    ipld.Loader
	defaultStorer    ipld.Storer
	activeRequests   map[graphsync.RequestID]struct{}
	requestQueues    map[graphsync.RequestID]string
	alternateQueues  map[string]alternateQueue
41
	responseCache    *responsecache.ResponseCache
42
	loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
43 44 45 46 47
}

// New initializes a new link loading manager for asynchronous loads from the given context
// and local store loading and storing function
func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer) *AsyncLoader {
48
	responseCache, loadAttemptQueue := setupAttemptQueue(loader, storer)
49 50 51 52 53 54
	ctx, cancel := context.WithCancel(ctx)
	return &AsyncLoader{
		ctx:              ctx,
		cancel:           cancel,
		incomingMessages: make(chan loaderMessage),
		outgoingMessages: make(chan loaderMessage),
55 56 57 58 59
		defaultLoader:    loader,
		defaultStorer:    storer,
		activeRequests:   make(map[graphsync.RequestID]struct{}),
		requestQueues:    make(map[graphsync.RequestID]string),
		alternateQueues:  make(map[string]alternateQueue),
60 61
		responseCache:    responseCache,
		loadAttemptQueue: loadAttemptQueue,
62 63 64
	}
}

65 66 67 68 69 70 71 72 73 74 75
// Startup starts processing of messages
func (al *AsyncLoader) Startup() {
	go al.messageQueueWorker()
	go al.run()
}

// Shutdown finishes processing of messages
func (al *AsyncLoader) Shutdown() {
	al.cancel()
}

76 77 78 79 80 81
// RegisterPersistenceOption registers a new loader/storer option for processing requests
func (al *AsyncLoader) RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error {
	if name == "" {
		return errors.New("Persistence option must have a name")
	}
	response := make(chan error, 1)
82 83 84 85 86 87 88 89
	err := al.sendSyncMessage(&registerPersistenceOptionMessage{name, loader, storer, response}, response)
	return err
}

// UnregisterPersistenceOption unregisters an existing loader/storer option for processing requests
func (al *AsyncLoader) UnregisterPersistenceOption(name string) error {
	if name == "" {
		return errors.New("Persistence option must have a name")
90
	}
91 92 93
	response := make(chan error, 1)
	err := al.sendSyncMessage(&unregisterPersistenceOptionMessage{name, response}, response)
	return err
94 95
}

96 97
// StartRequest indicates the given request has started and the manager should
// continually attempt to load links for this request as new responses come in
98 99
func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID, persistenceOption string) error {
	response := make(chan error, 1)
100 101
	err := al.sendSyncMessage(&startRequestMessage{requestID, persistenceOption, response}, response)
	return err
102 103
}

104 105
// ProcessResponse injests new responses and completes asynchronous loads as
// neccesary
106
func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
107
	blks []blocks.Block) {
108
	select {
109
	case <-al.ctx.Done():
110
	case al.incomingMessages <- &newResponsesAvailableMessage{responses, blks}:
111 112 113
	}
}

114 115
// AsyncLoad asynchronously loads the given link for the given request ID. It returns a channel for data and a channel
// for errors -- only one message will be sent over either.
116
func (al *AsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
117
	resultChan := make(chan types.AsyncLoadResult, 1)
118
	response := make(chan error, 1)
119
	lr := loadattemptqueue.NewLoadRequest(requestID, link, resultChan)
120
	_ = al.sendSyncMessage(&loadRequestMessage{response, requestID, lr}, response)
121
	return resultChan
122 123
}

124 125 126
// CompleteResponsesFor indicates no further responses will come in for the given
// requestID, so if no responses are in the cache or local store, a link load
// should not retry
127
func (al *AsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) {
128
	select {
129 130
	case <-al.ctx.Done():
	case al.incomingMessages <- &finishRequestMessage{requestID}:
131 132 133
	}
}

134 135 136
// CleanupRequest indicates the given request is complete on the client side,
// and no further attempts will be made to load links for this request,
// so any cached response data is invalid can be cleaned
137
func (al *AsyncLoader) CleanupRequest(requestID graphsync.RequestID) {
138 139 140 141
	select {
	case <-al.ctx.Done():
	case al.incomingMessages <- &cleanupRequestMessage{requestID}:
	}
142 143
}

144 145 146 147 148 149 150 151 152 153 154 155 156 157
func (al *AsyncLoader) sendSyncMessage(message loaderMessage, response chan error) error {
	select {
	case <-al.ctx.Done():
		return errors.New("Context Closed")
	case al.incomingMessages <- message:
	}
	select {
	case <-al.ctx.Done():
		return errors.New("Context Closed")
	case err := <-response:
		return err
	}
}

158
type loadRequestMessage struct {
159
	response    chan error
160
	requestID   graphsync.RequestID
161 162 163 164
	loadRequest loadattemptqueue.LoadRequest
}

type newResponsesAvailableMessage struct {
165 166 167 168 169 170 171 172 173
	responses map[graphsync.RequestID]metadata.Metadata
	blks      []blocks.Block
}

type registerPersistenceOptionMessage struct {
	name     string
	loader   ipld.Loader
	storer   ipld.Storer
	response chan error
174 175
}

176 177 178 179 180
type unregisterPersistenceOptionMessage struct {
	name     string
	response chan error
}

181
type startRequestMessage struct {
182 183 184
	requestID         graphsync.RequestID
	persistenceOption string
	response          chan error
185 186
}

187
type finishRequestMessage struct {
188
	requestID graphsync.RequestID
189 190
}

191 192 193 194
type cleanupRequestMessage struct {
	requestID graphsync.RequestID
}

195
func (al *AsyncLoader) run() {
196 197
	for {
		select {
198
		case <-al.ctx.Done():
199
			return
200 201
		case message := <-al.outgoingMessages:
			message.handle(al)
202 203 204 205
		}
	}
}

206
func (al *AsyncLoader) messageQueueWorker() {
207 208 209 210 211 212 213 214 215 216 217
	var messageBuffer []loaderMessage
	nextMessage := func() loaderMessage {
		if len(messageBuffer) == 0 {
			return nil
		}
		return messageBuffer[0]
	}
	outgoingMessages := func() chan<- loaderMessage {
		if len(messageBuffer) == 0 {
			return nil
		}
218
		return al.outgoingMessages
219 220 221
	}
	for {
		select {
222
		case incomingMessage := <-al.incomingMessages:
223 224 225
			messageBuffer = append(messageBuffer, incomingMessage)
		case outgoingMessages() <- nextMessage():
			messageBuffer = messageBuffer[1:]
226
		case <-al.ctx.Done():
227 228 229 230 231
			return
		}
	}
}

232 233 234 235 236 237 238 239 240 241 242 243 244 245
func (al *AsyncLoader) getLoadAttemptQueue(queue string) *loadattemptqueue.LoadAttemptQueue {
	if queue == "" {
		return al.loadAttemptQueue
	}
	return al.alternateQueues[queue].loadAttemptQueue
}

func (al *AsyncLoader) getResponseCache(queue string) *responsecache.ResponseCache {
	if queue == "" {
		return al.responseCache
	}
	return al.alternateQueues[queue].responseCache
}

246
func (lrm *loadRequestMessage) handle(al *AsyncLoader) {
247 248 249
	_, retry := al.activeRequests[lrm.requestID]
	loadAttemptQueue := al.getLoadAttemptQueue(al.requestQueues[lrm.requestID])
	loadAttemptQueue.AttemptLoad(lrm.loadRequest, retry)
250 251
	select {
	case <-al.ctx.Done():
252
	case lrm.response <- nil:
253
	}
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
}

func (rpom *registerPersistenceOptionMessage) register(al *AsyncLoader) error {
	_, existing := al.alternateQueues[rpom.name]
	if existing {
		return errors.New("already registerd a persistence option with this name")
	}
	responseCache, loadAttemptQueue := setupAttemptQueue(rpom.loader, rpom.storer)
	al.alternateQueues[rpom.name] = alternateQueue{responseCache, loadAttemptQueue}
	return nil
}

func (rpom *registerPersistenceOptionMessage) handle(al *AsyncLoader) {
	err := rpom.register(al)
	select {
	case <-al.ctx.Done():
	case rpom.response <- err:
	}
}

274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
func (upom *unregisterPersistenceOptionMessage) unregister(al *AsyncLoader) error {
	_, ok := al.alternateQueues[upom.name]
	if !ok {
		return errors.New("Unknown persistence option")
	}
	for _, requestQueue := range al.requestQueues {
		if upom.name == requestQueue {
			return errors.New("cannot unregister while requests are in progress")
		}
	}
	delete(al.alternateQueues, upom.name)
	return nil
}

func (upom *unregisterPersistenceOptionMessage) handle(al *AsyncLoader) {
	err := upom.unregister(al)
	select {
	case <-al.ctx.Done():
	case upom.response <- err:
	}
}

296 297 298 299 300 301 302 303 304 305
func (srm *startRequestMessage) startRequest(al *AsyncLoader) error {
	if srm.persistenceOption != "" {
		_, ok := al.alternateQueues[srm.persistenceOption]
		if !ok {
			return errors.New("Unknown persistence option")
		}
		al.requestQueues[srm.requestID] = srm.persistenceOption
	}
	al.activeRequests[srm.requestID] = struct{}{}
	return nil
306 307
}

308
func (srm *startRequestMessage) handle(al *AsyncLoader) {
309 310 311 312 313
	err := srm.startRequest(al)
	select {
	case <-al.ctx.Done():
	case srm.response <- err:
	}
314 315
}

316 317
func (frm *finishRequestMessage) handle(al *AsyncLoader) {
	delete(al.activeRequests, frm.requestID)
318 319
	loadAttemptQueue := al.getLoadAttemptQueue(al.requestQueues[frm.requestID])
	loadAttemptQueue.ClearRequest(frm.requestID)
320 321
}

322
func (nram *newResponsesAvailableMessage) handle(al *AsyncLoader) {
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
	byQueue := make(map[string][]graphsync.RequestID)
	for requestID := range nram.responses {
		queue := al.requestQueues[requestID]
		byQueue[queue] = append(byQueue[queue], requestID)
	}
	for queue, requestIDs := range byQueue {
		loadAttemptQueue := al.getLoadAttemptQueue(queue)
		responseCache := al.getResponseCache(queue)
		responses := make(map[graphsync.RequestID]metadata.Metadata, len(requestIDs))
		for _, requestID := range requestIDs {
			responses[requestID] = nram.responses[requestID]
		}
		responseCache.ProcessResponse(responses, nram.blks)
		loadAttemptQueue.RetryLoads()
	}
}

func (crm *cleanupRequestMessage) handle(al *AsyncLoader) {
	aq, ok := al.requestQueues[crm.requestID]
	if ok {
		al.alternateQueues[aq].responseCache.FinishRequest(crm.requestID)
		delete(al.requestQueues, crm.requestID)
		return
	}
	al.responseCache.FinishRequest(crm.requestID)
}

func setupAttemptQueue(loader ipld.Loader, storer ipld.Storer) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) {

	unverifiedBlockStore := unverifiedblockstore.New(storer)
	responseCache := responsecache.New(unverifiedBlockStore)
Hannah Howard's avatar
Hannah Howard committed
354
	loadAttemptQueue := loadattemptqueue.New(func(requestID graphsync.RequestID, link ipld.Link) types.AsyncLoadResult {
355 356 357 358 359 360 361 362
		// load from response cache
		data, err := responseCache.AttemptLoad(requestID, link)
		if data == nil && err == nil {
			// fall back to local store
			stream, loadErr := loader(link, ipld.LinkContext{})
			if stream != nil && loadErr == nil {
				localData, loadErr := ioutil.ReadAll(stream)
				if loadErr == nil && localData != nil {
Hannah Howard's avatar
Hannah Howard committed
363 364 365 366 367
					return types.AsyncLoadResult{
						Data:  localData,
						Err:   nil,
						Local: true,
					}
368 369 370
				}
			}
		}
Hannah Howard's avatar
Hannah Howard committed
371 372 373 374 375
		return types.AsyncLoadResult{
			Data:  data,
			Err:   err,
			Local: false,
		}
376 377 378
	})

	return responseCache, loadAttemptQueue
379
}