graphsync.go 3.99 KB
Newer Older
1 2 3 4 5
package graphsync

import (
	"context"

6
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
7
	"github.com/ipfs/go-graphsync/requestmanager/types"
8

9
	"github.com/ipfs/go-graphsync/ipldbridge"
10 11
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/messagequeue"
12
	gsnet "github.com/ipfs/go-graphsync/network"
13
	"github.com/ipfs/go-graphsync/peermanager"
14
	"github.com/ipfs/go-graphsync/requestmanager"
15 16
	"github.com/ipfs/go-graphsync/responsemanager"
	"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
17
	"github.com/ipfs/go-peertaskqueue"
18
	logging "github.com/ipfs/go-log"
19 20 21 22
	ipld "github.com/ipld/go-ipld-prime"
	"github.com/libp2p/go-libp2p-peer"
)

23 24
var log = logging.Logger("graphsync")

25 26
// ResponseProgress is the fundamental unit of responses making progress in
// Graphsync.
27
type ResponseProgress = types.ResponseProgress
28 29 30 31

// GraphSync is an instance of a GraphSync exchange that implements
// the graphsync protocol.
type GraphSync struct {
32 33 34
	ipldBridge          ipldbridge.IPLDBridge
	network             gsnet.GraphSyncNetwork
	loader              ipldbridge.Loader
35
	storer              ipldbridge.Storer
36 37
	requestManager      *requestmanager.RequestManager
	responseManager     *responsemanager.ResponseManager
38
	asyncLoader         *asyncloader.AsyncLoader
39 40 41 42 43
	peerResponseManager *peerresponsemanager.PeerResponseManager
	peerTaskQueue       *peertaskqueue.PeerTaskQueue
	peerManager         *peermanager.PeerMessageManager
	ctx                 context.Context
	cancel              context.CancelFunc
44 45 46 47 48
}

// New creates a new GraphSync Exchange on the given network,
// using the given bridge to IPLD and the given link loader.
func New(parent context.Context, network gsnet.GraphSyncNetwork,
49 50
	ipldBridge ipldbridge.IPLDBridge, loader ipldbridge.Loader,
	storer ipldbridge.Storer) *GraphSync {
51 52 53 54 55
	ctx, cancel := context.WithCancel(parent)

	createMessageQueue := func(ctx context.Context, p peer.ID) peermanager.PeerQueue {
		return messagequeue.New(ctx, p, network)
	}
56
	peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
57 58
	asyncLoader := asyncloader.New(ctx, loader, storer)
	requestManager := requestmanager.New(ctx, asyncLoader, ipldBridge)
59 60 61 62 63 64
	peerTaskQueue := peertaskqueue.New()
	createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender {
		return peerresponsemanager.NewResponseSender(ctx, p, peerManager, ipldBridge)
	}
	peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue)
	responseManager := responsemanager.New(ctx, loader, ipldBridge, peerResponseManager, peerTaskQueue)
65
	graphSync := &GraphSync{
66 67 68
		ipldBridge:          ipldBridge,
		network:             network,
		loader:              loader,
69 70
		storer:              storer,
		asyncLoader:         asyncLoader,
71 72 73 74 75 76 77
		requestManager:      requestManager,
		peerManager:         peerManager,
		peerTaskQueue:       peerTaskQueue,
		peerResponseManager: peerResponseManager,
		responseManager:     responseManager,
		ctx:                 ctx,
		cancel:              cancel,
78 79
	}

80
	asyncLoader.Startup()
81 82
	requestManager.SetDelegate(peerManager)
	requestManager.Startup()
83 84
	responseManager.Startup()
	network.SetDelegate(graphSync)
85 86 87 88
	return graphSync
}

// Request initiates a new GraphSync request to the given peer using the given selector spec.
89
func (gs *GraphSync) Request(ctx context.Context, p peer.ID, rootedSelector ipld.Node) (<-chan ResponseProgress, <-chan error) {
90 91
	return gs.requestManager.SendRequest(ctx, p, rootedSelector)
}
92 93 94 95 96 97 98 99

// ReceiveMessage is part of the networks Receiver interface and receives
// incoming messages from the network
func (gs *GraphSync) ReceiveMessage(
	ctx context.Context,
	sender peer.ID,
	incoming gsmsg.GraphSyncMessage) {
	gs.responseManager.ProcessRequests(ctx, sender, incoming.Requests())
100
	gs.requestManager.ProcessResponses(sender, incoming.Responses(), incoming.Blocks())
101 102 103 104 105 106 107
}

// ReceiveError is part of the network's Receiver interface and handles incoming
// errors from the network.
func (gs *GraphSync) ReceiveError(err error) {
	log.Errorf("Error: %s", err.Error())
}