message.go 12.8 KB
Newer Older
1 2 3 4
package message

import (
	"fmt"
5
	"io"
6

7
	blocks "github.com/ipfs/go-block-format"
8

9
	ggio "github.com/gogo/protobuf/io"
10
	cid "github.com/ipfs/go-cid"
11
	pb "github.com/ipfs/go-graphsync/message/pb"
12
	"github.com/libp2p/go-libp2p-core/network"
13 14 15 16 17 18 19 20 21 22 23
)

// GraphSyncRequestID is a unique identifier for a GraphSync request.
type GraphSyncRequestID int32

// GraphSyncPriority a priority for a GraphSync request.
type GraphSyncPriority int32

// GraphSyncResponseStatusCode is a status returned for a GraphSync Request.
type GraphSyncResponseStatusCode int32

24 25 26 27 28 29 30 31 32
// GraphSyncExtensionName is a name for a GraphSync extension
type GraphSyncExtensionName string

// GraphSyncExtension is a name/data pair for a graphsync extension
type GraphSyncExtension struct {
	Name GraphSyncExtensionName
	Data []byte
}

33 34
const (

35 36 37 38 39 40 41 42 43 44 45 46
	// Known Graphsync Extensions

	// ExtensionMetadata provides response metadata for a Graphsync request and is
	// documented at
	// https://github.com/ipld/specs/blob/master/block-layer/graphsync/known_extensions.md
	ExtensionMetadata = GraphSyncExtensionName("graphsync/response-metadata")

	// ExtensionDoNotSendCIDs tells the responding peer not to send certain blocks if they
	// are encountered in a traversal and is documented at
	// https://github.com/ipld/specs/blob/master/block-layer/graphsync/known_extensions.md
	ExtensionDoNotSendCIDs = GraphSyncExtensionName("graphsync/do-not-send-cids")

47 48 49 50 51
	// GraphSync Response Status Codes

	// Informational Response Codes (partial)

	// RequestAcknowledged means the request was received and is being worked on.
52
	RequestAcknowledged = GraphSyncResponseStatusCode(10)
53 54
	// AdditionalPeers means additional peers were found that may be able
	// to satisfy the request and contained in the extra block of the response.
55
	AdditionalPeers = GraphSyncResponseStatusCode(11)
56
	// NotEnoughGas means fulfilling this request requires payment.
57
	NotEnoughGas = GraphSyncResponseStatusCode(12)
58 59
	// OtherProtocol means a different type of response than GraphSync is
	// contained in extra.
60
	OtherProtocol = GraphSyncResponseStatusCode(13)
61 62 63
	// PartialResponse may include blocks and metadata about the in progress response
	// in extra.
	PartialResponse = GraphSyncResponseStatusCode(14)
64 65 66 67 68

	// Success Response Codes (request terminated)

	// RequestCompletedFull means the entire fulfillment of the GraphSync request
	// was sent back.
69
	RequestCompletedFull = GraphSyncResponseStatusCode(20)
70 71
	// RequestCompletedPartial means the response is completed, and part of the
	// GraphSync request was sent back, but not the complete request.
72
	RequestCompletedPartial = GraphSyncResponseStatusCode(21)
73 74 75 76

	// Error Response Codes (request terminated)

	// RequestRejected means the node did not accept the incoming request.
77
	RequestRejected = GraphSyncResponseStatusCode(30)
78 79
	// RequestFailedBusy means the node is too busy, try again later. Backoff may
	// be contained in extra.
80
	RequestFailedBusy = GraphSyncResponseStatusCode(31)
81 82
	// RequestFailedUnknown means the request failed for an unspecified reason. May
	// contain data about why in extra.
83
	RequestFailedUnknown = GraphSyncResponseStatusCode(32)
84
	// RequestFailedLegal means the request failed for legal reasons.
85
	RequestFailedLegal = GraphSyncResponseStatusCode(33)
86
	// RequestFailedContentNotFound means the respondent does not have the content.
87
	RequestFailedContentNotFound = GraphSyncResponseStatusCode(34)
88 89
)

