graphsync.go 14 KB
Newer Older
1 2 3 4 5
package graphsync

import (
	"context"

Hannah Howard's avatar
Hannah Howard committed
6 7 8 9 10
	logging "github.com/ipfs/go-log"
	"github.com/ipfs/go-peertaskqueue"
	ipld "github.com/ipld/go-ipld-prime"
	"github.com/libp2p/go-libp2p-core/peer"

11
	"github.com/ipfs/go-graphsync"
12
	"github.com/ipfs/go-graphsync/listeners"
13 14 15 16 17
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/messagequeue"
	gsnet "github.com/ipfs/go-graphsync/network"
	"github.com/ipfs/go-graphsync/peermanager"
	"github.com/ipfs/go-graphsync/requestmanager"
18
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
Hannah Howard's avatar
Hannah Howard committed
19
	requestorhooks "github.com/ipfs/go-graphsync/requestmanager/hooks"
20
	"github.com/ipfs/go-graphsync/responsemanager"
21
	"github.com/ipfs/go-graphsync/responsemanager/allocator"
Hannah Howard's avatar
Hannah Howard committed
22
	responderhooks "github.com/ipfs/go-graphsync/responsemanager/hooks"
23
	"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
24
	"github.com/ipfs/go-graphsync/responsemanager/persistenceoptions"
25
	"github.com/ipfs/go-graphsync/selectorvalidator"
26 27 28 29
)

var log = logging.Logger("graphsync")

30
const maxRecursionDepth = 100
31 32
const defaultTotalMaxMemory = uint64(256 << 20)
const defaultMaxMemoryPerPeer = uint64(16 << 20)
33
const defaultMaxInProgressRequests = uint64(6)
34

35 36 37
// GraphSync is an instance of a GraphSync exchange that implements
// the graphsync protocol.
type GraphSync struct {
38 39 40 41 42 43 44 45 46 47 48 49
	network                     gsnet.GraphSyncNetwork
	loader                      ipld.Loader
	storer                      ipld.Storer
	requestManager              *requestmanager.RequestManager
	responseManager             *responsemanager.ResponseManager
	asyncLoader                 *asyncloader.AsyncLoader
	peerResponseManager         *peerresponsemanager.PeerResponseManager
	peerTaskQueue               *peertaskqueue.PeerTaskQueue
	peerManager                 *peermanager.PeerMessageManager
	incomingRequestHooks        *responderhooks.IncomingRequestHooks
	outgoingBlockHooks          *responderhooks.OutgoingBlockHooks
	requestUpdatedHooks         *responderhooks.RequestUpdatedHooks
50 51 52 53
	completedResponseListeners  *listeners.CompletedResponseListeners
	requestorCancelledListeners *listeners.RequestorCancelledListeners
	blockSentListeners          *listeners.BlockSentListeners
	networkErrorListeners       *listeners.NetworkErrorListeners
54 55 56 57 58 59 60
	incomingResponseHooks       *requestorhooks.IncomingResponseHooks
	outgoingRequestHooks        *requestorhooks.OutgoingRequestHooks
	incomingBlockHooks          *requestorhooks.IncomingBlockHooks
	persistenceOptions          *persistenceoptions.PersistenceOptions
	ctx                         context.Context
	cancel                      context.CancelFunc
	unregisterDefaultValidator  graphsync.UnregisterHookFunc
61 62 63
	allocator                   *allocator.Allocator
	totalMaxMemory              uint64
	maxMemoryPerPeer            uint64
64
	maxInProgressRequests       uint64
65 66 67 68 69 70 71 72 73 74 75 76
}

// Option defines the functional option type that can be used to configure
// graphsync instances
type Option func(*GraphSync)

// RejectAllRequestsByDefault means that without hooks registered
// that perform their own request validation, all requests are rejected
func RejectAllRequestsByDefault() Option {
	return func(gs *GraphSync) {
		gs.unregisterDefaultValidator()
	}
77 78
}

