graphsync.go 14.5 KB
Newer Older
1 2 3 4 5
package graphsync

import (
	"context"

Hannah Howard's avatar
Hannah Howard committed
6 7 8 9 10
	logging "github.com/ipfs/go-log"
	"github.com/ipfs/go-peertaskqueue"
	ipld "github.com/ipld/go-ipld-prime"
	"github.com/libp2p/go-libp2p-core/peer"

11
	"github.com/ipfs/go-graphsync"
12
	"github.com/ipfs/go-graphsync/listeners"
13 14 15 16 17
	gsmsg "github.com/ipfs/go-graphsync/message"
	"github.com/ipfs/go-graphsync/messagequeue"
	gsnet "github.com/ipfs/go-graphsync/network"
	"github.com/ipfs/go-graphsync/peermanager"
	"github.com/ipfs/go-graphsync/requestmanager"
18
	"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
Hannah Howard's avatar
Hannah Howard committed
19
	requestorhooks "github.com/ipfs/go-graphsync/requestmanager/hooks"
20
	"github.com/ipfs/go-graphsync/responsemanager"
21
	"github.com/ipfs/go-graphsync/responsemanager/allocator"
Hannah Howard's avatar
Hannah Howard committed
22
	responderhooks "github.com/ipfs/go-graphsync/responsemanager/hooks"
23
	"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
24
	"github.com/ipfs/go-graphsync/responsemanager/persistenceoptions"
25
	"github.com/ipfs/go-graphsync/selectorvalidator"
26 27 28 29
)

var log = logging.Logger("graphsync")

30
const maxRecursionDepth = 100
31 32
const defaultTotalMaxMemory = uint64(256 << 20)
const defaultMaxMemoryPerPeer = uint64(16 << 20)
33
const defaultMaxInProgressRequests = uint64(6)
34

35 36 37
// GraphSync is an instance of a GraphSync exchange that implements
// the graphsync protocol.
type GraphSync struct {
38 39 40 41 42 43 44 45 46 47 48 49
	network                     gsnet.GraphSyncNetwork
	loader                      ipld.Loader
	storer                      ipld.Storer
	requestManager              *requestmanager.RequestManager
	responseManager             *responsemanager.ResponseManager
	asyncLoader                 *asyncloader.AsyncLoader
	peerResponseManager         *peerresponsemanager.PeerResponseManager
	peerTaskQueue               *peertaskqueue.PeerTaskQueue
	peerManager                 *peermanager.PeerMessageManager
	incomingRequestHooks        *responderhooks.IncomingRequestHooks
	outgoingBlockHooks          *responderhooks.OutgoingBlockHooks
	requestUpdatedHooks         *responderhooks.RequestUpdatedHooks
50 51 52 53
	completedResponseListeners  *listeners.CompletedResponseListeners
	requestorCancelledListeners *listeners.RequestorCancelledListeners
	blockSentListeners          *listeners.BlockSentListeners
	networkErrorListeners       *listeners.NetworkErrorListeners
54
	receiverErrorListeners      *listeners.NetworkReceiverErrorListeners
55 56 57 58 59 60 61
	incomingResponseHooks       *requestorhooks.IncomingResponseHooks
	outgoingRequestHooks        *requestorhooks.OutgoingRequestHooks
	incomingBlockHooks          *requestorhooks.IncomingBlockHooks
	persistenceOptions          *persistenceoptions.PersistenceOptions
	ctx                         context.Context
	cancel                      context.CancelFunc
	unregisterDefaultValidator  graphsync.UnregisterHookFunc
62 63 64
	allocator                   *allocator.Allocator
	totalMaxMemory              uint64
	maxMemoryPerPeer            uint64
65
	maxInProgressRequests       uint64
66 67 68 69 70 71 72 73 74 75 76 77
}

// Option defines the functional option type that can be used to configure
// graphsync instances
type Option func(*GraphSync)

// RejectAllRequestsByDefault means that without hooks registered
// that perform their own request validation, all requests are rejected
func RejectAllRequestsByDefault() Option {
	return func(gs *GraphSync) {
		gs.unregisterDefaultValidator()
	}
78 79
}