90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
// IsTerminalSuccessCode returns true if the response code indicates the
// request terminated successfully.
func IsTerminalSuccessCode(status GraphSyncResponseStatusCode) bool {
	return status == RequestCompletedFull ||
		status == RequestCompletedPartial
}

// IsTerminalFailureCode returns true if the response code indicates the
// request terminated in failure.
func IsTerminalFailureCode(status GraphSyncResponseStatusCode) bool {
	return status == RequestFailedBusy ||
		status == RequestFailedContentNotFound ||
		status == RequestFailedLegal ||
		status == RequestFailedUnknown
}

// IsTerminalResponseCode returns true if the response code signals
// the end of the request
func IsTerminalResponseCode(status GraphSyncResponseStatusCode) bool {
	return IsTerminalSuccessCode(status) || IsTerminalFailureCode(status)
}

112 113 114 115 116 117 118
// GraphSyncMessage is interface that can be serialized and deserialized to send
// over the GraphSync network
type GraphSyncMessage interface {
	Requests() []GraphSyncRequest

	Responses() []GraphSyncResponse

119 120
	Blocks() []blocks.Block

121
	AddRequest(graphSyncRequest GraphSyncRequest)
122

123
	AddResponse(graphSyncResponse GraphSyncResponse)
124 125

	AddBlock(blocks.Block)
126

127 128
	Empty() bool

129 130 131 132 133 134 135 136
	Exportable

	Loggable() map[string]interface{}
}

// Exportable is an interface that can serialize to a protobuf
type Exportable interface {
	ToProto() *pb.Message
137
	ToNet(w io.Writer) error
138 139
}

140 141 142
// GraphSyncRequest is a struct to capture data on a request contained in a
// GraphSyncMessage.
type GraphSyncRequest struct {
143 144 145 146 147 148
	root       cid.Cid
	selector   []byte
	priority   GraphSyncPriority
	id         GraphSyncRequestID
	extensions map[string][]byte
	isCancel   bool
149 150
}

151 152 153
// GraphSyncResponse is an struct to capture data on a response sent back
// in a GraphSyncMessage.
type GraphSyncResponse struct {
154 155 156
	requestID  GraphSyncRequestID
	status     GraphSyncResponseStatusCode
	extensions map[string][]byte
157 158 159
}

type graphSyncMessage struct {
160 161
	requests  map[GraphSyncRequestID]GraphSyncRequest
	responses map[GraphSyncRequestID]GraphSyncResponse
162
	blocks    map[cid.Cid]blocks.Block
163 164 165 166 167 168 169 170 171
}

// New initializes a new blank GraphSyncMessage
func New() GraphSyncMessage {
	return newMsg()
}

func newMsg() *graphSyncMessage {
	return &graphSyncMessage{
172 173
		requests:  make(map[GraphSyncRequestID]GraphSyncRequest),
		responses: make(map[GraphSyncRequestID]GraphSyncResponse),
174
		blocks:    make(map[cid.Cid]blocks.Block),
175 176 177
	}
}

178 179
// NewRequest builds a new Graphsync request
func NewRequest(id GraphSyncRequestID,
180
	root cid.Cid,
181
	selector []byte,
182 183 184 185
	priority GraphSyncPriority,
	extensions ...GraphSyncExtension) GraphSyncRequest {

	return newRequest(id, root, selector, priority, false, toExtensionsMap(extensions))
186 187 188 189
}

// CancelRequest request generates a request to cancel an in progress request
func CancelRequest(id GraphSyncRequestID) GraphSyncRequest {
190 191 192 193 194 195 196 197 198 199 200
	return newRequest(id, cid.Cid{}, nil, 0, true, nil)
}

func toExtensionsMap(extensions []GraphSyncExtension) (extensionsMap map[string][]byte) {
	if len(extensions) > 0 {
		extensionsMap = make(map[string][]byte, len(extensions))
		for _, extension := range extensions {
			extensionsMap[string(extension.Name)] = extension.Data
		}
	}
	return
201 202 203
}