79 80
// MaxMemoryResponder defines the maximum amount of memory the responder
// may consume queueing up messages for a response in total
81 82 83 84 85 86
func MaxMemoryResponder(totalMaxMemory uint64) Option {
	return func(gs *GraphSync) {
		gs.totalMaxMemory = totalMaxMemory
	}
}

87 88
// MaxMemoryPerPeerResponder defines the maximum amount of memory a peer
// may consume queueing up messages for a response
89 90 91 92 93 94
func MaxMemoryPerPeerResponder(maxMemoryPerPeer uint64) Option {
	return func(gs *GraphSync) {
		gs.maxMemoryPerPeer = maxMemoryPerPeer
	}
}

95 96 97 98 99 100 101 102
// MaxInProgressRequests changes the maximum number of
// graphsync requests that are processed in parallel (default 6)
func MaxInProgressRequests(maxInProgressRequests uint64) Option {
	return func(gs *GraphSync) {
		gs.maxInProgressRequests = maxInProgressRequests
	}
}

103
// New creates a new GraphSync Exchange on the given network,
104
// and the given link loader+storer.
105
func New(parent context.Context, network gsnet.GraphSyncNetwork,
106
	loader ipld.Loader, storer ipld.Storer, options ...Option) graphsync.GraphExchange {
107 108 109 110 111 112 113
	ctx, cancel := context.WithCancel(parent)

	createMessageQueue := func(ctx context.Context, p peer.ID) peermanager.PeerQueue {
		return messagequeue.New(ctx, p, network)
	}
	peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
	asyncLoader := asyncloader.New(ctx, loader, storer)
Hannah Howard's avatar
Hannah Howard committed
114 115
	incomingResponseHooks := requestorhooks.NewResponseHooks()
	outgoingRequestHooks := requestorhooks.NewRequestHooks()
Hannah Howard's avatar
Hannah Howard committed
116
	incomingBlockHooks := requestorhooks.NewBlockHooks()
117 118
	networkErrorListeners := listeners.NewNetworkErrorListeners()
	requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners)
119
	peerTaskQueue := peertaskqueue.New()
120

121
	persistenceOptions := persistenceoptions.New()
Hannah Howard's avatar
Hannah Howard committed
122 123 124
	incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions)
	outgoingBlockHooks := responderhooks.NewBlockHooks()
	requestUpdatedHooks := responderhooks.NewUpdateHooks()
125 126 127
	completedResponseListeners := listeners.NewCompletedResponseListeners()
	requestorCancelledListeners := listeners.NewRequestorCancelledListeners()
	blockSentListeners := listeners.NewBlockSentListeners()
128
	unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
129
	graphSync := &GraphSync{
130 131 132 133 134 135 136 137 138 139
		network:                     network,
		loader:                      loader,
		storer:                      storer,
		asyncLoader:                 asyncLoader,
		requestManager:              requestManager,
		peerManager:                 peerManager,
		persistenceOptions:          persistenceOptions,
		incomingRequestHooks:        incomingRequestHooks,
		outgoingBlockHooks:          outgoingBlockHooks,
		requestUpdatedHooks:         requestUpdatedHooks,
140
		completedResponseListeners:  completedResponseListeners,
141
		requestorCancelledListeners: requestorCancelledListeners,
142 143
		blockSentListeners:          blockSentListeners,
		networkErrorListeners:       networkErrorListeners,
144 145 146 147
		incomingResponseHooks:       incomingResponseHooks,
		outgoingRequestHooks:        outgoingRequestHooks,
		incomingBlockHooks:          incomingBlockHooks,
		peerTaskQueue:               peerTaskQueue,
148 149
		totalMaxMemory:              defaultTotalMaxMemory,
		maxMemoryPerPeer:            defaultMaxMemoryPerPeer,
150
		maxInProgressRequests:       defaultMaxInProgressRequests,
151 152 153
		ctx:                         ctx,
		cancel:                      cancel,
		unregisterDefaultValidator:  unregisterDefaultValidator,
154 155 156 157
	}

	for _, option := range options {
		option(graphSync)
158
	}