80 81
// MaxMemoryResponder defines the maximum amount of memory the responder
// may consume queueing up messages for a response in total
82 83 84 85 86 87
func MaxMemoryResponder(totalMaxMemory uint64) Option {
	return func(gs *GraphSync) {
		gs.totalMaxMemory = totalMaxMemory
	}
}

88 89
// MaxMemoryPerPeerResponder defines the maximum amount of memory a peer
// may consume queueing up messages for a response
90 91 92 93 94 95
func MaxMemoryPerPeerResponder(maxMemoryPerPeer uint64) Option {
	return func(gs *GraphSync) {
		gs.maxMemoryPerPeer = maxMemoryPerPeer
	}
}

96 97 98 99 100 101 102 103
// MaxInProgressRequests changes the maximum number of
// graphsync requests that are processed in parallel (default 6)
func MaxInProgressRequests(maxInProgressRequests uint64) Option {
	return func(gs *GraphSync) {
		gs.maxInProgressRequests = maxInProgressRequests
	}
}

104
// New creates a new GraphSync Exchange on the given network,
105
// and the given link loader+storer.
106
func New(parent context.Context, network gsnet.GraphSyncNetwork,
107
	loader ipld.Loader, storer ipld.Storer, options ...Option) graphsync.GraphExchange {
108 109 110 111 112 113 114
	ctx, cancel := context.WithCancel(parent)

	createMessageQueue := func(ctx context.Context, p peer.ID) peermanager.PeerQueue {
		return messagequeue.New(ctx, p, network)
	}
	peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
	asyncLoader := asyncloader.New(ctx, loader, storer)
Hannah Howard's avatar
Hannah Howard committed
115 116
	incomingResponseHooks := requestorhooks.NewResponseHooks()
	outgoingRequestHooks := requestorhooks.NewRequestHooks()
Hannah Howard's avatar
Hannah Howard committed
117
	incomingBlockHooks := requestorhooks.NewBlockHooks()
118
	networkErrorListeners := listeners.NewNetworkErrorListeners()
119
	receiverErrorListeners := listeners.NewReceiverNetworkErrorListeners()
120
	requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners)
121
	peerTaskQueue := peertaskqueue.New()
122

123
	persistenceOptions := persistenceoptions.New()
Hannah Howard's avatar
Hannah Howard committed
124 125 126
	incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions)
	outgoingBlockHooks := responderhooks.NewBlockHooks()
	requestUpdatedHooks := responderhooks.NewUpdateHooks()
127 128 129
	completedResponseListeners := listeners.NewCompletedResponseListeners()
	requestorCancelledListeners := listeners.NewRequestorCancelledListeners()
	blockSentListeners := listeners.NewBlockSentListeners()
130
	unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
131
	graphSync := &GraphSync{
132 133 134 135 136 137 138 139 140 141
		network:                     network,
		loader:                      loader,
		storer:                      storer,
		asyncLoader:                 asyncLoader,
		requestManager:              requestManager,
		peerManager:                 peerManager,
		persistenceOptions:          persistenceOptions,
		incomingRequestHooks:        incomingRequestHooks,
		outgoingBlockHooks:          outgoingBlockHooks,
		requestUpdatedHooks:         requestUpdatedHooks,
142
		completedResponseListeners:  completedResponseListeners,
143
		requestorCancelledListeners: requestorCancelledListeners,
144 145
		blockSentListeners:          blockSentListeners,
		networkErrorListeners:       networkErrorListeners,
146
		receiverErrorListeners:      receiverErrorListeners,
147 148 149 150
		incomingResponseHooks:       incomingResponseHooks,
		outgoingRequestHooks:        outgoingRequestHooks,
		incomingBlockHooks:          incomingBlockHooks,
		peerTaskQueue:               peerTaskQueue,
151 152
		totalMaxMemory:              defaultTotalMaxMemory,
		maxMemoryPerPeer:            defaultMaxMemoryPerPeer,
153
		maxInProgressRequests:       defaultMaxInProgressRequests,
154 155 156
		ctx:                         ctx,
		cancel:                      cancel,
		unregisterDefaultValidator:  unregisterDefaultValidator,
157 158 159 160
	}

	for _, option := range options {
		option(graphSync)
161
	}
