Commit a011c532 authored by hannahhoward's avatar hannahhoward

feat(loader): loader for request manager

wrapper around an async load to make a regular ipld.loader for the request manager
parent 2dce5f9d
...@@ -2,6 +2,7 @@ package ipldbridge ...@@ -2,6 +2,7 @@ package ipldbridge
import ( import (
"context" "context"
"errors"
"github.com/ipld/go-ipld-prime/fluent" "github.com/ipld/go-ipld-prime/fluent"
...@@ -10,6 +11,13 @@ import ( ...@@ -10,6 +11,13 @@ import (
ipldselector "github.com/ipld/go-ipld-prime/traversal/selector" ipldselector "github.com/ipld/go-ipld-prime/traversal/selector"
) )
var errDoNotFollow = errors.New("Dont Follow Me")
// ErrDoNotFollow is just a wrapper for whatever IPLD's ErrDoNotFollow ends up looking like
func ErrDoNotFollow() error {
return errDoNotFollow
}
// Loader is an alias from ipld, in case it's renamed/moved. // Loader is an alias from ipld, in case it's renamed/moved.
type Loader = ipld.Loader type Loader = ipld.Loader
......
package loader
import (
"bytes"
"context"
"fmt"
"io"
"github.com/ipfs/go-graphsync/ipldbridge"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
ipld "github.com/ipld/go-ipld-prime"
)
// AsyncLoadFn is a function which given a request id and an ipld.Link, returns
// a channel which will eventually return data for the link or an err
type AsyncLoadFn func(gsmsg.GraphSyncRequestID, ipld.Link) <-chan asyncloader.AsyncLoadResult
// WrapAsyncLoader creates a regular ipld link laoder from an asynchronous load
// function, with the given cancellation context, for the given requests, and will
// transmit load errors on the given channel
func WrapAsyncLoader(
ctx context.Context,
asyncLoadFn AsyncLoadFn,
requestID gsmsg.GraphSyncRequestID,
errorChan chan error) ipld.Loader {
return func(link ipld.Link, linkContext ipldbridge.LinkContext) (io.Reader, error) {
resultChan := asyncLoadFn(requestID, link)
select {
case <-ctx.Done():
return nil, fmt.Errorf("request finished")
case result := <-resultChan:
if result.Err != nil {
select {
case <-ctx.Done():
return nil, fmt.Errorf("request finished")
case errorChan <- result.Err:
return nil, ipldbridge.ErrDoNotFollow()
}
}
return bytes.NewReader(result.Data), nil
}
}
}
package loader
import (
"context"
"errors"
"io"
"io/ioutil"
"math/rand"
"reflect"
"testing"
"time"
"github.com/ipfs/go-graphsync/ipldbridge"
"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
"github.com/ipfs/go-graphsync/testbridge"
"github.com/ipfs/go-graphsync/testutil"
"github.com/ipld/go-ipld-prime"
gsmsg "github.com/ipfs/go-graphsync/message"
)
type callParams struct {
requestID gsmsg.GraphSyncRequestID
link ipld.Link
}
func makeAsyncLoadFn(responseChan chan asyncloader.AsyncLoadResult, calls chan callParams) AsyncLoadFn {
return func(requestID gsmsg.GraphSyncRequestID, link ipld.Link) <-chan asyncloader.AsyncLoadResult {
calls <- callParams{requestID, link}
return responseChan
}
}
func TestWrappedAsyncLoaderReturnsValues(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
responseChan := make(chan asyncloader.AsyncLoadResult, 1)
calls := make(chan callParams, 1)
asyncLoadFn := makeAsyncLoadFn(responseChan, calls)
errChan := make(chan error)
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
loader := WrapAsyncLoader(ctx, asyncLoadFn, requestID, errChan)
link := testbridge.NewMockLink()
data := testutil.RandomBytes(100)
responseChan <- asyncloader.AsyncLoadResult{Data: data, Err: nil}
stream, err := loader(link, ipldbridge.LinkContext{})
if err != nil {
t.Fatal("Should not have errored on load")
}
returnedData, err := ioutil.ReadAll(stream)
if err != nil {
t.Fatal("error in return stream")
}
if !reflect.DeepEqual(data, returnedData) {
t.Fatal("returned data did not match expected")
}
}
func TestWrappedAsyncLoaderSideChannelsErrors(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
responseChan := make(chan asyncloader.AsyncLoadResult, 1)
calls := make(chan callParams, 1)
asyncLoadFn := makeAsyncLoadFn(responseChan, calls)
errChan := make(chan error, 1)
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
loader := WrapAsyncLoader(ctx, asyncLoadFn, requestID, errChan)
link := testbridge.NewMockLink()
err := errors.New("something went wrong")
responseChan <- asyncloader.AsyncLoadResult{Data: nil, Err: err}
stream, loadErr := loader(link, ipldbridge.LinkContext{})
if stream != nil || loadErr != ipldbridge.ErrDoNotFollow() {
t.Fatal("Should have errored on load")
}
select {
case <-ctx.Done():
t.Fatal("should have returned an error on side channel but didn't")
case returnedErr := <-errChan:
if returnedErr != err {
t.Fatal("returned wrong error on side channel")
}
}
}
func TestWrappedAsyncLoaderContextCancels(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
subCtx, subCancel := context.WithCancel(ctx)
responseChan := make(chan asyncloader.AsyncLoadResult, 1)
calls := make(chan callParams, 1)
asyncLoadFn := makeAsyncLoadFn(responseChan, calls)
errChan := make(chan error, 1)
requestID := gsmsg.GraphSyncRequestID(rand.Int31())
loader := WrapAsyncLoader(subCtx, asyncLoadFn, requestID, errChan)
link := testbridge.NewMockLink()
resultsChan := make(chan struct {
io.Reader
error
})
go func() {
stream, err := loader(link, ipldbridge.LinkContext{})
resultsChan <- struct {
io.Reader
error
}{stream, err}
}()
subCancel()
select {
case <-ctx.Done():
t.Fatal("should have returned from context cancelling but didn't")
case result := <-resultsChan:
if result.Reader != nil || result.error == nil {
t.Fatal("should have errored from context cancelling but didn't")
}
}
}
...@@ -35,6 +35,9 @@ func WrapLoader(loader ipldbridge.Loader, ...@@ -35,6 +35,9 @@ func WrapLoader(loader ipldbridge.Loader,
} }
} }
responseSender.SendResponse(requestID, lnk, data) responseSender.SendResponse(requestID, lnk, data)
if data == nil {
err = ipldbridge.ErrDoNotFollow()
}
return result, err return result, err
} }
} }
...@@ -33,7 +33,7 @@ func (frs *fakeResponseSender) SendResponse( ...@@ -33,7 +33,7 @@ func (frs *fakeResponseSender) SendResponse(
frs.lastData = data frs.lastData = data
} }
func TestWrappedLoaderSendsRequests(t *testing.T) { func TestWrappedLoaderSendsResponses(t *testing.T) {
frs := &fakeResponseSender{} frs := &fakeResponseSender{}
link1 := testbridge.NewMockLink() link1 := testbridge.NewMockLink()
link2 := testbridge.NewMockLink() link2 := testbridge.NewMockLink()
...@@ -64,12 +64,15 @@ func TestWrappedLoaderSendsRequests(t *testing.T) { ...@@ -64,12 +64,15 @@ func TestWrappedLoaderSendsRequests(t *testing.T) {
} }
reader, err = wrappedLoader(link2, ipldbridge.LinkContext{}) reader, err = wrappedLoader(link2, ipldbridge.LinkContext{})
fmt.Println(reader)
fmt.Println(err)
if reader != nil || err == nil { if reader != nil || err == nil {
t.Fatal("Should return an error and empty reader if underlying loader does") t.Fatal("Should return an error and empty reader if underlying loader does")
} }
if err != ipldbridge.ErrDoNotFollow() {
t.Fatal("Should convert error to a do not follow error")
}
if frs.lastRequestID != requestID || if frs.lastRequestID != requestID ||
frs.lastLink != link2 || frs.lastLink != link2 ||
frs.lastData != nil { frs.lastData != nil {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment