message.go 10.6 KB
Newer Older
1 2 3 4
package message

import (
	"fmt"
5
	"io"
6

7
	blocks "github.com/ipfs/go-block-format"
8

9
	ggio "github.com/gogo/protobuf/io"
10
	cid "github.com/ipfs/go-cid"
11
	pb "github.com/ipfs/go-graphsync/message/pb"
12
	"github.com/libp2p/go-libp2p-core/network"
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
)

// GraphSyncRequestID is a unique identifier for a GraphSync request.
type GraphSyncRequestID int32

// GraphSyncPriority a priority for a GraphSync request.
type GraphSyncPriority int32

// GraphSyncResponseStatusCode is a status returned for a GraphSync Request.
type GraphSyncResponseStatusCode int32

const (

	// GraphSync Response Status Codes

	// Informational Response Codes (partial)

	// RequestAcknowledged means the request was received and is being worked on.
31
	RequestAcknowledged = GraphSyncResponseStatusCode(10)
32 33
	// AdditionalPeers means additional peers were found that may be able
	// to satisfy the request and contained in the extra block of the response.
34
	AdditionalPeers = GraphSyncResponseStatusCode(11)
35
	// NotEnoughGas means fulfilling this request requires payment.
36
	NotEnoughGas = GraphSyncResponseStatusCode(12)
37 38
	// OtherProtocol means a different type of response than GraphSync is
	// contained in extra.
39
	OtherProtocol = GraphSyncResponseStatusCode(13)
40 41 42
	// PartialResponse may include blocks and metadata about the in progress response
	// in extra.
	PartialResponse = GraphSyncResponseStatusCode(14)
43 44 45 46 47

	// Success Response Codes (request terminated)

	// RequestCompletedFull means the entire fulfillment of the GraphSync request
	// was sent back.
48
	RequestCompletedFull = GraphSyncResponseStatusCode(20)
49 50
	// RequestCompletedPartial means the response is completed, and part of the
	// GraphSync request was sent back, but not the complete request.
51
	RequestCompletedPartial = GraphSyncResponseStatusCode(21)
52 53 54 55

	// Error Response Codes (request terminated)

	// RequestRejected means the node did not accept the incoming request.
56
	RequestRejected = GraphSyncResponseStatusCode(30)
57 58
	// RequestFailedBusy means the node is too busy, try again later. Backoff may
	// be contained in extra.
59
	RequestFailedBusy = GraphSyncResponseStatusCode(31)
60 61
	// RequestFailedUnknown means the request failed for an unspecified reason. May
	// contain data about why in extra.
62
	RequestFailedUnknown = GraphSyncResponseStatusCode(32)
63
	// RequestFailedLegal means the request failed for legal reasons.
64
	RequestFailedLegal = GraphSyncResponseStatusCode(33)
65
	// RequestFailedContentNotFound means the respondent does not have the content.
66
	RequestFailedContentNotFound = GraphSyncResponseStatusCode(34)
67 68
)

69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
// IsTerminalSuccessCode returns true if the response code indicates the
// request terminated successfully.
func IsTerminalSuccessCode(status GraphSyncResponseStatusCode) bool {
	return status == RequestCompletedFull ||
		status == RequestCompletedPartial
}

// IsTerminalFailureCode returns true if the response code indicates the
// request terminated in failure.
func IsTerminalFailureCode(status GraphSyncResponseStatusCode) bool {
	return status == RequestFailedBusy ||
		status == RequestFailedContentNotFound ||
		status == RequestFailedLegal ||
		status == RequestFailedUnknown
}

// IsTerminalResponseCode returns true if the response code signals
// the end of the request
func IsTerminalResponseCode(status GraphSyncResponseStatusCode) bool {
	return IsTerminalSuccessCode(status) || IsTerminalFailureCode(status)
}

91 92 93 94 95 96 97
// GraphSyncMessage is interface that can be serialized and deserialized to send
// over the GraphSync network
type GraphSyncMessage interface {
	Requests() []GraphSyncRequest

	Responses() []GraphSyncResponse

98 99
	Blocks() []blocks.Block

100
	AddRequest(graphSyncRequest GraphSyncRequest)
101

102
	AddResponse(graphSyncResponse GraphSyncResponse)
103 104

	AddBlock(blocks.Block)
105

106 107
	Empty() bool

108 109 110 111 112 113 114 115
	Exportable

	Loggable() map[string]interface{}
}

