asyncloader.go 10.3 KB
Newer Older
1 2 3 4 5
package asyncloader

import (
	"context"
	"errors"
6
	"io/ioutil"
7

8 9
	blocks "github.com/ipfs/go-block-format"
	"github.com/ipfs/go-graphsync"
10 11 12 13 14 15

	"github.com/ipfs/go-graphsync/metadata"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/loadattemptqueue"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/responsecache"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/unverifiedblockstore"
	"github.com/ipfs/go-graphsync/requestmanager/types"
16 17 18 19
	"github.com/ipld/go-ipld-prime"
)

type loaderMessage interface {
20
	handle(al *AsyncLoader)
21 22
}

23 24 25 26 27
type alternateQueue struct {
	responseCache    *responsecache.ResponseCache
	loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
}

28 29
// AsyncLoader manages loading links asynchronously in as new responses
// come in from the network
30 31 32 33 34 35
type AsyncLoader struct {
	ctx              context.Context
	cancel           context.CancelFunc
	incomingMessages chan loaderMessage
	outgoingMessages chan loaderMessage

36 37 38 39 40
	defaultLoader    ipld.Loader
	defaultStorer    ipld.Storer
	activeRequests   map[graphsync.RequestID]struct{}
	requestQueues    map[graphsync.RequestID]string
	alternateQueues  map[string]alternateQueue
41
	responseCache    *responsecache.ResponseCache
42
	loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
43 44 45 46 47
}

// New initializes a new link loading manager for asynchronous loads from the given context
// and local store loading and storing function
func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer) *AsyncLoader {
48
	responseCache, loadAttemptQueue := setupAttemptQueue(loader, storer)
49 50 51 52 53 54
	ctx, cancel := context.WithCancel(ctx)
	return &AsyncLoader{
		ctx:              ctx,
		cancel:           cancel,
		incomingMessages: make(chan loaderMessage),
		outgoingMessages: make(chan loaderMessage),
55 56 57 58 59
		defaultLoader:    loader,
		defaultStorer:    storer,
		activeRequests:   make(map[graphsync.RequestID]struct{}),
		requestQueues:    make(map[graphsync.RequestID]string),
		alternateQueues:  make(map[string]alternateQueue),
60 61
		responseCache:    responseCache,
		loadAttemptQueue: loadAttemptQueue,
62 63 64
	}
}

65 66 67 68 69 70 71 72 73 74 75
// Startup starts processing of messages
func (al *AsyncLoader) Startup() {
	go al.messageQueueWorker()
	go al.run()
}

// Shutdown finishes processing of messages
func (al *AsyncLoader) Shutdown() {
	al.cancel()
}

76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
// RegisterPersistenceOption registers a new loader/storer option for processing requests
func (al *AsyncLoader) RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error {
	if name == "" {
		return errors.New("Persistence option must have a name")
	}
	response := make(chan error, 1)
	select {
	case <-al.ctx.Done():
		return errors.New("context closed")
	case al.incomingMessages <- &registerPersistenceOptionMessage{name, loader, storer, response}:
	}
	select {
	case <-al.ctx.Done():
		return errors.New("context closed")
	case err := <-response:
		return err
	}
}

95 96
// StartRequest indicates the given request has started and the manager should
// continually attempt to load links for this request as new responses come in
97 98 99 100 101 102 103
func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID, persistenceOption string) error {
	response := make(chan error, 1)
	select {
	case <-al.ctx.Done():
		return errors.New("context closed")
	case al.incomingMessages <- &startRequestMessage{requestID, persistenceOption, response}:
	}
104
	select {
105
	case <-al.ctx.Done():
106 107 108
		return errors.New("context closed")
	case err := <-response:
		return err
109 110 111
	}
}

112 113
// ProcessResponse injests new responses and completes asynchronous loads as
// neccesary
114
func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
115
	blks []blocks.Block) {
116
	select {
117
	case <-al.ctx.Done():
118
	case al.incomingMessages <- &newResponsesAvailableMessage{responses, blks}:
119 120 121
	}
}

