asyncloader.go 5.86 KB
Newer Older
1 2 3 4 5
package asyncloader

import (
	"context"
	"errors"
6
	"io/ioutil"
7

8 9
	blocks "github.com/ipfs/go-block-format"
	"github.com/ipfs/go-graphsync"
10 11 12 13 14 15 16

	"github.com/ipfs/go-graphsync/ipldbridge"
	"github.com/ipfs/go-graphsync/metadata"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/loadattemptqueue"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/responsecache"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader/unverifiedblockstore"
	"github.com/ipfs/go-graphsync/requestmanager/types"
17 18 19 20
	"github.com/ipld/go-ipld-prime"
)

type loaderMessage interface {
21
	handle(al *AsyncLoader)
22 23
}

24 25
// AsyncLoader manages loading links asynchronously in as new responses
// come in from the network
26 27 28 29 30 31
type AsyncLoader struct {
	ctx              context.Context
	cancel           context.CancelFunc
	incomingMessages chan loaderMessage
	outgoingMessages chan loaderMessage

32
	activeRequests   map[graphsync.RequestID]bool
33 34 35 36 37 38 39 40 41
	loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
	responseCache    *responsecache.ResponseCache
}

// New initializes a new link loading manager for asynchronous loads from the given context
// and local store loading and storing function
func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer) *AsyncLoader {
	unverifiedBlockStore := unverifiedblockstore.New(storer)
	responseCache := responsecache.New(unverifiedBlockStore)
42
	loadAttemptQueue := loadattemptqueue.New(func(requestID graphsync.RequestID, link ipld.Link) ([]byte, error) {
43 44 45 46 47 48 49 50 51 52 53 54 55 56
		// load from response cache
		data, err := responseCache.AttemptLoad(requestID, link)
		if data == nil && err == nil {
			// fall back to local store
			stream, loadErr := loader(link, ipldbridge.LinkContext{})
			if stream != nil && loadErr == nil {
				localData, loadErr := ioutil.ReadAll(stream)
				if loadErr == nil && localData != nil {
					return localData, nil
				}
			}
		}
		return data, err
	})
57 58 59 60 61 62
	ctx, cancel := context.WithCancel(ctx)
	return &AsyncLoader{
		ctx:              ctx,
		cancel:           cancel,
		incomingMessages: make(chan loaderMessage),
		outgoingMessages: make(chan loaderMessage),
63
		activeRequests:   make(map[graphsync.RequestID]bool),
64 65
		responseCache:    responseCache,
		loadAttemptQueue: loadAttemptQueue,
66 67 68
	}
}

69 70 71 72 73 74 75 76 77 78 79 80 81
// Startup starts processing of messages
func (al *AsyncLoader) Startup() {
	go al.messageQueueWorker()
	go al.run()
}

// Shutdown finishes processing of messages
func (al *AsyncLoader) Shutdown() {
	al.cancel()
}

// StartRequest indicates the given request has started and the manager should
// continually attempt to load links for this request as new responses come in
82
func (al *AsyncLoader) StartRequest(requestID graphsync.RequestID) {
83
	select {
84 85
	case <-al.ctx.Done():
	case al.incomingMessages <- &startRequestMessage{requestID}:
86 87 88
	}
}

89 90
// ProcessResponse injests new responses and completes asynchronous loads as
// neccesary
91
func (al *AsyncLoader) ProcessResponse(responses map[graphsync.RequestID]metadata.Metadata,
92 93
	blks []blocks.Block) {
	al.responseCache.ProcessResponse(responses, blks)
94
	select {
95 96
	case <-al.ctx.Done():
	case al.incomingMessages <- &newResponsesAvailableMessage{}:
97 98 99
	}
}

100 101
// AsyncLoad asynchronously loads the given link for the given request ID. It returns a channel for data and a channel
// for errors -- only one message will be sent over either.
102
func (al *AsyncLoader) AsyncLoad(requestID graphsync.RequestID, link ipld.Link) <-chan types.AsyncLoadResult {
103 104
	resultChan := make(chan types.AsyncLoadResult, 1)
	lr := loadattemptqueue.NewLoadRequest(requestID, link, resultChan)
105
	select {
106 107 108 109
	case <-al.ctx.Done():
		resultChan <- types.AsyncLoadResult{Data: nil, Err: errors.New("Context closed")}
		close(resultChan)
	case al.incomingMessages <- &loadRequestMessage{requestID, lr}:
110
	}
111
	return resultChan
112 113
}

114 115 116
// CompleteResponsesFor indicates no further responses will come in for the given
// requestID, so if no responses are in the cache or local store, a link load
// should not retry
117
func (al *AsyncLoader) CompleteResponsesFor(requestID graphsync.RequestID) {
118
	select {
119 120
	case <-al.ctx.Done():
	case al.incomingMessages <- &finishRequestMessage{requestID}:
121 122 123
	}
}

124 125 126
// CleanupRequest indicates the given request is complete on the client side,
// and no further attempts will be made to load links for this request,
// so any cached response data is invalid can be cleaned
127
func (al *AsyncLoader) CleanupRequest(requestID graphsync.RequestID) {
128 129 130 131
	al.responseCache.FinishRequest(requestID)
}

type loadRequestMessage struct {
132
	requestID   graphsync.RequestID
133 134 135 136 137 138 139
	loadRequest loadattemptqueue.LoadRequest
}

type newResponsesAvailableMessage struct {
}

type startRequestMessage struct {
140
	requestID graphsync.RequestID
141 142
}

143
type finishRequestMessage struct {
144
	requestID graphsync.RequestID
145 146
}

147
func (al *AsyncLoader) run() {
148 149
	for {
		select {
150
		case <-al.ctx.Done():
151
			return
152 153
		case message := <-al.outgoingMessages:
			message.handle(al)
154 155 156 157
		}
	}
}

158
func (al *AsyncLoader) messageQueueWorker() {
159 160 161 162 163 164 165 166 167 168 169
	var messageBuffer []loaderMessage
	nextMessage := func() loaderMessage {
		if len(messageBuffer) == 0 {
			return nil
		}
		return messageBuffer[0]
	}
	outgoingMessages := func() chan<- loaderMessage {
		if len(messageBuffer) == 0 {
			return nil
		}
170
		return al.outgoingMessages
171 172 173
	}
	for {
		select {
174
		case incomingMessage := <-al.incomingMessages:
175 176 177
			messageBuffer = append(messageBuffer, incomingMessage)
		case outgoingMessages() <- nextMessage():
			messageBuffer = messageBuffer[1:]
178
		case <-al.ctx.Done():
179 180 181 182 183
			return
		}
	}
}

184 185 186
func (lrm *loadRequestMessage) handle(al *AsyncLoader) {
	retry := al.activeRequests[lrm.requestID]
	al.loadAttemptQueue.AttemptLoad(lrm.loadRequest, retry)
187 188
}

189 190
func (srm *startRequestMessage) handle(al *AsyncLoader) {
	al.activeRequests[srm.requestID] = true
191 192
}

193 194 195
func (frm *finishRequestMessage) handle(al *AsyncLoader) {
	delete(al.activeRequests, frm.requestID)
	al.loadAttemptQueue.ClearRequest(frm.requestID)
196 197
}

198 199
func (nram *newResponsesAvailableMessage) handle(al *AsyncLoader) {
	al.loadAttemptQueue.RetryLoads()
200
}