graphsync.go 14.2 KB
Newer Older
1 2 3 4
package graphsync

import (
	"context"
5
	"errors"
6

7
	"github.com/ipfs/go-cid"
8
	"github.com/ipld/go-ipld-prime"
9
	"github.com/ipld/go-ipld-prime/traversal"
Hannah Howard's avatar
Hannah Howard committed
10
	"github.com/libp2p/go-libp2p-core/peer"
11 12
)

13 14
// RequestID is a unique identifier for a GraphSync request.
type RequestID int32
15

16 17
// Priority a priority for a GraphSync request.
type Priority int32
18

19 20
// ResponseStatusCode is a status returned for a GraphSync Request.
type ResponseStatusCode int32
21

22 23
// ExtensionName is a name for a GraphSync extension
type ExtensionName string
24

25 26 27 28
// ExtensionData is a name/data pair for a graphsync extension
type ExtensionData struct {
	Name ExtensionName
	Data []byte
29
}
30

31 32 33 34 35 36 37 38 39 40 41 42 43 44
const (

	// Known Graphsync Extensions

	// ExtensionMetadata provides response metadata for a Graphsync request and is
	// documented at
	// https://github.com/ipld/specs/blob/master/block-layer/graphsync/known_extensions.md
	ExtensionMetadata = ExtensionName("graphsync/response-metadata")

	// ExtensionDoNotSendCIDs tells the responding peer not to send certain blocks if they
	// are encountered in a traversal and is documented at
	// https://github.com/ipld/specs/blob/master/block-layer/graphsync/known_extensions.md
	ExtensionDoNotSendCIDs = ExtensionName("graphsync/do-not-send-cids")

Hannah Howard's avatar
Hannah Howard committed
45 46 47 48
	// ExtensionDeDupByKey tells the responding peer to only deduplicate block sending
	// for requests that have the same key. The data for the extension is a string key
	ExtensionDeDupByKey = ExtensionName("graphsync/dedup-by-key")

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
	// GraphSync Response Status Codes

	// Informational Response Codes (partial)

	// RequestAcknowledged means the request was received and is being worked on.
	RequestAcknowledged = ResponseStatusCode(10)
	// AdditionalPeers means additional peers were found that may be able
	// to satisfy the request and contained in the extra block of the response.
	AdditionalPeers = ResponseStatusCode(11)
	// NotEnoughGas means fulfilling this request requires payment.
	NotEnoughGas = ResponseStatusCode(12)
	// OtherProtocol means a different type of response than GraphSync is
	// contained in extra.
	OtherProtocol = ResponseStatusCode(13)
	// PartialResponse may include blocks and metadata about the in progress response
	// in extra.
	PartialResponse = ResponseStatusCode(14)
Hannah Howard's avatar
Hannah Howard committed
66 67 68
	// RequestPaused indicates a request is paused and will not send any more data
	// until unpaused
	RequestPaused = ResponseStatusCode(15)
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92

	// Success Response Codes (request terminated)

	// RequestCompletedFull means the entire fulfillment of the GraphSync request
	// was sent back.
	RequestCompletedFull = ResponseStatusCode(20)
	// RequestCompletedPartial means the response is completed, and part of the
	// GraphSync request was sent back, but not the complete request.
	RequestCompletedPartial = ResponseStatusCode(21)

	// Error Response Codes (request terminated)

	// RequestRejected means the node did not accept the incoming request.
	RequestRejected = ResponseStatusCode(30)
	// RequestFailedBusy means the node is too busy, try again later. Backoff may
	// be contained in extra.
	RequestFailedBusy = ResponseStatusCode(31)
	// RequestFailedUnknown means the request failed for an unspecified reason. May
	// contain data about why in extra.
	RequestFailedUnknown = ResponseStatusCode(32)
	// RequestFailedLegal means the request failed for legal reasons.
	RequestFailedLegal = ResponseStatusCode(33)
	// RequestFailedContentNotFound means the respondent does not have the content.
	RequestFailedContentNotFound = ResponseStatusCode(34)
93 94
	// RequestCancelled means the responder was processing the request but decided to top, for whatever reason
	RequestCancelled = ResponseStatusCode(35)
95
)
96

