message.go 12.6 KB
Newer Older
1 2 3 4
package message

import (
	"fmt"
5
	"io"
6

Hannah Howard's avatar
Hannah Howard committed
7
	ggio "github.com/gogo/protobuf/io"
8
	blocks "github.com/ipfs/go-block-format"
Hannah Howard's avatar
Hannah Howard committed
9
	cid "github.com/ipfs/go-cid"
10
	"github.com/ipld/go-ipld-prime"
Hannah Howard's avatar
Hannah Howard committed
11
	"github.com/libp2p/go-libp2p-core/network"
12

Hannah Howard's avatar
Hannah Howard committed
13
	"github.com/ipfs/go-graphsync"
14
	"github.com/ipfs/go-graphsync/ipldutil"
15 16 17
	pb "github.com/ipfs/go-graphsync/message/pb"
)

18 19
// IsTerminalSuccessCode returns true if the response code indicates the
// request terminated successfully.
20 21 22
func IsTerminalSuccessCode(status graphsync.ResponseStatusCode) bool {
	return status == graphsync.RequestCompletedFull ||
		status == graphsync.RequestCompletedPartial
23 24 25 26
}

// IsTerminalFailureCode returns true if the response code indicates the
// request terminated in failure.
27 28 29 30
func IsTerminalFailureCode(status graphsync.ResponseStatusCode) bool {
	return status == graphsync.RequestFailedBusy ||
		status == graphsync.RequestFailedContentNotFound ||
		status == graphsync.RequestFailedLegal ||
31 32
		status == graphsync.RequestFailedUnknown ||
		status == graphsync.RequestCancelled
33 34 35 36
}

// IsTerminalResponseCode returns true if the response code signals
// the end of the request
37
func IsTerminalResponseCode(status graphsync.ResponseStatusCode) bool {
38 39 40
	return IsTerminalSuccessCode(status) || IsTerminalFailureCode(status)
}

41 42 43 44 45 46 47
// GraphSyncMessage is interface that can be serialized and deserialized to send
// over the GraphSync network
type GraphSyncMessage interface {
	Requests() []GraphSyncRequest

	Responses() []GraphSyncResponse

48 49
	Blocks() []blocks.Block

50
	AddRequest(graphSyncRequest GraphSyncRequest)
51

52
	AddResponse(graphSyncResponse GraphSyncResponse)
53 54

	AddBlock(blocks.Block)
55

56 57
	Empty() bool

58 59 60 61 62 63 64
	Exportable

	Loggable() map[string]interface{}
}

// Exportable is an interface that can serialize to a protobuf
type Exportable interface {
65
	ToProto() (*pb.Message, error)
66
	ToNet(w io.Writer) error
67 68
}

69 70 71
// GraphSyncRequest is a struct to capture data on a request contained in a
// GraphSyncMessage.
type GraphSyncRequest struct {
72
	root       cid.Cid
73
	selector   ipld.Node
74 75
	priority   graphsync.Priority
	id         graphsync.RequestID
76 77
	extensions map[string][]byte
	isCancel   bool
Hannah Howard's avatar
Hannah Howard committed
78
	isUpdate   bool
79 80
}

81 82 83
// GraphSyncResponse is an struct to capture data on a response sent back
// in a GraphSyncMessage.
type GraphSyncResponse struct {
84 85
	requestID  graphsync.RequestID
	status     graphsync.ResponseStatusCode
86
	extensions map[string][]byte
87 88 89
}

type graphSyncMessage struct {
90 91
	requests  map[graphsync.RequestID]GraphSyncRequest
	responses map[graphsync.RequestID]GraphSyncResponse
92
	blocks    map[cid.Cid]blocks.Block
93 94 95 96 97 98 99 100 101
}

// New initializes a new blank GraphSyncMessage
func New() GraphSyncMessage {
	return newMsg()
}

func newMsg() *graphSyncMessage {
	return &graphSyncMessage{
102 103
		requests:  make(map[graphsync.RequestID]GraphSyncRequest),
		responses: make(map[graphsync.RequestID]GraphSyncResponse),
104
		blocks:    make(map[cid.Cid]blocks.Block),
105 106 107
	}
}

