executor.go 2.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
package executor

import (
	"context"

	"github.com/ipfs/go-graphsync"
	"github.com/ipfs/go-graphsync/ipldutil"
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/requestmanager/loader"
	ipld "github.com/ipld/go-ipld-prime"
	cidlink "github.com/ipld/go-ipld-prime/linking/cid"
	"github.com/ipld/go-ipld-prime/traversal"
)

// RequestExecution runs a single graphsync request with data loaded from the
// asynchronous loader
type RequestExecution struct {
	Request          gsmsg.GraphSyncRequest
	SendRequest      func(gsmsg.GraphSyncRequest)
	Loader           loader.AsyncLoadFn
	RunBlockHooks    func(blk graphsync.BlockData) error
	TerminateRequest func()
	NodeStyleChooser traversal.LinkTargetNodeStyleChooser
}

// Start begins execution of a request in a go routine
func (re RequestExecution) Start(ctx context.Context) (chan graphsync.ResponseProgress, chan error) {
	executor := &requestExecutor{
		inProgressChan:   make(chan graphsync.ResponseProgress),
		inProgressErr:    make(chan error),
		ctx:              ctx,
		request:          re.Request,
		sendRequest:      re.SendRequest,
		loader:           re.Loader,
		runBlockHooks:    re.RunBlockHooks,
		terminateRequest: re.TerminateRequest,
		nodeStyleChooser: re.NodeStyleChooser,
	}
	executor.sendRequest(executor.request)
	go executor.run()
	return executor.inProgressChan, executor.inProgressErr
}

type requestExecutor struct {
	inProgressChan   chan graphsync.ResponseProgress
	inProgressErr    chan error
	ctx              context.Context
	request          gsmsg.GraphSyncRequest
	sendRequest      func(gsmsg.GraphSyncRequest)
	loader           loader.AsyncLoadFn
	runBlockHooks    func(blk graphsync.BlockData) error
	terminateRequest func()
	nodeStyleChooser traversal.LinkTargetNodeStyleChooser
}

func (re *requestExecutor) visitor(tp traversal.Progress, node ipld.Node, tr traversal.VisitReason) error {
	select {
	case <-re.ctx.Done():
	case re.inProgressChan <- graphsync.ResponseProgress{
		Node:      node,
		Path:      tp.Path,
		LastBlock: tp.LastBlock,
	}:
	}
	return nil
}

func (re *requestExecutor) run() {
	selector, _ := ipldutil.ParseSelector(re.request.Selector())
	loaderFn := loader.WrapAsyncLoader(re.ctx, re.loader, re.request.ID(), re.inProgressErr, re.runBlockHooks)
	err := ipldutil.Traverse(re.ctx, loaderFn, re.nodeStyleChooser, cidlink.Link{Cid: re.request.Root()}, selector, re.visitor)
	if err != nil {
		_, isContextErr := err.(loader.ContextCancelError)
		if !isContextErr {
			select {
			case <-re.ctx.Done():
			case re.inProgressErr <- err:
			}
		}
	}
	re.terminateRequest()
	close(re.inProgressChan)
	close(re.inProgressErr)
}