graphsync.go 5.38 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
package graphsync

import (
	"context"

	"github.com/ipfs/go-graphsync"
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader"

	"github.com/ipfs/go-graphsync/ipldbridge"
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/messagequeue"
	gsnet "github.com/ipfs/go-graphsync/network"
	"github.com/ipfs/go-graphsync/peermanager"
	"github.com/ipfs/go-graphsync/requestmanager"
	"github.com/ipfs/go-graphsync/responsemanager"
	"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
	logging "github.com/ipfs/go-log"
	"github.com/ipfs/go-peertaskqueue"
	ipld "github.com/ipld/go-ipld-prime"
	"github.com/libp2p/go-libp2p-core/peer"
)

var log = logging.Logger("graphsync")

// GraphSync is an instance of a GraphSync exchange that implements
// the graphsync protocol.
type GraphSync struct {
	ipldBridge          ipldbridge.IPLDBridge
	network             gsnet.GraphSyncNetwork
	loader              ipldbridge.Loader
	storer              ipldbridge.Storer
	requestManager      *requestmanager.RequestManager
	responseManager     *responsemanager.ResponseManager
	asyncLoader         *asyncloader.AsyncLoader
	peerResponseManager *peerresponsemanager.PeerResponseManager
	peerTaskQueue       *peertaskqueue.PeerTaskQueue
	peerManager         *peermanager.PeerMessageManager
	ctx                 context.Context
	cancel              context.CancelFunc
}

// New creates a new GraphSync Exchange on the given network,
// using the given bridge to IPLD and the given link loader.
func New(parent context.Context, network gsnet.GraphSyncNetwork,
	ipldBridge ipldbridge.IPLDBridge, loader ipldbridge.Loader,
	storer ipldbridge.Storer) graphsync.GraphExchange {
	ctx, cancel := context.WithCancel(parent)

	createMessageQueue := func(ctx context.Context, p peer.ID) peermanager.PeerQueue {
		return messagequeue.New(ctx, p, network)
	}
	peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
	asyncLoader := asyncloader.New(ctx, loader, storer)
54
	requestManager := requestmanager.New(ctx, asyncLoader)
55 56
	peerTaskQueue := peertaskqueue.New()
	createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender {
57
		return peerresponsemanager.NewResponseSender(ctx, p, peerManager)
58 59
	}
	peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue)
60
	responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue)
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
	graphSync := &GraphSync{
		ipldBridge:          ipldBridge,
		network:             network,
		loader:              loader,
		storer:              storer,
		asyncLoader:         asyncLoader,
		requestManager:      requestManager,
		peerManager:         peerManager,
		peerTaskQueue:       peerTaskQueue,
		peerResponseManager: peerResponseManager,
		responseManager:     responseManager,
		ctx:                 ctx,
		cancel:              cancel,
	}

	asyncLoader.Startup()
	requestManager.SetDelegate(peerManager)
	requestManager.Startup()
	responseManager.Startup()
	network.SetDelegate((*graphSyncReceiver)(graphSync))
	return graphSync
}

// Request initiates a new GraphSync request to the given peer using the given selector spec.
85 86
func (gs *GraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
	return gs.requestManager.SendRequest(ctx, p, root, selector, extensions...)
87 88
}

89 90 91 92
// RegisterRequestReceivedHook adds a hook that runs when a request is received
// If overrideDefaultValidation is set to true, then if the hook does not error,
// it is considered to have "validated" the request -- and that validation supersedes
// the normal validation of requests Graphsync does (i.e. all selectors can be accepted)
93 94
func (gs *GraphSync) RegisterRequestReceivedHook(hook graphsync.OnRequestReceivedHook) error {
	gs.responseManager.RegisterHook(hook)
95 96 97 98
	return nil
}

// RegisterResponseReceivedHook adds a hook that runs when a response is received
99
func (gs *GraphSync) RegisterResponseReceivedHook(hook graphsync.OnResponseReceivedHook) error {
100
	gs.requestManager.RegisterHook(hook)
101 102 103
	return nil
}

104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
type graphSyncReceiver GraphSync

func (gsr *graphSyncReceiver) graphSync() *GraphSync {
	return (*GraphSync)(gsr)
}

// ReceiveMessage is part of the networks Receiver interface and receives
// incoming messages from the network
func (gsr *graphSyncReceiver) ReceiveMessage(
	ctx context.Context,
	sender peer.ID,
	incoming gsmsg.GraphSyncMessage) {
	gsr.graphSync().responseManager.ProcessRequests(ctx, sender, incoming.Requests())
	gsr.graphSync().requestManager.ProcessResponses(sender, incoming.Responses(), incoming.Blocks())
}

// ReceiveError is part of the network's Receiver interface and handles incoming
// errors from the network.
func (gsr *graphSyncReceiver) ReceiveError(err error) {
	log.Infof("Graphsync ReceiveError: %s", err)
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
}

// Connected is part of the networks 's Receiver interface and handles peers connecting
// on the network
func (gsr *graphSyncReceiver) Connected(p peer.ID) {
	gsr.graphSync().peerManager.Connected(p)
	gsr.graphSync().peerResponseManager.Connected(p)
}

// Connected is part of the networks 's Receiver interface and handles peers connecting
// on the network
func (gsr *graphSyncReceiver) Disconnected(p peer.ID) {
	gsr.graphSync().peerManager.Disconnected(p)
	gsr.graphSync().peerResponseManager.Disconnected(p)
}