message.go 10.3 KB
Newer Older
1 2 3 4
package message

import (
	"fmt"
5
	"io"
6

7 8
	"github.com/ipfs/go-block-format"

9
	ggio "github.com/gogo/protobuf/io"
10
	cid "github.com/ipfs/go-cid"
11
	pb "github.com/ipfs/go-graphsync/message/pb"
12
	inet "github.com/libp2p/go-libp2p-net"
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
)

// GraphSyncRequestID is a unique identifier for a GraphSync request.
type GraphSyncRequestID int32

// GraphSyncPriority a priority for a GraphSync request.
type GraphSyncPriority int32

// GraphSyncResponseStatusCode is a status returned for a GraphSync Request.
type GraphSyncResponseStatusCode int32

const (

	// GraphSync Response Status Codes

	// Informational Response Codes (partial)

	// RequestAcknowledged means the request was received and is being worked on.
31
	RequestAcknowledged = GraphSyncResponseStatusCode(10)
32 33
	// AdditionalPeers means additional peers were found that may be able
	// to satisfy the request and contained in the extra block of the response.
34
	AdditionalPeers = GraphSyncResponseStatusCode(11)
35
	// NotEnoughGas means fulfilling this request requires payment.
36
	NotEnoughGas = GraphSyncResponseStatusCode(12)
37 38
	// OtherProtocol means a different type of response than GraphSync is
	// contained in extra.
39
	OtherProtocol = GraphSyncResponseStatusCode(13)
40 41 42
	// PartialResponse may include blocks and metadata about the in progress response
	// in extra.
	PartialResponse = GraphSyncResponseStatusCode(14)
43 44 45 46 47

	// Success Response Codes (request terminated)

	// RequestCompletedFull means the entire fulfillment of the GraphSync request
	// was sent back.
48
	RequestCompletedFull = GraphSyncResponseStatusCode(20)
49 50
	// RequestCompletedPartial means the response is completed, and part of the
	// GraphSync request was sent back, but not the complete request.
51
	RequestCompletedPartial = GraphSyncResponseStatusCode(21)
52 53 54 55

	// Error Response Codes (request terminated)

	// RequestRejected means the node did not accept the incoming request.
56
	RequestRejected = GraphSyncResponseStatusCode(30)
57 58
	// RequestFailedBusy means the node is too busy, try again later. Backoff may
	// be contained in extra.
59
	RequestFailedBusy = GraphSyncResponseStatusCode(31)
60 61
	// RequestFailedUnknown means the request failed for an unspecified reason. May
	// contain data about why in extra.
62
	RequestFailedUnknown = GraphSyncResponseStatusCode(32)
63
	// RequestFailedLegal means the request failed for legal reasons.
64
	RequestFailedLegal = GraphSyncResponseStatusCode(33)
65
	// RequestFailedContentNotFound means the respondent does not have the content.
66
	RequestFailedContentNotFound = GraphSyncResponseStatusCode(34)
67 68
)

69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
// IsTerminalSuccessCode returns true if the response code indicates the
// request terminated successfully.
func IsTerminalSuccessCode(status GraphSyncResponseStatusCode) bool {
	return status == RequestCompletedFull ||
		status == RequestCompletedPartial
}

// IsTerminalFailureCode returns true if the response code indicates the
// request terminated in failure.
func IsTerminalFailureCode(status GraphSyncResponseStatusCode) bool {
	return status == RequestFailedBusy ||
		status == RequestFailedContentNotFound ||
		status == RequestFailedLegal ||
		status == RequestFailedUnknown
}

// IsTerminalResponseCode returns true if the response code signals
// the end of the request
func IsTerminalResponseCode(status GraphSyncResponseStatusCode) bool {
	return IsTerminalSuccessCode(status) || IsTerminalFailureCode(status)
}

91 92 93 94 95 96 97
// GraphSyncMessage is interface that can be serialized and deserialized to send
// over the GraphSync network
type GraphSyncMessage interface {
	Requests() []GraphSyncRequest

	Responses() []GraphSyncResponse

98 99
	Blocks() []blocks.Block

100
	AddRequest(graphSyncRequest GraphSyncRequest)
101

102
	AddResponse(graphSyncResponse GraphSyncResponse)
103 104

	AddBlock(blocks.Block)
105

106 107
	Empty() bool

108 109 110 111 112 113 114 115
	Exportable

	Loggable() map[string]interface{}
}

// Exportable is an interface that can serialize to a protobuf
type Exportable interface {
	ToProto() *pb.Message
116
	ToNet(w io.Writer) error
117 118
}

119 120 121
// GraphSyncRequest is a struct to capture data on a request contained in a
// GraphSyncMessage.
type GraphSyncRequest struct {
122
	selector []byte
123 124 125 126 127
	priority GraphSyncPriority
	id       GraphSyncRequestID
	isCancel bool
}

128 129 130
// GraphSyncResponse is an struct to capture data on a response sent back
// in a GraphSyncMessage.
type GraphSyncResponse struct {
131 132
	requestID GraphSyncRequestID
	status    GraphSyncResponseStatusCode
133
	extra     []byte
134 135 136
}

type graphSyncMessage struct {
137 138
	requests  map[GraphSyncRequestID]GraphSyncRequest
	responses map[GraphSyncRequestID]GraphSyncResponse
139
	blocks    map[cid.Cid]blocks.Block
140 141 142 143 144 145 146 147 148
}

// New initializes a new blank GraphSyncMessage
func New() GraphSyncMessage {
	return newMsg()
}

func newMsg() *graphSyncMessage {
	return &graphSyncMessage{
149 150
		requests:  make(map[GraphSyncRequestID]GraphSyncRequest),
		responses: make(map[GraphSyncRequestID]GraphSyncResponse),
151
		blocks:    make(map[cid.Cid]blocks.Block),
152 153 154
	}
}

155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
// NewRequest builds a new Graphsync request
func NewRequest(id GraphSyncRequestID,
	selector []byte,
	priority GraphSyncPriority) GraphSyncRequest {
	return newRequest(id, selector, priority, false)
}

// CancelRequest request generates a request to cancel an in progress request
func CancelRequest(id GraphSyncRequestID) GraphSyncRequest {
	return newRequest(id, nil, 0, true)
}

func newRequest(id GraphSyncRequestID,
	selector []byte,
	priority GraphSyncPriority,
	isCancel bool) GraphSyncRequest {
	return GraphSyncRequest{
		id:       id,
		selector: selector,
		priority: priority,
		isCancel: isCancel,
	}
}

// NewResponse builds a new Graphsync response
func NewResponse(requestID GraphSyncRequestID,
	status GraphSyncResponseStatusCode,
	extra []byte) GraphSyncResponse {
	return GraphSyncResponse{
		requestID: requestID,
		status:    status,
		extra:     extra,
	}
}

190
func newMessageFromProto(pbm pb.Message) (GraphSyncMessage, error) {
191
	gsm := newMsg()
192
	for _, req := range pbm.Requests {
193
		gsm.AddRequest(newRequest(GraphSyncRequestID(req.Id), req.Selector, GraphSyncPriority(req.Priority), req.Cancel))
194 195 196
	}

	for _, res := range pbm.Responses {
197
		gsm.AddResponse(NewResponse(GraphSyncRequestID(res.Id), GraphSyncResponseStatusCode(res.Status), res.Extra))
198 199 200 201
	}

	for _, b := range pbm.GetData() {
		pref, err := cid.PrefixFromBytes(b.GetPrefix())
202
		if err != nil {
203 204 205 206 207 208 209 210 211 212 213
			return nil, err
		}

		c, err := pref.Sum(b.GetData())
		if err != nil {
			return nil, err
		}

		blk, err := blocks.NewBlockWithCid(b.GetData(), c)
		if err != nil {
			return nil, err
214
		}
215

216
		gsm.AddBlock(blk)
217 218 219 220 221
	}

	return gsm, nil
}

222 223 224 225
func (gsm *graphSyncMessage) Empty() bool {
	return len(gsm.blocks) == 0 && len(gsm.requests) == 0 && len(gsm.responses) == 0
}

226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
func (gsm *graphSyncMessage) Requests() []GraphSyncRequest {
	requests := make([]GraphSyncRequest, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, request)
	}
	return requests
}

func (gsm *graphSyncMessage) Responses() []GraphSyncResponse {
	responses := make([]GraphSyncResponse, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, response)
	}
	return responses
}

242 243 244 245 246 247 248 249
func (gsm *graphSyncMessage) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(gsm.blocks))
	for _, block := range gsm.blocks {
		bs = append(bs, block)
	}
	return bs
}

250 251
func (gsm *graphSyncMessage) AddRequest(graphSyncRequest GraphSyncRequest) {
	gsm.requests[graphSyncRequest.id] = graphSyncRequest
252 253
}

254 255
func (gsm *graphSyncMessage) AddResponse(graphSyncResponse GraphSyncResponse) {
	gsm.responses[graphSyncResponse.requestID] = graphSyncResponse
256 257
}

258 259 260 261
func (gsm *graphSyncMessage) AddBlock(b blocks.Block) {
	gsm.blocks[b.Cid()] = b
}

262
// FromNet can read a network stream to deserialized a GraphSyncMessage
263
func FromNet(r io.Reader) (GraphSyncMessage, error) {
264
	pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax)
265
	return FromPBReader(pbr)
266 267
}

268
// FromPBReader can deserialize a protobuf message into a GraphySyncMessage.
269
func FromPBReader(pbr ggio.Reader) (GraphSyncMessage, error) {
270 271 272 273 274
	pb := new(pb.Message)
	if err := pbr.ReadMsg(pb); err != nil {
		return nil, err
	}

275
	return newMessageFromProto(*pb)
276 277 278 279
}

func (gsm *graphSyncMessage) ToProto() *pb.Message {
	pbm := new(pb.Message)
280
	pbm.Requests = make([]pb.Message_Request, 0, len(gsm.requests))
281
	for _, request := range gsm.requests {
282
		pbm.Requests = append(pbm.Requests, pb.Message_Request{
283
			Id:       int32(request.id),
284
			Selector: request.selector,
285 286 287 288 289
			Priority: int32(request.priority),
			Cancel:   request.isCancel,
		})
	}

290
	pbm.Responses = make([]pb.Message_Response, 0, len(gsm.responses))
291
	for _, response := range gsm.responses {
292
		pbm.Responses = append(pbm.Responses, pb.Message_Response{
293 294
			Id:     int32(response.requestID),
			Status: int32(response.status),
295 296 297 298 299 300 301 302 303 304
			Extra:  response.extra,
		})
	}

	blocks := gsm.Blocks()
	pbm.Data = make([]pb.Message_Block, 0, len(blocks))
	for _, b := range blocks {
		pbm.Data = append(pbm.Data, pb.Message_Block{
			Data:   b.RawData(),
			Prefix: b.Cid().Prefix().Bytes(),
305 306 307 308 309
		})
	}
	return pbm
}

310 311 312 313 314 315
func (gsm *graphSyncMessage) ToNet(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)

	return pbw.WriteMsg(gsm.ToProto())
}

316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
func (gsm *graphSyncMessage) Loggable() map[string]interface{} {
	requests := make([]string, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, fmt.Sprintf("%d", request.id))
	}
	responses := make([]string, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, fmt.Sprintf("%d", response.requestID))
	}
	return map[string]interface{}{
		"requests":  requests,
		"responses": responses,
	}
}

331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347
// ID Returns the request ID for this Request
func (gsr GraphSyncRequest) ID() GraphSyncRequestID { return gsr.id }

// Selector returns the byte representation of the selector for this request
func (gsr GraphSyncRequest) Selector() []byte { return gsr.selector }

// Priority returns the priority of this request
func (gsr GraphSyncRequest) Priority() GraphSyncPriority { return gsr.priority }

// IsCancel returns true if this particular request is being cancelled
func (gsr GraphSyncRequest) IsCancel() bool { return gsr.isCancel }

// RequestID returns the request ID for this response
func (gsr GraphSyncResponse) RequestID() GraphSyncRequestID { return gsr.requestID }

// Status returns the status for a response
func (gsr GraphSyncResponse) Status() GraphSyncResponseStatusCode { return gsr.status }
348

349 350
// Extra returns any metadata on a response
func (gsr GraphSyncResponse) Extra() []byte { return gsr.extra }