159 160 161 162 163 164 165
	allocator := allocator.NewAllocator(graphSync.totalMaxMemory, graphSync.maxMemoryPerPeer)
	graphSync.allocator = allocator
	createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender {
		return peerresponsemanager.NewResponseSender(ctx, p, peerManager, allocator)
	}
	peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue)
	graphSync.peerResponseManager = peerResponseManager
166
	responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, graphSync.maxInProgressRequests)
167
	graphSync.responseManager = responseManager
168 169 170 171 172 173 174 175 176 177

	asyncLoader.Startup()
	requestManager.SetDelegate(peerManager)
	requestManager.Startup()
	responseManager.Startup()
	network.SetDelegate((*graphSyncReceiver)(graphSync))
	return graphSync
}

// Request initiates a new GraphSync request to the given peer using the given selector spec.
178 179
func (gs *GraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
	return gs.requestManager.SendRequest(ctx, p, root, selector, extensions...)
180 181
}

182
// RegisterIncomingRequestHook adds a hook that runs when a request is received
183 184 185
// If overrideDefaultValidation is set to true, then if the hook does not error,
// it is considered to have "validated" the request -- and that validation supersedes
// the normal validation of requests Graphsync does (i.e. all selectors can be accepted)
186
func (gs *GraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingRequestHook) graphsync.UnregisterHookFunc {
187
	return gs.incomingRequestHooks.Register(hook)
188 189
}

190 191
// RegisterIncomingResponseHook adds a hook that runs when a response is received
func (gs *GraphSync) RegisterIncomingResponseHook(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
Hannah Howard's avatar
Hannah Howard committed
192
	return gs.incomingResponseHooks.Register(hook)
193 194 195 196
}

// RegisterOutgoingRequestHook adds a hook that runs immediately prior to sending a new request
func (gs *GraphSync) RegisterOutgoingRequestHook(hook graphsync.OnOutgoingRequestHook) graphsync.UnregisterHookFunc {
Hannah Howard's avatar
Hannah Howard committed
197
	return gs.outgoingRequestHooks.Register(hook)
198 199 200 201 202 203 204 205
}

// RegisterPersistenceOption registers an alternate loader/storer combo that can be substituted for the default
func (gs *GraphSync) RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error {
	err := gs.asyncLoader.RegisterPersistenceOption(name, loader, storer)
	if err != nil {
		return err
	}
206 207 208
	return gs.persistenceOptions.Register(name, loader)
}

209 210 211 212 213 214 215 216 217
// UnregisterPersistenceOption unregisters an alternate loader/storer combo
func (gs *GraphSync) UnregisterPersistenceOption(name string) error {
	err := gs.asyncLoader.UnregisterPersistenceOption(name)
	if err != nil {
		return err
	}
	return gs.persistenceOptions.Unregister(name)
}

218 219 220 221 222
// RegisterOutgoingBlockHook registers a hook that runs after each block is sent in a response
func (gs *GraphSync) RegisterOutgoingBlockHook(hook graphsync.OnOutgoingBlockHook) graphsync.UnregisterHookFunc {
	return gs.outgoingBlockHooks.Register(hook)
}

Hannah Howard's avatar
Hannah Howard committed
223 224 225 226 227
// RegisterRequestUpdatedHook registers a hook that runs when an update to a request is received
func (gs *GraphSync) RegisterRequestUpdatedHook(hook graphsync.OnRequestUpdatedHook) graphsync.UnregisterHookFunc {
	return gs.requestUpdatedHooks.Register(hook)
}

