asyncloader.go 5.81 KB
Newer Older
1 2 3 4 5
package asyncloader

import (
	"context"
	"errors"
6
	"io/ioutil"
7

8 9
	blocks "github.com/ipfs/go-block-format"
	"github.com/ipfs/go-graphsync"
10 11 12 13 14 15

	"github.com/ipfs/go-graphsync/metadata"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/loadattemptqueue"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/responsecache"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/unverifiedblockstore"
	"github.com/ipfs/go-graphsync/requestmanager/types"
16 17 18 19
	"github.com/ipld/go-ipld-prime"
)

type loaderMessage interface {
20
	handle(al *AsyncLoader)
21 22
}

23 24
// AsyncLoader manages loading links asynchronously in as new responses
// come in from the network
25 26 27 28 29 30
type AsyncLoader struct {
	ctx              context.Context
	cancel           context.CancelFunc
	incomingMessages chan loaderMessage
	outgoingMessages chan loaderMessage

31
	activeRequests   map[graphsync.RequestID]bool
32 33 34 35 36 37 38 39 40
	loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
	responseCache    *responsecache.ResponseCache
}

// New initializes a new link loading manager for asynchronous loads from the given context
// and local store loading and storing function
func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer) *AsyncLoader {
	unverifiedBlockStore := unverifiedblockstore.New(storer)
	responseCache := responsecache.New(unverifiedBlockStore)
41
	loadAttemptQueue := loadattemptqueue.New(func(requestID graphsync.RequestID, link ipld.Link) ([]byte, error) {
42 43 44 45
		// load from response cache
		data, err := responseCache.AttemptLoad(requestID, link)
		if data == nil && err == nil {
			// fall back to local store
46
			stream, loadErr := loader(link, ipld.LinkContext{})
47 48 49 50 51 52 53 54 55
			if stream != nil && loadErr == nil {
				localData, loadErr := ioutil.ReadAll(stream)
				if loadErr == nil && localData != nil {
					return localData, nil
				}
			}
		}
		return data, err
	})
56 57 58 59 60 61
	ctx, cancel := context.WithCancel(ctx)
	return &AsyncLoader{
		ctx:              ctx,
		cancel:           cancel,
		incomingMessages: make(chan loaderMessage),
		outgoingMessages: make(chan loaderMessage),
62
		activeRequests:   make(map[graphsync.RequestID]bool),
63 64
		responseCache:    responseCache,
		loadAttemptQueue: loadAttemptQueue,
65 66 67
	}
}

68 69 70 71 72 73 74 75 76 77 78 79 80
// Startup starts processing of messages
func (al *AsyncLoader) Startup() {
	go al.messageQueueWorker()
	go al.run()
}

// Shutdown finishes processing of messages
func (al *AsyncLoader) Shutdown() {
	al.cancel()
}

// StartRequest indicates the given request has started and the manager should
// continually attempt to load links for this request as new responses come in
81
func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID) {
82
	select {
83 84
	case <-al.ctx.Done():
	case al.incomingMessages <- &startRequestMessage{requestID}:
85 86 87
	}
}

88 89
// ProcessResponse injests new responses and completes asynchronous loads as
// neccesary
90
func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
91 92
	blks []blocks.Block) {
	al.responseCache.ProcessResponse(responses, blks)
93
	select {
94 95
	case <-al.ctx.Done():
	case al.incomingMessages <- &newResponsesAvailableMessage{}:
96 97 98
	}
}

99 100
// AsyncLoad asynchronously loads the given link for the given request ID. It returns a channel for data and a channel
// for errors -- only one message will be sent over either.
101
func (al *AsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
102 103
	resultChan := make(chan types.AsyncLoadResult, 1)
	lr := loadattemptqueue.NewLoadRequest(requestID, link, resultChan)
104
	select {
105 106 107 108
	case <-al.ctx.Done():
		resultChan <- types.AsyncLoadResult{Data: nil, Err: errors.New("Context closed")}
		close(resultChan)
	case al.incomingMessages <- &loadRequestMessage{requestID, lr}:
109
	}
110
	return resultChan
111 112
}

113 114 115
// CompleteResponsesFor indicates no further responses will come in for the given
// requestID, so if no responses are in the cache or local store, a link load
// should not retry
116
func (al *AsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) {
117
	select {
118 119
	case <-al.ctx.Done():
	case al.incomingMessages <- &finishRequestMessage{requestID}:
120 121 122
	}
}

123 124 125
// CleanupRequest indicates the given request is complete on the client side,
// and no further attempts will be made to load links for this request,
// so any cached response data is invalid can be cleaned
126
func (al *AsyncLoader) CleanupRequest(requestID graphsync.RequestID) {
127 128 129 130
	al.responseCache.FinishRequest(requestID)
}

type loadRequestMessage struct {
131
	requestID   graphsync.RequestID
132 133 134 135 136 137 138
	loadRequest loadattemptqueue.LoadRequest
}

type newResponsesAvailableMessage struct {
}

type startRequestMessage struct {
139
	requestID graphsync.RequestID
140 141
}

142
type finishRequestMessage struct {
143
	requestID graphsync.RequestID
144 145
}

146
func (al *AsyncLoader) run() {
147 148
	for {
		select {
149
		case <-al.ctx.Done():
150
			return
151 152
		case message := <-al.outgoingMessages:
			message.handle(al)
153 154 155 156
		}
	}
}

157
func (al *AsyncLoader) messageQueueWorker() {
158 159 160 161 162 163 164 165 166 167 168
	var messageBuffer []loaderMessage
	nextMessage := func() loaderMessage {
		if len(messageBuffer) == 0 {
			return nil
		}
		return messageBuffer[0]
	}
	outgoingMessages := func() chan<- loaderMessage {
		if len(messageBuffer) == 0 {
			return nil
		}
169
		return al.outgoingMessages
170 171 172
	}
	for {
		select {
173
		case incomingMessage := <-al.incomingMessages:
174 175 176
			messageBuffer = append(messageBuffer, incomingMessage)
		case outgoingMessages() <- nextMessage():
			messageBuffer = messageBuffer[1:]
177
		case <-al.ctx.Done():
178 179 180 181 182
			return
		}
	}
}

183 184 185
func (lrm *loadRequestMessage) handle(al *AsyncLoader) {
	retry := al.activeRequests[lrm.requestID]
	al.loadAttemptQueue.AttemptLoad(lrm.loadRequest, retry)
186 187
}

188 189
func (srm *startRequestMessage) handle(al *AsyncLoader) {
	al.activeRequests[srm.requestID] = true
190 191
}

192 193 194
func (frm *finishRequestMessage) handle(al *AsyncLoader) {
	delete(al.activeRequests, frm.requestID)
	al.loadAttemptQueue.ClearRequest(frm.requestID)
195 196
}

197 198
func (nram *newResponsesAvailableMessage) handle(al *AsyncLoader) {
	al.loadAttemptQueue.RetryLoads()
199
}