asyncloader.go 9.96 KB
Newer Older
1 2 3 4 5
package asyncloader

import (
	"context"
	"errors"
6
	"io/ioutil"
7

8 9
	blocks "github.com/ipfs/go-block-format"
	"github.com/ipfs/go-graphsync"
10 11 12 13 14 15

	"github.com/ipfs/go-graphsync/metadata"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/loadattemptqueue"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/responsecache"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/unverifiedblockstore"
	"github.com/ipfs/go-graphsync/requestmanager/types"
16 17 18 19
	"github.com/ipld/go-ipld-prime"
)

type loaderMessage interface {
20
	handle(al *AsyncLoader)
21 22
}

23 24 25 26 27
type alternateQueue struct {
	responseCache    *responsecache.ResponseCache
	loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
}

28 29
// AsyncLoader manages loading links asynchronously in as new responses
// come in from the network
30 31 32 33 34 35
type AsyncLoader struct {
	ctx              context.Context
	cancel           context.CancelFunc
	incomingMessages chan loaderMessage
	outgoingMessages chan loaderMessage

36 37 38 39 40
	defaultLoader    ipld.Loader
	defaultStorer    ipld.Storer
	activeRequests   map[graphsync.RequestID]struct{}
	requestQueues    map[graphsync.RequestID]string
	alternateQueues  map[string]alternateQueue
41
	responseCache    *responsecache.ResponseCache
42
	loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
43 44 45 46 47
}

// New initializes a new link loading manager for asynchronous loads from the given context
// and local store loading and storing function
func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer) *AsyncLoader {
48
	responseCache, loadAttemptQueue := setupAttemptQueue(loader, storer)
49 50 51 52 53 54
	ctx, cancel := context.WithCancel(ctx)
	return &AsyncLoader{
		ctx:              ctx,
		cancel:           cancel,
		incomingMessages: make(chan loaderMessage),
		outgoingMessages: make(chan loaderMessage),
55 56 57 58 59
		defaultLoader:    loader,
		defaultStorer:    storer,
		activeRequests:   make(map[graphsync.RequestID]struct{}),
		requestQueues:    make(map[graphsync.RequestID]string),
		alternateQueues:  make(map[string]alternateQueue),
60 61
		responseCache:    responseCache,
		loadAttemptQueue: loadAttemptQueue,
62 63 64
	}
}

65 66 67 68 69 70 71 72 73 74 75
// Startup starts processing of messages
func (al *AsyncLoader) Startup() {
	go al.messageQueueWorker()
	go al.run()
}

// Shutdown finishes processing of messages
func (al *AsyncLoader) Shutdown() {
	al.cancel()
}

76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
// RegisterPersistenceOption registers a new loader/storer option for processing requests
func (al *AsyncLoader) RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error {
	if name == "" {
		return errors.New("Persistence option must have a name")
	}
	response := make(chan error, 1)
	select {
	case <-al.ctx.Done():
		return errors.New("context closed")
	case al.incomingMessages <- &registerPersistenceOptionMessage{name, loader, storer, response}:
	}
	select {
	case <-al.ctx.Done():
		return errors.New("context closed")
	case err := <-response:
		return err
	}
}

95 96
// StartRequest indicates the given request has started and the manager should
// continually attempt to load links for this request as new responses come in
97 98 99 100 101 102 103
func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID, persistenceOption string) error {
	response := make(chan error, 1)
	select {
	case <-al.ctx.Done():
		return errors.New("context closed")
	case al.incomingMessages <- &startRequestMessage{requestID, persistenceOption, response}:
	}
104
	select {
105
	case <-al.ctx.Done():
106 107 108
		return errors.New("context closed")
	case err := <-response:
		return err
109 110 111
	}
}

112 113
// ProcessResponse injests new responses and completes asynchronous loads as
// neccesary
114
func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
115
	blks []blocks.Block) {
116
	select {
117
	case <-al.ctx.Done():
118
	case al.incomingMessages <- &newResponsesAvailableMessage{responses, blks}:
119 120 121
	}
}