97 98 99 100 101 102 103
// RequestContextCancelledErr is an error message received on the error channel when the request context given by the user is cancelled/times out
type RequestContextCancelledErr struct{}

func (e RequestContextCancelledErr) Error() string {
	return "Request Context Cancelled"
}

104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
// RequestFailedBusyErr is an error message received on the error channel when the peer is busy
type RequestFailedBusyErr struct{}

func (e RequestFailedBusyErr) Error() string {
	return "Request Failed - Peer Is Busy"
}

// RequestFailedContentNotFoundErr is an error message received on the error channel when the content is not found
type RequestFailedContentNotFoundErr struct{}

func (e RequestFailedContentNotFoundErr) Error() string {
	return "Request Failed - Content Not Found"
}

// RequestFailedLegalErr is an error message received on the error channel when the request fails for legal reasons
type RequestFailedLegalErr struct{}

func (e RequestFailedLegalErr) Error() string {
	return "Request Failed - For Legal Reasons"
}

// RequestFailedUnknownErr is an error message received on the error channel when the request fails for unknown reasons
type RequestFailedUnknownErr struct{}

func (e RequestFailedUnknownErr) Error() string {
	return "Request Failed - Unknown Reason"
}

// RequestCancelledErr is an error message received on the error channel that indicates the responder cancelled a request
type RequestCancelledErr struct{}

func (e RequestCancelledErr) Error() string {
	return "Request Failed - Responder Cancelled"
}

139 140 141 142 143
var (
	// ErrExtensionAlreadyRegistered means a user extension can be registered only once
	ErrExtensionAlreadyRegistered = errors.New("extension already registered")
)

144 145 146 147 148 149 150 151
// ResponseProgress is the fundamental unit of responses making progress in Graphsync.
type ResponseProgress struct {
	Node      ipld.Node // a node which matched the graphsync query
	Path      ipld.Path // the path of that node relative to the traversal start
	LastBlock struct {  // LastBlock stores the Path and Link of the last block edge we had to load.
		Path ipld.Path
		Link ipld.Link
	}
152 153
}

154 155 156 157 158 159 160 161 162
// RequestData describes a received graphsync request.
type RequestData interface {
	// ID Returns the request ID for this Request
	ID() RequestID

	// Root returns the CID to the root block of this request
	Root() cid.Cid

	// Selector returns the byte representation of the selector for this request
163
	Selector() ipld.Node
164 165 166 167 168 169 170 171 172 173 174 175

	// Priority returns the priority of this request
	Priority() Priority

	// Extension returns the content for an extension on a response, or errors
	// if extension is not present
	Extension(name ExtensionName) ([]byte, bool)

	// IsCancel returns true if this particular request is being cancelled
	IsCancel() bool
}

176 177 178 179 180 181 182 183 184 185 186 187 188
// ResponseData describes a received Graphsync response
type ResponseData interface {
	// RequestID returns the request ID for this response
	RequestID() RequestID

	// Status returns the status for a response
	Status() ResponseStatusCode

	// Extension returns the content for an extension on a response, or errors
	// if extension is not present
	Extension(name ExtensionName) ([]byte, bool)
}

189 190 191 192 193 194 195 196 197 198 199 200
// BlockData gives information about a block included in a graphsync response
type BlockData interface {
	// Link is the link/cid for the block
	Link() ipld.Link

	// BlockSize specifies the size of the block
	BlockSize() uint64

	// BlockSize specifies the amount of data actually transmitted over the network
	BlockSizeOnWire() uint64
}

201
// IncomingRequestHookActions are actions that a request hook can take to change
202
// behavior for the response
203
type IncomingRequestHookActions interface {
204
	SendExtensionData(ExtensionData)
205
	UsePersistenceOption(name string)
Hannah Howard's avatar
Hannah Howard committed
206
	UseLinkTargetNodeStyleChooser(traversal.LinkTargetNodeStyleChooser)
207 208
	TerminateWithError(error)
	ValidateRequest()
209
	PauseResponse()
210 211
}

