message.go 10.2 KB
Newer Older
1 2 3 4
package message

import (
	"fmt"
5
	"io"
6

7
	blocks "github.com/ipfs/go-block-format"
8
	"github.com/ipfs/go-graphsync"
9
	"github.com/ipld/go-ipld-prime"
10

11
	ggio "github.com/gogo/protobuf/io"
12
	cid "github.com/ipfs/go-cid"
13
	"github.com/ipfs/go-graphsync/ipldutil"
14
	pb "github.com/ipfs/go-graphsync/message/pb"
15
	"github.com/libp2p/go-libp2p-core/network"
16 17
)

18 19
// IsTerminalSuccessCode returns true if the response code indicates the
// request terminated successfully.
20 21 22
func IsTerminalSuccessCode(status graphsync.ResponseStatusCode) bool {
	return status == graphsync.RequestCompletedFull ||
		status == graphsync.RequestCompletedPartial
23 24 25 26
}

// IsTerminalFailureCode returns true if the response code indicates the
// request terminated in failure.
27 28 29 30 31
func IsTerminalFailureCode(status graphsync.ResponseStatusCode) bool {
	return status == graphsync.RequestFailedBusy ||
		status == graphsync.RequestFailedContentNotFound ||
		status == graphsync.RequestFailedLegal ||
		status == graphsync.RequestFailedUnknown
32 33 34 35
}

// IsTerminalResponseCode returns true if the response code signals
// the end of the request
36
func IsTerminalResponseCode(status graphsync.ResponseStatusCode) bool {
37 38 39
	return IsTerminalSuccessCode(status) || IsTerminalFailureCode(status)
}

40 41 42 43 44 45 46
// GraphSyncMessage is interface that can be serialized and deserialized to send
// over the GraphSync network
type GraphSyncMessage interface {
	Requests() []GraphSyncRequest

	Responses() []GraphSyncResponse

47 48
	Blocks() []blocks.Block

49
	AddRequest(graphSyncRequest GraphSyncRequest)
50

51
	AddResponse(graphSyncResponse GraphSyncResponse)
52 53

	AddBlock(blocks.Block)
54

55 56
	Empty() bool

57 58 59 60 61 62 63
	Exportable

	Loggable() map[string]interface{}
}

// Exportable is an interface that can serialize to a protobuf
type Exportable interface {
64
	ToProto() (*pb.Message, error)
65
	ToNet(w io.Writer) error
66 67
}

68 69 70
// GraphSyncRequest is a struct to capture data on a request contained in a
// GraphSyncMessage.
type GraphSyncRequest struct {
71
	root       cid.Cid
72
	selector   ipld.Node
73 74
	priority   graphsync.Priority
	id         graphsync.RequestID
75 76
	extensions map[string][]byte
	isCancel   bool
77 78
}

79 80 81
// GraphSyncResponse is an struct to capture data on a response sent back
// in a GraphSyncMessage.
type GraphSyncResponse struct {
82 83
	requestID  graphsync.RequestID
	status     graphsync.ResponseStatusCode
84
	extensions map[string][]byte
85 86 87
}

type graphSyncMessage struct {
88 89
	requests  map[graphsync.RequestID]GraphSyncRequest
	responses map[graphsync.RequestID]GraphSyncResponse
90
	blocks    map[cid.Cid]blocks.Block
91 92 93 94 95 96 97 98 99
}

// New initializes a new blank GraphSyncMessage
func New() GraphSyncMessage {
	return newMsg()
}

func newMsg() *graphSyncMessage {
	return &graphSyncMessage{
100 101
		requests:  make(map[graphsync.RequestID]GraphSyncRequest),
		responses: make(map[graphsync.RequestID]GraphSyncResponse),
102
		blocks:    make(map[cid.Cid]blocks.Block),
103 104 105
	}
}

106
// NewRequest builds a new Graphsync request
107
func NewRequest(id graphsync.RequestID,
108
	root cid.Cid,
109
	selector ipld.Node,
110 111
	priority graphsync.Priority,
	extensions ...graphsync.ExtensionData) GraphSyncRequest {
112 113

	return newRequest(id, root, selector, priority, false, toExtensionsMap(extensions))
114 115 116
}

// CancelRequest request generates a request to cancel an in progress request
117
func CancelRequest(id graphsync.RequestID) GraphSyncRequest {
118 119 120
	return newRequest(id, cid.Cid{}, nil, 0, true, nil)
}

