message.go 13.5 KB
Newer Older
1 2 3
package message

import (
4
	"errors"
5
	"fmt"
6
	"io"
7

Hannah Howard's avatar
Hannah Howard committed
8
	ggio "github.com/gogo/protobuf/io"
9
	blocks "github.com/ipfs/go-block-format"
Hannah Howard's avatar
Hannah Howard committed
10
	cid "github.com/ipfs/go-cid"
11
	"github.com/ipld/go-ipld-prime"
Hannah Howard's avatar
Hannah Howard committed
12
	"github.com/libp2p/go-libp2p-core/network"
13 14
	"github.com/libp2p/go-msgio"
	"google.golang.org/protobuf/proto"
15

Hannah Howard's avatar
Hannah Howard committed
16
	"github.com/ipfs/go-graphsync"
17
	"github.com/ipfs/go-graphsync/ipldutil"
18 19 20
	pb "github.com/ipfs/go-graphsync/message/pb"
)

21 22
// IsTerminalSuccessCode returns true if the response code indicates the
// request terminated successfully.
23 24 25
func IsTerminalSuccessCode(status graphsync.ResponseStatusCode) bool {
	return status == graphsync.RequestCompletedFull ||
		status == graphsync.RequestCompletedPartial
26 27 28 29
}

// IsTerminalFailureCode returns true if the response code indicates the
// request terminated in failure.
30 31 32 33
func IsTerminalFailureCode(status graphsync.ResponseStatusCode) bool {
	return status == graphsync.RequestFailedBusy ||
		status == graphsync.RequestFailedContentNotFound ||
		status == graphsync.RequestFailedLegal ||
34 35
		status == graphsync.RequestFailedUnknown ||
		status == graphsync.RequestCancelled
36 37 38 39
}

// IsTerminalResponseCode returns true if the response code signals
// the end of the request
40
func IsTerminalResponseCode(status graphsync.ResponseStatusCode) bool {
41 42 43
	return IsTerminalSuccessCode(status) || IsTerminalFailureCode(status)
}

44 45 46 47 48 49 50
// GraphSyncMessage is interface that can be serialized and deserialized to send
// over the GraphSync network
type GraphSyncMessage interface {
	Requests() []GraphSyncRequest

	Responses() []GraphSyncResponse

51 52
	Blocks() []blocks.Block

53
	AddRequest(graphSyncRequest GraphSyncRequest)
54

55
	AddResponse(graphSyncResponse GraphSyncResponse)
56 57

	AddBlock(blocks.Block)
58

59 60
	Empty() bool

61 62 63
	Exportable

	Loggable() map[string]interface{}
64 65

	Clone() GraphSyncMessage
66 67 68 69
}

// Exportable is an interface that can serialize to a protobuf
type Exportable interface {
70
	ToProto() (*pb.Message, error)
71
	ToNet(w io.Writer) error
72 73
}

74 75 76
// GraphSyncRequest is a struct to capture data on a request contained in a
// GraphSyncMessage.
type GraphSyncRequest struct {
77
	root       cid.Cid
78
	selector   ipld.Node
79 80
	priority   graphsync.Priority
	id         graphsync.RequestID
81 82
	extensions map[string][]byte
	isCancel   bool
Hannah Howard's avatar
Hannah Howard committed
83
	isUpdate   bool
84 85
}

86 87 88
// GraphSyncResponse is an struct to capture data on a response sent back
// in a GraphSyncMessage.
type GraphSyncResponse struct {
89 90
	requestID  graphsync.RequestID
	status     graphsync.ResponseStatusCode
91
	extensions map[string][]byte
92 93 94
}

type graphSyncMessage struct {
95 96
	requests  map[graphsync.RequestID]GraphSyncRequest
	responses map[graphsync.RequestID]GraphSyncResponse
97
	blocks    map[cid.Cid]blocks.Block
98 99 100 101 102 103 104 105 106
}

// New initializes a new blank GraphSyncMessage
func New() GraphSyncMessage {
	return newMsg()
}

func newMsg() *graphSyncMessage {
	return &graphSyncMessage{
107 108
		requests:  make(map[graphsync.RequestID]GraphSyncRequest),
		responses: make(map[graphsync.RequestID]GraphSyncResponse),
109
		blocks:    make(map[cid.Cid]blocks.Block),
110 111 112
	}
}

113
// NewRequest builds a new Graphsync request
114
func NewRequest(id graphsync.RequestID,
115
	root cid.Cid,
116
	selector ipld.Node,
117 118
	priority graphsync.Priority,
	extensions ...graphsync.ExtensionData) GraphSyncRequest {
119

Hannah Howard's avatar
Hannah Howard committed
120
	return newRequest(id, root, selector, priority, false, false, toExtensionsMap(extensions))
121 122 123
}

// CancelRequest request generates a request to cancel an in progress request
124
func CancelRequest(id graphsync.RequestID) GraphSyncRequest {
Hannah Howard's avatar
Hannah Howard committed
125 126 127 128 129 130
	return newRequest(id, cid.Cid{}, nil, 0, true, false, nil)
}

// UpdateRequest generates a new request to update an in progress request with the given extensions
func UpdateRequest(id graphsync.RequestID, extensions ...graphsync.ExtensionData) GraphSyncRequest {
	return newRequest(id, cid.Cid{}, nil, 0, false, true, toExtensionsMap(extensions))
131 132
}

133
func toExtensionsMap(extensions []graphsync.ExtensionData) (extensionsMap map[string][]byte) {
134 135 136 137 138 139 140
	if len(extensions) > 0 {
		extensionsMap = make(map[string][]byte, len(extensions))
		for _, extension := range extensions {
			extensionsMap[string(extension.Name)] = extension.Data
		}
	}
	return
141 142
}

143
func newRequest(id graphsync.RequestID,
144
	root cid.Cid,
145
	selector ipld.Node,
146
	priority graphsync.Priority,
147
	isCancel bool,
Hannah Howard's avatar
Hannah Howard committed
148
	isUpdate bool,
149
	extensions map[string][]byte) GraphSyncRequest {
150
	return GraphSyncRequest{
151 152 153 154 155
		id:         id,
		root:       root,
		selector:   selector,
		priority:   priority,
		isCancel:   isCancel,
Hannah Howard's avatar
Hannah Howard committed
156
		isUpdate:   isUpdate,
157
		extensions: extensions,
158 159 160 161
	}
}

// NewResponse builds a new Graphsync response
162 163 164
func NewResponse(requestID graphsync.RequestID,
	status graphsync.ResponseStatusCode,
	extensions ...graphsync.ExtensionData) GraphSyncResponse {
165 166 167
	return newResponse(requestID, status, toExtensionsMap(extensions))
}

168 169
func newResponse(requestID graphsync.RequestID,
	status graphsync.ResponseStatusCode, extensions map[string][]byte) GraphSyncResponse {
170
	return GraphSyncResponse{
171 172 173
		requestID:  requestID,
		status:     status,
		extensions: extensions,
174 175
	}
}
176
func newMessageFromProto(pbm *pb.Message) (GraphSyncMessage, error) {
177
	gsm := newMsg()
178
	for _, req := range pbm.Requests {
179 180 181
		if req == nil {
			return nil, errors.New("request is nil")
		}
Hannah Howard's avatar
Hannah Howard committed
182 183 184 185 186 187 188
		var root cid.Cid
		var err error
		if !req.Cancel && !req.Update {
			root, err = cid.Cast(req.Root)
			if err != nil {
				return nil, err
			}
189
		}
Hannah Howard's avatar
Hannah Howard committed
190 191 192 193 194 195 196

		var selector ipld.Node
		if !req.Cancel && !req.Update {
			selector, err = ipldutil.DecodeNode(req.Selector)
			if err != nil {
				return nil, err
			}
197
		}
198 199 200 201 202
		exts := req.GetExtensions()
		if exts == nil {
			exts = make(map[string][]byte)
		}
		gsm.AddRequest(newRequest(graphsync.RequestID(req.Id), root, selector, graphsync.Priority(req.Priority), req.Cancel, req.Update, exts))
203 204 205
	}

	for _, res := range pbm.Responses {
206 207 208 209 210 211 212 213
		if res == nil {
			return nil, errors.New("response is nil")
		}
		exts := res.GetExtensions()
		if exts == nil {
			exts = make(map[string][]byte)
		}
		gsm.AddResponse(newResponse(graphsync.RequestID(res.Id), graphsync.ResponseStatusCode(res.Status), exts))
214 215 216
	}

	for _, b := range pbm.GetData() {
217 218 219 220
		if b == nil {
			return nil, errors.New("block is nil")
		}

221
		pref, err := cid.PrefixFromBytes(b.GetPrefix())
222
		if err != nil {
223 224 225 226 227 228 229 230 231 232 233
			return nil, err
		}

		c, err := pref.Sum(b.GetData())
		if err != nil {
			return nil, err
		}

		blk, err := blocks.NewBlockWithCid(b.GetData(), c)
		if err != nil {
			return nil, err
234
		}
235

236
		gsm.AddBlock(blk)
237 238 239 240 241
	}

	return gsm, nil
}

