Commit 2adb6306 authored by hannahhoward's avatar hannahhoward

refactor(asyncloader): use response cache

extracts parts of old asyncloader to load attempt queue
expands functionality of asyncloader to handle response cache
moves response cache and unverified block store under async laoder
parent a011c532
......@@ -3,156 +3,159 @@ package asyncloader
import (
"context"
"errors"
"sync"
"io/ioutil"
"github.com/ipfs/go-block-format"
"github.com/ipfs/go-graphsync/ipldbridge"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/metadata"
"github.com/ipfs/go-graphsync/requestmanager/asyncloader/loadattemptqueue"
"github.com/ipfs/go-graphsync/requestmanager/asyncloader/responsecache"
"github.com/ipfs/go-graphsync/requestmanager/asyncloader/unverifiedblockstore"
"github.com/ipfs/go-graphsync/requestmanager/types"
"github.com/ipld/go-ipld-prime"
)
type loadRequest struct {
requestID gsmsg.GraphSyncRequestID
link ipld.Link
resultChan chan AsyncLoadResult
}
var loadRequestPool = sync.Pool{
New: func() interface{} {
return new(loadRequest)
},
}
func newLoadRequest(requestID gsmsg.GraphSyncRequestID,
link ipld.Link,
resultChan chan AsyncLoadResult) *loadRequest {
lr := loadRequestPool.Get().(*loadRequest)
lr.requestID = requestID
lr.link = link
lr.resultChan = resultChan
return lr
}
func returnLoadRequest(lr *loadRequest) {
*lr = loadRequest{}
loadRequestPool.Put(lr)
}
type loaderMessage interface {
handle(abl *AsyncLoader)
}
type newResponsesAvailableMessage struct{}
type startRequestMessage struct {
requestID gsmsg.GraphSyncRequestID
handle(al *AsyncLoader)
}
type finishRequestMessage struct {
requestID gsmsg.GraphSyncRequestID
}
// LoadAttempter attempts to load a link to an array of bytes
// it has three results:
// bytes present, error nil = success
// bytes nil, error present = error
// bytes nil, error nil = did not load, but try again later
type LoadAttempter func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error)
// AsyncLoadResult is sent once over the channel returned by an async load.
type AsyncLoadResult struct {
Data []byte
Err error
}
// AsyncLoader is used to make multiple attempts to load a blocks over the
// course of a request - as long as a request is in progress, it will make multiple
// attempts to load a block until it gets a definitive result of whether the block
// is present or missing in the response
// AsyncLoader manages loading links asynchronously in as new responses
// come in from the network
type AsyncLoader struct {
ctx context.Context
cancel context.CancelFunc
loadAttempter LoadAttempter
incomingMessages chan loaderMessage
outgoingMessages chan loaderMessage
activeRequests map[gsmsg.GraphSyncRequestID]struct{}
pausedRequests []*loadRequest
}
// New initializes a new AsyncLoader from the given context and loadAttempter function
func New(ctx context.Context, loadAttempter LoadAttempter) *AsyncLoader {
activeRequests map[gsmsg.GraphSyncRequestID]bool
loadAttemptQueue *loadattemptqueue.LoadAttemptQueue
responseCache *responsecache.ResponseCache
}
// New initializes a new link loading manager for asynchronous loads from the given context
// and local store loading and storing function
func New(ctx context.Context, loader ipld.Loader, storer ipld.Storer) *AsyncLoader {
unverifiedBlockStore := unverifiedblockstore.New(storer)
responseCache := responsecache.New(unverifiedBlockStore)
loadAttemptQueue := loadattemptqueue.New(func(requestID gsmsg.GraphSyncRequestID, link ipld.Link) ([]byte, error) {
// load from response cache
data, err := responseCache.AttemptLoad(requestID, link)
if data == nil && err == nil {
// fall back to local store
stream, loadErr := loader(link, ipldbridge.LinkContext{})
if stream != nil && loadErr == nil {
localData, loadErr := ioutil.ReadAll(stream)
if loadErr == nil && localData != nil {
return localData, nil
}
}
}
return data, err
})
ctx, cancel := context.WithCancel(ctx)
return &AsyncLoader{
ctx: ctx,
cancel: cancel,
loadAttempter: loadAttempter,
incomingMessages: make(chan loaderMessage),
outgoingMessages: make(chan loaderMessage),
activeRequests: make(map[gsmsg.GraphSyncRequestID]struct{}),
activeRequests: make(map[gsmsg.GraphSyncRequestID]bool),
responseCache: responseCache,
loadAttemptQueue: loadAttemptQueue,
}
}
// AsyncLoad asynchronously loads the given link for the given request ID. It returns a channel for data and a channel
// for errors -- only one message will be sent over either.
func (abl *AsyncLoader) AsyncLoad(requestID gsmsg.GraphSyncRequestID, link ipld.Link) <-chan AsyncLoadResult {
resultChan := make(chan AsyncLoadResult, 1)
lr := newLoadRequest(requestID, link, resultChan)
// Startup starts processing of messages
func (al *AsyncLoader) Startup() {
go al.messageQueueWorker()
go al.run()
}
// Shutdown finishes processing of messages
func (al *AsyncLoader) Shutdown() {
al.cancel()
}
// StartRequest indicates the given request has started and the manager should
// continually attempt to load links for this request as new responses come in
func (al *AsyncLoader) StartRequest(requestID gsmsg.GraphSyncRequestID) {
select {
case <-abl.ctx.Done():
abl.terminateWithError("Context Closed", resultChan)
case abl.incomingMessages <- lr:
case <-al.ctx.Done():
case al.incomingMessages <- &startRequestMessage{requestID}:
}
return resultChan
}
// NewResponsesAvailable indicates that the async loader should make another attempt to load
// the links that are currently pending.
func (abl *AsyncLoader) NewResponsesAvailable() {
// ProcessResponse injests new responses and completes asynchronous loads as
// neccesary
func (al *AsyncLoader) ProcessResponse(responses map[gsmsg.GraphSyncRequestID]metadata.Metadata,
blks []blocks.Block) {
al.responseCache.ProcessResponse(responses, blks)
select {
case <-abl.ctx.Done():
case abl.incomingMessages <- &newResponsesAvailableMessage{}:
case <-al.ctx.Done():
case al.incomingMessages <- &newResponsesAvailableMessage{}:
}
}
// StartRequest indicates the given request has started and the loader should
// accepting link load requests for this requestID.
func (abl *AsyncLoader) StartRequest(requestID gsmsg.GraphSyncRequestID) {
// AsyncLoad asynchronously loads the given link for the given request ID. It returns a channel for data and a channel
// for errors -- only one message will be sent over either.
func (al *AsyncLoader) AsyncLoad(requestID gsmsg.GraphSyncRequestID, link ipld.Link) <-chan types.AsyncLoadResult {
resultChan := make(chan types.AsyncLoadResult, 1)
lr := loadattemptqueue.NewLoadRequest(requestID, link, resultChan)
select {
case <-abl.ctx.Done():
case abl.incomingMessages <- &startRequestMessage{requestID}:
case <-al.ctx.Done():
resultChan <- types.AsyncLoadResult{Data: nil, Err: errors.New("Context closed")}
close(resultChan)
case al.incomingMessages <- &loadRequestMessage{requestID, lr}:
}
return resultChan
}
// FinishRequest indicates the given request is completed or cancelled, and all in
// progress link load requests for this request ID should error
func (abl *AsyncLoader) FinishRequest(requestID gsmsg.GraphSyncRequestID) {
// CompleteResponsesFor indicates no further responses will come in for the given
// requestID, so if no responses are in the cache or local store, a link load
// should not retry
func (al *AsyncLoader) CompleteResponsesFor(requestID gsmsg.GraphSyncRequestID) {
select {
case <-abl.ctx.Done():
case abl.incomingMessages <- &finishRequestMessage{requestID}:
case <-al.ctx.Done():
case al.incomingMessages <- &finishRequestMessage{requestID}:
}
}
// Startup starts processing of messages
func (abl *AsyncLoader) Startup() {
go abl.messageQueueWorker()
go abl.run()
// CleanupRequest indicates the given request is complete on the client side,
// and no further attempts will be made to load links for this request,
// so any cached response data is invalid can be cleaned
func (al *AsyncLoader) CleanupRequest(requestID gsmsg.GraphSyncRequestID) {
al.responseCache.FinishRequest(requestID)
}
type loadRequestMessage struct {
requestID gsmsg.GraphSyncRequestID
loadRequest loadattemptqueue.LoadRequest
}
type newResponsesAvailableMessage struct {
}
type startRequestMessage struct {
requestID gsmsg.GraphSyncRequestID
}
// Shutdown stops processing of messages
func (abl *AsyncLoader) Shutdown() {
abl.cancel()
type finishRequestMessage struct {
requestID gsmsg.GraphSyncRequestID
}
func (abl *AsyncLoader) run() {
func (al *AsyncLoader) run() {
for {
select {
case <-abl.ctx.Done():
case <-al.ctx.Done():
return
case message := <-abl.outgoingMessages:
message.handle(abl)
case message := <-al.outgoingMessages:
message.handle(al)
}
}
}
func (abl *AsyncLoader) messageQueueWorker() {
func (al *AsyncLoader) messageQueueWorker() {
var messageBuffer []loaderMessage
nextMessage := func() loaderMessage {
if len(messageBuffer) == 0 {
......@@ -164,75 +167,34 @@ func (abl *AsyncLoader) messageQueueWorker() {
if len(messageBuffer) == 0 {
return nil
}
return abl.outgoingMessages
return al.outgoingMessages
}
for {
select {
case incomingMessage := <-abl.incomingMessages:
case incomingMessage := <-al.incomingMessages:
messageBuffer = append(messageBuffer, incomingMessage)
case outgoingMessages() <- nextMessage():
messageBuffer = messageBuffer[1:]
case <-abl.ctx.Done():
case <-al.ctx.Done():
return
}
}
}
func (lr *loadRequest) handle(abl *AsyncLoader) {
response, err := abl.loadAttempter(lr.requestID, lr.link)
if err != nil {
lr.resultChan <- AsyncLoadResult{nil, err}
close(lr.resultChan)
returnLoadRequest(lr)
return
}
if response != nil {
lr.resultChan <- AsyncLoadResult{response, nil}
close(lr.resultChan)
returnLoadRequest(lr)
return
}
_, ok := abl.activeRequests[lr.requestID]
if !ok {
abl.terminateWithError("No active request", lr.resultChan)
returnLoadRequest(lr)
return
}
abl.pausedRequests = append(abl.pausedRequests, lr)
}
func (srm *startRequestMessage) handle(abl *AsyncLoader) {
abl.activeRequests[srm.requestID] = struct{}{}
func (lrm *loadRequestMessage) handle(al *AsyncLoader) {
retry := al.activeRequests[lrm.requestID]
al.loadAttemptQueue.AttemptLoad(lrm.loadRequest, retry)
}
func (frm *finishRequestMessage) handle(abl *AsyncLoader) {
delete(abl.activeRequests, frm.requestID)
pausedRequests := abl.pausedRequests
abl.pausedRequests = nil
for _, lr := range pausedRequests {
if lr.requestID == frm.requestID {
abl.terminateWithError("No active request", lr.resultChan)
returnLoadRequest(lr)
} else {
abl.pausedRequests = append(abl.pausedRequests, lr)
}
}
func (srm *startRequestMessage) handle(al *AsyncLoader) {
al.activeRequests[srm.requestID] = true
}
func (nram *newResponsesAvailableMessage) handle(abl *AsyncLoader) {
// drain buffered
pausedRequests := abl.pausedRequests
abl.pausedRequests = nil
for _, lr := range pausedRequests {
select {
case <-abl.ctx.Done():
return
case abl.incomingMessages <- lr:
}
}
func (frm *finishRequestMessage) handle(al *AsyncLoader) {
delete(al.activeRequests, frm.requestID)
al.loadAttemptQueue.ClearRequest(frm.requestID)
}
func (abl *AsyncLoader) terminateWithError(errMsg string, resultChan chan<- AsyncLoadResult) {
resultChan <- AsyncLoadResult{nil, errors.New(errMsg)}
close(resultChan)
func (nram *newResponsesAvailableMessage) handle(al *AsyncLoader) {
al.loadAttemptQueue.RetryLoads()
}
......@@ -2,30 +2,49 @@ package asyncloader
import (
"context"
"fmt"
"io"
"math/rand"
"reflect"
"testing"
"time"
"github.com/ipfs/go-graphsync/ipldbridge"
"github.com/ipfs/go-graphsync/metadata"
"github.com/ipld/go-ipld-prime/linking/cid"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/testbridge"
"github.com/ipfs/go-graphsync/testutil"
ipld "github.com/ipld/go-ipld-prime"
)
func TestAsyncLoadInitialLoadSucceeds(t *testing.T) {
func TestAsyncLoadInitialLoadSucceedsLocallyPresent(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
callCount := 0
loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) {
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
block := testutil.GenerateBlocksOfSize(1, 100)[0]
writer, commit, err := storer(ipldbridge.LinkContext{})
_, err = writer.Write(block.RawData())
if err != nil {
t.Fatal("could not seed block store")
}
link := cidlink.Link{Cid: block.Cid()}
err = commit(link)
if err != nil {
t.Fatal("could not seed block store")
}
wrappedLoader := func(link ipld.Link, linkContext ipldbridge.LinkContext) (io.Reader, error) {
callCount++
return testutil.RandomBytes(100), nil
return loader(link, linkContext)
}
asyncLoader := New(ctx, loadAttempter)
asyncLoader := New(ctx, wrappedLoader, storer)
asyncLoader.Startup()
link := testbridge.NewMockLink()
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
resultChan := asyncLoader.AsyncLoad(requestID, link)
......@@ -42,7 +61,60 @@ func TestAsyncLoadInitialLoadSucceeds(t *testing.T) {
}
if callCount == 0 {
t.Fatal("should have attempted to load link but did not")
t.Fatal("should have attempted to load link from local store but did not")
}
}
func TestAsyncLoadInitialLoadSucceedsResponsePresent(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
callCount := 0
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
blocks := testutil.GenerateBlocksOfSize(1, 100)
block := blocks[0]
link := cidlink.Link{Cid: block.Cid()}
wrappedLoader := func(link ipld.Link, linkContext ipldbridge.LinkContext) (io.Reader, error) {
callCount++
return loader(link, linkContext)
}
asyncLoader := New(ctx, wrappedLoader, storer)
asyncLoader.Startup()
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
responses := map[gsmsg.GraphSyncRequestID]metadata.Metadata{
requestID: metadata.Metadata{
metadata.Item{
Link: link,
BlockPresent: true,
},
},
}
asyncLoader.ProcessResponse(responses, blocks)
resultChan := asyncLoader.AsyncLoad(requestID, link)
select {
case result := <-resultChan:
if result.Data == nil {
t.Fatal("should have sent a response")
}
if result.Err != nil {
t.Fatal("should not have sent an error")
}
case <-ctx.Done():
t.Fatal("should have closed response channel")
}
if callCount > 0 {
t.Fatal("should not have attempted to load link from local store")
}
if !reflect.DeepEqual(blockStore[link], block.RawData()) {
t.Fatal("should have stored block but didn't")
}
}
......@@ -51,15 +123,30 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
callCount := 0
loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) {
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
wrappedLoader := func(link ipld.Link, linkContext ipldbridge.LinkContext) (io.Reader, error) {
callCount++
return nil, fmt.Errorf("something went wrong")
return loader(link, linkContext)
}
asyncLoader := New(ctx, loadAttempter)
asyncLoader := New(ctx, wrappedLoader, storer)
asyncLoader.Startup()
link := testbridge.NewMockLink()
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
responses := map[gsmsg.GraphSyncRequestID]metadata.Metadata{
requestID: metadata.Metadata{
metadata.Item{
Link: link,
BlockPresent: false,
},
},
}
asyncLoader.ProcessResponse(responses, nil)
resultChan := asyncLoader.AsyncLoad(requestID, link)
select {
......@@ -74,25 +161,25 @@ func TestAsyncLoadInitialLoadFails(t *testing.T) {
t.Fatal("should have closed response channel")
}
if callCount == 0 {
t.Fatal("should have attempted to load link but did not")
if callCount > 0 {
t.Fatal("should not have attempted to load link from local store")
}
}
func TestAsyncLoadInitialLoadIndeterminateWhenRequestNotInProgress(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
callCount := 0
loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) {
var result []byte
if callCount > 0 {
result = testutil.RandomBytes(100)
}
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
wrappedLoader := func(link ipld.Link, linkContext ipldbridge.LinkContext) (io.Reader, error) {
callCount++
return result, nil
return loader(link, linkContext)
}
asyncLoader := New(ctx, loadAttempter)
asyncLoader := New(ctx, wrappedLoader, storer)
asyncLoader.Startup()
link := testbridge.NewMockLink()
......@@ -106,14 +193,13 @@ func TestAsyncLoadInitialLoadIndeterminateWhenRequestNotInProgress(t *testing.T)
}
if result.Err == nil {
t.Fatal("should have sent an error")
}
case <-ctx.Done():
t.Fatal("should have produced result")
t.Fatal("should have closed response channel")
}
if callCount > 1 {
t.Fatal("should have failed after load with indeterminate result")
if callCount == 0 {
t.Fatal("should have attempted to load link from local store but did not")
}
}
......@@ -122,23 +208,26 @@ func TestAsyncLoadInitialLoadIndeterminateThenSucceeds(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
callCount := 0
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
blocks := testutil.GenerateBlocksOfSize(1, 100)
block := blocks[0]
link := cidlink.Link{Cid: block.Cid()}
called := make(chan struct{}, 2)
loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) {
var result []byte
wrappedLoader := func(link ipld.Link, linkContext ipldbridge.LinkContext) (io.Reader, error) {
called <- struct{}{}
if callCount > 0 {
result = testutil.RandomBytes(100)
}
callCount++
return result, nil
return loader(link, linkContext)
}
asyncLoader := New(ctx, loadAttempter)
asyncLoader := New(ctx, wrappedLoader, storer)
asyncLoader.Startup()
link := testbridge.NewMockLink()
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
asyncLoader.StartRequest(requestID)
resultChan := asyncLoader.AsyncLoad(requestID, link)
select {
case <-called:
case <-resultChan:
......@@ -146,7 +235,16 @@ func TestAsyncLoadInitialLoadIndeterminateThenSucceeds(t *testing.T) {
case <-ctx.Done():
t.Fatal("should have attempted load once")
}
asyncLoader.NewResponsesAvailable()
responses := map[gsmsg.GraphSyncRequestID]metadata.Metadata{
requestID: metadata.Metadata{
metadata.Item{
Link: link,
BlockPresent: true,
},
},
}
asyncLoader.ProcessResponse(responses, blocks)
select {
case result := <-resultChan:
......@@ -160,33 +258,95 @@ func TestAsyncLoadInitialLoadIndeterminateThenSucceeds(t *testing.T) {
t.Fatal("should have closed response channel")
}
if callCount < 2 {
t.Fatal("should have attempted to load multiple times till success but did not")
if callCount != 1 {
t.Fatal("should have attempted to load from local store exactly once")
}
if !reflect.DeepEqual(blockStore[link], block.RawData()) {
t.Fatal("should have stored block but didn't")
}
}
func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) {
func TestAsyncLoadInitialLoadIndeterminateThenFails(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
callCount := 0
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
link := testbridge.NewMockLink()
called := make(chan struct{}, 2)
loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) {
var result []byte
wrappedLoader := func(link ipld.Link, linkContext ipldbridge.LinkContext) (io.Reader, error) {
called <- struct{}{}
if callCount > 0 {
result = testutil.RandomBytes(100)
}
callCount++
return result, nil
return loader(link, linkContext)
}
asyncLoader := New(ctx, loadAttempter)
asyncLoader := New(ctx, wrappedLoader, storer)
asyncLoader.Startup()
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
asyncLoader.StartRequest(requestID)
resultChan := asyncLoader.AsyncLoad(requestID, link)
select {
case <-called:
case <-resultChan:
t.Fatal("Should not have sent message on response chan")
case <-ctx.Done():
t.Fatal("should have attempted load once")
}
responses := map[gsmsg.GraphSyncRequestID]metadata.Metadata{
requestID: metadata.Metadata{
metadata.Item{
Link: link,
BlockPresent: false,
},
},
}
asyncLoader.ProcessResponse(responses, nil)
select {
case result := <-resultChan:
if result.Data != nil {
t.Fatal("should not have sent responses")
}
if result.Err == nil {
t.Fatal("should have sent an error")
}
case <-ctx.Done():
t.Fatal("should have closed response channel")
}
if callCount != 1 {
t.Fatal("should have attempted to load from local store exactly once")
}
}
func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
callCount := 0
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
link := testbridge.NewMockLink()
called := make(chan struct{}, 2)
wrappedLoader := func(link ipld.Link, linkContext ipldbridge.LinkContext) (io.Reader, error) {
called <- struct{}{}
callCount++
return loader(link, linkContext)
}
asyncLoader := New(ctx, wrappedLoader, storer)
asyncLoader.Startup()
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
asyncLoader.StartRequest(requestID)
resultChan := asyncLoader.AsyncLoad(requestID, link)
select {
case <-called:
case <-resultChan:
......@@ -194,8 +354,7 @@ func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) {
case <-ctx.Done():
t.Fatal("should have attempted load once")
}
asyncLoader.FinishRequest(requestID)
asyncLoader.NewResponsesAvailable()
asyncLoader.CompleteResponsesFor(requestID)
select {
case result := <-resultChan:
......@@ -209,7 +368,82 @@ func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) {
t.Fatal("should have closed response channel")
}
if callCount > 1 {
t.Fatal("should only have attempted one call but attempted multiple")
if callCount != 1 {
t.Fatal("should have attempted to load from local store exactly once")
}
}
func TestAsyncLoadTwiceLoadsLocallySecondTime(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
callCount := 0
blockStore := make(map[ipld.Link][]byte)
loader, storer := testbridge.NewMockStore(blockStore)
blocks := testutil.GenerateBlocksOfSize(1, 100)
block := blocks[0]
link := cidlink.Link{Cid: block.Cid()}
wrappedLoader := func(link ipld.Link, linkContext ipldbridge.LinkContext) (io.Reader, error) {
callCount++
return loader(link, linkContext)
}
asyncLoader := New(ctx, wrappedLoader, storer)
asyncLoader.Startup()
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
responses := map[gsmsg.GraphSyncRequestID]metadata.Metadata{
requestID: metadata.Metadata{
metadata.Item{
Link: link,
BlockPresent: true,
},
},
}
asyncLoader.ProcessResponse(responses, blocks)
resultChan := asyncLoader.AsyncLoad(requestID, link)
select {
case result := <-resultChan:
if result.Data == nil {
t.Fatal("should have sent a response")
}
if result.Err != nil {
t.Fatal("should not have sent an error")
}
case <-ctx.Done():
t.Fatal("should have closed response channel")
}
if callCount > 0 {
t.Fatal("should not have attempted to load link from local store")
}
if !reflect.DeepEqual(blockStore[link], block.RawData()) {
t.Fatal("should have stored block but didn't")
}
resultChan = asyncLoader.AsyncLoad(requestID, link)
select {
case result := <-resultChan:
if result.Data == nil {
t.Fatal("should have sent a response")
}
if result.Err != nil {
t.Fatal("should not have sent an error")
}
case <-ctx.Done():
t.Fatal("should have closed response channel")
}
if callCount == 0 {
t.Fatal("should have attempted to load link from local store but did not")
}
if !reflect.DeepEqual(blockStore[link], block.RawData()) {
t.Fatal("should have stored block but didn't")
}
}
package loadattemptqueue
import (
"errors"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/requestmanager/types"
"github.com/ipld/go-ipld-prime"
)
// LoadRequest is a request to load the given link for the given request id,
// with results returned to the given channel
type LoadRequest struct {
requestID gsmsg.GraphSyncRequestID
link ipld.Link
resultChan chan types.AsyncLoadResult
}
// NewLoadRequest returns a new LoadRequest for the given request id, link,
// and results channel
func NewLoadRequest(requestID gsmsg.GraphSyncRequestID,
link ipld.Link,
resultChan chan types.AsyncLoadResult) LoadRequest {
return LoadRequest{requestID, link, resultChan}
}
// LoadAttempter attempts to load a link to an array of bytes
// it has three results:
// bytes present, error nil = success
// bytes nil, error present = error
// bytes nil, error nil = did not load, but try again later
type LoadAttempter func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error)
// LoadAttemptQueue attempts to load using the load attempter, and then can
// place requests on a retry queue
type LoadAttemptQueue struct {
loadAttempter LoadAttempter
pausedRequests []LoadRequest
}
// New initializes a new AsyncLoader from loadAttempter function
func New(loadAttempter LoadAttempter) *LoadAttemptQueue {
return &LoadAttemptQueue{
loadAttempter: loadAttempter,
}
}
// AttemptLoad attempts to loads the given load request, and if retry is true
// it saves the loadrequest for retrying later
func (laq *LoadAttemptQueue) AttemptLoad(lr LoadRequest, retry bool) {
response, err := laq.loadAttempter(lr.requestID, lr.link)
if err != nil {
lr.resultChan <- types.AsyncLoadResult{Data: nil, Err: err}
close(lr.resultChan)
return
}
if response != nil {
lr.resultChan <- types.AsyncLoadResult{Data: response, Err: nil}
close(lr.resultChan)
return
}
if !retry {
laq.terminateWithError("No active request", lr.resultChan)
return
}
laq.pausedRequests = append(laq.pausedRequests, lr)
}
// ClearRequest purges the given request from the queue of load requests
// to retry
func (laq *LoadAttemptQueue) ClearRequest(requestID gsmsg.GraphSyncRequestID) {
pausedRequests := laq.pausedRequests
laq.pausedRequests = nil
for _, lr := range pausedRequests {
if lr.requestID == requestID {
laq.terminateWithError("No active request", lr.resultChan)
} else {
laq.pausedRequests = append(laq.pausedRequests, lr)
}
}
}
// RetryLoads attempts loads on all saved load requests that were loaded with
// retry = true
func (laq *LoadAttemptQueue) RetryLoads() {
// drain buffered
pausedRequests := laq.pausedRequests
laq.pausedRequests = nil
for _, lr := range pausedRequests {
laq.AttemptLoad(lr, true)
}
}
func (laq *LoadAttemptQueue) terminateWithError(errMsg string, resultChan chan<- types.AsyncLoadResult) {
resultChan <- types.AsyncLoadResult{Data: nil, Err: errors.New(errMsg)}
close(resultChan)
}
package loadattemptqueue
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/requestmanager/types"
"github.com/ipfs/go-graphsync/testbridge"
"github.com/ipfs/go-graphsync/testutil"
ipld "github.com/ipld/go-ipld-prime"
)
func TestAsyncLoadInitialLoadSucceeds(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
callCount := 0
loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) {
callCount++
return testutil.RandomBytes(100), nil
}
loadAttemptQueue := New(loadAttempter)
link := testbridge.NewMockLink()
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
resultChan := make(chan types.AsyncLoadResult, 1)
lr := NewLoadRequest(requestID, link, resultChan)
loadAttemptQueue.AttemptLoad(lr, false)
select {
case result := <-resultChan:
if result.Data == nil {
t.Fatal("should have sent a response")
}
if result.Err != nil {
t.Fatal("should not have sent an error")
}
case <-ctx.Done():
t.Fatal("should have closed response channel")
}
if callCount == 0 {
t.Fatal("should have attempted to load link but did not")
}
}
func TestAsyncLoadInitialLoadFails(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
callCount := 0
loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) {
callCount++
return nil, fmt.Errorf("something went wrong")
}
loadAttemptQueue := New(loadAttempter)
link := testbridge.NewMockLink()
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
resultChan := make(chan types.AsyncLoadResult, 1)
lr := NewLoadRequest(requestID, link, resultChan)
loadAttemptQueue.AttemptLoad(lr, false)
select {
case result := <-resultChan:
if result.Data != nil {
t.Fatal("should not have sent responses")
}
if result.Err == nil {
t.Fatal("should have sent an error")
}
case <-ctx.Done():
t.Fatal("should have closed response channel")
}
if callCount == 0 {
t.Fatal("should have attempted to load link but did not")
}
}
func TestAsyncLoadInitialLoadIndeterminateRetryFalse(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
callCount := 0
loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) {
var result []byte
if callCount > 0 {
result = testutil.RandomBytes(100)
}
callCount++
return result, nil
}
loadAttemptQueue := New(loadAttempter)
link := testbridge.NewMockLink()
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
resultChan := make(chan types.AsyncLoadResult, 1)
lr := NewLoadRequest(requestID, link, resultChan)
loadAttemptQueue.AttemptLoad(lr, false)
select {
case result := <-resultChan:
if result.Data != nil {
t.Fatal("should not have sent responses")
}
if result.Err == nil {
t.Fatal("should have sent an error")
}
case <-ctx.Done():
t.Fatal("should have produced result")
}
if callCount > 1 {
t.Fatal("should have failed after load with indeterminate result")
}
}
func TestAsyncLoadInitialLoadIndeterminateRetryTrueThenRetriedSuccess(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
callCount := 0
called := make(chan struct{}, 2)
loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) {
var result []byte
called <- struct{}{}
if callCount > 0 {
result = testutil.RandomBytes(100)
}
callCount++
return result, nil
}
loadAttemptQueue := New(loadAttempter)
link := testbridge.NewMockLink()
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
resultChan := make(chan types.AsyncLoadResult, 1)
lr := NewLoadRequest(requestID, link, resultChan)
loadAttemptQueue.AttemptLoad(lr, true)
select {
case <-called:
case <-resultChan:
t.Fatal("Should not have sent message on response chan")
case <-ctx.Done():
t.Fatal("should have attempted load once")
}
loadAttemptQueue.RetryLoads()
select {
case result := <-resultChan:
if result.Data == nil {
t.Fatal("should have sent a response")
}
if result.Err != nil {
t.Fatal("should not have sent an error")
}
case <-ctx.Done():
t.Fatal("should have closed response channel")
}
if callCount < 2 {
t.Fatal("should have attempted to load multiple times till success but did not")
}
}
func TestAsyncLoadInitialLoadIndeterminateThenRequestFinishes(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
callCount := 0
called := make(chan struct{}, 2)
loadAttempter := func(gsmsg.GraphSyncRequestID, ipld.Link) ([]byte, error) {
var result []byte
called <- struct{}{}
if callCount > 0 {
result = testutil.RandomBytes(100)
}
callCount++
return result, nil
}
loadAttemptQueue := New(loadAttempter)
link := testbridge.NewMockLink()
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
resultChan := make(chan types.AsyncLoadResult, 1)
lr := NewLoadRequest(requestID, link, resultChan)
loadAttemptQueue.AttemptLoad(lr, true)
select {
case <-called:
case <-resultChan:
t.Fatal("Should not have sent message on response chan")
case <-ctx.Done():
t.Fatal("should have attempted load once")
}
loadAttemptQueue.ClearRequest(requestID)
loadAttemptQueue.RetryLoads()
select {
case result := <-resultChan:
if result.Data != nil {
t.Fatal("should not have sent responses")
}
if result.Err == nil {
t.Fatal("should have sent an error")
}
case <-ctx.Done():
t.Fatal("should have closed response channel")
}
if callCount > 1 {
t.Fatal("should only have attempted one call but attempted multiple")
}
}
......@@ -8,13 +8,13 @@ import (
"github.com/ipfs/go-graphsync/ipldbridge"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
"github.com/ipfs/go-graphsync/requestmanager/types"
ipld "github.com/ipld/go-ipld-prime"
)
// AsyncLoadFn is a function which given a request id and an ipld.Link, returns
// a channel which will eventually return data for the link or an err
type AsyncLoadFn func(gsmsg.GraphSyncRequestID, ipld.Link) <-chan asyncloader.AsyncLoadResult
type AsyncLoadFn func(gsmsg.GraphSyncRequestID, ipld.Link) <-chan types.AsyncLoadResult
// WrapAsyncLoader creates a regular ipld link laoder from an asynchronous load
// function, with the given cancellation context, for the given requests, and will
......
......@@ -11,8 +11,8 @@ import (
"time"
"github.com/ipfs/go-graphsync/ipldbridge"
"github.com/ipfs/go-graphsync/requestmanager/types"
"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
"github.com/ipfs/go-graphsync/testbridge"
"github.com/ipfs/go-graphsync/testutil"
"github.com/ipld/go-ipld-prime"
......@@ -25,8 +25,8 @@ type callParams struct {
link ipld.Link
}
func makeAsyncLoadFn(responseChan chan asyncloader.AsyncLoadResult, calls chan callParams) AsyncLoadFn {
return func(requestID gsmsg.GraphSyncRequestID, link ipld.Link) <-chan asyncloader.AsyncLoadResult {
func makeAsyncLoadFn(responseChan chan types.AsyncLoadResult, calls chan callParams) AsyncLoadFn {
return func(requestID gsmsg.GraphSyncRequestID, link ipld.Link) <-chan types.AsyncLoadResult {
calls <- callParams{requestID, link}
return responseChan
}
......@@ -36,7 +36,7 @@ func TestWrappedAsyncLoaderReturnsValues(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
responseChan := make(chan asyncloader.AsyncLoadResult, 1)
responseChan := make(chan types.AsyncLoadResult, 1)
calls := make(chan callParams, 1)
asyncLoadFn := makeAsyncLoadFn(responseChan, calls)
errChan := make(chan error)
......@@ -45,7 +45,7 @@ func TestWrappedAsyncLoaderReturnsValues(t *testing.T) {
link := testbridge.NewMockLink()
data := testutil.RandomBytes(100)
responseChan <- asyncloader.AsyncLoadResult{Data: data, Err: nil}
responseChan <- types.AsyncLoadResult{Data: data, Err: nil}
stream, err := loader(link, ipldbridge.LinkContext{})
if err != nil {
t.Fatal("Should not have errored on load")
......@@ -63,7 +63,7 @@ func TestWrappedAsyncLoaderSideChannelsErrors(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
responseChan := make(chan asyncloader.AsyncLoadResult, 1)
responseChan := make(chan types.AsyncLoadResult, 1)
calls := make(chan callParams, 1)
asyncLoadFn := makeAsyncLoadFn(responseChan, calls)
errChan := make(chan error, 1)
......@@ -72,7 +72,7 @@ func TestWrappedAsyncLoaderSideChannelsErrors(t *testing.T) {
link := testbridge.NewMockLink()
err := errors.New("something went wrong")
responseChan <- asyncloader.AsyncLoadResult{Data: nil, Err: err}
responseChan <- types.AsyncLoadResult{Data: nil, Err: err}
stream, loadErr := loader(link, ipldbridge.LinkContext{})
if stream != nil || loadErr != ipldbridge.ErrDoNotFollow() {
t.Fatal("Should have errored on load")
......@@ -92,7 +92,7 @@ func TestWrappedAsyncLoaderContextCancels(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
subCtx, subCancel := context.WithCancel(ctx)
responseChan := make(chan asyncloader.AsyncLoadResult, 1)
responseChan := make(chan types.AsyncLoadResult, 1)
calls := make(chan callParams, 1)
asyncLoadFn := makeAsyncLoadFn(responseChan, calls)
errChan := make(chan error, 1)
......
package types
// AsyncLoadResult is sent once over the channel returned by an async load.
type AsyncLoadResult struct {
Data []byte
Err error
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment