message.go 13 KB
Newer Older
1 2 3
package message

import (
4
	"errors"
5
	"fmt"
6
	"io"
7

8
	blocks "github.com/ipfs/go-block-format"
9

10
	ggio "github.com/gogo/protobuf/io"
11
	cid "github.com/ipfs/go-cid"
12
	pb "github.com/ipfs/go-graphsync/message/pb"
13
	"github.com/libp2p/go-libp2p-core/network"
14 15 16 17 18 19 20 21 22 23 24
)

// GraphSyncRequestID is a unique identifier for a GraphSync request.
type GraphSyncRequestID int32

// GraphSyncPriority a priority for a GraphSync request.
type GraphSyncPriority int32

// GraphSyncResponseStatusCode is a status returned for a GraphSync Request.
type GraphSyncResponseStatusCode int32

25 26 27 28 29 30 31 32 33
// GraphSyncExtensionName is a name for a GraphSync extension
type GraphSyncExtensionName string

// GraphSyncExtension is a name/data pair for a graphsync extension
type GraphSyncExtension struct {
	Name GraphSyncExtensionName
	Data []byte
}

34 35
const (

36 37 38 39 40 41 42 43 44 45 46 47
	// Known Graphsync Extensions

	// ExtensionMetadata provides response metadata for a Graphsync request and is
	// documented at
	// https://github.com/ipld/specs/blob/master/block-layer/graphsync/known_extensions.md
	ExtensionMetadata = GraphSyncExtensionName("graphsync/response-metadata")

	// ExtensionDoNotSendCIDs tells the responding peer not to send certain blocks if they
	// are encountered in a traversal and is documented at
	// https://github.com/ipld/specs/blob/master/block-layer/graphsync/known_extensions.md
	ExtensionDoNotSendCIDs = GraphSyncExtensionName("graphsync/do-not-send-cids")

48 49 50 51 52
	// GraphSync Response Status Codes

	// Informational Response Codes (partial)

	// RequestAcknowledged means the request was received and is being worked on.
53
	RequestAcknowledged = GraphSyncResponseStatusCode(10)
54 55
	// AdditionalPeers means additional peers were found that may be able
	// to satisfy the request and contained in the extra block of the response.
56
	AdditionalPeers = GraphSyncResponseStatusCode(11)
57
	// NotEnoughGas means fulfilling this request requires payment.
58
	NotEnoughGas = GraphSyncResponseStatusCode(12)
59 60
	// OtherProtocol means a different type of response than GraphSync is
	// contained in extra.
61
	OtherProtocol = GraphSyncResponseStatusCode(13)
62 63 64
	// PartialResponse may include blocks and metadata about the in progress response
	// in extra.
	PartialResponse = GraphSyncResponseStatusCode(14)
65 66 67 68 69

	// Success Response Codes (request terminated)

	// RequestCompletedFull means the entire fulfillment of the GraphSync request
	// was sent back.
70
	RequestCompletedFull = GraphSyncResponseStatusCode(20)
71 72
	// RequestCompletedPartial means the response is completed, and part of the
	// GraphSync request was sent back, but not the complete request.
73
	RequestCompletedPartial = GraphSyncResponseStatusCode(21)
74 75 76 77

	// Error Response Codes (request terminated)

	// RequestRejected means the node did not accept the incoming request.
78
	RequestRejected = GraphSyncResponseStatusCode(30)
79 80
	// RequestFailedBusy means the node is too busy, try again later. Backoff may
	// be contained in extra.
81
	RequestFailedBusy = GraphSyncResponseStatusCode(31)
82 83
	// RequestFailedUnknown means the request failed for an unspecified reason. May
	// contain data about why in extra.
84
	RequestFailedUnknown = GraphSyncResponseStatusCode(32)
85
	// RequestFailedLegal means the request failed for legal reasons.
86
	RequestFailedLegal = GraphSyncResponseStatusCode(33)
87
	// RequestFailedContentNotFound means the respondent does not have the content.
88
	RequestFailedContentNotFound = GraphSyncResponseStatusCode(34)
89 90
)

91 92 93 94 95
var (
	// ErrExtensionNotPresent means the looked up extension was not found
	ErrExtensionNotPresent = errors.New("Extension is missing from this message")
)

96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
// IsTerminalSuccessCode returns true if the response code indicates the
// request terminated successfully.
func IsTerminalSuccessCode(status GraphSyncResponseStatusCode) bool {
	return status == RequestCompletedFull ||
		status == RequestCompletedPartial
}

// IsTerminalFailureCode returns true if the response code indicates the
// request terminated in failure.
func IsTerminalFailureCode(status GraphSyncResponseStatusCode) bool {
	return status == RequestFailedBusy ||
		status == RequestFailedContentNotFound ||
		status == RequestFailedLegal ||
		status == RequestFailedUnknown
}

// IsTerminalResponseCode returns true if the response code signals
// the end of the request
func IsTerminalResponseCode(status GraphSyncResponseStatusCode) bool {
	return IsTerminalSuccessCode(status) || IsTerminalFailureCode(status)
}

118 119 120 121 122 123 124
// GraphSyncMessage is interface that can be serialized and deserialized to send
// over the GraphSync network
type GraphSyncMessage interface {
	Requests() []GraphSyncRequest

	Responses() []GraphSyncResponse

125 126
	Blocks() []blocks.Block

127
	AddRequest(graphSyncRequest GraphSyncRequest)
128

129
	AddResponse(graphSyncResponse GraphSyncResponse)
130 131

	AddBlock(blocks.Block)
132

133 134
	Empty() bool

135 136 137 138 139 140 141 142
	Exportable

	Loggable() map[string]interface{}
}

// Exportable is an interface that can serialize to a protobuf
type Exportable interface {
	ToProto() *pb.Message
143
	ToNet(w io.Writer) error
144 145
}

146 147 148
// GraphSyncRequest is a struct to capture data on a request contained in a
// GraphSyncMessage.
type GraphSyncRequest struct {
149 150 151 152 153 154
	root       cid.Cid
	selector   []byte
	priority   GraphSyncPriority
	id         GraphSyncRequestID
	extensions map[string][]byte
	isCancel   bool
155 156
}

157 158 159
// GraphSyncResponse is an struct to capture data on a response sent back
// in a GraphSyncMessage.
type GraphSyncResponse struct {
160 161 162
	requestID  GraphSyncRequestID
	status     GraphSyncResponseStatusCode
	extensions map[string][]byte
163 164 165
}

type graphSyncMessage struct {
166 167
	requests  map[GraphSyncRequestID]GraphSyncRequest
	responses map[GraphSyncRequestID]GraphSyncResponse
168
	blocks    map[cid.Cid]blocks.Block
169 170 171 172 173 174 175 176 177
}

// New initializes a new blank GraphSyncMessage
func New() GraphSyncMessage {
	return newMsg()
}

func newMsg() *graphSyncMessage {
	return &graphSyncMessage{
178 179
		requests:  make(map[GraphSyncRequestID]GraphSyncRequest),
		responses: make(map[GraphSyncRequestID]GraphSyncResponse),
180
		blocks:    make(map[cid.Cid]blocks.Block),
181 182 183
	}
}

184 185
// NewRequest builds a new Graphsync request
func NewRequest(id GraphSyncRequestID,
186
	root cid.Cid,
187
	selector []byte,
188 189 190 191
	priority GraphSyncPriority,
	extensions ...GraphSyncExtension) GraphSyncRequest {

	return newRequest(id, root, selector, priority, false, toExtensionsMap(extensions))
192 193 194 195
}

// CancelRequest request generates a request to cancel an in progress request
func CancelRequest(id GraphSyncRequestID) GraphSyncRequest {
196 197 198 199 200 201 202 203 204 205 206
	return newRequest(id, cid.Cid{}, nil, 0, true, nil)
}

func toExtensionsMap(extensions []GraphSyncExtension) (extensionsMap map[string][]byte) {
	if len(extensions) > 0 {
		extensionsMap = make(map[string][]byte, len(extensions))
		for _, extension := range extensions {
			extensionsMap[string(extension.Name)] = extension.Data
		}
	}
	return
207 208 209
}

