graphsync.go 9.34 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
package graphsync

import (
	"context"

	"github.com/ipfs/go-graphsync"
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/messagequeue"
	gsnet "github.com/ipfs/go-graphsync/network"
	"github.com/ipfs/go-graphsync/peermanager"
	"github.com/ipfs/go-graphsync/requestmanager"
12
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
Hannah Howard's avatar
Hannah Howard committed
13
	requestorhooks "github.com/ipfs/go-graphsync/requestmanager/hooks"
14
	"github.com/ipfs/go-graphsync/responsemanager"
Hannah Howard's avatar
Hannah Howard committed
15
	responderhooks "github.com/ipfs/go-graphsync/responsemanager/hooks"
16
	"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
17
	"github.com/ipfs/go-graphsync/responsemanager/persistenceoptions"
18
	"github.com/ipfs/go-graphsync/selectorvalidator"
19 20 21 22 23 24 25 26
	logging "github.com/ipfs/go-log"
	"github.com/ipfs/go-peertaskqueue"
	ipld "github.com/ipld/go-ipld-prime"
	"github.com/libp2p/go-libp2p-core/peer"
)

var log = logging.Logger("graphsync")

27 28
const maxRecursionDepth = 100

29 30 31
// GraphSync is an instance of a GraphSync exchange that implements
// the graphsync protocol.
type GraphSync struct {
32 33 34 35 36 37 38 39 40
	network                    gsnet.GraphSyncNetwork
	loader                     ipld.Loader
	storer                     ipld.Storer
	requestManager             *requestmanager.RequestManager
	responseManager            *responsemanager.ResponseManager
	asyncLoader                *asyncloader.AsyncLoader
	peerResponseManager        *peerresponsemanager.PeerResponseManager
	peerTaskQueue              *peertaskqueue.PeerTaskQueue
	peerManager                *peermanager.PeerMessageManager
Hannah Howard's avatar
Hannah Howard committed
41 42 43
	incomingRequestHooks       *responderhooks.IncomingRequestHooks
	outgoingBlockHooks         *responderhooks.OutgoingBlockHooks
	requestUpdatedHooks        *responderhooks.RequestUpdatedHooks
44
	completedResponseListeners *responderhooks.CompletedResponseListeners
Hannah Howard's avatar
Hannah Howard committed
45 46
	incomingResponseHooks      *requestorhooks.IncomingResponseHooks
	outgoingRequestHooks       *requestorhooks.OutgoingRequestHooks
47
	persistenceOptions         *persistenceoptions.PersistenceOptions
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
	ctx                        context.Context
	cancel                     context.CancelFunc
	unregisterDefaultValidator graphsync.UnregisterHookFunc
}

// Option defines the functional option type that can be used to configure
// graphsync instances
type Option func(*GraphSync)

// RejectAllRequestsByDefault means that without hooks registered
// that perform their own request validation, all requests are rejected
func RejectAllRequestsByDefault() Option {
	return func(gs *GraphSync) {
		gs.unregisterDefaultValidator()
	}
63 64 65
}

// New creates a new GraphSync Exchange on the given network,
66
// and the given link loader+storer.
67
func New(parent context.Context, network gsnet.GraphSyncNetwork,
68
	loader ipld.Loader, storer ipld.Storer, options ...Option) graphsync.GraphExchange {
69 70 71 72 73 74 75
	ctx, cancel := context.WithCancel(parent)

	createMessageQueue := func(ctx context.Context, p peer.ID) peermanager.PeerQueue {
		return messagequeue.New(ctx, p, network)
	}
	peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
	asyncLoader := asyncloader.New(ctx, loader, storer)
Hannah Howard's avatar
Hannah Howard committed
76 77 78
	incomingResponseHooks := requestorhooks.NewResponseHooks()
	outgoingRequestHooks := requestorhooks.NewRequestHooks()
	requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks)
79 80
	peerTaskQueue := peertaskqueue.New()
	createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender {
81
		return peerresponsemanager.NewResponseSender(ctx, p, peerManager)
82 83
	}
	peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue)
84
	persistenceOptions := persistenceoptions.New()
Hannah Howard's avatar
Hannah Howard committed
85 86 87
	incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions)
	outgoingBlockHooks := responderhooks.NewBlockHooks()
	requestUpdatedHooks := responderhooks.NewUpdateHooks()
88 89
	completedResponseListeners := responderhooks.NewCompletedResponseListeners()
	responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners)
90
	unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
