graphsync.go 11.6 KB
Newer Older
1 2 3 4 5
package graphsync

import (
	"context"

Hannah Howard's avatar
Hannah Howard committed
6 7 8 9 10
	logging "github.com/ipfs/go-log"
	"github.com/ipfs/go-peertaskqueue"
	ipld "github.com/ipld/go-ipld-prime"
	"github.com/libp2p/go-libp2p-core/peer"

11 12 13 14 15 16
	"github.com/ipfs/go-graphsync"
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/messagequeue"
	gsnet "github.com/ipfs/go-graphsync/network"
	"github.com/ipfs/go-graphsync/peermanager"
	"github.com/ipfs/go-graphsync/requestmanager"
17
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
Hannah Howard's avatar
Hannah Howard committed
18
	requestorhooks "github.com/ipfs/go-graphsync/requestmanager/hooks"
19
	"github.com/ipfs/go-graphsync/responsemanager"
Hannah Howard's avatar
Hannah Howard committed
20
	responderhooks "github.com/ipfs/go-graphsync/responsemanager/hooks"
21
	"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
22
	"github.com/ipfs/go-graphsync/responsemanager/persistenceoptions"
23
	"github.com/ipfs/go-graphsync/selectorvalidator"
24 25 26 27
)

var log = logging.Logger("graphsync")

28 29
const maxRecursionDepth = 100

30 31 32
// GraphSync is an instance of a GraphSync exchange that implements
// the graphsync protocol.
type GraphSync struct {
33 34 35 36 37 38 39 40 41 42 43 44
	network                     gsnet.GraphSyncNetwork
	loader                      ipld.Loader
	storer                      ipld.Storer
	requestManager              *requestmanager.RequestManager
	responseManager             *responsemanager.ResponseManager
	asyncLoader                 *asyncloader.AsyncLoader
	peerResponseManager         *peerresponsemanager.PeerResponseManager
	peerTaskQueue               *peertaskqueue.PeerTaskQueue
	peerManager                 *peermanager.PeerMessageManager
	incomingRequestHooks        *responderhooks.IncomingRequestHooks
	outgoingBlockHooks          *responderhooks.OutgoingBlockHooks
	requestUpdatedHooks         *responderhooks.RequestUpdatedHooks
45
	completedResponseListeners  *responderhooks.CompletedResponseListeners
46 47 48 49 50 51 52 53
	requestorCancelledListeners *responderhooks.RequestorCancelledListeners
	incomingResponseHooks       *requestorhooks.IncomingResponseHooks
	outgoingRequestHooks        *requestorhooks.OutgoingRequestHooks
	incomingBlockHooks          *requestorhooks.IncomingBlockHooks
	persistenceOptions          *persistenceoptions.PersistenceOptions
	ctx                         context.Context
	cancel                      context.CancelFunc
	unregisterDefaultValidator  graphsync.UnregisterHookFunc
54 55 56 57 58 59 60 61 62 63 64 65
}

// Option defines the functional option type that can be used to configure
// graphsync instances
type Option func(*GraphSync)

// RejectAllRequestsByDefault means that without hooks registered
// that perform their own request validation, all requests are rejected
func RejectAllRequestsByDefault() Option {
	return func(gs *GraphSync) {
		gs.unregisterDefaultValidator()
	}
66 67 68
}

// New creates a new GraphSync Exchange on the given network,
69
// and the given link loader+storer.
70
func New(parent context.Context, network gsnet.GraphSyncNetwork,
71
	loader ipld.Loader, storer ipld.Storer, options ...Option) graphsync.GraphExchange {
72 73 74 75 76 77 78
	ctx, cancel := context.WithCancel(parent)

	createMessageQueue := func(ctx context.Context, p peer.ID) peermanager.PeerQueue {
		return messagequeue.New(ctx, p, network)
	}
	peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
	asyncLoader := asyncloader.New(ctx, loader, storer)
Hannah Howard's avatar
Hannah Howard committed
79 80
	incomingResponseHooks := requestorhooks.NewResponseHooks()
	outgoingRequestHooks := requestorhooks.NewRequestHooks()
Hannah Howard's avatar
Hannah Howard committed
81 82
	incomingBlockHooks := requestorhooks.NewBlockHooks()
	requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks)