func newRequest(id GraphSyncRequestID,
204
	root cid.Cid,
205 206
	selector []byte,
	priority GraphSyncPriority,
207 208
	isCancel bool,
	extensions map[string][]byte) GraphSyncRequest {
209
	return GraphSyncRequest{
210 211 212 213 214 215
		id:         id,
		root:       root,
		selector:   selector,
		priority:   priority,
		isCancel:   isCancel,
		extensions: extensions,
216 217 218 219 220 221
	}
}

// NewResponse builds a new Graphsync response
func NewResponse(requestID GraphSyncRequestID,
	status GraphSyncResponseStatusCode,
222 223 224 225 226 227
	extensions ...GraphSyncExtension) GraphSyncResponse {
	return newResponse(requestID, status, toExtensionsMap(extensions))
}

func newResponse(requestID GraphSyncRequestID,
	status GraphSyncResponseStatusCode, extensions map[string][]byte) GraphSyncResponse {
228
	return GraphSyncResponse{
229 230 231
		requestID:  requestID,
		status:     status,
		extensions: extensions,
232 233
	}
}
234
func newMessageFromProto(pbm pb.Message) (GraphSyncMessage, error) {
235
	gsm := newMsg()
236
	for _, req := range pbm.Requests {
237 238 239 240
		root, err := cid.Cast(req.Root)
		if err != nil {
			return nil, err
		}
241
		gsm.AddRequest(newRequest(GraphSyncRequestID(req.Id), root, req.Selector, GraphSyncPriority(req.Priority), req.Cancel, req.GetExtensions()))
242 243 244
	}

	for _, res := range pbm.Responses {
245
		gsm.AddResponse(newResponse(GraphSyncRequestID(res.Id), GraphSyncResponseStatusCode(res.Status), res.GetExtensions()))
246 247 248 249
	}

	for _, b := range pbm.GetData() {
		pref, err := cid.PrefixFromBytes(b.GetPrefix())
250
		if err != nil {
251 252 253 254 255 256 257 258 259 260 261
			return nil, err
		}

		c, err := pref.Sum(b.GetData())
		if err != nil {
			return nil, err
		}

		blk, err := blocks.NewBlockWithCid(b.GetData(), c)
		if err != nil {
			return nil, err
262
		}
263

264
		gsm.AddBlock(blk)
265 266 267 268 269
	}

	return gsm, nil
}

270 271 272 273
func (gsm *graphSyncMessage) Empty() bool {
	return len(gsm.blocks) == 0 && len(gsm.requests) == 0 && len(gsm.responses) == 0
}

274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
func (gsm *graphSyncMessage) Requests() []GraphSyncRequest {
	requests := make([]GraphSyncRequest, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, request)
	}
	return requests
}

func (gsm *graphSyncMessage) Responses() []GraphSyncResponse {
	responses := make([]GraphSyncResponse, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, response)
	}
	return responses
}

290 291 292 293 294 295 296 297
func (gsm *graphSyncMessage) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(gsm.blocks))
	for _, block := range gsm.blocks {
		bs = append(bs, block)
	}
	return bs
}

298 299
func (gsm *graphSyncMessage) AddRequest(graphSyncRequest GraphSyncRequest) {
	gsm.requests[graphSyncRequest.id] = graphSyncRequest
300 301
}

302 303
func (gsm *graphSyncMessage) AddResponse(graphSyncResponse GraphSyncResponse) {
	gsm.responses[graphSyncResponse.requestID] = graphSyncResponse
304 305
}

306 307 308 309
func (gsm *graphSyncMessage) AddBlock(b blocks.Block) {
	gsm.blocks[b.Cid()] = b
}

310
// FromNet can read a network stream to deserialized a GraphSyncMessage
311
func FromNet(r io.Reader) (GraphSyncMessage, error) {
312
	pbr := ggio.NewDelimitedReader(r, network.MessageSizeMax)
313
	return FromPBReader(pbr)
314 315
}