Hannah Howard's avatar
Hannah Howard committed
212 213 214 215 216 217 218 219
// OutgoingBlockHookActions are actions that an outgoing block hook can take to
// change the execution of a request
type OutgoingBlockHookActions interface {
	SendExtensionData(ExtensionData)
	TerminateWithError(error)
	PauseResponse()
}

220
// OutgoingRequestHookActions are actions that an outgoing request hook can take
Hannah Howard's avatar
Hannah Howard committed
221
// to change the execution of a request
222 223
type OutgoingRequestHookActions interface {
	UsePersistenceOption(name string)
Hannah Howard's avatar
Hannah Howard committed
224
	UseLinkTargetNodeStyleChooser(traversal.LinkTargetNodeStyleChooser)
225 226
}

Hannah Howard's avatar
Hannah Howard committed
227 228 229
// IncomingResponseHookActions are actions that incoming response hook can take
// to change the execution of a request
type IncomingResponseHookActions interface {
230
	TerminateWithError(error)
Hannah Howard's avatar
Hannah Howard committed
231 232 233
	UpdateRequestWithExtensions(...ExtensionData)
}

Hannah Howard's avatar
Hannah Howard committed
234 235 236 237 238
// IncomingBlockHookActions are actions that incoming block hook can take
// to change the execution of a request
type IncomingBlockHookActions interface {
	TerminateWithError(error)
	UpdateRequestWithExtensions(...ExtensionData)
239
	PauseRequest()
Hannah Howard's avatar
Hannah Howard committed
240 241
}

Hannah Howard's avatar
Hannah Howard committed
242 243 244 245 246 247
// RequestUpdatedHookActions are actions that can be taken in a request updated hook to
// change execution of the response
type RequestUpdatedHookActions interface {
	TerminateWithError(error)
	SendExtensionData(ExtensionData)
	UnpauseResponse()
248 249
}

250
// OnIncomingRequestHook is a hook that runs each time a new request is received.
251
// It receives the peer that sent the request and all data about the request.
252 253
// It receives an interface for customizing the response to this request
type OnIncomingRequestHook func(p peer.ID, request RequestData, hookActions IncomingRequestHookActions)
254

255
// OnIncomingResponseHook is a hook that runs each time a new response is received.
256
// It receives the peer that sent the response and all data about the response.
Hannah Howard's avatar
Hannah Howard committed
257
// It receives an interface for customizing how we handle the ongoing execution of the request
Hannah Howard's avatar
Hannah Howard committed
258
type OnIncomingResponseHook func(p peer.ID, responseData ResponseData, hookActions IncomingResponseHookActions)
259

Hannah Howard's avatar
Hannah Howard committed
260 261 262 263 264 265 266 267 268
// OnIncomingBlockHook is a hook that runs each time a new block is validated as
// part of the response, regardless of whether it came locally or over the network
// It receives that sent the response, the most recent response, a link for the block received,
// and the size of the block received
// The difference between BlockSize & BlockSizeOnWire can be used to determine
// where the block came from (Local vs remote)
// It receives an interface for customizing how we handle the ongoing execution of the request
type OnIncomingBlockHook func(p peer.ID, responseData ResponseData, blockData BlockData, hookActions IncomingBlockHookActions)

269 270 271 272
// OnOutgoingRequestHook is a hook that runs immediately prior to sending a request
// It receives the peer we're sending a request to and all the data aobut the request
// It receives an interface for customizing how we handle executing this request
type OnOutgoingRequestHook func(p peer.ID, request RequestData, hookActions OutgoingRequestHookActions)
273

274 275 276 277 278 279 280
// OnOutgoingBlockHook is a hook that runs immediately after a requestor sends a new block
// on a response
// It receives the peer we're sending a request to, all the data aobut the request, a link for the block sent,
// and the size of the block sent
// It receives an interface for taking further action on the response
type OnOutgoingBlockHook func(p peer.ID, request RequestData, block BlockData, hookActions OutgoingBlockHookActions)

Hannah Howard's avatar
Hannah Howard committed
281 282 283 284 285
// OnRequestUpdatedHook is a hook that runs when an update to a request is received
// It receives the peer we're sending to, the original request, the request update
// It receives an interface to taking further action on the response
type OnRequestUpdatedHook func(p peer.ID, request RequestData, updateRequest RequestData, hookActions RequestUpdatedHookActions)

286 287
// OnResponseCompletedListener provides a way to listen for when responder has finished serving a response
type OnResponseCompletedListener func(p peer.ID, request RequestData, status ResponseStatusCode)
288

289 290 291
// OnRequestorCancelledListener provides a way to listen for responses the requestor canncels
type OnRequestorCancelledListener func(p peer.ID, request RequestData)

292 293 294
// UnregisterHookFunc is a function call to unregister a hook that was previously registered
type UnregisterHookFunc func()

295 296
// GraphExchange is a protocol that can exchange IPLD graphs based on a selector
type GraphExchange interface {
297 298
	// Request initiates a new GraphSync request to the given peer using the given selector spec.
	Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...ExtensionData) (<-chan ResponseProgress, <-chan error)
299

300 301 302
	// RegisterPersistenceOption registers an alternate loader/storer combo that can be substituted for the default
	RegisterPersistenceOption(name string, loader ipld.Loader, storer ipld.Storer) error

303 304 305
	// UnregisterPersistenceOption unregisters an alternate loader/storer combo
	UnregisterPersistenceOption(name string) error

306 307 308 309 310
	// RegisterIncomingRequestHook adds a hook that runs when a request is received
	RegisterIncomingRequestHook(hook OnIncomingRequestHook) UnregisterHookFunc

	// RegisterIncomingResponseHook adds a hook that runs when a response is received
	RegisterIncomingResponseHook(OnIncomingResponseHook) UnregisterHookFunc
311

Hannah Howard's avatar
Hannah Howard committed
312 313 314
	// RegisterIncomingBlockHook adds a hook that runs when a block is received and validated (put in block store)
	RegisterIncomingBlockHook(OnIncomingBlockHook) UnregisterHookFunc

315 316
	// RegisterOutgoingRequestHook adds a hook that runs immediately prior to sending a new request
	RegisterOutgoingRequestHook(hook OnOutgoingRequestHook) UnregisterHookFunc
317 318 319 320

	// RegisterOutgoingBlockHook adds a hook that runs every time a block is sent from a responder
	RegisterOutgoingBlockHook(hook OnOutgoingBlockHook) UnregisterHookFunc

Hannah Howard's avatar
Hannah Howard committed
321 322 323
	// RegisterRequestUpdatedHook adds a hook that runs every time an update to a request is received
	RegisterRequestUpdatedHook(hook OnRequestUpdatedHook) UnregisterHookFunc

324 325
	// RegisterCompletedResponseListener adds a listener on the responder for completed responses
	RegisterCompletedResponseListener(listener OnResponseCompletedListener) UnregisterHookFunc
326

327 328 329 330 331 332 333 334 335 336 337
	// RegisterRequestorCancelledListener adds a listener on the responder for
	// responses cancelled by the requestor
	RegisterRequestorCancelledListener(listener OnRequestorCancelledListener) UnregisterHookFunc

	// UnpauseRequest unpauses a request that was paused in a block hook based request ID
	// Can also send extensions with unpause
	UnpauseRequest(RequestID, ...ExtensionData) error

	// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
	PauseRequest(RequestID) error

338
	// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
339 340 341 342 343 344 345 346
	// Can also send extensions with unpause
	UnpauseResponse(peer.ID, RequestID, ...ExtensionData) error

	// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
	PauseResponse(peer.ID, RequestID) error

	// CancelResponse cancels an in progress response
	CancelResponse(peer.ID, RequestID) error
347
}