83 84
	peerTaskQueue := peertaskqueue.New()
	createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender {
85
		return peerresponsemanager.NewResponseSender(ctx, p, peerManager)
86 87
	}
	peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue)
88
	persistenceOptions := persistenceoptions.New()
Hannah Howard's avatar
Hannah Howard committed
89 90 91
	incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions)
	outgoingBlockHooks := responderhooks.NewBlockHooks()
	requestUpdatedHooks := responderhooks.NewUpdateHooks()
92
	completedResponseListeners := responderhooks.NewCompletedResponseListeners()
93
	requestorCancelledListeners := responderhooks.NewRequestorCancelledListeners()
94
	responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners)
95
	unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
96
	graphSync := &GraphSync{
97 98 99 100 101 102 103 104 105 106
		network:                     network,
		loader:                      loader,
		storer:                      storer,
		asyncLoader:                 asyncLoader,
		requestManager:              requestManager,
		peerManager:                 peerManager,
		persistenceOptions:          persistenceOptions,
		incomingRequestHooks:        incomingRequestHooks,
		outgoingBlockHooks:          outgoingBlockHooks,
		requestUpdatedHooks:         requestUpdatedHooks,
107
		completedResponseListeners:  completedResponseListeners,
108 109 110 111 112 113 114 115 116 117
		requestorCancelledListeners: requestorCancelledListeners,
		incomingResponseHooks:       incomingResponseHooks,
		outgoingRequestHooks:        outgoingRequestHooks,
		incomingBlockHooks:          incomingBlockHooks,
		peerTaskQueue:               peerTaskQueue,
		peerResponseManager:         peerResponseManager,
		responseManager:             responseManager,
		ctx:                         ctx,
		cancel:                      cancel,
		unregisterDefaultValidator:  unregisterDefaultValidator,
118 119 120 121
	}

	for _, option := range options {
		option(graphSync)
122 123 124 125 126 127 128 129 130 131 132
	}

	asyncLoader.Startup()
	requestManager.SetDelegate(peerManager)
	requestManager.Startup()
	responseManager.Startup()
	network.SetDelegate((*graphSyncReceiver)(graphSync))
	return graphSync
}

// Request initiates a new GraphSync request to the given peer using the given selector spec.
133 134
func (gs *GraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
	return gs.requestManager.SendRequest(ctx, p, root, selector, extensions...)
135 136
}

137
// RegisterIncomingRequestHook adds a hook that runs when a request is received
138 139 140
// If overrideDefaultValidation is set to true, then if the hook does not error,
// it is considered to have "validated" the request -- and that validation supersedes
// the normal validation of requests Graphsync does (i.e. all selectors can be accepted)
141
func (gs *GraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingRequestHook) graphsync.UnregisterHookFunc {
142
	return gs.incomingRequestHooks.Register(hook)
143 144
}

145 146
// RegisterIncomingResponseHook adds a hook that runs when a response is received
func (gs *GraphSync) RegisterIncomingResponseHook(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
Hannah Howard's avatar
Hannah Howard committed
147
	return gs.incomingResponseHooks.Register(hook)
148 149 150 151
}

// RegisterOutgoingRequestHook adds a hook that runs immediately prior to sending a new request
func (gs *GraphSync) RegisterOutgoingRequestHook(hook graphsync.OnOutgoingRequestHook) graphsync.UnregisterHookFunc {
Hannah Howard's avatar
Hannah Howard committed
152
	return gs.outgoingRequestHooks.Register(hook)
153 154 155 156 157 158 159 160
}

// RegisterPersistenceOption registers an alternate loader/storer combo that can be substituted for the default
func (gs *GraphSync) RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error {
	err := gs.asyncLoader.RegisterPersistenceOption(name, loader, storer)
	if err != nil {
		return err
	}
161 162 163
	return gs.persistenceOptions.Register(name, loader)
}