242 243 244 245
func (gsm *graphSyncMessage) Empty() bool {
	return len(gsm.blocks) == 0 && len(gsm.requests) == 0 && len(gsm.responses) == 0
}

246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
func (gsm *graphSyncMessage) Requests() []GraphSyncRequest {
	requests := make([]GraphSyncRequest, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, request)
	}
	return requests
}

func (gsm *graphSyncMessage) Responses() []GraphSyncResponse {
	responses := make([]GraphSyncResponse, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, response)
	}
	return responses
}

262 263 264 265 266 267 268 269
func (gsm *graphSyncMessage) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(gsm.blocks))
	for _, block := range gsm.blocks {
		bs = append(bs, block)
	}
	return bs
}

270 271
func (gsm *graphSyncMessage) AddRequest(graphSyncRequest GraphSyncRequest) {
	gsm.requests[graphSyncRequest.id] = graphSyncRequest
272 273
}

274 275
func (gsm *graphSyncMessage) AddResponse(graphSyncResponse GraphSyncResponse) {
	gsm.responses[graphSyncResponse.requestID] = graphSyncResponse
276 277
}

278 279 280 281
func (gsm *graphSyncMessage) AddBlock(b blocks.Block) {
	gsm.blocks[b.Cid()] = b
}

282
// FromNet can read a network stream to deserialized a GraphSyncMessage
283
func FromNet(r io.Reader) (GraphSyncMessage, error) {
284 285
	reader := msgio.NewVarintReaderSize(r, network.MessageSizeMax)
	return FromMsgReader(reader)
286 287
}

288 289 290 291 292 293 294 295 296 297 298
// FromMsgReader can deserialize a protobuf message into a GraphySyncMessage.
func FromMsgReader(r msgio.Reader) (GraphSyncMessage, error) {
	msg, err := r.ReadMsg()
	if err != nil {
		return nil, err
	}

	var pb pb.Message
	err = proto.Unmarshal(msg, &pb)
	r.ReleaseMsg(msg)
	if err != nil {
299 300 301
		return nil, err
	}

302
	return newMessageFromProto(&pb)
303 304
}

305
func (gsm *graphSyncMessage) ToProto() (*pb.Message, error) {
306
	pbm := new(pb.Message)
307
	pbm.Requests = make([]*pb.Message_Request, 0, len(gsm.requests))
308
	for _, request := range gsm.requests {
309 310 311 312 313 314 315
		var selector []byte
		var err error
		if request.selector != nil {
			selector, err = ipldutil.EncodeNode(request.selector)
			if err != nil {
				return nil, err
			}
316
		}
317
		pbm.Requests = append(pbm.Requests, &pb.Message_Request{
318 319
			Id:         int32(request.id),
			Root:       request.root.Bytes(),
320
			Selector:   selector,
321 322
			Priority:   int32(request.priority),
			Cancel:     request.isCancel,
Hannah Howard's avatar
Hannah Howard committed
323
			Update:     request.isUpdate,
324
			Extensions: request.extensions,
325 326 327
		})
	}

328
	pbm.Responses = make([]*pb.Message_Response, 0, len(gsm.responses))
329
	for _, response := range gsm.responses {
330
		pbm.Responses = append(pbm.Responses, &pb.Message_Response{
331 332 333
			Id:         int32(response.requestID),
			Status:     int32(response.status),
			Extensions: response.extensions,
334 335 336 337
		})
	}

	blocks := gsm.Blocks()
338
	pbm.Data = make([]*pb.Message_Block, 0, len(blocks))
339
	for _, b := range blocks {
340
		pbm.Data = append(pbm.Data, &pb.Message_Block{
341 342
			Data:   b.RawData(),
			Prefix: b.Cid().Prefix().Bytes(),
343 344
		})
	}
345
	return pbm, nil
346 347
}

348 349
func (gsm *graphSyncMessage) ToNet(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)
350 351 352 353 354
	msg, err := gsm.ToProto()
	if err != nil {
		return err
	}
	return pbw.WriteMsg(msg)
355 356
}

357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
func (gsm *graphSyncMessage) Loggable() map[string]interface{} {
	requests := make([]string, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, fmt.Sprintf("%d", request.id))
	}
	responses := make([]string, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, fmt.Sprintf("%d", response.requestID))
	}
	return map[string]interface{}{
		"requests":  requests,
		"responses": responses,
	}
}