func newRequest(id GraphSyncRequestID,
210
	root cid.Cid,
211 212
	selector []byte,
	priority GraphSyncPriority,
213 214
	isCancel bool,
	extensions map[string][]byte) GraphSyncRequest {
215
	return GraphSyncRequest{
216 217 218 219 220 221
		id:         id,
		root:       root,
		selector:   selector,
		priority:   priority,
		isCancel:   isCancel,
		extensions: extensions,
222 223 224 225 226 227
	}
}

// NewResponse builds a new Graphsync response
func NewResponse(requestID GraphSyncRequestID,
	status GraphSyncResponseStatusCode,
228 229 230 231 232 233
	extensions ...GraphSyncExtension) GraphSyncResponse {
	return newResponse(requestID, status, toExtensionsMap(extensions))
}

func newResponse(requestID GraphSyncRequestID,
	status GraphSyncResponseStatusCode, extensions map[string][]byte) GraphSyncResponse {
234
	return GraphSyncResponse{
235 236 237
		requestID:  requestID,
		status:     status,
		extensions: extensions,
238 239
	}
}
240
func newMessageFromProto(pbm pb.Message) (GraphSyncMessage, error) {
241
	gsm := newMsg()
242
	for _, req := range pbm.Requests {
243 244 245 246
		root, err := cid.Cast(req.Root)
		if err != nil {
			return nil, err
		}
247
		gsm.AddRequest(newRequest(GraphSyncRequestID(req.Id), root, req.Selector, GraphSyncPriority(req.Priority), req.Cancel, req.GetExtensions()))
248 249 250
	}

	for _, res := range pbm.Responses {
251
		gsm.AddResponse(newResponse(GraphSyncRequestID(res.Id), GraphSyncResponseStatusCode(res.Status), res.GetExtensions()))
252 253 254 255
	}

	for _, b := range pbm.GetData() {
		pref, err := cid.PrefixFromBytes(b.GetPrefix())
256
		if err != nil {
257 258 259 260 261 262 263 264 265 266 267
			return nil, err
		}

		c, err := pref.Sum(b.GetData())
		if err != nil {
			return nil, err
		}

		blk, err := blocks.NewBlockWithCid(b.GetData(), c)
		if err != nil {
			return nil, err
268
		}
269

270
		gsm.AddBlock(blk)
271 272 273 274 275
	}

	return gsm, nil
}

276 277 278 279
func (gsm *graphSyncMessage) Empty() bool {
	return len(gsm.blocks) == 0 && len(gsm.requests) == 0 && len(gsm.responses) == 0
}

280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
func (gsm *graphSyncMessage) Requests() []GraphSyncRequest {
	requests := make([]GraphSyncRequest, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, request)
	}
	return requests
}

func (gsm *graphSyncMessage) Responses() []GraphSyncResponse {
	responses := make([]GraphSyncResponse, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, response)
	}
	return responses
}

296 297 298 299 300 301 302 303
func (gsm *graphSyncMessage) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(gsm.blocks))
	for _, block := range gsm.blocks {
		bs = append(bs, block)
	}
	return bs
}

304 305
func (gsm *graphSyncMessage) AddRequest(graphSyncRequest GraphSyncRequest) {
	gsm.requests[graphSyncRequest.id] = graphSyncRequest
306 307
}

308 309
func (gsm *graphSyncMessage) AddResponse(graphSyncResponse GraphSyncResponse) {
	gsm.responses[graphSyncResponse.requestID] = graphSyncResponse
310 311
}

312 313 314 315
func (gsm *graphSyncMessage) AddBlock(b blocks.Block) {
	gsm.blocks[b.Cid()] = b
}

316
// FromNet can read a network stream to deserialized a GraphSyncMessage
317
func FromNet(r io.Reader) (GraphSyncMessage, error) {
318
	pbr := ggio.NewDelimitedReader(r, network.MessageSizeMax)
319
	return FromPBReader(pbr)
320 321
}