228 229 230
// RegisterCompletedResponseListener adds a listener on the responder for completed responses
func (gs *GraphSync) RegisterCompletedResponseListener(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc {
	return gs.completedResponseListeners.Register(listener)
231 232
}

Hannah Howard's avatar
Hannah Howard committed
233 234 235 236 237
// RegisterIncomingBlockHook adds a hook that runs when a block is received and validated (put in block store)
func (gs *GraphSync) RegisterIncomingBlockHook(hook graphsync.OnIncomingBlockHook) graphsync.UnregisterHookFunc {
	return gs.incomingBlockHooks.Register(hook)
}

238 239 240 241 242 243
// RegisterRequestorCancelledListener adds a listener on the responder for
// responses cancelled by the requestor
func (gs *GraphSync) RegisterRequestorCancelledListener(listener graphsync.OnRequestorCancelledListener) graphsync.UnregisterHookFunc {
	return gs.requestorCancelledListeners.Register(listener)
}

244 245 246 247 248 249 250 251 252 253
// RegisterBlockSentListener adds a listener for when blocks are actually sent over the wire
func (gs *GraphSync) RegisterBlockSentListener(listener graphsync.OnBlockSentListener) graphsync.UnregisterHookFunc {
	return gs.blockSentListeners.Register(listener)
}

// RegisterNetworkErrorListener adds a listener for when errors occur sending data over the wire
func (gs *GraphSync) RegisterNetworkErrorListener(listener graphsync.OnNetworkErrorListener) graphsync.UnregisterHookFunc {
	return gs.networkErrorListeners.Register(listener)
}

254 255 256 257 258 259 260 261 262 263 264
// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
func (gs *GraphSync) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
	return gs.requestManager.UnpauseRequest(requestID, extensions...)
}

// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
func (gs *GraphSync) PauseRequest(requestID graphsync.RequestID) error {
	return gs.requestManager.PauseRequest(requestID)
}

265
// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
266 267 268 269 270 271 272 273 274 275 276 277
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
	return gs.responseManager.UnpauseResponse(p, requestID, extensions...)
}

// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
func (gs *GraphSync) PauseResponse(p peer.ID, requestID graphsync.RequestID) error {
	return gs.responseManager.PauseResponse(p, requestID)
}

// CancelResponse cancels an in progress response
func (gs *GraphSync) CancelResponse(p peer.ID, requestID graphsync.RequestID) error {
	return gs.responseManager.CancelResponse(p, requestID)
278 279
}

280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
type graphSyncReceiver GraphSync

func (gsr *graphSyncReceiver) graphSync() *GraphSync {
	return (*GraphSync)(gsr)
}

// ReceiveMessage is part of the networks Receiver interface and receives
// incoming messages from the network
func (gsr *graphSyncReceiver) ReceiveMessage(
	ctx context.Context,
	sender peer.ID,
	incoming gsmsg.GraphSyncMessage) {
	gsr.graphSync().responseManager.ProcessRequests(ctx, sender, incoming.Requests())
	gsr.graphSync().requestManager.ProcessResponses(sender, incoming.Responses(), incoming.Blocks())
}

// ReceiveError is part of the network's Receiver interface and handles incoming
// errors from the network.
func (gsr *graphSyncReceiver) ReceiveError(err error) {
	log.Infof("Graphsync ReceiveError: %s", err)
	// TODO log the network error
	// TODO bubble the network error up to the parent context/error logger
}

// Connected is part of the networks 's Receiver interface and handles peers connecting
// on the network
func (gsr *graphSyncReceiver) Connected(p peer.ID) {
	gsr.graphSync().peerManager.Connected(p)
	gsr.graphSync().peerResponseManager.Connected(p)
}

// Connected is part of the networks 's Receiver interface and handles peers connecting
// on the network
func (gsr *graphSyncReceiver) Disconnected(p peer.ID) {
	gsr.graphSync().peerManager.Disconnected(p)
	gsr.graphSync().peerResponseManager.Disconnected(p)
}