// Exportable is an interface that can serialize to a protobuf
type Exportable interface {
	ToProto() *pb.Message
116
	ToNet(w io.Writer) error
117 118
}

119 120 121
// GraphSyncRequest is a struct to capture data on a request contained in a
// GraphSyncMessage.
type GraphSyncRequest struct {
122
	root     cid.Cid
123
	selector []byte
124 125 126 127 128
	priority GraphSyncPriority
	id       GraphSyncRequestID
	isCancel bool
}

129 130 131
// GraphSyncResponse is an struct to capture data on a response sent back
// in a GraphSyncMessage.
type GraphSyncResponse struct {
132 133
	requestID GraphSyncRequestID
	status    GraphSyncResponseStatusCode
134
	extra     []byte
135 136 137
}

type graphSyncMessage struct {
138 139
	requests  map[GraphSyncRequestID]GraphSyncRequest
	responses map[GraphSyncRequestID]GraphSyncResponse
140
	blocks    map[cid.Cid]blocks.Block
141 142 143 144 145 146 147 148 149
}

// New initializes a new blank GraphSyncMessage
func New() GraphSyncMessage {
	return newMsg()
}

func newMsg() *graphSyncMessage {
	return &graphSyncMessage{
150 151
		requests:  make(map[GraphSyncRequestID]GraphSyncRequest),
		responses: make(map[GraphSyncRequestID]GraphSyncResponse),
152
		blocks:    make(map[cid.Cid]blocks.Block),
153 154 155
	}
}

156 157
// NewRequest builds a new Graphsync request
func NewRequest(id GraphSyncRequestID,
158
	root cid.Cid,
159 160
	selector []byte,
	priority GraphSyncPriority) GraphSyncRequest {
161
	return newRequest(id, root, selector, priority, false)
162 163 164 165
}

// CancelRequest request generates a request to cancel an in progress request
func CancelRequest(id GraphSyncRequestID) GraphSyncRequest {
166
	return newRequest(id, cid.Cid{}, nil, 0, true)
167 168 169
}

func newRequest(id GraphSyncRequestID,
170
	root cid.Cid,
171 172 173 174 175
	selector []byte,
	priority GraphSyncPriority,
	isCancel bool) GraphSyncRequest {
	return GraphSyncRequest{
		id:       id,
176
		root:     root,
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
		selector: selector,
		priority: priority,
		isCancel: isCancel,
	}
}

// NewResponse builds a new Graphsync response
func NewResponse(requestID GraphSyncRequestID,
	status GraphSyncResponseStatusCode,
	extra []byte) GraphSyncResponse {
	return GraphSyncResponse{
		requestID: requestID,
		status:    status,
		extra:     extra,
	}
}

194
func newMessageFromProto(pbm pb.Message) (GraphSyncMessage, error) {
195
	gsm := newMsg()
196
	for _, req := range pbm.Requests {
197 198 199 200 201
		root, err := cid.Cast(req.Root)
		if err != nil {
			return nil, err
		}
		gsm.AddRequest(newRequest(GraphSyncRequestID(req.Id), root, req.Selector, GraphSyncPriority(req.Priority), req.Cancel))
202 203 204
	}

	for _, res := range pbm.Responses {
205
		gsm.AddResponse(NewResponse(GraphSyncRequestID(res.Id), GraphSyncResponseStatusCode(res.Status), res.Extra))
206 207 208 209
	}

	for _, b := range pbm.GetData() {
		pref, err := cid.PrefixFromBytes(b.GetPrefix())
210
		if err != nil {
211 212 213 214 215 216 217 218 219 220 221
			return nil, err
		}

		c, err := pref.Sum(b.GetData())
		if err != nil {
			return nil, err
		}

		blk, err := blocks.NewBlockWithCid(b.GetData(), c)
		if err != nil {
			return nil, err
222
		}
223

224
		gsm.AddBlock(blk)
225 226 227 228 229
	}

	return gsm, nil
}

230 231 232 233
func (gsm *graphSyncMessage) Empty() bool {
	return len(gsm.blocks) == 0 && len(gsm.requests) == 0 && len(gsm.responses) == 0
}

234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
func (gsm *graphSyncMessage) Requests() []GraphSyncRequest {
	requests := make([]GraphSyncRequest, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, request)
	}
	return requests
}

func (gsm *graphSyncMessage) Responses() []GraphSyncResponse {
	responses := make([]GraphSyncResponse, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, response)
	}
	return responses
}