322
// FromPBReader can deserialize a protobuf message into a GraphySyncMessage.
323
func FromPBReader(pbr ggio.Reader) (GraphSyncMessage, error) {
324 325 326 327 328
	pb := new(pb.Message)
	if err := pbr.ReadMsg(pb); err != nil {
		return nil, err
	}

329
	return newMessageFromProto(*pb)
330 331 332 333
}

func (gsm *graphSyncMessage) ToProto() *pb.Message {
	pbm := new(pb.Message)
334
	pbm.Requests = make([]pb.Message_Request, 0, len(gsm.requests))
335
	for _, request := range gsm.requests {
336
		pbm.Requests = append(pbm.Requests, pb.Message_Request{
337 338 339 340 341 342
			Id:         int32(request.id),
			Root:       request.root.Bytes(),
			Selector:   request.selector,
			Priority:   int32(request.priority),
			Cancel:     request.isCancel,
			Extensions: request.extensions,
343 344 345
		})
	}

346
	pbm.Responses = make([]pb.Message_Response, 0, len(gsm.responses))
347
	for _, response := range gsm.responses {
348
		pbm.Responses = append(pbm.Responses, pb.Message_Response{
349 350 351
			Id:         int32(response.requestID),
			Status:     int32(response.status),
			Extensions: response.extensions,
352 353 354 355 356 357 358 359 360
		})
	}

	blocks := gsm.Blocks()
	pbm.Data = make([]pb.Message_Block, 0, len(blocks))
	for _, b := range blocks {
		pbm.Data = append(pbm.Data, pb.Message_Block{
			Data:   b.RawData(),
			Prefix: b.Cid().Prefix().Bytes(),
361 362 363 364 365
		})
	}
	return pbm
}

366 367 368 369 370 371
func (gsm *graphSyncMessage) ToNet(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)

	return pbw.WriteMsg(gsm.ToProto())
}

372 373 374 375 376 377 378 379 380 381 382 383 384 385 386
func (gsm *graphSyncMessage) Loggable() map[string]interface{} {
	requests := make([]string, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, fmt.Sprintf("%d", request.id))
	}
	responses := make([]string, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, fmt.Sprintf("%d", response.requestID))
	}
	return map[string]interface{}{
		"requests":  requests,
		"responses": responses,
	}
}

387 388 389
// ID Returns the request ID for this Request
func (gsr GraphSyncRequest) ID() GraphSyncRequestID { return gsr.id }

390 391 392
// Root returns the CID to the root block of this request
func (gsr GraphSyncRequest) Root() cid.Cid { return gsr.root }

393 394 395 396 397 398
// Selector returns the byte representation of the selector for this request
func (gsr GraphSyncRequest) Selector() []byte { return gsr.selector }

// Priority returns the priority of this request
func (gsr GraphSyncRequest) Priority() GraphSyncPriority { return gsr.priority }

399 400 401 402 403 404 405 406 407 408 409 410 411
// Extension returns the content for an extension on a response, or errors
// if extension is not present
func (gsr GraphSyncRequest) Extension(name GraphSyncExtensionName) ([]byte, error) {
	if gsr.extensions == nil {
		return nil, ErrExtensionNotPresent
	}
	val, ok := gsr.extensions[string(name)]
	if !ok {
		return nil, ErrExtensionNotPresent
	}
	return val, nil
}

412 413 414 415 416 417 418 419
// IsCancel returns true if this particular request is being cancelled
func (gsr GraphSyncRequest) IsCancel() bool { return gsr.isCancel }

// RequestID returns the request ID for this response
func (gsr GraphSyncResponse) RequestID() GraphSyncRequestID { return gsr.requestID }

// Status returns the status for a response
func (gsr GraphSyncResponse) Status() GraphSyncResponseStatusCode { return gsr.status }
420

421 422 423 424 425 426 427 428 429 430 431 432 433
// Extension returns the content for an extension on a response, or errors
// if extension is not present
func (gsr GraphSyncResponse) Extension(name GraphSyncExtensionName) ([]byte, error) {
	if gsr.extensions == nil {
		return nil, ErrExtensionNotPresent
	}
	val, ok := gsr.extensions[string(name)]
	if !ok {
		return nil, ErrExtensionNotPresent
	}
	return val, nil

}