message.go 12.9 KB
Newer Older
1 2 3 4
package message

import (
	"fmt"
5
	"io"
6

Hannah Howard's avatar
Hannah Howard committed
7
	ggio "github.com/gogo/protobuf/io"
8
	blocks "github.com/ipfs/go-block-format"
Hannah Howard's avatar
Hannah Howard committed
9
	cid "github.com/ipfs/go-cid"
10
	"github.com/ipld/go-ipld-prime"
Hannah Howard's avatar
Hannah Howard committed
11
	"github.com/libp2p/go-libp2p-core/network"
12

Hannah Howard's avatar
Hannah Howard committed
13
	"github.com/ipfs/go-graphsync"
14
	"github.com/ipfs/go-graphsync/ipldutil"
15 16 17
	pb "github.com/ipfs/go-graphsync/message/pb"
)

18 19
// IsTerminalSuccessCode returns true if the response code indicates the
// request terminated successfully.
20 21 22
func IsTerminalSuccessCode(status graphsync.ResponseStatusCode) bool {
	return status == graphsync.RequestCompletedFull ||
		status == graphsync.RequestCompletedPartial
23 24 25 26
}

// IsTerminalFailureCode returns true if the response code indicates the
// request terminated in failure.
27 28 29 30
func IsTerminalFailureCode(status graphsync.ResponseStatusCode) bool {
	return status == graphsync.RequestFailedBusy ||
		status == graphsync.RequestFailedContentNotFound ||
		status == graphsync.RequestFailedLegal ||
31 32
		status == graphsync.RequestFailedUnknown ||
		status == graphsync.RequestCancelled
33 34 35 36
}

// IsTerminalResponseCode returns true if the response code signals
// the end of the request
37
func IsTerminalResponseCode(status graphsync.ResponseStatusCode) bool {
38 39 40
	return IsTerminalSuccessCode(status) || IsTerminalFailureCode(status)
}

41 42 43 44 45 46 47
// GraphSyncMessage is interface that can be serialized and deserialized to send
// over the GraphSync network
type GraphSyncMessage interface {
	Requests() []GraphSyncRequest

	Responses() []GraphSyncResponse

48 49
	Blocks() []blocks.Block

50
	AddRequest(graphSyncRequest GraphSyncRequest)
51

52
	AddResponse(graphSyncResponse GraphSyncResponse)
53 54

	AddBlock(blocks.Block)
55

56 57
	Empty() bool

58 59 60
	Exportable

	Loggable() map[string]interface{}
61 62

	Clone() GraphSyncMessage
63 64 65 66
}

// Exportable is an interface that can serialize to a protobuf
type Exportable interface {
67
	ToProto() (*pb.Message, error)
68
	ToNet(w io.Writer) error
69 70
}

71 72 73
// GraphSyncRequest is a struct to capture data on a request contained in a
// GraphSyncMessage.
type GraphSyncRequest struct {
74
	root       cid.Cid
75
	selector   ipld.Node
76 77
	priority   graphsync.Priority
	id         graphsync.RequestID
78 79
	extensions map[string][]byte
	isCancel   bool
Hannah Howard's avatar
Hannah Howard committed
80
	isUpdate   bool
81 82
}

83 84 85
// GraphSyncResponse is an struct to capture data on a response sent back
// in a GraphSyncMessage.
type GraphSyncResponse struct {
86 87
	requestID  graphsync.RequestID
	status     graphsync.ResponseStatusCode
88
	extensions map[string][]byte
89 90 91
}

type graphSyncMessage struct {
92 93
	requests  map[graphsync.RequestID]GraphSyncRequest
	responses map[graphsync.RequestID]GraphSyncResponse
94
	blocks    map[cid.Cid]blocks.Block
95 96 97 98 99 100 101 102 103
}

// New initializes a new blank GraphSyncMessage
func New() GraphSyncMessage {
	return newMsg()
}

func newMsg() *graphSyncMessage {
	return &graphSyncMessage{
104 105
		requests:  make(map[graphsync.RequestID]GraphSyncRequest),
		responses: make(map[graphsync.RequestID]GraphSyncResponse),
106
		blocks:    make(map[cid.Cid]blocks.Block),
107 108 109
	}
}

110
// NewRequest builds a new Graphsync request
111
func NewRequest(id graphsync.RequestID,
112
	root cid.Cid,
113
	selector ipld.Node,
114 115
	priority graphsync.Priority,
	extensions ...graphsync.ExtensionData) GraphSyncRequest {
116

Hannah Howard's avatar
Hannah Howard committed
117
	return newRequest(id, root, selector, priority, false, false, toExtensionsMap(extensions))
118 119 120
}

// CancelRequest request generates a request to cancel an in progress request
121
func CancelRequest(id graphsync.RequestID) GraphSyncRequest {
Hannah Howard's avatar
Hannah Howard committed
122 123 124 125 126 127
	return newRequest(id, cid.Cid{}, nil, 0, true, false, nil)
}

// UpdateRequest generates a new request to update an in progress request with the given extensions
func UpdateRequest(id graphsync.RequestID, extensions ...graphsync.ExtensionData) GraphSyncRequest {
	return newRequest(id, cid.Cid{}, nil, 0, false, true, toExtensionsMap(extensions))
128 129
}

130
func toExtensionsMap(extensions []graphsync.ExtensionData) (extensionsMap map[string][]byte) {
131 132 133 134 135 136 137
	if len(extensions) > 0 {
		extensionsMap = make(map[string][]byte, len(extensions))
		for _, extension := range extensions {
			extensionsMap[string(extension.Name)] = extension.Data
		}
	}
	return
138 139
}

140
func newRequest(id graphsync.RequestID,
141
	root cid.Cid,
142
	selector ipld.Node,
143
	priority graphsync.Priority,
144
	isCancel bool,
Hannah Howard's avatar
Hannah Howard committed
145
	isUpdate bool,
146
	extensions map[string][]byte) GraphSyncRequest {
147
	return GraphSyncRequest{
148 149 150 151 152
		id:         id,
		root:       root,
		selector:   selector,
		priority:   priority,
		isCancel:   isCancel,
Hannah Howard's avatar
Hannah Howard committed
153
		isUpdate:   isUpdate,
154
		extensions: extensions,
155 156 157 158
	}
}

// NewResponse builds a new Graphsync response
159 160 161
func NewResponse(requestID graphsync.RequestID,
	status graphsync.ResponseStatusCode,
	extensions ...graphsync.ExtensionData) GraphSyncResponse {
162 163 164
	return newResponse(requestID, status, toExtensionsMap(extensions))
}

165 166
func newResponse(requestID graphsync.RequestID,
	status graphsync.ResponseStatusCode, extensions map[string][]byte) GraphSyncResponse {
167
	return GraphSyncResponse{
168 169 170
		requestID:  requestID,
		status:     status,
		extensions: extensions,
171 172
	}
}
173
func newMessageFromProto(pbm pb.Message) (GraphSyncMessage, error) {
174
	gsm := newMsg()
175
	for _, req := range pbm.Requests {
Hannah Howard's avatar
Hannah Howard committed
176 177 178 179 180 181 182
		var root cid.Cid
		var err error
		if !req.Cancel && !req.Update {
			root, err = cid.Cast(req.Root)
			if err != nil {
				return nil, err
			}
183
		}
Hannah Howard's avatar
Hannah Howard committed
184 185 186 187 188 189 190

		var selector ipld.Node
		if !req.Cancel && !req.Update {
			selector, err = ipldutil.DecodeNode(req.Selector)
			if err != nil {
				return nil, err
			}
191
		}
Hannah Howard's avatar
Hannah Howard committed
192
		gsm.AddRequest(newRequest(graphsync.RequestID(req.Id), root, selector, graphsync.Priority(req.Priority), req.Cancel, req.Update, req.GetExtensions()))
193 194 195
	}

	for _, res := range pbm.Responses {
196
		gsm.AddResponse(newResponse(graphsync.RequestID(res.Id), graphsync.ResponseStatusCode(res.Status), res.GetExtensions()))
197 198 199 200
	}

	for _, b := range pbm.GetData() {
		pref, err := cid.PrefixFromBytes(b.GetPrefix())
201
		if err != nil {
202 203 204 205 206 207 208 209 210 211 212
			return nil, err
		}

		c, err := pref.Sum(b.GetData())
		if err != nil {
			return nil, err
		}

		blk, err := blocks.NewBlockWithCid(b.GetData(), c)
		if err != nil {
			return nil, err
213
		}
214

215
		gsm.AddBlock(blk)
216 217 218 219 220
	}

	return gsm, nil
}

221 222 223 224
func (gsm *graphSyncMessage) Empty() bool {
	return len(gsm.blocks) == 0 && len(gsm.requests) == 0 && len(gsm.responses) == 0
}

225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
func (gsm *graphSyncMessage) Requests() []GraphSyncRequest {
	requests := make([]GraphSyncRequest, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, request)
	}
	return requests
}

func (gsm *graphSyncMessage) Responses() []GraphSyncResponse {
	responses := make([]GraphSyncResponse, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, response)
	}
	return responses
}

241 242 243 244 245 246 247 248
func (gsm *graphSyncMessage) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(gsm.blocks))
	for _, block := range gsm.blocks {
		bs = append(bs, block)
	}
	return bs
}

249 250
func (gsm *graphSyncMessage) AddRequest(graphSyncRequest GraphSyncRequest) {
	gsm.requests[graphSyncRequest.id] = graphSyncRequest
251 252
}

253 254
func (gsm *graphSyncMessage) AddResponse(graphSyncResponse GraphSyncResponse) {
	gsm.responses[graphSyncResponse.requestID] = graphSyncResponse
255 256
}

257 258 259 260
func (gsm *graphSyncMessage) AddBlock(b blocks.Block) {
	gsm.blocks[b.Cid()] = b
}

261
// FromNet can read a network stream to deserialized a GraphSyncMessage
262
func FromNet(r io.Reader) (GraphSyncMessage, error) {
263
	pbr := ggio.NewDelimitedReader(r, network.MessageSizeMax)
264
	return FromPBReader(pbr)
265 266
}

267
// FromPBReader can deserialize a protobuf message into a GraphySyncMessage.
268
func FromPBReader(pbr ggio.Reader) (GraphSyncMessage, error) {
269 270 271 272 273
	pb := new(pb.Message)
	if err := pbr.ReadMsg(pb); err != nil {
		return nil, err
	}

274
	return newMessageFromProto(*pb)
275 276
}

277
func (gsm *graphSyncMessage) ToProto() (*pb.Message, error) {
278
	pbm := new(pb.Message)
279
	pbm.Requests = make([]pb.Message_Request, 0, len(gsm.requests))
280
	for _, request := range gsm.requests {
281 282 283 284 285 286 287
		var selector []byte
		var err error
		if request.selector != nil {
			selector, err = ipldutil.EncodeNode(request.selector)
			if err != nil {
				return nil, err
			}
288
		}
289
		pbm.Requests = append(pbm.Requests, pb.Message_Request{
290 291
			Id:         int32(request.id),
			Root:       request.root.Bytes(),
292
			Selector:   selector,
293 294
			Priority:   int32(request.priority),
			Cancel:     request.isCancel,
Hannah Howard's avatar
Hannah Howard committed
295
			Update:     request.isUpdate,
296
			Extensions: request.extensions,
297 298 299
		})
	}

300
	pbm.Responses = make([]pb.Message_Response, 0, len(gsm.responses))
301
	for _, response := range gsm.responses {
302
		pbm.Responses = append(pbm.Responses, pb.Message_Response{
303 304 305
			Id:         int32(response.requestID),
			Status:     int32(response.status),
			Extensions: response.extensions,
306 307 308 309 310 311 312 313 314
		})
	}

	blocks := gsm.Blocks()
	pbm.Data = make([]pb.Message_Block, 0, len(blocks))
	for _, b := range blocks {
		pbm.Data = append(pbm.Data, pb.Message_Block{
			Data:   b.RawData(),
			Prefix: b.Cid().Prefix().Bytes(),
315 316
		})
	}
317
	return pbm, nil
318 319
}

320 321
func (gsm *graphSyncMessage) ToNet(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)
322 323 324 325 326
	msg, err := gsm.ToProto()
	if err != nil {
		return err
	}
	return pbw.WriteMsg(msg)
327 328
}

329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
func (gsm *graphSyncMessage) Loggable() map[string]interface{} {
	requests := make([]string, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, fmt.Sprintf("%d", request.id))
	}
	responses := make([]string, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, fmt.Sprintf("%d", response.requestID))
	}
	return map[string]interface{}{
		"requests":  requests,
		"responses": responses,
	}
}