162 163 164 165 166 167 168
	allocator := allocator.NewAllocator(graphSync.totalMaxMemory, graphSync.maxMemoryPerPeer)
	graphSync.allocator = allocator
	createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender {
		return peerresponsemanager.NewResponseSender(ctx, p, peerManager, allocator)
	}
	peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue)
	graphSync.peerResponseManager = peerResponseManager
169
	responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, graphSync.maxInProgressRequests)
170
	graphSync.responseManager = responseManager
171 172 173 174 175 176 177 178 179 180

	asyncLoader.Startup()
	requestManager.SetDelegate(peerManager)
	requestManager.Startup()
	responseManager.Startup()
	network.SetDelegate((*graphSyncReceiver)(graphSync))
	return graphSync
}

// Request initiates a new GraphSync request to the given peer using the given selector spec.
181 182
func (gs *GraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) {
	return gs.requestManager.SendRequest(ctx, p, root, selector, extensions...)
183 184
}

185
// RegisterIncomingRequestHook adds a hook that runs when a request is received
186 187 188
// If overrideDefaultValidation is set to true, then if the hook does not error,
// it is considered to have "validated" the request -- and that validation supersedes
// the normal validation of requests Graphsync does (i.e. all selectors can be accepted)
189
func (gs *GraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingRequestHook) graphsync.UnregisterHookFunc {
190
	return gs.incomingRequestHooks.Register(hook)
191 192
}

193 194
// RegisterIncomingResponseHook adds a hook that runs when a response is received
func (gs *GraphSync) RegisterIncomingResponseHook(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
Hannah Howard's avatar
Hannah Howard committed
195
	return gs.incomingResponseHooks.Register(hook)
196 197 198 199
}

// RegisterOutgoingRequestHook adds a hook that runs immediately prior to sending a new request
func (gs *GraphSync) RegisterOutgoingRequestHook(hook graphsync.OnOutgoingRequestHook) graphsync.UnregisterHookFunc {
Hannah Howard's avatar
Hannah Howard committed
200
	return gs.outgoingRequestHooks.Register(hook)
201 202 203 204 205 206 207 208
}

// RegisterPersistenceOption registers an alternate loader/storer combo that can be substituted for the default
func (gs *GraphSync) RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error {
	err := gs.asyncLoader.RegisterPersistenceOption(name, loader, storer)
	if err != nil {
		return err
	}
209 210 211
	return gs.persistenceOptions.Register(name, loader)
}

212 213 214 215 216 217 218 219 220
// UnregisterPersistenceOption unregisters an alternate loader/storer combo
func (gs *GraphSync) UnregisterPersistenceOption(name string) error {
	err := gs.asyncLoader.UnregisterPersistenceOption(name)
	if err != nil {
		return err
	}
	return gs.persistenceOptions.Unregister(name)
}

221 222 223 224 225
// RegisterOutgoingBlockHook registers a hook that runs after each block is sent in a response
func (gs *GraphSync) RegisterOutgoingBlockHook(hook graphsync.OnOutgoingBlockHook) graphsync.UnregisterHookFunc {
	return gs.outgoingBlockHooks.Register(hook)
}

Hannah Howard's avatar
Hannah Howard committed
226 227 228 229 230
// RegisterRequestUpdatedHook registers a hook that runs when an update to a request is received
func (gs *GraphSync) RegisterRequestUpdatedHook(hook graphsync.OnRequestUpdatedHook) graphsync.UnregisterHookFunc {
	return gs.requestUpdatedHooks.Register(hook)
}