372 373 374 375 376 377 378 379 380 381 382 383 384 385
func (gsm *graphSyncMessage) Clone() GraphSyncMessage {
	clone := newMsg()
	for id, request := range gsm.requests {
		clone.requests[id] = request
	}
	for id, response := range gsm.responses {
		clone.responses[id] = response
	}
	for cid, block := range gsm.blocks {
		clone.blocks[cid] = block
	}
	return clone
}

386
// ID Returns the request ID for this Request
387
func (gsr GraphSyncRequest) ID() graphsync.RequestID { return gsr.id }
388

389 390 391
// Root returns the CID to the root block of this request
func (gsr GraphSyncRequest) Root() cid.Cid { return gsr.root }

392
// Selector returns the byte representation of the selector for this request
393
func (gsr GraphSyncRequest) Selector() ipld.Node { return gsr.selector }
394 395

// Priority returns the priority of this request
396
func (gsr GraphSyncRequest) Priority() graphsync.Priority { return gsr.priority }
397

398 399
// Extension returns the content for an extension on a response, or errors
// if extension is not present
400
func (gsr GraphSyncRequest) Extension(name graphsync.ExtensionName) ([]byte, bool) {
401
	if gsr.extensions == nil {
402
		return nil, false
403 404 405
	}
	val, ok := gsr.extensions[string(name)]
	if !ok {
406
		return nil, false
407
	}
408
	return val, true
409 410
}

411 412 413
// IsCancel returns true if this particular request is being cancelled
func (gsr GraphSyncRequest) IsCancel() bool { return gsr.isCancel }

Hannah Howard's avatar
Hannah Howard committed
414 415 416
// IsUpdate returns true if this particular request is being updated
func (gsr GraphSyncRequest) IsUpdate() bool { return gsr.isUpdate }

417
// RequestID returns the request ID for this response
418
func (gsr GraphSyncResponse) RequestID() graphsync.RequestID { return gsr.requestID }
419 420

// Status returns the status for a response
421
func (gsr GraphSyncResponse) Status() graphsync.ResponseStatusCode { return gsr.status }
422

423 424
// Extension returns the content for an extension on a response, or errors
// if extension is not present
425
func (gsr GraphSyncResponse) Extension(name graphsync.ExtensionName) ([]byte, bool) {
426
	if gsr.extensions == nil {
427
		return nil, false
428 429 430
	}
	val, ok := gsr.extensions[string(name)]
	if !ok {
431
		return nil, false
432
	}
433
	return val, true
434 435

}
436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476

// ReplaceExtensions merges the extensions given extensions into the request to create a new request,
// but always uses new data
func (gsr GraphSyncRequest) ReplaceExtensions(extensions []graphsync.ExtensionData) GraphSyncRequest {
	req, _ := gsr.MergeExtensions(extensions, func(name graphsync.ExtensionName, oldData []byte, newData []byte) ([]byte, error) {
		return newData, nil
	})
	return req
}

// MergeExtensions merges the given list of extensions to produce a new request with the combination of the old request
// plus the new extensions. When an old extension and a new extension are both present, mergeFunc is called to produce
// the result
func (gsr GraphSyncRequest) MergeExtensions(extensions []graphsync.ExtensionData, mergeFunc func(name graphsync.ExtensionName, oldData []byte, newData []byte) ([]byte, error)) (GraphSyncRequest, error) {
	if gsr.extensions == nil {
		return newRequest(gsr.id, gsr.root, gsr.selector, gsr.priority, gsr.isCancel, gsr.isUpdate, toExtensionsMap(extensions)), nil
	}
	newExtensionMap := toExtensionsMap(extensions)
	combinedExtensions := make(map[string][]byte)
	for name, newData := range newExtensionMap {
		oldData, ok := gsr.extensions[name]
		if !ok {
			combinedExtensions[name] = newData
			continue
		}
		resultData, err := mergeFunc(graphsync.ExtensionName(name), oldData, newData)
		if err != nil {
			return GraphSyncRequest{}, err
		}
		combinedExtensions[name] = resultData
	}

	for name, oldData := range gsr.extensions {
		_, ok := combinedExtensions[name]
		if ok {
			continue
		}
		combinedExtensions[name] = oldData
	}
	return newRequest(gsr.id, gsr.root, gsr.selector, gsr.priority, gsr.isCancel, gsr.isUpdate, combinedExtensions), nil
}