344 345 346 347 348 349 350 351 352 353 354 355 356 357
func (gsm *graphSyncMessage) Clone() GraphSyncMessage {
	clone := newMsg()
	for id, request := range gsm.requests {
		clone.requests[id] = request
	}
	for id, response := range gsm.responses {
		clone.responses[id] = response
	}
	for cid, block := range gsm.blocks {
		clone.blocks[cid] = block
	}
	return clone
}

358
// ID Returns the request ID for this Request
359
func (gsr GraphSyncRequest) ID() graphsync.RequestID { return gsr.id }
360

361 362 363
// Root returns the CID to the root block of this request
func (gsr GraphSyncRequest) Root() cid.Cid { return gsr.root }

364
// Selector returns the byte representation of the selector for this request
365
func (gsr GraphSyncRequest) Selector() ipld.Node { return gsr.selector }
366 367

// Priority returns the priority of this request
368
func (gsr GraphSyncRequest) Priority() graphsync.Priority { return gsr.priority }
369

370 371
// Extension returns the content for an extension on a response, or errors
// if extension is not present
372
func (gsr GraphSyncRequest) Extension(name graphsync.ExtensionName) ([]byte, bool) {
373
	if gsr.extensions == nil {
374
		return nil, false
375 376 377
	}
	val, ok := gsr.extensions[string(name)]
	if !ok {
378
		return nil, false
379
	}
380
	return val, true
381 382
}

383 384 385
// IsCancel returns true if this particular request is being cancelled
func (gsr GraphSyncRequest) IsCancel() bool { return gsr.isCancel }

Hannah Howard's avatar
Hannah Howard committed
386 387 388
// IsUpdate returns true if this particular request is being updated
func (gsr GraphSyncRequest) IsUpdate() bool { return gsr.isUpdate }

389
// RequestID returns the request ID for this response
390
func (gsr GraphSyncResponse) RequestID() graphsync.RequestID { return gsr.requestID }
391 392

// Status returns the status for a response
393
func (gsr GraphSyncResponse) Status() graphsync.ResponseStatusCode { return gsr.status }
394

395 396
// Extension returns the content for an extension on a response, or errors
// if extension is not present
397
func (gsr GraphSyncResponse) Extension(name graphsync.ExtensionName) ([]byte, bool) {
398
	if gsr.extensions == nil {
399
		return nil, false
400 401 402
	}
	val, ok := gsr.extensions[string(name)]
	if !ok {
403
		return nil, false
404
	}
405
	return val, true
406 407

}
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448

// ReplaceExtensions merges the extensions given extensions into the request to create a new request,
// but always uses new data
func (gsr GraphSyncRequest) ReplaceExtensions(extensions []graphsync.ExtensionData) GraphSyncRequest {
	req, _ := gsr.MergeExtensions(extensions, func(name graphsync.ExtensionName, oldData []byte, newData []byte) ([]byte, error) {
		return newData, nil
	})
	return req
}

// MergeExtensions merges the given list of extensions to produce a new request with the combination of the old request
// plus the new extensions. When an old extension and a new extension are both present, mergeFunc is called to produce
// the result
func (gsr GraphSyncRequest) MergeExtensions(extensions []graphsync.ExtensionData, mergeFunc func(name graphsync.ExtensionName, oldData []byte, newData []byte) ([]byte, error)) (GraphSyncRequest, error) {
	if gsr.extensions == nil {
		return newRequest(gsr.id, gsr.root, gsr.selector, gsr.priority, gsr.isCancel, gsr.isUpdate, toExtensionsMap(extensions)), nil
	}
	newExtensionMap := toExtensionsMap(extensions)
	combinedExtensions := make(map[string][]byte)
	for name, newData := range newExtensionMap {
		oldData, ok := gsr.extensions[name]
		if !ok {
			combinedExtensions[name] = newData
			continue
		}
		resultData, err := mergeFunc(graphsync.ExtensionName(name), oldData, newData)
		if err != nil {
			return GraphSyncRequest{}, err
		}
		combinedExtensions[name] = resultData
	}

	for name, oldData := range gsr.extensions {
		_, ok := combinedExtensions[name]
		if ok {
			continue
		}
		combinedExtensions[name] = oldData
	}
	return newRequest(gsr.id, gsr.root, gsr.selector, gsr.priority, gsr.isCancel, gsr.isUpdate, combinedExtensions), nil
}