message.go 10.9 KB
Newer Older
1 2 3 4
package message

import (
	"fmt"
5
	"io"
6

7
	blocks "github.com/ipfs/go-block-format"
8
	"github.com/ipfs/go-graphsync"
9
	"github.com/ipld/go-ipld-prime"
10

11
	ggio "github.com/gogo/protobuf/io"
12
	cid "github.com/ipfs/go-cid"
13
	"github.com/ipfs/go-graphsync/ipldutil"
14
	pb "github.com/ipfs/go-graphsync/message/pb"
15
	"github.com/libp2p/go-libp2p-core/network"
16 17
)

18 19
// IsTerminalSuccessCode returns true if the response code indicates the
// request terminated successfully.
20 21 22
func IsTerminalSuccessCode(status graphsync.ResponseStatusCode) bool {
	return status == graphsync.RequestCompletedFull ||
		status == graphsync.RequestCompletedPartial
23 24 25 26
}

// IsTerminalFailureCode returns true if the response code indicates the
// request terminated in failure.
27 28 29 30 31
func IsTerminalFailureCode(status graphsync.ResponseStatusCode) bool {
	return status == graphsync.RequestFailedBusy ||
		status == graphsync.RequestFailedContentNotFound ||
		status == graphsync.RequestFailedLegal ||
		status == graphsync.RequestFailedUnknown
32 33 34 35
}

// IsTerminalResponseCode returns true if the response code signals
// the end of the request
36
func IsTerminalResponseCode(status graphsync.ResponseStatusCode) bool {
37 38 39
	return IsTerminalSuccessCode(status) || IsTerminalFailureCode(status)
}

40 41 42 43 44 45 46
// GraphSyncMessage is interface that can be serialized and deserialized to send
// over the GraphSync network
type GraphSyncMessage interface {
	Requests() []GraphSyncRequest

	Responses() []GraphSyncResponse

47 48
	Blocks() []blocks.Block

49
	AddRequest(graphSyncRequest GraphSyncRequest)
50

51
	AddResponse(graphSyncResponse GraphSyncResponse)
52 53

	AddBlock(blocks.Block)
54

55 56
	Empty() bool

57 58 59 60 61 62 63
	Exportable

	Loggable() map[string]interface{}
}

// Exportable is an interface that can serialize to a protobuf
type Exportable interface {
64
	ToProto() (*pb.Message, error)
65
	ToNet(w io.Writer) error
66 67
}

68 69 70
// GraphSyncRequest is a struct to capture data on a request contained in a
// GraphSyncMessage.
type GraphSyncRequest struct {
71
	root       cid.Cid
72
	selector   ipld.Node
73 74
	priority   graphsync.Priority
	id         graphsync.RequestID
75 76
	extensions map[string][]byte
	isCancel   bool
Hannah Howard's avatar
Hannah Howard committed
77
	isUpdate   bool
78 79
}

80 81 82
// GraphSyncResponse is an struct to capture data on a response sent back
// in a GraphSyncMessage.
type GraphSyncResponse struct {
83 84
	requestID  graphsync.RequestID
	status     graphsync.ResponseStatusCode
85
	extensions map[string][]byte
86 87 88
}

type graphSyncMessage struct {
89 90
	requests  map[graphsync.RequestID]GraphSyncRequest
	responses map[graphsync.RequestID]GraphSyncResponse
91
	blocks    map[cid.Cid]blocks.Block
92 93 94 95 96 97 98 99 100
}

// New initializes a new blank GraphSyncMessage
func New() GraphSyncMessage {
	return newMsg()
}

func newMsg() *graphSyncMessage {
	return &graphSyncMessage{
101 102
		requests:  make(map[graphsync.RequestID]GraphSyncRequest),
		responses: make(map[graphsync.RequestID]GraphSyncResponse),
103
		blocks:    make(map[cid.Cid]blocks.Block),
104 105 106
	}
}

107
// NewRequest builds a new Graphsync request
108
func NewRequest(id graphsync.RequestID,
109
	root cid.Cid,
110
	selector ipld.Node,
111 112
	priority graphsync.Priority,
	extensions ...graphsync.ExtensionData) GraphSyncRequest {
113

Hannah Howard's avatar
Hannah Howard committed
114
	return newRequest(id, root, selector, priority, false, false, toExtensionsMap(extensions))
115 116 117
}

// CancelRequest request generates a request to cancel an in progress request
118
func CancelRequest(id graphsync.RequestID) GraphSyncRequest {
Hannah Howard's avatar
Hannah Howard committed
119 120 121 122 123 124
	return newRequest(id, cid.Cid{}, nil, 0, true, false, nil)
}

// UpdateRequest generates a new request to update an in progress request with the given extensions
func UpdateRequest(id graphsync.RequestID, extensions ...graphsync.ExtensionData) GraphSyncRequest {
	return newRequest(id, cid.Cid{}, nil, 0, false, true, toExtensionsMap(extensions))
125 126
}

