graphsync.go 3.69 KB
Newer Older
1 2 3 4 5 6
package graphsync

import (
	"context"

	"github.com/ipfs/go-graphsync/ipldbridge"
7 8
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/messagequeue"
9
	gsnet "github.com/ipfs/go-graphsync/network"
10
	"github.com/ipfs/go-graphsync/peermanager"
11
	"github.com/ipfs/go-graphsync/requestmanager"
12 13 14 15
	"github.com/ipfs/go-graphsync/responsemanager"
	"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
	"github.com/ipfs/go-graphsync/responsemanager/peertaskqueue"
	logging "github.com/ipfs/go-log"
16 17 18 19
	ipld "github.com/ipld/go-ipld-prime"
	"github.com/libp2p/go-libp2p-peer"
)

20 21
var log = logging.Logger("graphsync")

22 23 24 25 26 27 28 29 30 31
// ResponseProgress is the fundamental unit of responses making progress in
// Graphsync.
type ResponseProgress = requestmanager.ResponseProgress

// ResponseError is an error that occurred during a traversal.
type ResponseError = requestmanager.ResponseError

// GraphSync is an instance of a GraphSync exchange that implements
// the graphsync protocol.
type GraphSync struct {
32 33 34 35 36 37 38 39 40 41
	ipldBridge          ipldbridge.IPLDBridge
	network             gsnet.GraphSyncNetwork
	loader              ipldbridge.Loader
	requestManager      *requestmanager.RequestManager
	responseManager     *responsemanager.ResponseManager
	peerResponseManager *peerresponsemanager.PeerResponseManager
	peerTaskQueue       *peertaskqueue.PeerTaskQueue
	peerManager         *peermanager.PeerMessageManager
	ctx                 context.Context
	cancel              context.CancelFunc
42 43 44 45 46 47 48 49 50 51 52
}

// New creates a new GraphSync Exchange on the given network,
// using the given bridge to IPLD and the given link loader.
func New(parent context.Context, network gsnet.GraphSyncNetwork,
	ipldBridge ipldbridge.IPLDBridge, loader ipldbridge.Loader) *GraphSync {
	ctx, cancel := context.WithCancel(parent)

	createMessageQueue := func(ctx context.Context, p peer.ID) peermanager.PeerQueue {
		return messagequeue.New(ctx, p, network)
	}
53
	peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
54
	requestManager := requestmanager.New(ctx, ipldBridge)
55 56 57 58 59 60
	peerTaskQueue := peertaskqueue.New()
	createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender {
		return peerresponsemanager.NewResponseSender(ctx, p, peerManager, ipldBridge)
	}
	peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue)
	responseManager := responsemanager.New(ctx, loader, ipldBridge, peerResponseManager, peerTaskQueue)
61
	graphSync := &GraphSync{
62 63 64 65 66 67 68 69 70 71
		ipldBridge:          ipldBridge,
		network:             network,
		loader:              loader,
		requestManager:      requestManager,
		peerManager:         peerManager,
		peerTaskQueue:       peerTaskQueue,
		peerResponseManager: peerResponseManager,
		responseManager:     responseManager,
		ctx:                 ctx,
		cancel:              cancel,
72 73 74 75
	}

	requestManager.SetDelegate(peerManager)
	requestManager.Startup()
76 77
	responseManager.Startup()
	network.SetDelegate(graphSync)
78 79 80 81 82 83 84
	return graphSync
}

// Request initiates a new GraphSync request to the given peer using the given selector spec.
func (gs *GraphSync) Request(ctx context.Context, p peer.ID, rootedSelector ipld.Node) (<-chan ResponseProgress, <-chan ResponseError) {
	return gs.requestManager.SendRequest(ctx, p, rootedSelector)
}
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99

// ReceiveMessage is part of the networks Receiver interface and receives
// incoming messages from the network
func (gs *GraphSync) ReceiveMessage(
	ctx context.Context,
	sender peer.ID,
	incoming gsmsg.GraphSyncMessage) {
	gs.responseManager.ProcessRequests(ctx, sender, incoming.Requests())
}

// ReceiveError is part of the network's Receiver interface and handles incoming
// errors from the network.
func (gs *GraphSync) ReceiveError(err error) {
	log.Errorf("Error: %s", err.Error())
}