loader.go 1.31 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
package loader

import (
	"bytes"
	"context"
	"fmt"
	"io"

	"github.com/ipfs/go-graphsync/ipldbridge"
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
	ipld "github.com/ipld/go-ipld-prime"
)

// AsyncLoadFn is a function which given a request id and an ipld.Link, returns
// a channel which will eventually return data for the link or an err
type AsyncLoadFn func(gsmsg.GraphSyncRequestID, ipld.Link) <-chan asyncloader.AsyncLoadResult

// WrapAsyncLoader creates a regular ipld link laoder from an asynchronous load
// function, with the given cancellation context, for the given requests, and will
// transmit load errors on the given channel
func WrapAsyncLoader(
	ctx context.Context,
	asyncLoadFn AsyncLoadFn,
	requestID gsmsg.GraphSyncRequestID,
	errorChan chan error) ipld.Loader {
	return func(link ipld.Link, linkContext ipldbridge.LinkContext) (io.Reader, error) {
		resultChan := asyncLoadFn(requestID, link)
		select {
		case <-ctx.Done():
			return nil, fmt.Errorf("request finished")
		case result := <-resultChan:
			if result.Err != nil {
				select {
				case <-ctx.Done():
					return nil, fmt.Errorf("request finished")
				case errorChan <- result.Err:
					return nil, ipldbridge.ErrDoNotFollow()
				}
			}
			return bytes.NewReader(result.Data), nil
		}
	}
}