122 123
// AsyncLoad asynchronously loads the given link for the given request ID. It returns a channel for data and a channel
// for errors -- only one message will be sent over either.
124
func (al *AsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
125 126
	resultChan := make(chan types.AsyncLoadResult, 1)
	lr := loadattemptqueue.NewLoadRequest(requestID, link, resultChan)
127
	select {
128 129 130 131
	case <-al.ctx.Done():
		resultChan <- types.AsyncLoadResult{Data: nil, Err: errors.New("Context closed")}
		close(resultChan)
	case al.incomingMessages <- &loadRequestMessage{requestID, lr}:
132
	}
133
	return resultChan
134 135
}

136 137 138
// CompleteResponsesFor indicates no further responses will come in for the given
// requestID, so if no responses are in the cache or local store, a link load
// should not retry
139
func (al *AsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) {
140
	select {
141 142
	case <-al.ctx.Done():
	case al.incomingMessages <- &finishRequestMessage{requestID}:
143 144 145
	}
}

146 147 148
// CleanupRequest indicates the given request is complete on the client side,
// and no further attempts will be made to load links for this request,
// so any cached response data is invalid can be cleaned
149
func (al *AsyncLoader) CleanupRequest(requestID graphsync.RequestID) {
150 151 152 153
	select {
	case <-al.ctx.Done():
	case al.incomingMessages <- &cleanupRequestMessage{requestID}:
	}
154 155 156
}

type loadRequestMessage struct {
157
	requestID   graphsync.RequestID
158 159 160 161
	loadRequest loadattemptqueue.LoadRequest
}

type newResponsesAvailableMessage struct {
162 163 164 165 166 167 168 169 170
	responses map[graphsync.RequestID]metadata.Metadata
	blks      []blocks.Block
}

type registerPersistenceOptionMessage struct {
	name     string
	loader   ipld.Loader
	storer   ipld.Storer
	response chan error
171 172 173
}

type startRequestMessage struct {
174 175 176
	requestID         graphsync.RequestID
	persistenceOption string
	response          chan error
177 178
}

179
type finishRequestMessage struct {
180
	requestID graphsync.RequestID
181 182
}

183 184 185 186
type cleanupRequestMessage struct {
	requestID graphsync.RequestID
}

187
func (al *AsyncLoader) run() {
188 189
	for {
		select {
190
		case <-al.ctx.Done():
191
			return
192 193
		case message := <-al.outgoingMessages:
			message.handle(al)
194 195 196 197
		}
	}
}

198
func (al *AsyncLoader) messageQueueWorker() {
199 200 201 202 203 204 205 206 207 208 209
	var messageBuffer []loaderMessage
	nextMessage := func() loaderMessage {
		if len(messageBuffer) == 0 {
			return nil
		}
		return messageBuffer[0]
	}
	outgoingMessages := func() chan<- loaderMessage {
		if len(messageBuffer) == 0 {
			return nil
		}
210
		return al.outgoingMessages
211 212 213
	}
	for {
		select {
214
		case incomingMessage := <-al.incomingMessages:
215 216 217
			messageBuffer = append(messageBuffer, incomingMessage)
		case outgoingMessages() <- nextMessage():
			messageBuffer = messageBuffer[1:]
218
		case <-al.ctx.Done():
219 220 221 222 223
			return
		}
	}
}

224 225 226 227 228 229 230 231 232 233 234 235 236 237
func (al *AsyncLoader) getLoadAttemptQueue(queue string) *loadattemptqueue.LoadAttemptQueue {
	if queue == "" {
		return al.loadAttemptQueue
	}
	return al.alternateQueues[queue].loadAttemptQueue
}

func (al *AsyncLoader) getResponseCache(queue string) *responsecache.ResponseCache {
	if queue == "" {
		return al.responseCache
	}
	return al.alternateQueues[queue].responseCache
}