122 123
// AsyncLoad asynchronously loads the given link for the given request ID. It returns a channel for data and a channel
// for errors -- only one message will be sent over either.
124
func (al *AsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
125
	resultChan := make(chan types.AsyncLoadResult, 1)
126
	response := make(chan struct{}, 1)
127
	lr := loadattemptqueue.NewLoadRequest(requestID, link, resultChan)
128
	select {
129 130 131
	case <-al.ctx.Done():
		resultChan <- types.AsyncLoadResult{Data: nil, Err: errors.New("Context closed")}
		close(resultChan)
132 133 134 135 136
	case al.incomingMessages <- &loadRequestMessage{response, requestID, lr}:
	}
	select {
	case <-al.ctx.Done():
	case <-response:
137
	}
138
	return resultChan
139 140
}

141 142 143
// CompleteResponsesFor indicates no further responses will come in for the given
// requestID, so if no responses are in the cache or local store, a link load
// should not retry
144
func (al *AsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) {
145
	select {
146 147
	case <-al.ctx.Done():
	case al.incomingMessages <- &finishRequestMessage{requestID}:
148 149 150
	}
}

151 152 153
// CleanupRequest indicates the given request is complete on the client side,
// and no further attempts will be made to load links for this request,
// so any cached response data is invalid can be cleaned
154
func (al *AsyncLoader) CleanupRequest(requestID graphsync.RequestID) {
155 156 157 158
	select {
	case <-al.ctx.Done():
	case al.incomingMessages <- &cleanupRequestMessage{requestID}:
	}
159 160 161
}

type loadRequestMessage struct {
162
	response    chan struct{}
163
	requestID   graphsync.RequestID
164 165 166 167
	loadRequest loadattemptqueue.LoadRequest
}

type newResponsesAvailableMessage struct {
168 169 170 171 172 173 174 175 176
	responses map[graphsync.RequestID]metadata.Metadata
	blks      []blocks.Block
}

type registerPersistenceOptionMessage struct {
	name     string
	loader   ipld.Loader
	storer   ipld.Storer
	response chan error
177 178 179
}

type startRequestMessage struct {
180 181 182
	requestID         graphsync.RequestID
	persistenceOption string
	response          chan error
183 184
}

185
type finishRequestMessage struct {
186
	requestID graphsync.RequestID
187 188
}

189 190 191 192
type cleanupRequestMessage struct {
	requestID graphsync.RequestID
}

193
func (al *AsyncLoader) run() {
194 195
	for {
		select {
196
		case <-al.ctx.Done():
197
			return
198 199
		case message := <-al.outgoingMessages:
			message.handle(al)
200 201 202 203
		}
	}
}

204
func (al *AsyncLoader) messageQueueWorker() {
205 206 207 208 209 210 211 212 213 214 215
	var messageBuffer []loaderMessage
	nextMessage := func() loaderMessage {
		if len(messageBuffer) == 0 {
			return nil
		}
		return messageBuffer[0]
	}
	outgoingMessages := func() chan<- loaderMessage {
		if len(messageBuffer) == 0 {
			return nil
		}
216
		return al.outgoingMessages
217 218 219
	}
	for {
		select {
220
		case incomingMessage := <-al.incomingMessages:
221 222 223
			messageBuffer = append(messageBuffer, incomingMessage)
		case outgoingMessages() <- nextMessage():
			messageBuffer = messageBuffer[1:]
224
		case <-al.ctx.Done():
225 226 227 228 229
			return
		}
	}
}

230 231 232 233 234 235 236 237 238 239 240 241 242 243
func (al *AsyncLoader) getLoadAttemptQueue(queue string) *loadattemptqueue.LoadAttemptQueue {
	if queue == "" {
		return al.loadAttemptQueue
	}
	return al.alternateQueues[queue].loadAttemptQueue
}

func (al *AsyncLoader) getResponseCache(queue string) *responsecache.ResponseCache {
	if queue == "" {
		return al.responseCache
	}
	return al.alternateQueues[queue].responseCache
}

244
func (lrm *loadRequestMessage) handle(al *AsyncLoader) {
245 246 247
	_, retry := al.activeRequests[lrm.requestID]
	loadAttemptQueue := al.getLoadAttemptQueue(al.requestQueues[lrm.requestID])
	loadAttemptQueue.AttemptLoad(lrm.loadRequest, retry)
248 249 250 251
	select {
	case <-al.ctx.Done():
	case lrm.response <- struct{}{}:
	}
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
}