121
func toExtensionsMap(extensions []graphsync.ExtensionData) (extensionsMap map[string][]byte) {
122 123 124 125 126 127 128
	if len(extensions) > 0 {
		extensionsMap = make(map[string][]byte, len(extensions))
		for _, extension := range extensions {
			extensionsMap[string(extension.Name)] = extension.Data
		}
	}
	return
129 130
}

131
func newRequest(id graphsync.RequestID,
132
	root cid.Cid,
133
	selector ipld.Node,
134
	priority graphsync.Priority,
135 136
	isCancel bool,
	extensions map[string][]byte) GraphSyncRequest {
137
	return GraphSyncRequest{
138 139 140 141 142 143
		id:         id,
		root:       root,
		selector:   selector,
		priority:   priority,
		isCancel:   isCancel,
		extensions: extensions,
144 145 146 147
	}
}

// NewResponse builds a new Graphsync response
148 149 150
func NewResponse(requestID graphsync.RequestID,
	status graphsync.ResponseStatusCode,
	extensions ...graphsync.ExtensionData) GraphSyncResponse {
151 152 153
	return newResponse(requestID, status, toExtensionsMap(extensions))
}

154 155
func newResponse(requestID graphsync.RequestID,
	status graphsync.ResponseStatusCode, extensions map[string][]byte) GraphSyncResponse {
156
	return GraphSyncResponse{
157 158 159
		requestID:  requestID,
		status:     status,
		extensions: extensions,
160 161
	}
}
162
func newMessageFromProto(pbm pb.Message) (GraphSyncMessage, error) {
163
	gsm := newMsg()
164
	for _, req := range pbm.Requests {
165 166 167 168
		root, err := cid.Cast(req.Root)
		if err != nil {
			return nil, err
		}
169
		selector, err := ipldutil.DecodeNode(req.Selector)
170 171 172 173
		if err != nil {
			return nil, err
		}
		gsm.AddRequest(newRequest(graphsync.RequestID(req.Id), root, selector, graphsync.Priority(req.Priority), req.Cancel, req.GetExtensions()))
174 175 176
	}

	for _, res := range pbm.Responses {
177
		gsm.AddResponse(newResponse(graphsync.RequestID(res.Id), graphsync.ResponseStatusCode(res.Status), res.GetExtensions()))
178 179 180 181
	}

	for _, b := range pbm.GetData() {
		pref, err := cid.PrefixFromBytes(b.GetPrefix())
182
		if err != nil {
183 184 185 186 187 188 189 190 191 192 193
			return nil, err
		}

		c, err := pref.Sum(b.GetData())
		if err != nil {
			return nil, err
		}

		blk, err := blocks.NewBlockWithCid(b.GetData(), c)
		if err != nil {
			return nil, err
194
		}
195

196
		gsm.AddBlock(blk)
197 198 199 200 201
	}

	return gsm, nil
}

202 203 204 205
func (gsm *graphSyncMessage) Empty() bool {
	return len(gsm.blocks) == 0 && len(gsm.requests) == 0 && len(gsm.responses) == 0
}

206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
func (gsm *graphSyncMessage) Requests() []GraphSyncRequest {
	requests := make([]GraphSyncRequest, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, request)
	}
	return requests
}

func (gsm *graphSyncMessage) Responses() []GraphSyncResponse {
	responses := make([]GraphSyncResponse, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, response)
	}
	return responses
}

222 223 224 225 226 227 228 229
func (gsm *graphSyncMessage) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(gsm.blocks))
	for _, block := range gsm.blocks {
		bs = append(bs, block)
	}
	return bs
}

230 231
func (gsm *graphSyncMessage) AddRequest(graphSyncRequest GraphSyncRequest) {
	gsm.requests[graphSyncRequest.id] = graphSyncRequest
232 233
}

234 235
func (gsm *graphSyncMessage) AddResponse(graphSyncResponse GraphSyncResponse) {
	gsm.responses[graphSyncResponse.requestID] = graphSyncResponse
236 237
}

238 239 240 241
func (gsm *graphSyncMessage) AddBlock(b blocks.Block) {
	gsm.blocks[b.Cid()] = b
}

242
// FromNet can read a network stream to deserialized a GraphSyncMessage
243
func FromNet(r io.Reader) (GraphSyncMessage, error) {
244
	pbr := ggio.NewDelimitedReader(r, network.MessageSizeMax)
245
	return FromPBReader(pbr)
246 247
}

248
// FromPBReader can deserialize a protobuf message into a GraphySyncMessage.
249
func FromPBReader(pbr ggio.Reader) (GraphSyncMessage, error) {
250 251 252 253 254
	pb := new(pb.Message)
	if err := pbr.ReadMsg(pb); err != nil {
		return nil, err
	}

255
	return newMessageFromProto(*pb)
256 257
}

258
func (gsm *graphSyncMessage) ToProto() (*pb.Message, error) {
259
	pbm := new(pb.Message)
260
	pbm.Requests = make([]pb.Message_Request, 0, len(gsm.requests))
261
	for _, request := range gsm.requests {
262 263 264 265 266 267 268
		var selector []byte
		var err error
		if request.selector != nil {
			selector, err = ipldutil.EncodeNode(request.selector)
			if err != nil {
				return nil, err
			}
269
		}
270
		pbm.Requests = append(pbm.Requests, pb.Message_Request{
271 272
			Id:         int32(request.id),
			Root:       request.root.Bytes(),
273
			Selector:   selector,
274 275 276
			Priority:   int32(request.priority),
			Cancel:     request.isCancel,
			Extensions: request.extensions,
277 278 279
		})
	}

280
	pbm.Responses = make([]pb.Message_Response, 0, len(gsm.responses))
281
	for _, response := range gsm.responses {
282
		pbm.Responses = append(pbm.Responses, pb.Message_Response{
283 284 285
			Id:         int32(response.requestID),
			Status:     int32(response.status),
			Extensions: response.extensions,
286 287 288 289 290 291 292 293 294
		})
	}

	blocks := gsm.Blocks()
	pbm.Data = make([]pb.Message_Block, 0, len(blocks))
	for _, b := range blocks {
		pbm.Data = append(pbm.Data, pb.Message_Block{
			Data:   b.RawData(),
			Prefix: b.Cid().Prefix().Bytes(),
295 296
		})
	}
297
	return pbm, nil
298 299
}

300 301
func (gsm *graphSyncMessage) ToNet(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)
302 303 304 305 306
	msg, err := gsm.ToProto()
	if err != nil {
		return err
	}
	return pbw.WriteMsg(msg)
307 308
}

309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
func (gsm *graphSyncMessage) Loggable() map[string]interface{} {
	requests := make([]string, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, fmt.Sprintf("%d", request.id))
	}
	responses := make([]string, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, fmt.Sprintf("%d", response.requestID))
	}
	return map[string]interface{}{
		"requests":  requests,
		"responses": responses,
	}
}

324
// ID Returns the request ID for this Request
325
func (gsr GraphSyncRequest) ID() graphsync.RequestID { return gsr.id }
326

327 328 329
// Root returns the CID to the root block of this request
func (gsr GraphSyncRequest) Root() cid.Cid { return gsr.root }

330
// Selector returns the byte representation of the selector for this request
331
func (gsr GraphSyncRequest) Selector() ipld.Node { return gsr.selector }
332 333

// Priority returns the priority of this request
334
func (gsr GraphSyncRequest) Priority() graphsync.Priority { return gsr.priority }
335

336 337
// Extension returns the content for an extension on a response, or errors
// if extension is not present
338
func (gsr GraphSyncRequest) Extension(name graphsync.ExtensionName) ([]byte, bool) {
339
	if gsr.extensions == nil {
340
		return nil, false
341 342 343
	}
	val, ok := gsr.extensions[string(name)]
	if !ok {
344
		return nil, false
345
	}
346
	return val, true
347 348
}

349 350 351 352
// IsCancel returns true if this particular request is being cancelled
func (gsr GraphSyncRequest) IsCancel() bool { return gsr.isCancel }

// RequestID returns the request ID for this response
353
func (gsr GraphSyncResponse) RequestID() graphsync.RequestID { return gsr.requestID }
354 355

// Status returns the status for a response
356
func (gsr GraphSyncResponse) Status() graphsync.ResponseStatusCode { return gsr.status }
357

358 359
// Extension returns the content for an extension on a response, or errors
// if extension is not present
360
func (gsr GraphSyncResponse) Extension(name graphsync.ExtensionName) ([]byte, bool) {
361
	if gsr.extensions == nil {
362
		return nil, false
363 364 365
	}
	val, ok := gsr.extensions[string(name)]
	if !ok {
366
		return nil, false
367
	}
368
	return val, true
369 370

}