238
func (lrm *loadRequestMessage) handle(al *AsyncLoader) {
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
	_, retry := al.activeRequests[lrm.requestID]
	loadAttemptQueue := al.getLoadAttemptQueue(al.requestQueues[lrm.requestID])
	loadAttemptQueue.AttemptLoad(lrm.loadRequest, retry)
}

func (rpom *registerPersistenceOptionMessage) register(al *AsyncLoader) error {
	_, existing := al.alternateQueues[rpom.name]
	if existing {
		return errors.New("already registerd a persistence option with this name")
	}
	responseCache, loadAttemptQueue := setupAttemptQueue(rpom.loader, rpom.storer)
	al.alternateQueues[rpom.name] = alternateQueue{responseCache, loadAttemptQueue}
	return nil
}

func (rpom *registerPersistenceOptionMessage) handle(al *AsyncLoader) {
	err := rpom.register(al)
	select {
	case <-al.ctx.Done():
	case rpom.response <- err:
	}
}

func (srm *startRequestMessage) startRequest(al *AsyncLoader) error {
	if srm.persistenceOption != "" {
		_, ok := al.alternateQueues[srm.persistenceOption]
		if !ok {
			return errors.New("Unknown persistence option")
		}
		al.requestQueues[srm.requestID] = srm.persistenceOption
	}
	al.activeRequests[srm.requestID] = struct{}{}
	return nil
272 273
}

274
func (srm *startRequestMessage) handle(al *AsyncLoader) {
275 276 277 278 279
	err := srm.startRequest(al)
	select {
	case <-al.ctx.Done():
	case srm.response <- err:
	}
280 281
}

282 283
func (frm *finishRequestMessage) handle(al *AsyncLoader) {
	delete(al.activeRequests, frm.requestID)
284 285
	loadAttemptQueue := al.getLoadAttemptQueue(al.requestQueues[frm.requestID])
	loadAttemptQueue.ClearRequest(frm.requestID)
286 287
}

288
func (nram *newResponsesAvailableMessage) handle(al *AsyncLoader) {
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
	byQueue := make(map[string][]graphsync.RequestID)
	for requestID := range nram.responses {
		queue := al.requestQueues[requestID]
		byQueue[queue] = append(byQueue[queue], requestID)
	}
	for queue, requestIDs := range byQueue {
		loadAttemptQueue := al.getLoadAttemptQueue(queue)
		responseCache := al.getResponseCache(queue)
		responses := make(map[graphsync.RequestID]metadata.Metadata, len(requestIDs))
		for _, requestID := range requestIDs {
			responses[requestID] = nram.responses[requestID]
		}
		responseCache.ProcessResponse(responses, nram.blks)
		loadAttemptQueue.RetryLoads()
	}
}

func (crm *cleanupRequestMessage) handle(al *AsyncLoader) {
	aq, ok := al.requestQueues[crm.requestID]
	if ok {
		al.alternateQueues[aq].responseCache.FinishRequest(crm.requestID)
		delete(al.requestQueues, crm.requestID)
		return
	}
	al.responseCache.FinishRequest(crm.requestID)
}

func setupAttemptQueue(loader ipld.Loader, storer ipld.Storer) (*responsecache.ResponseCache, *loadattemptqueue.LoadAttemptQueue) {

	unverifiedBlockStore := unverifiedblockstore.New(storer)
	responseCache := responsecache.New(unverifiedBlockStore)
	loadAttemptQueue := loadattemptqueue.New(func(requestID graphsync.RequestID, link ipld.Link) ([]byte, error) {
		// load from response cache
		data, err := responseCache.AttemptLoad(requestID, link)
		if data == nil && err == nil {
			// fall back to local store
			stream, loadErr := loader(link, ipld.LinkContext{})
			if stream != nil && loadErr == nil {
				localData, loadErr := ioutil.ReadAll(stream)
				if loadErr == nil && localData != nil {
					return localData, nil
				}
			}
		}
		return data, err
	})

	return responseCache, loadAttemptQueue
337
}