func (rpom *registerPersistenceOptionMessage) register(al *AsyncLoader) error {
	_, existing := al.alternateQueues[rpom.name]
	if existing {
		return errors.New("already registerd a persistence option with this name")
	}
	responseCache, loadAttemptQueue := setupAttemptQueue(rpom.loader, rpom.storer)
	al.alternateQueues[rpom.name] = alternateQueue{responseCache, loadAttemptQueue}
	return nil
}

func (rpom *registerPersistenceOptionMessage) handle(al *AsyncLoader) {
	err := rpom.register(al)
	select {
	case <-al.ctx.Done():
	case rpom.response <- err:
	}
}

func (srm *startRequestMessage) startRequest(al *AsyncLoader) error {
	if srm.persistenceOption != "" {
		_, ok := al.alternateQueues[srm.persistenceOption]
		if !ok {
			return errors.New("Unknown persistence option")
		}
		al.requestQueues[srm.requestID] = srm.persistenceOption
	}
	al.activeRequests[srm.requestID] = struct{}{}
	return nil
282 283
}

284
func (srm *startRequestMessage) handle(al *AsyncLoader) {
285 286 287 288 289
	err := srm.startRequest(al)
	select {
	case <-al.ctx.Done():
	case srm.response <- err:
	}
290 291
}

292 293
func (frm *finishRequestMessage) handle(al *AsyncLoader) {
	delete(al.activeRequests, frm.requestID)
294 295
	loadAttemptQueue := al.getLoadAttemptQueue(al.requestQueues[frm.requestID])
	loadAttemptQueue.ClearRequest(frm.requestID)
296 297
}

298
func (nram *newResponsesAvailableMessage) handle(al *AsyncLoader) {
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
	byQueue := make(map[string][]graphsync.RequestID)
	for requestID := range nram.responses {
		queue := al.requestQueues[requestID]
		byQueue[queue] = append(byQueue[queue], requestID)
	}
	for queue, requestIDs := range byQueue {
		loadAttemptQueue := al.getLoadAttemptQueue(queue)
		responseCache := al.getResponseCache(queue)
		responses := make(map[graphsync.RequestID]metadata.Metadata, len(requestIDs))
		for _, requestID := range requestIDs {
			responses[requestID] = nram.responses[requestID]
		}
		responseCache.ProcessResponse(responses, nram.blks)
		loadAttemptQueue.RetryLoads()
	}
}

func (crm *cleanupRequestMessage) handle(al *AsyncLoader) {
	aq, ok := al.requestQueues[crm.requestID]
	if ok {
		al.alternateQueues[aq].responseCache.FinishRequest(crm.requestID)
		delete(al.requestQueues, crm.requestID)
		return
	}
	al.responseCache.FinishRequest(crm.requestID)
}

func setupAttemptQueue(loader ipld.Loader, storer ipld.Storer) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) {

	unverifiedBlockStore := unverifiedblockstore.New(storer)
	responseCache := responsecache.New(unverifiedBlockStore)
Hannah Howard's avatar
Hannah Howard committed
330
	loadAttemptQueue := loadattemptqueue.New(func(requestID graphsync.RequestID, link ipld.Link) types.AsyncLoadResult {
331 332 333 334 335 336 337 338
		// load from response cache
		data, err := responseCache.AttemptLoad(requestID, link)
		if data == nil && err == nil {
			// fall back to local store
			stream, loadErr := loader(link, ipld.LinkContext{})
			if stream != nil && loadErr == nil {
				localData, loadErr := ioutil.ReadAll(stream)
				if loadErr == nil && localData != nil {
Hannah Howard's avatar
Hannah Howard committed
339 340 341 342 343
					return types.AsyncLoadResult{
						Data:  localData,
						Err:   nil,
						Local: true,
					}
344 345 346
				}
			}
		}
Hannah Howard's avatar
Hannah Howard committed
347 348 349 350 351
		return types.AsyncLoadResult{
			Data:  data,
			Err:   err,
			Local: false,
		}
352 353 354
	})

	return responseCache, loadAttemptQueue
355
}