127
func toExtensionsMap(extensions []graphsync.ExtensionData) (extensionsMap map[string][]byte) {
128 129 130 131 132 133 134
	if len(extensions) > 0 {
		extensionsMap = make(map[string][]byte, len(extensions))
		for _, extension := range extensions {
			extensionsMap[string(extension.Name)] = extension.Data
		}
	}
	return
135 136
}

137
func newRequest(id graphsync.RequestID,
138
	root cid.Cid,
139
	selector ipld.Node,
140
	priority graphsync.Priority,
141
	isCancel bool,
Hannah Howard's avatar
Hannah Howard committed
142
	isUpdate bool,
143
	extensions map[string][]byte) GraphSyncRequest {
144
	return GraphSyncRequest{
145 146 147 148 149
		id:         id,
		root:       root,
		selector:   selector,
		priority:   priority,
		isCancel:   isCancel,
Hannah Howard's avatar
Hannah Howard committed
150
		isUpdate:   isUpdate,
151
		extensions: extensions,
152 153 154 155
	}
}

// NewResponse builds a new Graphsync response
156 157 158
func NewResponse(requestID graphsync.RequestID,
	status graphsync.ResponseStatusCode,
	extensions ...graphsync.ExtensionData) GraphSyncResponse {
159 160 161
	return newResponse(requestID, status, toExtensionsMap(extensions))
}

162 163
func newResponse(requestID graphsync.RequestID,
	status graphsync.ResponseStatusCode, extensions map[string][]byte) GraphSyncResponse {
164
	return GraphSyncResponse{
165 166 167
		requestID:  requestID,
		status:     status,
		extensions: extensions,
168 169
	}
}
170
func newMessageFromProto(pbm pb.Message) (GraphSyncMessage, error) {
171
	gsm := newMsg()
172
	for _, req := range pbm.Requests {
Hannah Howard's avatar
Hannah Howard committed
173 174 175 176 177 178 179
		var root cid.Cid
		var err error
		if !req.Cancel && !req.Update {
			root, err = cid.Cast(req.Root)
			if err != nil {
				return nil, err
			}
180
		}
Hannah Howard's avatar
Hannah Howard committed
181 182 183 184 185 186 187

		var selector ipld.Node
		if !req.Cancel && !req.Update {
			selector, err = ipldutil.DecodeNode(req.Selector)
			if err != nil {
				return nil, err
			}
188
		}
Hannah Howard's avatar
Hannah Howard committed
189
		gsm.AddRequest(newRequest(graphsync.RequestID(req.Id), root, selector, graphsync.Priority(req.Priority), req.Cancel, req.Update, req.GetExtensions()))
190 191 192
	}

	for _, res := range pbm.Responses {
193
		gsm.AddResponse(newResponse(graphsync.RequestID(res.Id), graphsync.ResponseStatusCode(res.Status), res.GetExtensions()))
194 195 196 197
	}

	for _, b := range pbm.GetData() {
		pref, err := cid.PrefixFromBytes(b.GetPrefix())
198
		if err != nil {
199 200 201 202 203 204 205 206 207 208 209
			return nil, err
		}

		c, err := pref.Sum(b.GetData())
		if err != nil {
			return nil, err
		}

		blk, err := blocks.NewBlockWithCid(b.GetData(), c)
		if err != nil {
			return nil, err
210
		}
211

212
		gsm.AddBlock(blk)
213 214 215 216 217
	}

	return gsm, nil
}

218 219 220 221
func (gsm *graphSyncMessage) Empty() bool {
	return len(gsm.blocks) == 0 && len(gsm.requests) == 0 && len(gsm.responses) == 0
}

222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
func (gsm *graphSyncMessage) Requests() []GraphSyncRequest {
	requests := make([]GraphSyncRequest, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, request)
	}
	return requests
}

func (gsm *graphSyncMessage) Responses() []GraphSyncResponse {
	responses := make([]GraphSyncResponse, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, response)
	}
	return responses
}

238 239 240 241 242 243 244 245
func (gsm *graphSyncMessage) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(gsm.blocks))
	for _, block := range gsm.blocks {
		bs = append(bs, block)
	}
	return bs
}

246 247
func (gsm *graphSyncMessage) AddRequest(graphSyncRequest GraphSyncRequest) {
	gsm.requests[graphSyncRequest.id] = graphSyncRequest
248 249
}

250 251
func (gsm *graphSyncMessage) AddResponse(graphSyncResponse GraphSyncResponse) {
	gsm.responses[graphSyncResponse.requestID] = graphSyncResponse
252 253
}

254 255 256 257
func (gsm *graphSyncMessage) AddBlock(b blocks.Block) {
	gsm.blocks[b.Cid()] = b
}