250 251 252 253 254 255 256 257
func (gsm *graphSyncMessage) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(gsm.blocks))
	for _, block := range gsm.blocks {
		bs = append(bs, block)
	}
	return bs
}

258 259
func (gsm *graphSyncMessage) AddRequest(graphSyncRequest GraphSyncRequest) {
	gsm.requests[graphSyncRequest.id] = graphSyncRequest
260 261
}

262 263
func (gsm *graphSyncMessage) AddResponse(graphSyncResponse GraphSyncResponse) {
	gsm.responses[graphSyncResponse.requestID] = graphSyncResponse
264 265
}

266 267 268 269
func (gsm *graphSyncMessage) AddBlock(b blocks.Block) {
	gsm.blocks[b.Cid()] = b
}

270
// FromNet can read a network stream to deserialized a GraphSyncMessage
271
func FromNet(r io.Reader) (GraphSyncMessage, error) {
272
	pbr := ggio.NewDelimitedReader(r, network.MessageSizeMax)
273
	return FromPBReader(pbr)
274 275
}

276
// FromPBReader can deserialize a protobuf message into a GraphySyncMessage.
277
func FromPBReader(pbr ggio.Reader) (GraphSyncMessage, error) {
278 279 280 281 282
	pb := new(pb.Message)
	if err := pbr.ReadMsg(pb); err != nil {
		return nil, err
	}

283
	return newMessageFromProto(*pb)
284 285 286 287
}

func (gsm *graphSyncMessage) ToProto() *pb.Message {
	pbm := new(pb.Message)
288
	pbm.Requests = make([]pb.Message_Request, 0, len(gsm.requests))
289
	for _, request := range gsm.requests {
290
		pbm.Requests = append(pbm.Requests, pb.Message_Request{
291
			Id:       int32(request.id),
292
			Root:     request.root.Bytes(),
293
			Selector: request.selector,
294 295 296 297 298
			Priority: int32(request.priority),
			Cancel:   request.isCancel,
		})
	}

299
	pbm.Responses = make([]pb.Message_Response, 0, len(gsm.responses))
300
	for _, response := range gsm.responses {
301
		pbm.Responses = append(pbm.Responses, pb.Message_Response{
302 303
			Id:     int32(response.requestID),
			Status: int32(response.status),
304 305 306 307 308 309 310 311 312 313
			Extra:  response.extra,
		})
	}

	blocks := gsm.Blocks()
	pbm.Data = make([]pb.Message_Block, 0, len(blocks))
	for _, b := range blocks {
		pbm.Data = append(pbm.Data, pb.Message_Block{
			Data:   b.RawData(),
			Prefix: b.Cid().Prefix().Bytes(),
314 315 316 317 318
		})
	}
	return pbm
}

319 320 321 322 323 324
func (gsm *graphSyncMessage) ToNet(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)

	return pbw.WriteMsg(gsm.ToProto())
}

325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
func (gsm *graphSyncMessage) Loggable() map[string]interface{} {
	requests := make([]string, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, fmt.Sprintf("%d", request.id))
	}
	responses := make([]string, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, fmt.Sprintf("%d", response.requestID))
	}
	return map[string]interface{}{
		"requests":  requests,
		"responses": responses,
	}
}

340 341 342
// ID Returns the request ID for this Request
func (gsr GraphSyncRequest) ID() GraphSyncRequestID { return gsr.id }

343 344 345
// Root returns the CID to the root block of this request
func (gsr GraphSyncRequest) Root() cid.Cid { return gsr.root }

346 347 348 349 350 351 352 353 354 355 356 357 358 359
// Selector returns the byte representation of the selector for this request
func (gsr GraphSyncRequest) Selector() []byte { return gsr.selector }

// Priority returns the priority of this request
func (gsr GraphSyncRequest) Priority() GraphSyncPriority { return gsr.priority }

// IsCancel returns true if this particular request is being cancelled
func (gsr GraphSyncRequest) IsCancel() bool { return gsr.isCancel }

// RequestID returns the request ID for this response
func (gsr GraphSyncResponse) RequestID() GraphSyncRequestID { return gsr.requestID }

// Status returns the status for a response
func (gsr GraphSyncResponse) Status() GraphSyncResponseStatusCode { return gsr.status }
360

361 362
// Extra returns any metadata on a response
func (gsr GraphSyncResponse) Extra() []byte { return gsr.extra }