231 232 233
// RegisterCompletedResponseListener adds a listener on the responder for completed responses
func (gs *GraphSync) RegisterCompletedResponseListener(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc {
	return gs.completedResponseListeners.Register(listener)
234 235
}

Hannah Howard's avatar
Hannah Howard committed
236 237 238 239 240
// RegisterIncomingBlockHook adds a hook that runs when a block is received and validated (put in block store)
func (gs *GraphSync) RegisterIncomingBlockHook(hook graphsync.OnIncomingBlockHook) graphsync.UnregisterHookFunc {
	return gs.incomingBlockHooks.Register(hook)
}

241 242 243 244 245 246
// RegisterRequestorCancelledListener adds a listener on the responder for
// responses cancelled by the requestor
func (gs *GraphSync) RegisterRequestorCancelledListener(listener graphsync.OnRequestorCancelledListener) graphsync.UnregisterHookFunc {
	return gs.requestorCancelledListeners.Register(listener)
}

247 248 249 250 251 252 253 254 255 256
// RegisterBlockSentListener adds a listener for when blocks are actually sent over the wire
func (gs *GraphSync) RegisterBlockSentListener(listener graphsync.OnBlockSentListener) graphsync.UnregisterHookFunc {
	return gs.blockSentListeners.Register(listener)
}

// RegisterNetworkErrorListener adds a listener for when errors occur sending data over the wire
func (gs *GraphSync) RegisterNetworkErrorListener(listener graphsync.OnNetworkErrorListener) graphsync.UnregisterHookFunc {
	return gs.networkErrorListeners.Register(listener)
}

257 258 259 260 261
// RegisterReceiverNetworkErrorListener adds a listener for when errors occur receiving data over the wire
func (gs *GraphSync) RegisterReceiverNetworkErrorListener(listener graphsync.OnReceiverNetworkErrorListener) graphsync.UnregisterHookFunc {
	return gs.receiverErrorListeners.Register(listener)
}

262 263 264 265 266 267 268 269 270 271 272
// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
func (gs *GraphSync) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
	return gs.requestManager.UnpauseRequest(requestID, extensions...)
}

// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
func (gs *GraphSync) PauseRequest(requestID graphsync.RequestID) error {
	return gs.requestManager.PauseRequest(requestID)
}

273
// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
274 275 276 277 278 279 280 281 282 283 284 285
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
	return gs.responseManager.UnpauseResponse(p, requestID, extensions...)
}

// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
func (gs *GraphSync) PauseResponse(p peer.ID, requestID graphsync.RequestID) error {
	return gs.responseManager.PauseResponse(p, requestID)
}

// CancelResponse cancels an in progress response
func (gs *GraphSync) CancelResponse(p peer.ID, requestID graphsync.RequestID) error {
	return gs.responseManager.CancelResponse(p, requestID)
286 287
}

288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
type graphSyncReceiver GraphSync

func (gsr *graphSyncReceiver) graphSync() *GraphSync {
	return (*GraphSync)(gsr)
}

// ReceiveMessage is part of the networks Receiver interface and receives
// incoming messages from the network
func (gsr *graphSyncReceiver) ReceiveMessage(
	ctx context.Context,
	sender peer.ID,
	incoming gsmsg.GraphSyncMessage) {
	gsr.graphSync().responseManager.ProcessRequests(ctx, sender, incoming.Requests())
	gsr.graphSync().requestManager.ProcessResponses(sender, incoming.Responses(), incoming.Blocks())
}

// ReceiveError is part of the network's Receiver interface and handles incoming
// errors from the network.
306 307 308
func (gsr *graphSyncReceiver) ReceiveError(p peer.ID, err error) {
	log.Infof("Graphsync ReceiveError from %s: %s", p, err)
	gsr.receiverErrorListeners.NotifyNetworkErrorListeners(p, err)
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
}

// Connected is part of the networks 's Receiver interface and handles peers connecting
// on the network
func (gsr *graphSyncReceiver) Connected(p peer.ID) {
	gsr.graphSync().peerManager.Connected(p)
	gsr.graphSync().peerResponseManager.Connected(p)
}

// Connected is part of the networks 's Receiver interface and handles peers connecting
// on the network
func (gsr *graphSyncReceiver) Disconnected(p peer.ID) {
	gsr.graphSync().peerManager.Disconnected(p)
	gsr.graphSync().peerResponseManager.Disconnected(p)
}