message.go 13.7 KB
Newer Older
1 2 3
package message

import (
4
	"encoding/binary"
5
	"errors"
6
	"fmt"
7
	"io"
8

9
	blocks "github.com/ipfs/go-block-format"
Hannah Howard's avatar
Hannah Howard committed
10
	cid "github.com/ipfs/go-cid"
11
	"github.com/ipld/go-ipld-prime"
12
	pool "github.com/libp2p/go-buffer-pool"
Hannah Howard's avatar
Hannah Howard committed
13
	"github.com/libp2p/go-libp2p-core/network"
14 15
	"github.com/libp2p/go-msgio"
	"google.golang.org/protobuf/proto"
16

Hannah Howard's avatar
Hannah Howard committed
17
	"github.com/ipfs/go-graphsync"
18
	"github.com/ipfs/go-graphsync/ipldutil"
19 20 21
	pb "github.com/ipfs/go-graphsync/message/pb"
)

22 23
// IsTerminalSuccessCode returns true if the response code indicates the
// request terminated successfully.
24 25 26
func IsTerminalSuccessCode(status graphsync.ResponseStatusCode) bool {
	return status == graphsync.RequestCompletedFull ||
		status == graphsync.RequestCompletedPartial
27 28 29 30
}

// IsTerminalFailureCode returns true if the response code indicates the
// request terminated in failure.
31 32 33 34
func IsTerminalFailureCode(status graphsync.ResponseStatusCode) bool {
	return status == graphsync.RequestFailedBusy ||
		status == graphsync.RequestFailedContentNotFound ||
		status == graphsync.RequestFailedLegal ||
35 36
		status == graphsync.RequestFailedUnknown ||
		status == graphsync.RequestCancelled
37 38 39 40
}

// IsTerminalResponseCode returns true if the response code signals
// the end of the request
41
func IsTerminalResponseCode(status graphsync.ResponseStatusCode) bool {
42 43 44
	return IsTerminalSuccessCode(status) || IsTerminalFailureCode(status)
}

45 46 47 48 49 50 51
// GraphSyncMessage is interface that can be serialized and deserialized to send
// over the GraphSync network
type GraphSyncMessage interface {
	Requests() []GraphSyncRequest

	Responses() []GraphSyncResponse

52 53
	Blocks() []blocks.Block

54
	AddRequest(graphSyncRequest GraphSyncRequest)
55

56
	AddResponse(graphSyncResponse GraphSyncResponse)
57 58

	AddBlock(blocks.Block)
59

60 61
	Empty() bool

62 63 64
	Exportable

	Loggable() map[string]interface{}
65 66

	Clone() GraphSyncMessage
67 68 69 70
}

// Exportable is an interface that can serialize to a protobuf
type Exportable interface {
71
	ToProto() (*pb.Message, error)
72
	ToNet(w io.Writer) error
73 74
}

75 76 77
// GraphSyncRequest is a struct to capture data on a request contained in a
// GraphSyncMessage.
type GraphSyncRequest struct {
78
	root       cid.Cid
79
	selector   ipld.Node
80 81
	priority   graphsync.Priority
	id         graphsync.RequestID
82 83
	extensions map[string][]byte
	isCancel   bool
Hannah Howard's avatar
Hannah Howard committed
84
	isUpdate   bool
85 86
}

87 88 89
// GraphSyncResponse is an struct to capture data on a response sent back
// in a GraphSyncMessage.
type GraphSyncResponse struct {
90 91
	requestID  graphsync.RequestID
	status     graphsync.ResponseStatusCode
92
	extensions map[string][]byte
93 94 95
}

type graphSyncMessage struct {
96 97
	requests  map[graphsync.RequestID]GraphSyncRequest
	responses map[graphsync.RequestID]GraphSyncResponse
98
	blocks    map[cid.Cid]blocks.Block
99 100 101 102 103 104 105 106 107
}

// New initializes a new blank GraphSyncMessage
func New() GraphSyncMessage {
	return newMsg()
}

func newMsg() *graphSyncMessage {
	return &graphSyncMessage{
108 109
		requests:  make(map[graphsync.RequestID]GraphSyncRequest),
		responses: make(map[graphsync.RequestID]GraphSyncResponse),
110
		blocks:    make(map[cid.Cid]blocks.Block),
111 112 113
	}
}

114
// NewRequest builds a new Graphsync request
115
func NewRequest(id graphsync.RequestID,
116
	root cid.Cid,
117
	selector ipld.Node,
118 119
	priority graphsync.Priority,
	extensions ...graphsync.ExtensionData) GraphSyncRequest {
120

Hannah Howard's avatar
Hannah Howard committed
121
	return newRequest(id, root, selector, priority, false, false, toExtensionsMap(extensions))
122 123 124
}