164 165 166 167 168 169 170 171 172
// UnregisterPersistenceOption unregisters an alternate loader/storer combo
func (gs *GraphSync) UnregisterPersistenceOption(name string) error {
	err := gs.asyncLoader.UnregisterPersistenceOption(name)
	if err != nil {
		return err
	}
	return gs.persistenceOptions.Unregister(name)
}

173 174 175 176 177
// RegisterOutgoingBlockHook registers a hook that runs after each block is sent in a response
func (gs *GraphSync) RegisterOutgoingBlockHook(hook graphsync.OnOutgoingBlockHook) graphsync.UnregisterHookFunc {
	return gs.outgoingBlockHooks.Register(hook)
}

Hannah Howard's avatar
Hannah Howard committed
178 179 180 181 182
// RegisterRequestUpdatedHook registers a hook that runs when an update to a request is received
func (gs *GraphSync) RegisterRequestUpdatedHook(hook graphsync.OnRequestUpdatedHook) graphsync.UnregisterHookFunc {
	return gs.requestUpdatedHooks.Register(hook)
}

183 184 185
// RegisterCompletedResponseListener adds a listener on the responder for completed responses
func (gs *GraphSync) RegisterCompletedResponseListener(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc {
	return gs.completedResponseListeners.Register(listener)
186 187
}

Hannah Howard's avatar
Hannah Howard committed
188 189 190 191 192
// RegisterIncomingBlockHook adds a hook that runs when a block is received and validated (put in block store)
func (gs *GraphSync) RegisterIncomingBlockHook(hook graphsync.OnIncomingBlockHook) graphsync.UnregisterHookFunc {
	return gs.incomingBlockHooks.Register(hook)
}

193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
// RegisterRequestorCancelledListener adds a listener on the responder for
// responses cancelled by the requestor
func (gs *GraphSync) RegisterRequestorCancelledListener(listener graphsync.OnRequestorCancelledListener) graphsync.UnregisterHookFunc {
	return gs.requestorCancelledListeners.Register(listener)
}

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
func (gs *GraphSync) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
	return gs.requestManager.UnpauseRequest(requestID, extensions...)
}

// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
func (gs *GraphSync) PauseRequest(requestID graphsync.RequestID) error {
	return gs.requestManager.PauseRequest(requestID)
}

210
// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
211 212 213 214 215 216 217 218 219 220 221 222
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
	return gs.responseManager.UnpauseResponse(p, requestID, extensions...)
}

// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
func (gs *GraphSync) PauseResponse(p peer.ID, requestID graphsync.RequestID) error {
	return gs.responseManager.PauseResponse(p, requestID)
}

// CancelResponse cancels an in progress response
func (gs *GraphSync) CancelResponse(p peer.ID, requestID graphsync.RequestID) error {
	return gs.responseManager.CancelResponse(p, requestID)
223 224
}

225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
type graphSyncReceiver GraphSync

func (gsr *graphSyncReceiver) graphSync() *GraphSync {
	return (*GraphSync)(gsr)
}

// ReceiveMessage is part of the networks Receiver interface and receives
// incoming messages from the network
func (gsr *graphSyncReceiver) ReceiveMessage(
	ctx context.Context,
	sender peer.ID,
	incoming gsmsg.GraphSyncMessage) {
	gsr.graphSync().responseManager.ProcessRequests(ctx, sender, incoming.Requests())
	gsr.graphSync().requestManager.ProcessResponses(sender, incoming.Responses(), incoming.Blocks())
}

// ReceiveError is part of the network's Receiver interface and handles incoming
// errors from the network.
func (gsr *graphSyncReceiver) ReceiveError(err error) {
	log.Infof("Graphsync ReceiveError: %s", err)
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
}

// Connected is part of the networks 's Receiver interface and handles peers connecting
// on the network
func (gsr *graphSyncReceiver) Connected(p peer.ID) {
	gsr.graphSync().peerManager.Connected(p)
	gsr.graphSync().peerResponseManager.Connected(p)
}

// Connected is part of the networks 's Receiver interface and handles peers connecting
// on the network
func (gsr *graphSyncReceiver) Disconnected(p peer.ID) {
	gsr.graphSync().peerManager.Disconnected(p)
	gsr.graphSync().peerResponseManager.Disconnected(p)
}