runtraversal.go 1.89 KB
Newer Older
1 2 3 4 5 6
package runtraversal

import (
	"bytes"
	"io"

7 8
	"github.com/ipfs/go-graphsync/ipldutil"
	logging "github.com/ipfs/go-log/v2"
9
	ipld "github.com/ipld/go-ipld-prime"
10
	"github.com/ipld/go-ipld-prime/traversal"
11 12
)

13 14
var logger = logging.Logger("gs-traversal")

15 16 17 18 19 20 21 22 23 24 25 26
// ResponseSender sends responses over the network
type ResponseSender func(
	link ipld.Link,
	data []byte,
) error

// RunTraversal wraps a given loader with an interceptor that sends loaded
// blocks out to the network with the given response sender.
func RunTraversal(
	loader ipld.Loader,
	traverser ipldutil.Traverser,
	sendResponse ResponseSender) error {
27 28
	nBlocksRead := 0

29 30 31
	for {
		isComplete, err := traverser.IsComplete()
		if isComplete {
32 33 34 35 36
			if err != nil {
				logger.Infof("traversal completion check failed, nBlocksRead=%d, err=%s", nBlocksRead, err)
			} else {
				logger.Infof("traversal completed successfully, nBlocksRead=%d", nBlocksRead)
			}
37 38 39
			return err
		}
		lnk, lnkCtx := traverser.CurrentRequest()
40
		logger.Debugf("will load link=%s", lnk)
41 42 43
		result, err := loader(lnk, lnkCtx)
		var data []byte
		if err != nil {
44
			logger.Errorf("failed to load link=%s, nBlocksRead=%d, err=%s", lnk, nBlocksRead, err)
45
			traverser.Error(traversal.SkipMe{})
46
		} else {
47 48 49 50 51
			blockBuffer, ok := result.(*bytes.Buffer)
			if !ok {
				blockBuffer = new(bytes.Buffer)
				_, err = io.Copy(blockBuffer, result)
			}
52
			if err != nil {
53
				logger.Errorf("failed to write to buffer, link=%s, nBlocksRead=%d, err=%s", lnk, nBlocksRead, err)
54 55 56
				traverser.Error(err)
			} else {
				data = blockBuffer.Bytes()
57
				err = traverser.Advance(blockBuffer)
58
				if err != nil {
59
					logger.Errorf("failed to advance traversal, link=%s, nBlocksRead=%d, err=%s", lnk, nBlocksRead, err)
60 61 62
					return err
				}
			}
63 64
			nBlocksRead++
			logger.Debugf("successfully loaded link=%s, nBlocksRead=%d", lnk, nBlocksRead)
65 66 67 68 69 70 71
		}
		err = sendResponse(lnk, data)
		if err != nil {
			return err
		}
	}
}