258
// FromNet can read a network stream to deserialized a GraphSyncMessage
259
func FromNet(r io.Reader) (GraphSyncMessage, error) {
260
	pbr := ggio.NewDelimitedReader(r, network.MessageSizeMax)
261
	return FromPBReader(pbr)
262 263
}

264
// FromPBReader can deserialize a protobuf message into a GraphySyncMessage.
265
func FromPBReader(pbr ggio.Reader) (GraphSyncMessage, error) {
266 267 268 269 270
	pb := new(pb.Message)
	if err := pbr.ReadMsg(pb); err != nil {
		return nil, err
	}

271
	return newMessageFromProto(*pb)
272 273
}

274
func (gsm *graphSyncMessage) ToProto() (*pb.Message, error) {
275
	pbm := new(pb.Message)
276
	pbm.Requests = make([]pb.Message_Request, 0, len(gsm.requests))
277
	for _, request := range gsm.requests {
278 279 280 281 282 283 284
		var selector []byte
		var err error
		if request.selector != nil {
			selector, err = ipldutil.EncodeNode(request.selector)
			if err != nil {
				return nil, err
			}
285
		}
286
		pbm.Requests = append(pbm.Requests, pb.Message_Request{
287 288
			Id:         int32(request.id),
			Root:       request.root.Bytes(),
289
			Selector:   selector,
290 291
			Priority:   int32(request.priority),
			Cancel:     request.isCancel,
Hannah Howard's avatar
Hannah Howard committed
292
			Update:     request.isUpdate,
293
			Extensions: request.extensions,
294 295 296
		})
	}

297
	pbm.Responses = make([]pb.Message_Response, 0, len(gsm.responses))
298
	for _, response := range gsm.responses {
299
		pbm.Responses = append(pbm.Responses, pb.Message_Response{
300 301 302
			Id:         int32(response.requestID),
			Status:     int32(response.status),
			Extensions: response.extensions,
303 304 305 306 307 308 309 310 311
		})
	}

	blocks := gsm.Blocks()
	pbm.Data = make([]pb.Message_Block, 0, len(blocks))
	for _, b := range blocks {
		pbm.Data = append(pbm.Data, pb.Message_Block{
			Data:   b.RawData(),
			Prefix: b.Cid().Prefix().Bytes(),
312 313
		})
	}
314
	return pbm, nil
315 316
}

317 318
func (gsm *graphSyncMessage) ToNet(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)
319 320 321 322 323
	msg, err := gsm.ToProto()
	if err != nil {
		return err
	}
	return pbw.WriteMsg(msg)
324 325
}

326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
func (gsm *graphSyncMessage) Loggable() map[string]interface{} {
	requests := make([]string, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, fmt.Sprintf("%d", request.id))
	}
	responses := make([]string, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, fmt.Sprintf("%d", response.requestID))
	}
	return map[string]interface{}{
		"requests":  requests,
		"responses": responses,
	}
}

341
// ID Returns the request ID for this Request
342
func (gsr GraphSyncRequest) ID() graphsync.RequestID { return gsr.id }
343

344 345 346
// Root returns the CID to the root block of this request
func (gsr GraphSyncRequest) Root() cid.Cid { return gsr.root }

347
// Selector returns the byte representation of the selector for this request
348
func (gsr GraphSyncRequest) Selector() ipld.Node { return gsr.selector }
349 350

// Priority returns the priority of this request
351
func (gsr GraphSyncRequest) Priority() graphsync.Priority { return gsr.priority }
352

353 354
// Extension returns the content for an extension on a response, or errors
// if extension is not present
355
func (gsr GraphSyncRequest) Extension(name graphsync.ExtensionName) ([]byte, bool) {
356
	if gsr.extensions == nil {
357
		return nil, false
358 359 360
	}
	val, ok := gsr.extensions[string(name)]
	if !ok {
361
		return nil, false
362
	}
363
	return val, true
364 365
}

366 367 368
// IsCancel returns true if this particular request is being cancelled
func (gsr GraphSyncRequest) IsCancel() bool { return gsr.isCancel }

Hannah Howard's avatar
Hannah Howard committed
369 370 371
// IsUpdate returns true if this particular request is being updated
func (gsr GraphSyncRequest) IsUpdate() bool { return gsr.isUpdate }

372
// RequestID returns the request ID for this response
373
func (gsr GraphSyncResponse) RequestID() graphsync.RequestID { return gsr.requestID }
374 375

// Status returns the status for a response
376
func (gsr GraphSyncResponse) Status() graphsync.ResponseStatusCode { return gsr.status }
377

378 379
// Extension returns the content for an extension on a response, or errors
// if extension is not present
380
func (gsr GraphSyncResponse) Extension(name graphsync.ExtensionName) ([]byte, bool) {
381
	if gsr.extensions == nil {
382
		return nil, false
383 384 385
	}
	val, ok := gsr.extensions[string(name)]
	if !ok {
386
		return nil, false
387
	}
388
	return val, true
389 390

}