91
	graphSync := &GraphSync{
92 93 94 95 96 97
		network:                    network,
		loader:                     loader,
		storer:                     storer,
		asyncLoader:                asyncLoader,
		requestManager:             requestManager,
		peerManager:                peerManager,
98 99 100
		persistenceOptions:         persistenceOptions,
		incomingRequestHooks:       incomingRequestHooks,
		outgoingBlockHooks:         outgoingBlockHooks,
Hannah Howard's avatar
Hannah Howard committed
101
		requestUpdatedHooks:        requestUpdatedHooks,
102
		completedResponseListeners: completedResponseListeners,
Hannah Howard's avatar
Hannah Howard committed
103 104
		incomingResponseHooks:      incomingResponseHooks,
		outgoingRequestHooks:       outgoingRequestHooks,
105 106 107 108 109 110 111 112 113 114
		peerTaskQueue:              peerTaskQueue,
		peerResponseManager:        peerResponseManager,
		responseManager:            responseManager,
		ctx:                        ctx,
		cancel:                     cancel,
		unregisterDefaultValidator: unregisterDefaultValidator,
	}

	for _, option := range options {
		option(graphSync)
115 116 117 118 119 120 121 122 123 124 125
	}

	asyncLoader.Startup()
	requestManager.SetDelegate(peerManager)
	requestManager.Startup()
	responseManager.Startup()
	network.SetDelegate((*graphSyncReceiver)(graphSync))
	return graphSync
}

// Request initiates a new GraphSync request to the given peer using the given selector spec.
126 127
func (gs *GraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
	return gs.requestManager.SendRequest(ctx, p, root, selector, extensions...)
128 129
}

130
// RegisterIncomingRequestHook adds a hook that runs when a request is received
131 132 133
// If overrideDefaultValidation is set to true, then if the hook does not error,
// it is considered to have "validated" the request -- and that validation supersedes
// the normal validation of requests Graphsync does (i.e. all selectors can be accepted)
134
func (gs *GraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingRequestHook) graphsync.UnregisterHookFunc {
135
	return gs.incomingRequestHooks.Register(hook)
136 137
}

138 139
// RegisterIncomingResponseHook adds a hook that runs when a response is received
func (gs *GraphSync) RegisterIncomingResponseHook(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
Hannah Howard's avatar
Hannah Howard committed
140
	return gs.incomingResponseHooks.Register(hook)
141 142 143 144
}

// RegisterOutgoingRequestHook adds a hook that runs immediately prior to sending a new request
func (gs *GraphSync) RegisterOutgoingRequestHook(hook graphsync.OnOutgoingRequestHook) graphsync.UnregisterHookFunc {
Hannah Howard's avatar
Hannah Howard committed
145
	return gs.outgoingRequestHooks.Register(hook)
146 147 148 149 150 151 152 153
}

// RegisterPersistenceOption registers an alternate loader/storer combo that can be substituted for the default
func (gs *GraphSync) RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error {
	err := gs.asyncLoader.RegisterPersistenceOption(name, loader, storer)
	if err != nil {
		return err
	}
154 155 156 157 158 159 160 161
	return gs.persistenceOptions.Register(name, loader)
}

// RegisterOutgoingBlockHook registers a hook that runs after each block is sent in a response
func (gs *GraphSync) RegisterOutgoingBlockHook(hook graphsync.OnOutgoingBlockHook) graphsync.UnregisterHookFunc {
	return gs.outgoingBlockHooks.Register(hook)
}

Hannah Howard's avatar
Hannah Howard committed
162 163 164 165 166
// RegisterRequestUpdatedHook registers a hook that runs when an update to a request is received
func (gs *GraphSync) RegisterRequestUpdatedHook(hook graphsync.OnRequestUpdatedHook) graphsync.UnregisterHookFunc {
	return gs.requestUpdatedHooks.Register(hook)
}

167 168 169 170 171
// RegisterCompletedResponseListener adds a listener on the responder for completed responses
func (gs *GraphSync) RegisterCompletedResponseListener(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc {
	return gs.completedResponseListeners.Register(listener)
}

172 173 174
// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID) error {
	return gs.responseManager.UnpauseResponse(p, requestID)
175 176
}

177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
type graphSyncReceiver GraphSync

func (gsr *graphSyncReceiver) graphSync() *GraphSync {
	return (*GraphSync)(gsr)
}

// ReceiveMessage is part of the networks Receiver interface and receives
// incoming messages from the network
func (gsr *graphSyncReceiver) ReceiveMessage(
	ctx context.Context,
	sender peer.ID,
	incoming gsmsg.GraphSyncMessage) {
	gsr.graphSync().responseManager.ProcessRequests(ctx, sender, incoming.Requests())
	gsr.graphSync().requestManager.ProcessResponses(sender, incoming.Responses(), incoming.Blocks())
}

// ReceiveError is part of the network's Receiver interface and handles incoming
// errors from the network.
func (gsr *graphSyncReceiver) ReceiveError(err error) {
	log.Infof("Graphsync ReceiveError: %s", err)
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
}

// Connected is part of the networks 's Receiver interface and handles peers connecting
// on the network
func (gsr *graphSyncReceiver) Connected(p peer.ID) {
	gsr.graphSync().peerManager.Connected(p)
	gsr.graphSync().peerResponseManager.Connected(p)
}

// Connected is part of the networks 's Receiver interface and handles peers connecting
// on the network
func (gsr *graphSyncReceiver) Disconnected(p peer.ID) {
	gsr.graphSync().peerManager.Disconnected(p)
	gsr.graphSync().peerResponseManager.Disconnected(p)
}