// CancelRequest request generates a request to cancel an in progress request
125
func CancelRequest(id graphsync.RequestID) GraphSyncRequest {
Hannah Howard's avatar
Hannah Howard committed
126 127 128 129 130 131
	return newRequest(id, cid.Cid{}, nil, 0, true, false, nil)
}

// UpdateRequest generates a new request to update an in progress request with the given extensions
func UpdateRequest(id graphsync.RequestID, extensions ...graphsync.ExtensionData) GraphSyncRequest {
	return newRequest(id, cid.Cid{}, nil, 0, false, true, toExtensionsMap(extensions))
132 133
}

134
func toExtensionsMap(extensions []graphsync.ExtensionData) (extensionsMap map[string][]byte) {
135 136 137 138 139 140 141
	if len(extensions) > 0 {
		extensionsMap = make(map[string][]byte, len(extensions))
		for _, extension := range extensions {
			extensionsMap[string(extension.Name)] = extension.Data
		}
	}
	return
142 143
}

144
func newRequest(id graphsync.RequestID,
145
	root cid.Cid,
146
	selector ipld.Node,
147
	priority graphsync.Priority,
148
	isCancel bool,
Hannah Howard's avatar
Hannah Howard committed
149
	isUpdate bool,
150
	extensions map[string][]byte) GraphSyncRequest {
151
	return GraphSyncRequest{
152 153 154 155 156
		id:         id,
		root:       root,
		selector:   selector,
		priority:   priority,
		isCancel:   isCancel,
Hannah Howard's avatar
Hannah Howard committed
157
		isUpdate:   isUpdate,
158
		extensions: extensions,
159 160 161 162
	}
}

// NewResponse builds a new Graphsync response
163 164 165
func NewResponse(requestID graphsync.RequestID,
	status graphsync.ResponseStatusCode,
	extensions ...graphsync.ExtensionData) GraphSyncResponse {
166 167 168
	return newResponse(requestID, status, toExtensionsMap(extensions))
}

169 170
func newResponse(requestID graphsync.RequestID,
	status graphsync.ResponseStatusCode, extensions map[string][]byte) GraphSyncResponse {
171
	return GraphSyncResponse{
172 173 174
		requestID:  requestID,
		status:     status,
		extensions: extensions,
175 176
	}
}
177
func newMessageFromProto(pbm *pb.Message) (GraphSyncMessage, error) {
178
	gsm := newMsg()
179
	for _, req := range pbm.Requests {
180 181 182
		if req == nil {
			return nil, errors.New("request is nil")
		}
Hannah Howard's avatar
Hannah Howard committed
183 184 185 186 187 188 189
		var root cid.Cid
		var err error
		if !req.Cancel && !req.Update {
			root, err = cid.Cast(req.Root)
			if err != nil {
				return nil, err
			}
190
		}
Hannah Howard's avatar
Hannah Howard committed
191 192 193 194 195 196 197

		var selector ipld.Node
		if !req.Cancel && !req.Update {
			selector, err = ipldutil.DecodeNode(req.Selector)
			if err != nil {
				return nil, err
			}
198
		}
199 200 201 202 203
		exts := req.GetExtensions()
		if exts == nil {
			exts = make(map[string][]byte)
		}
		gsm.AddRequest(newRequest(graphsync.RequestID(req.Id), root, selector, graphsync.Priority(req.Priority), req.Cancel, req.Update, exts))
204 205 206
	}

	for _, res := range pbm.Responses {
207 208 209 210 211 212 213 214
		if res == nil {
			return nil, errors.New("response is nil")
		}
		exts := res.GetExtensions()
		if exts == nil {
			exts = make(map[string][]byte)
		}
		gsm.AddResponse(newResponse(graphsync.RequestID(res.Id), graphsync.ResponseStatusCode(res.Status), exts))
215 216 217
	}

	for _, b := range pbm.GetData() {
218 219 220 221
		if b == nil {
			return nil, errors.New("block is nil")
		}

222
		pref, err := cid.PrefixFromBytes(b.GetPrefix())
223
		if err != nil {
224 225 226 227 228 229 230 231 232 233 234
			return nil, err
		}

		c, err := pref.Sum(b.GetData())
		if err != nil {
			return nil, err
		}

		blk, err := blocks.NewBlockWithCid(b.GetData(), c)
		if err != nil {
			return nil, err
235
		}
236

237
		gsm.AddBlock(blk)
238 239 240 241 242
	}

	return gsm, nil
}

243 244 245 246
func (gsm *graphSyncMessage) Empty() bool {
	return len(gsm.blocks) == 0 && len(gsm.requests) == 0 && len(gsm.responses) == 0
}

247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
func (gsm *graphSyncMessage) Requests() []GraphSyncRequest {
	requests := make([]GraphSyncRequest, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, request)
	}
	return requests
}

func (gsm *graphSyncMessage) Responses() []GraphSyncResponse {
	responses := make([]GraphSyncResponse, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, response)
	}
	return responses
}

263 264 265 266 267 268 269 270
func (gsm *graphSyncMessage) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(gsm.blocks))
	for _, block := range gsm.blocks {
		bs = append(bs, block)
	}
	return bs
}

271 272
func (gsm *graphSyncMessage) AddRequest(graphSyncRequest GraphSyncRequest) {
	gsm.requests[graphSyncRequest.id] = graphSyncRequest
273 274
}

275 276
func (gsm *graphSyncMessage) AddResponse(graphSyncResponse GraphSyncResponse) {
	gsm.responses[graphSyncResponse.requestID] = graphSyncResponse
277 278
}

279 280 281 282
func (gsm *graphSyncMessage) AddBlock(b blocks.Block) {
	gsm.blocks[b.Cid()] = b
}

283
// FromNet can read a network stream to deserialized a GraphSyncMessage
284
func FromNet(r io.Reader) (GraphSyncMessage, error) {
285 286
	reader := msgio.NewVarintReaderSize(r, network.MessageSizeMax)
	return FromMsgReader(reader)
287 288
}

289 290 291 292 293 294 295 296 297 298 299
// FromMsgReader can deserialize a protobuf message into a GraphySyncMessage.
func FromMsgReader(r msgio.Reader) (GraphSyncMessage, error) {
	msg, err := r.ReadMsg()
	if err != nil {
		return nil, err
	}

	var pb pb.Message
	err = proto.Unmarshal(msg, &pb)
	r.ReleaseMsg(msg)
	if err != nil {
300 301 302
		return nil, err
	}

303
	return newMessageFromProto(&pb)
304 305
}

306
func (gsm *graphSyncMessage) ToProto() (*pb.Message, error) {
307
	pbm := new(pb.Message)
308
	pbm.Requests = make([]*pb.Message_Request, 0, len(gsm.requests))
309
	for _, request := range gsm.requests {
310 311 312 313 314 315 316
		var selector []byte
		var err error
		if request.selector != nil {
			selector, err = ipldutil.EncodeNode(request.selector)
			if err != nil {
				return nil, err
			}
317
		}
318
		pbm.Requests = append(pbm.Requests, &pb.Message_Request{
319 320
			Id:         int32(request.id),
			Root:       request.root.Bytes(),
321
			Selector:   selector,
322 323
			Priority:   int32(request.priority),
			Cancel:     request.isCancel,
Hannah Howard's avatar
Hannah Howard committed
324
			Update:     request.isUpdate,
325
			Extensions: request.extensions,
326 327 328
		})
	}

329
	pbm.Responses = make([]*pb.Message_Response, 0, len(gsm.responses))
330
	for _, response := range gsm.responses {
331
		pbm.Responses = append(pbm.Responses, &pb.Message_Response{
332 333 334
			Id:         int32(response.requestID),
			Status:     int32(response.status),
			Extensions: response.extensions,
335 336 337 338
		})
	}

	blocks := gsm.Blocks()
339
	pbm.Data = make([]*pb.Message_Block, 0, len(blocks))
340
	for _, b := range blocks {
341
		pbm.Data = append(pbm.Data, &pb.Message_Block{
342 343
			Data:   b.RawData(),
			Prefix: b.Cid().Prefix().Bytes(),
344 345
		})
	}
346
	return pbm, nil
347 348
}

349
func (gsm *graphSyncMessage) ToNet(w io.Writer) error {
350
	msg, err := gsm.ToProto()
351 352 353 354 355 356 357
	size := proto.Size(msg)
	buf := pool.Get(size + binary.MaxVarintLen64)
	defer pool.Put(buf)

	n := binary.PutUvarint(buf, uint64(size))

	out, err := proto.MarshalOptions{}.MarshalAppend(buf[:n], msg)
358 359 360
	if err != nil {
		return err
	}
361 362
	_, err = w.Write(out)
	return err
363 364
}

