Commit 783a69fb authored by hannahhoward's avatar hannahhoward

feat(asyncloader): create asynchronous loader

Create async loader to handle multiple load attempts as responses come in from network
parent 60ab4b24
package asyncloader
import (
"context"
"errors"
"sync"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipld/go-ipld-prime"
)
type loadRequest struct {
requestID gsmsg.GraphSyncRequestID
link ipld.Link
responseChan chan []byte
errChan chan error
}
var loadRequestPool = sync.Pool{
New: func() interface{} {
return new(loadRequest)
},
}
func newLoadRequest(requestID gsmsg.GraphSyncRequestID,
link ipld.Link,
responseChan chan []byte,
errChan chan error) *loadRequest {
lr := loadRequestPool.Get().(*loadRequest)
lr.requestID = requestID
lr.link = link
lr.responseChan = responseChan
lr.errChan = errChan
return lr
}
func returnLoadRequest(lr *loadRequest) {
*lr = loadRequest{}
loadRequestPool.Put(lr)
}
type loaderMessage interface {
handle(abl *AsyncLoader)
}
type newResponsesAvailableMessage struct{}
type startRequestMessage struct {
requestID gsmsg.GraphSyncRequestID
}
type finishRequestMessage struct {
requestID gsmsg.GraphSyncRequestID
}
// LoadAttempter attempts to load a link to an array of bytes
// it has three results:
// bytes present, error nil = success
// bytes nil, error present = error
// bytes nil, error nil = did not load, but try again later
type LoadAttempter func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error)
// AsyncLoader is used to make multiple attempts to load a blocks over the
// course of a request - as long as a request is in progress, it will make multiple
// attempts to load a block until it gets a definitive result of whether the block
// is present or missing in the response
type AsyncLoader struct {
ctx context.Context
cancel context.CancelFunc
loadAttempter LoadAttempter
incomingMessages chan loaderMessage
outgoingMessages chan loaderMessage
activeRequests map[gsmsg.GraphSyncRequestID]struct{}
pausedRequests []*loadRequest
}
// New initializes a new AsyncLoader from the given context and loadAttempter function
func New(ctx context.Context, loadAttempter LoadAttempter) *AsyncLoader {
ctx, cancel := context.WithCancel(ctx)
return &AsyncLoader{
ctx: ctx,
cancel: cancel,
loadAttempter: loadAttempter,
incomingMessages: make(chan loaderMessage),
outgoingMessages: make(chan loaderMessage),
activeRequests: make(map[gsmsg.GraphSyncRequestID]struct{}),
}
}
// AsyncLoad asynchronously loads the given link for the given request ID. It returns a channel for data and a channel
// for errors -- only one message will be sent over either.
func (abl *AsyncLoader) AsyncLoad(requestID gsmsg.GraphSyncRequestID, link ipld.Link) (<-chan []byte, <-chan error) {
responseChan := make(chan []byte, 1)
errChan := make(chan error, 1)
lr := newLoadRequest(requestID, link, responseChan, errChan)
select {
case <-abl.ctx.Done():
abl.terminateWithError("Context Closed", responseChan, errChan)
case abl.incomingMessages <- lr:
}
return responseChan, errChan
}
// NewResponsesAvailable indicates that the async loader should make another attempt to load
// the links that are currently pending.
func (abl *AsyncLoader) NewResponsesAvailable() {
select {
case <-abl.ctx.Done():
case abl.incomingMessages <- &newResponsesAvailableMessage{}:
}
}
// StartRequest indicates the given request has started and the loader should
// accepting link load requests for this requestID.
func (abl *AsyncLoader) StartRequest(requestID gsmsg.GraphSyncRequestID) {
select {
case <-abl.ctx.Done():
case abl.incomingMessages <- &startRequestMessage{requestID}:
}
}
// FinishRequest indicates the given request is completed or cancelled, and all in
// progress link load requests for this request ID should error
func (abl *AsyncLoader) FinishRequest(requestID gsmsg.GraphSyncRequestID) {
select {
case <-abl.ctx.Done():
case abl.incomingMessages <- &finishRequestMessage{requestID}:
}
}
// Startup starts processing of messages
func (abl *AsyncLoader) Startup() {
go abl.messageQueueWorker()
go abl.run()
}
// Shutdown stops processing of messages
func (abl *AsyncLoader) Shutdown() {
abl.cancel()
}
func (abl *AsyncLoader) run() {
for {
select {
case <-abl.ctx.Done():
return
case message := <-abl.outgoingMessages:
message.handle(abl)
}
}
}
func (abl *AsyncLoader) messageQueueWorker() {
var messageBuffer []loaderMessage
nextMessage := func() loaderMessage {
if len(messageBuffer) == 0 {
return nil
}
return messageBuffer[0]
}
outgoingMessages := func() chan<- loaderMessage {
if len(messageBuffer) == 0 {
return nil
}
return abl.outgoingMessages
}
for {
select {
case incomingMessage := <-abl.incomingMessages:
messageBuffer = append(messageBuffer, incomingMessage)
case outgoingMessages() <- nextMessage():
messageBuffer = messageBuffer[1:]
case <-abl.ctx.Done():
return
}
}
}
func (lr *loadRequest) handle(abl *AsyncLoader) {
_, ok := abl.activeRequests[lr.requestID]
if !ok {
abl.terminateWithError("No active request", lr.responseChan, lr.errChan)
returnLoadRequest(lr)
return
}
response, err := abl.loadAttempter(lr.requestID, lr.link)
if err != nil {
lr.errChan <- err
close(lr.errChan)
close(lr.responseChan)
returnLoadRequest(lr)
return
}
if response != nil {
lr.responseChan <- response
close(lr.errChan)
close(lr.responseChan)
returnLoadRequest(lr)
return
}
abl.pausedRequests = append(abl.pausedRequests, lr)
}
func (srm *startRequestMessage) handle(abl *AsyncLoader) {
abl.activeRequests[srm.requestID] = struct{}{}
}
func (frm *finishRequestMessage) handle(abl *AsyncLoader) {
delete(abl.activeRequests, frm.requestID)
pausedRequests := abl.pausedRequests
abl.pausedRequests = nil
for _, lr := range pausedRequests {
if lr.requestID == frm.requestID {
abl.terminateWithError("No active request", lr.responseChan, lr.errChan)
returnLoadRequest(lr)
} else {
abl.pausedRequests = append(abl.pausedRequests, lr)
}
}
}
func (nram *newResponsesAvailableMessage) handle(abl *AsyncLoader) {
// drain buffered
pausedRequests := abl.pausedRequests
abl.pausedRequests = nil
for _, lr := range pausedRequests {
select {
case <-abl.ctx.Done():
return
case abl.incomingMessages <- lr:
}
}
}
func (abl *AsyncLoader) terminateWithError(errMsg string, responseChan chan<- []byte, errChan chan<- error) {
errChan <- errors.New(errMsg)
close(errChan)
close(responseChan)
}
package asyncloader
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/testbridge"
"github.com/ipfs/go-graphsync/testutil"
ipld "github.com/ipld/go-ipld-prime"
)
func TestAsyncLoadWhenRequestNotInProgress(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
callCount := 0
loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) {
callCount++
return testutil.RandomBytes(100), nil
}
asyncLoader := New(ctx, loadAttempter)
asyncLoader.Startup()
link := testbridge.NewMockLink()
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
responseChan, errChan := asyncLoader.AsyncLoad(requestID, link)
select {
case _, ok := <-responseChan:
if ok {
t.Fatal("should not have sent responses")
}
case <-ctx.Done():
t.Fatal("should have closed response channel")
}
select {
case _, ok := <-errChan:
if !ok {
t.Fatal("should have sent an error")
}
case <-ctx.Done():
t.Fatal("should have closed error channel")
}
if callCount > 0 {
t.Fatal("should not have attempted to load link but did")
}
}
func TestAsyncLoadWhenInitialLoadSucceeds(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
callCount := 0
loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) {
callCount++
return testutil.RandomBytes(100), nil
}
asyncLoader := New(ctx, loadAttempter)
asyncLoader.Startup()
link := testbridge.NewMockLink()
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
asyncLoader.StartRequest(requestID)
responseChan, errChan := asyncLoader.AsyncLoad(requestID, link)
select {
case _, ok := <-responseChan:
if !ok {
t.Fatal("should have sent a response")
}
case <-ctx.Done():
t.Fatal("should have closed response channel")
}
select {
case _, ok := <-errChan:
if ok {
t.Fatal("should not have sent an error")
}
case <-ctx.Done():
t.Fatal("should have closed error channel")
}
if callCount == 0 {
t.Fatal("should have attempted to load link but did not")
}
}
func TestAsyncLoadInitialLoadFails(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
callCount := 0
loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) {
callCount++
return nil, fmt.Errorf("something went wrong")
}
asyncLoader := New(ctx, loadAttempter)
asyncLoader.Startup()
link := testbridge.NewMockLink()
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
asyncLoader.StartRequest(requestID)
responseChan, errChan := asyncLoader.AsyncLoad(requestID, link)
select {
case _, ok := <-responseChan:
if ok {
t.Fatal("should not have sent responses")
}
case <-ctx.Done():
t.Fatal("should have closed response channel")
}
select {
case _, ok := <-errChan:
if !ok {
t.Fatal("should have sent an error")
}
case <-ctx.Done():
t.Fatal("should have closed error channel")
}
if callCount == 0 {
t.Fatal("should have attempted to load link but did not")
}
}
func TestAsyncLoadInitialLoadIndeterminateThenSucceeds(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
callCount := 0
called := make(chan struct{}, 2)
loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) {
var result []byte
called <- struct{}{}
if callCount > 0 {
result = testutil.RandomBytes(100)
}
callCount++
return result, nil
}
asyncLoader := New(ctx, loadAttempter)
asyncLoader.Startup()
link := testbridge.NewMockLink()
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
asyncLoader.StartRequest(requestID)
responseChan, errChan := asyncLoader.AsyncLoad(requestID, link)
select {
case <-called:
case <-responseChan:
t.Fatal("Should not have sent message on response chan")
case <-errChan:
t.Fatal("Should not have sent messages on error chan")
case <-ctx.Done():
t.Fatal("should have attempted load once")
}
asyncLoader.NewResponsesAvailable()
select {
case _, ok := <-responseChan:
if !ok {
t.Fatal("should have sent a response")
}
case <-ctx.Done():
t.Fatal("should have closed response channel")
}
select {
case _, ok := <-errChan:
if ok {
t.Fatal("should not have sent an error")
}
case <-ctx.Done():
t.Fatal("should have closed error channel")
}
if callCount < 2 {
t.Fatal("should have attempted to load multiple times till success but did not")
}
}
func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
callCount := 0
called := make(chan struct{}, 2)
loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) {
var result []byte
called <- struct{}{}
if callCount > 0 {
result = testutil.RandomBytes(100)
}
callCount++
return result, nil
}
asyncLoader := New(ctx, loadAttempter)
asyncLoader.Startup()
link := testbridge.NewMockLink()
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
asyncLoader.StartRequest(requestID)
responseChan, errChan := asyncLoader.AsyncLoad(requestID, link)
select {
case <-called:
case <-responseChan:
t.Fatal("Should not have sent message on response chan")
case <-errChan:
t.Fatal("Should not have sent messages on error chan")
case <-ctx.Done():
t.Fatal("should have attempted load once")
}
asyncLoader.FinishRequest(requestID)
asyncLoader.NewResponsesAvailable()
select {
case _, ok := <-responseChan:
if ok {
t.Fatal("should not have sent responses")
}
case <-ctx.Done():
t.Fatal("should have closed response channel")
}
select {
case _, ok := <-errChan:
if !ok {
t.Fatal("should have sent an error")
}
case <-ctx.Done():
t.Fatal("should have closed error channel")
}
if callCount > 1 {
t.Fatal("should only have attempted one call but attempted multiple")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment