graphsync.go 3.89 KB
Newer Older
1 2 3 4 5
package graphsync

import (
	"context"

6 7
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader"

8
	"github.com/ipfs/go-graphsync/ipldbridge"
9 10
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/messagequeue"
11
	gsnet "github.com/ipfs/go-graphsync/network"
12
	"github.com/ipfs/go-graphsync/peermanager"
13
	"github.com/ipfs/go-graphsync/requestmanager"
14 15 16 17
	"github.com/ipfs/go-graphsync/responsemanager"
	"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
	"github.com/ipfs/go-graphsync/responsemanager/peertaskqueue"
	logging "github.com/ipfs/go-log"
18 19 20 21
	ipld "github.com/ipld/go-ipld-prime"
	"github.com/libp2p/go-libp2p-peer"
)

22 23
var log = logging.Logger("graphsync")

24 25 26 27 28 29 30
// ResponseProgress is the fundamental unit of responses making progress in
// Graphsync.
type ResponseProgress = requestmanager.ResponseProgress

// GraphSync is an instance of a GraphSync exchange that implements
// the graphsync protocol.
type GraphSync struct {
31 32 33
	ipldBridge          ipldbridge.IPLDBridge
	network             gsnet.GraphSyncNetwork
	loader              ipldbridge.Loader
34
	storer              ipldbridge.Storer
35 36
	requestManager      *requestmanager.RequestManager
	responseManager     *responsemanager.ResponseManager
37
	asyncLoader         *asyncloader.AsyncLoader
38 39 40 41 42
	peerResponseManager *peerresponsemanager.PeerResponseManager
	peerTaskQueue       *peertaskqueue.PeerTaskQueue
	peerManager         *peermanager.PeerMessageManager
	ctx                 context.Context
	cancel              context.CancelFunc
43 44 45 46 47
}

// New creates a new GraphSync Exchange on the given network,
// using the given bridge to IPLD and the given link loader.
func New(parent context.Context, network gsnet.GraphSyncNetwork,
48 49
	ipldBridge ipldbridge.IPLDBridge, loader ipldbridge.Loader,
	storer ipldbridge.Storer) *GraphSync {
50 51 52 53 54
	ctx, cancel := context.WithCancel(parent)

	createMessageQueue := func(ctx context.Context, p peer.ID) peermanager.PeerQueue {
		return messagequeue.New(ctx, p, network)
	}
55
	peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
56 57
	asyncLoader := asyncloader.New(ctx, loader, storer)
	requestManager := requestmanager.New(ctx, asyncLoader, ipldBridge)
58 59 60 61 62 63
	peerTaskQueue := peertaskqueue.New()
	createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender {
		return peerresponsemanager.NewResponseSender(ctx, p, peerManager, ipldBridge)
	}
	peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue)
	responseManager := responsemanager.New(ctx, loader, ipldBridge, peerResponseManager, peerTaskQueue)
64
	graphSync := &GraphSync{
65 66 67
		ipldBridge:          ipldBridge,
		network:             network,
		loader:              loader,
68 69
		storer:              storer,
		asyncLoader:         asyncLoader,
70 71 72 73 74 75 76
		requestManager:      requestManager,
		peerManager:         peerManager,
		peerTaskQueue:       peerTaskQueue,
		peerResponseManager: peerResponseManager,
		responseManager:     responseManager,
		ctx:                 ctx,
		cancel:              cancel,
77 78
	}

79
	asyncLoader.Startup()
80 81
	requestManager.SetDelegate(peerManager)
	requestManager.Startup()
82 83
	responseManager.Startup()
	network.SetDelegate(graphSync)
84 85 86 87
	return graphSync
}

// Request initiates a new GraphSync request to the given peer using the given selector spec.
88
func (gs *GraphSync) Request(ctx context.Context, p peer.ID, rootedSelector ipld.Node) (<-chan ResponseProgress, <-chan error) {
89 90
	return gs.requestManager.SendRequest(ctx, p, rootedSelector)
}
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105

// ReceiveMessage is part of the networks Receiver interface and receives
// incoming messages from the network
func (gs *GraphSync) ReceiveMessage(
	ctx context.Context,
	sender peer.ID,
	incoming gsmsg.GraphSyncMessage) {
	gs.responseManager.ProcessRequests(ctx, sender, incoming.Requests())
}

// ReceiveError is part of the network's Receiver interface and handles incoming
// errors from the network.
func (gs *GraphSync) ReceiveError(err error) {
	log.Errorf("Error: %s", err.Error())
}