365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
func (gsm *graphSyncMessage) Loggable() map[string]interface{} {
	requests := make([]string, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, fmt.Sprintf("%d", request.id))
	}
	responses := make([]string, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, fmt.Sprintf("%d", response.requestID))
	}
	return map[string]interface{}{
		"requests":  requests,
		"responses": responses,
	}
}

380 381 382 383 384 385 386 387 388 389 390 391 392 393
func (gsm *graphSyncMessage) Clone() GraphSyncMessage {
	clone := newMsg()
	for id, request := range gsm.requests {
		clone.requests[id] = request
	}
	for id, response := range gsm.responses {
		clone.responses[id] = response
	}
	for cid, block := range gsm.blocks {
		clone.blocks[cid] = block
	}
	return clone
}

394
// ID Returns the request ID for this Request
395
func (gsr GraphSyncRequest) ID() graphsync.RequestID { return gsr.id }
396

397 398 399
// Root returns the CID to the root block of this request
func (gsr GraphSyncRequest) Root() cid.Cid { return gsr.root }

400
// Selector returns the byte representation of the selector for this request
401
func (gsr GraphSyncRequest) Selector() ipld.Node { return gsr.selector }
402 403

// Priority returns the priority of this request
404
func (gsr GraphSyncRequest) Priority() graphsync.Priority { return gsr.priority }
405

406 407
// Extension returns the content for an extension on a response, or errors
// if extension is not present
408
func (gsr GraphSyncRequest) Extension(name graphsync.ExtensionName) ([]byte, bool) {
409
	if gsr.extensions == nil {
410
		return nil, false
411 412 413
	}
	val, ok := gsr.extensions[string(name)]
	if !ok {
414
		return nil, false
415
	}
416
	return val, true
417 418
}

419 420 421
// IsCancel returns true if this particular request is being cancelled
func (gsr GraphSyncRequest) IsCancel() bool { return gsr.isCancel }

Hannah Howard's avatar
Hannah Howard committed
422 423 424
// IsUpdate returns true if this particular request is being updated
func (gsr GraphSyncRequest) IsUpdate() bool { return gsr.isUpdate }

425
// RequestID returns the request ID for this response
426
func (gsr GraphSyncResponse) RequestID() graphsync.RequestID { return gsr.requestID }
427 428

// Status returns the status for a response
429
func (gsr GraphSyncResponse) Status() graphsync.ResponseStatusCode { return gsr.status }
430

431 432
// Extension returns the content for an extension on a response, or errors
// if extension is not present
433
func (gsr GraphSyncResponse) Extension(name graphsync.ExtensionName) ([]byte, bool) {
434
	if gsr.extensions == nil {
435
		return nil, false
436 437 438
	}
	val, ok := gsr.extensions[string(name)]
	if !ok {
439
		return nil, false
440
	}
441
	return val, true
442 443

}
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484

// ReplaceExtensions merges the extensions given extensions into the request to create a new request,
// but always uses new data
func (gsr GraphSyncRequest) ReplaceExtensions(extensions []graphsync.ExtensionData) GraphSyncRequest {
	req, _ := gsr.MergeExtensions(extensions, func(name graphsync.ExtensionName, oldData []byte, newData []byte) ([]byte, error) {
		return newData, nil
	})
	return req
}

// MergeExtensions merges the given list of extensions to produce a new request with the combination of the old request
// plus the new extensions. When an old extension and a new extension are both present, mergeFunc is called to produce
// the result
func (gsr GraphSyncRequest) MergeExtensions(extensions []graphsync.ExtensionData, mergeFunc func(name graphsync.ExtensionName, oldData []byte, newData []byte) ([]byte, error)) (GraphSyncRequest, error) {
	if gsr.extensions == nil {
		return newRequest(gsr.id, gsr.root, gsr.selector, gsr.priority, gsr.isCancel, gsr.isUpdate, toExtensionsMap(extensions)), nil
	}
	newExtensionMap := toExtensionsMap(extensions)
	combinedExtensions := make(map[string][]byte)
	for name, newData := range newExtensionMap {
		oldData, ok := gsr.extensions[name]
		if !ok {
			combinedExtensions[name] = newData
			continue
		}
		resultData, err := mergeFunc(graphsync.ExtensionName(name), oldData, newData)
		if err != nil {
			return GraphSyncRequest{}, err
		}
		combinedExtensions[name] = resultData
	}

	for name, oldData := range gsr.extensions {
		_, ok := combinedExtensions[name]
		if ok {
			continue
		}
		combinedExtensions[name] = oldData
	}
	return newRequest(gsr.id, gsr.root, gsr.selector, gsr.priority, gsr.isCancel, gsr.isUpdate, combinedExtensions), nil
}