108
// NewRequest builds a new Graphsync request
109
func NewRequest(id graphsync.RequestID,
110
	root cid.Cid,
111
	selector ipld.Node,
112 113
	priority graphsync.Priority,
	extensions ...graphsync.ExtensionData) GraphSyncRequest {
114

Hannah Howard's avatar
Hannah Howard committed
115
	return newRequest(id, root, selector, priority, false, false, toExtensionsMap(extensions))
116 117 118
}

// CancelRequest request generates a request to cancel an in progress request
119
func CancelRequest(id graphsync.RequestID) GraphSyncRequest {
Hannah Howard's avatar
Hannah Howard committed
120 121 122 123 124 125
	return newRequest(id, cid.Cid{}, nil, 0, true, false, nil)
}

// UpdateRequest generates a new request to update an in progress request with the given extensions
func UpdateRequest(id graphsync.RequestID, extensions ...graphsync.ExtensionData) GraphSyncRequest {
	return newRequest(id, cid.Cid{}, nil, 0, false, true, toExtensionsMap(extensions))
126 127
}

128
func toExtensionsMap(extensions []graphsync.ExtensionData) (extensionsMap map[string][]byte) {
129 130 131 132 133 134 135
	if len(extensions) > 0 {
		extensionsMap = make(map[string][]byte, len(extensions))
		for _, extension := range extensions {
			extensionsMap[string(extension.Name)] = extension.Data
		}
	}
	return
136 137
}

138
func newRequest(id graphsync.RequestID,
139
	root cid.Cid,
140
	selector ipld.Node,
141
	priority graphsync.Priority,
142
	isCancel bool,
Hannah Howard's avatar
Hannah Howard committed
143
	isUpdate bool,
144
	extensions map[string][]byte) GraphSyncRequest {
145
	return GraphSyncRequest{
146 147 148 149 150
		id:         id,
		root:       root,
		selector:   selector,
		priority:   priority,
		isCancel:   isCancel,
Hannah Howard's avatar
Hannah Howard committed
151
		isUpdate:   isUpdate,
152
		extensions: extensions,
153 154 155 156
	}
}

// NewResponse builds a new Graphsync response
157 158 159
func NewResponse(requestID graphsync.RequestID,
	status graphsync.ResponseStatusCode,
	extensions ...graphsync.ExtensionData) GraphSyncResponse {
160 161 162
	return newResponse(requestID, status, toExtensionsMap(extensions))
}

163 164
func newResponse(requestID graphsync.RequestID,
	status graphsync.ResponseStatusCode, extensions map[string][]byte) GraphSyncResponse {
165
	return GraphSyncResponse{
166 167 168
		requestID:  requestID,
		status:     status,
		extensions: extensions,
169 170
	}
}
171
func newMessageFromProto(pbm pb.Message) (GraphSyncMessage, error) {
172
	gsm := newMsg()
173
	for _, req := range pbm.Requests {
Hannah Howard's avatar
Hannah Howard committed
174 175 176 177 178 179 180
		var root cid.Cid
		var err error
		if !req.Cancel && !req.Update {
			root, err = cid.Cast(req.Root)
			if err != nil {
				return nil, err
			}
181
		}
Hannah Howard's avatar
Hannah Howard committed
182 183 184 185 186 187 188

		var selector ipld.Node
		if !req.Cancel && !req.Update {
			selector, err = ipldutil.DecodeNode(req.Selector)
			if err != nil {
				return nil, err
			}
189
		}
Hannah Howard's avatar
Hannah Howard committed
190
		gsm.AddRequest(newRequest(graphsync.RequestID(req.Id), root, selector, graphsync.Priority(req.Priority), req.Cancel, req.Update, req.GetExtensions()))
191 192 193
	}

	for _, res := range pbm.Responses {
194
		gsm.AddResponse(newResponse(graphsync.RequestID(res.Id), graphsync.ResponseStatusCode(res.Status), res.GetExtensions()))
195 196 197 198
	}

	for _, b := range pbm.GetData() {
		pref, err := cid.PrefixFromBytes(b.GetPrefix())
199
		if err != nil {
200 201 202 203 204 205 206 207 208 209 210
			return nil, err
		}

		c, err := pref.Sum(b.GetData())
		if err != nil {
			return nil, err
		}

		blk, err := blocks.NewBlockWithCid(b.GetData(), c)
		if err != nil {
			return nil, err
211
		}
212

213
		gsm.AddBlock(blk)
214 215 216 217 218
	}

	return gsm, nil
}