316
// FromPBReader can deserialize a protobuf message into a GraphySyncMessage.
317
func FromPBReader(pbr ggio.Reader) (GraphSyncMessage, error) {
318 319 320 321 322
	pb := new(pb.Message)
	if err := pbr.ReadMsg(pb); err != nil {
		return nil, err
	}

323
	return newMessageFromProto(*pb)
324 325 326 327
}

func (gsm *graphSyncMessage) ToProto() *pb.Message {
	pbm := new(pb.Message)
328
	pbm.Requests = make([]pb.Message_Request, 0, len(gsm.requests))
329
	for _, request := range gsm.requests {
330
		pbm.Requests = append(pbm.Requests, pb.Message_Request{
331 332 333 334 335 336
			Id:         int32(request.id),
			Root:       request.root.Bytes(),
			Selector:   request.selector,
			Priority:   int32(request.priority),
			Cancel:     request.isCancel,
			Extensions: request.extensions,
337 338 339
		})
	}

340
	pbm.Responses = make([]pb.Message_Response, 0, len(gsm.responses))
341
	for _, response := range gsm.responses {
342
		pbm.Responses = append(pbm.Responses, pb.Message_Response{
343 344 345
			Id:         int32(response.requestID),
			Status:     int32(response.status),
			Extensions: response.extensions,
346 347 348 349 350 351 352 353 354
		})
	}

	blocks := gsm.Blocks()
	pbm.Data = make([]pb.Message_Block, 0, len(blocks))
	for _, b := range blocks {
		pbm.Data = append(pbm.Data, pb.Message_Block{
			Data:   b.RawData(),
			Prefix: b.Cid().Prefix().Bytes(),
355 356 357 358 359
		})
	}
	return pbm
}

360 361 362 363 364 365
func (gsm *graphSyncMessage) ToNet(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)

	return pbw.WriteMsg(gsm.ToProto())
}

366 367 368 369 370 371 372 373 374 375 376 377 378 379 380
func (gsm *graphSyncMessage) Loggable() map[string]interface{} {
	requests := make([]string, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, fmt.Sprintf("%d", request.id))
	}
	responses := make([]string, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, fmt.Sprintf("%d", response.requestID))
	}
	return map[string]interface{}{
		"requests":  requests,
		"responses": responses,
	}
}

381 382 383
// ID Returns the request ID for this Request
func (gsr GraphSyncRequest) ID() GraphSyncRequestID { return gsr.id }

384 385 386
// Root returns the CID to the root block of this request
func (gsr GraphSyncRequest) Root() cid.Cid { return gsr.root }

387 388 389 390 391 392
// Selector returns the byte representation of the selector for this request
func (gsr GraphSyncRequest) Selector() []byte { return gsr.selector }

// Priority returns the priority of this request
func (gsr GraphSyncRequest) Priority() GraphSyncPriority { return gsr.priority }

393 394
// Extension returns the content for an extension on a response, or errors
// if extension is not present
395
func (gsr GraphSyncRequest) Extension(name GraphSyncExtensionName) ([]byte, bool) {
396
	if gsr.extensions == nil {
397
		return nil, false
398 399 400
	}
	val, ok := gsr.extensions[string(name)]
	if !ok {
401
		return nil, false
402
	}
403
	return val, true
404 405
}

406 407 408 409 410 411 412 413
// IsCancel returns true if this particular request is being cancelled
func (gsr GraphSyncRequest) IsCancel() bool { return gsr.isCancel }

// RequestID returns the request ID for this response
func (gsr GraphSyncResponse) RequestID() GraphSyncRequestID { return gsr.requestID }

// Status returns the status for a response
func (gsr GraphSyncResponse) Status() GraphSyncResponseStatusCode { return gsr.status }
414

415 416
// Extension returns the content for an extension on a response, or errors
// if extension is not present
417
func (gsr GraphSyncResponse) Extension(name GraphSyncExtensionName) ([]byte, bool) {
418
	if gsr.extensions == nil {
419
		return nil, false
420 421 422
	}
	val, ok := gsr.extensions[string(name)]
	if !ok {
423
		return nil, false
424
	}
425
	return val, true
426 427

}