219 220 221 222
func (gsm *graphSyncMessage) Empty() bool {
	return len(gsm.blocks) == 0 && len(gsm.requests) == 0 && len(gsm.responses) == 0
}

223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
func (gsm *graphSyncMessage) Requests() []GraphSyncRequest {
	requests := make([]GraphSyncRequest, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, request)
	}
	return requests
}

func (gsm *graphSyncMessage) Responses() []GraphSyncResponse {
	responses := make([]GraphSyncResponse, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, response)
	}
	return responses
}

239 240 241 242 243 244 245 246
func (gsm *graphSyncMessage) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(gsm.blocks))
	for _, block := range gsm.blocks {
		bs = append(bs, block)
	}
	return bs
}

247 248
func (gsm *graphSyncMessage) AddRequest(graphSyncRequest GraphSyncRequest) {
	gsm.requests[graphSyncRequest.id] = graphSyncRequest
249 250
}

251 252
func (gsm *graphSyncMessage) AddResponse(graphSyncResponse GraphSyncResponse) {
	gsm.responses[graphSyncResponse.requestID] = graphSyncResponse
253 254
}

255 256 257 258
func (gsm *graphSyncMessage) AddBlock(b blocks.Block) {
	gsm.blocks[b.Cid()] = b
}

259
// FromNet can read a network stream to deserialized a GraphSyncMessage
260
func FromNet(r io.Reader) (GraphSyncMessage, error) {
261
	pbr := ggio.NewDelimitedReader(r, network.MessageSizeMax)
262
	return FromPBReader(pbr)
263 264
}

265
// FromPBReader can deserialize a protobuf message into a GraphySyncMessage.
266
func FromPBReader(pbr ggio.Reader) (GraphSyncMessage, error) {
267 268 269 270 271
	pb := new(pb.Message)
	if err := pbr.ReadMsg(pb); err != nil {
		return nil, err
	}

272
	return newMessageFromProto(*pb)
273 274
}

275
func (gsm *graphSyncMessage) ToProto() (*pb.Message, error) {
276
	pbm := new(pb.Message)
277
	pbm.Requests = make([]pb.Message_Request, 0, len(gsm.requests))
278
	for _, request := range gsm.requests {
279 280 281 282 283 284 285
		var selector []byte
		var err error
		if request.selector != nil {
			selector, err = ipldutil.EncodeNode(request.selector)
			if err != nil {
				return nil, err
			}
286
		}
287
		pbm.Requests = append(pbm.Requests, pb.Message_Request{
288 289
			Id:         int32(request.id),
			Root:       request.root.Bytes(),
290
			Selector:   selector,
291 292
			Priority:   int32(request.priority),
			Cancel:     request.isCancel,
Hannah Howard's avatar
Hannah Howard committed
293
			Update:     request.isUpdate,
294
			Extensions: request.extensions,
295 296 297
		})
	}

298
	pbm.Responses = make([]pb.Message_Response, 0, len(gsm.responses))
299
	for _, response := range gsm.responses {
300
		pbm.Responses = append(pbm.Responses, pb.Message_Response{
301 302 303
			Id:         int32(response.requestID),
			Status:     int32(response.status),
			Extensions: response.extensions,
304 305 306 307 308 309 310 311 312
		})
	}

	blocks := gsm.Blocks()
	pbm.Data = make([]pb.Message_Block, 0, len(blocks))
	for _, b := range blocks {
		pbm.Data = append(pbm.Data, pb.Message_Block{
			Data:   b.RawData(),
			Prefix: b.Cid().Prefix().Bytes(),
313 314
		})
	}
315
	return pbm, nil
316 317
}

318 319
func (gsm *graphSyncMessage) ToNet(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)
320 321 322 323 324
	msg, err := gsm.ToProto()
	if err != nil {
		return err
	}
	return pbw.WriteMsg(msg)
325 326
}

327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
func (gsm *graphSyncMessage) Loggable() map[string]interface{} {
	requests := make([]string, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, fmt.Sprintf("%d", request.id))
	}
	responses := make([]string, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, fmt.Sprintf("%d", response.requestID))
	}
	return map[string]interface{}{
		"requests":  requests,
		"responses": responses,
	}
}

342
// ID Returns the request ID for this Request
343
func (gsr GraphSyncRequest) ID() graphsync.RequestID { return gsr.id }
344

345 346 347
// Root returns the CID to the root block of this request
func (gsr GraphSyncRequest) Root() cid.Cid { return gsr.root }

348
// Selector returns the byte representation of the selector for this request
349
func (gsr GraphSyncRequest) Selector() ipld.Node { return gsr.selector }
350 351

// Priority returns the priority of this request
352
func (gsr GraphSyncRequest) Priority() graphsync.Priority { return gsr.priority }
353

354 355
// Extension returns the content for an extension on a response, or errors
// if extension is not present
356
func (gsr GraphSyncRequest) Extension(name graphsync.ExtensionName) ([]byte, bool) {
357
	if gsr.extensions == nil {
358
		return nil, false
359 360 361
	}
	val, ok := gsr.extensions[string(name)]
	if !ok {
362
		return nil, false
363
	}
364
	return val, true
365 366
}

367 368 369
// IsCancel returns true if this particular request is being cancelled
func (gsr GraphSyncRequest) IsCancel() bool { return gsr.isCancel }

Hannah Howard's avatar
Hannah Howard committed
370 371 372
// IsUpdate returns true if this particular request is being updated
func (gsr GraphSyncRequest) IsUpdate() bool { return gsr.isUpdate }

373
// RequestID returns the request ID for this response
374
func (gsr GraphSyncResponse) RequestID() graphsync.RequestID { return gsr.requestID }
375 376

// Status returns the status for a response
377
func (gsr GraphSyncResponse) Status() graphsync.ResponseStatusCode { return gsr.status }
378

379 380
// Extension returns the content for an extension on a response, or errors
// if extension is not present
381
func (gsr GraphSyncResponse) Extension(name graphsync.ExtensionName) ([]byte, bool) {
382
	if gsr.extensions == nil {
383
		return nil, false
384 385 386
	}
	val, ok := gsr.extensions[string(name)]
	if !ok {
387
		return nil, false
388
	}
389
	return val, true
390 391

}
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432

// ReplaceExtensions merges the extensions given extensions into the request to create a new request,
// but always uses new data
func (gsr GraphSyncRequest) ReplaceExtensions(extensions []graphsync.ExtensionData) GraphSyncRequest {
	req, _ := gsr.MergeExtensions(extensions, func(name graphsync.ExtensionName, oldData []byte, newData []byte) ([]byte, error) {
		return newData, nil
	})
	return req
}

// MergeExtensions merges the given list of extensions to produce a new request with the combination of the old request
// plus the new extensions. When an old extension and a new extension are both present, mergeFunc is called to produce
// the result
func (gsr GraphSyncRequest) MergeExtensions(extensions []graphsync.ExtensionData, mergeFunc func(name graphsync.ExtensionName, oldData []byte, newData []byte) ([]byte, error)) (GraphSyncRequest, error) {
	if gsr.extensions == nil {
		return newRequest(gsr.id, gsr.root, gsr.selector, gsr.priority, gsr.isCancel, gsr.isUpdate, toExtensionsMap(extensions)), nil
	}
	newExtensionMap := toExtensionsMap(extensions)
	combinedExtensions := make(map[string][]byte)
	for name, newData := range newExtensionMap {
		oldData, ok := gsr.extensions[name]
		if !ok {
			combinedExtensions[name] = newData
			continue
		}
		resultData, err := mergeFunc(graphsync.ExtensionName(name), oldData, newData)
		if err != nil {
			return GraphSyncRequest{}, err
		}
		combinedExtensions[name] = resultData
	}

	for name, oldData := range gsr.extensions {
		_, ok := combinedExtensions[name]
		if ok {
			continue
		}
		combinedExtensions[name] = oldData
	}
	return newRequest(gsr.id, gsr.root, gsr.selector, gsr.priority, gsr.isCancel, gsr.isUpdate, combinedExtensions), nil
}