message.go 9.83 KB
Newer Older
1 2 3 4
package message

import (
	"fmt"
5
	"io"
6

7
	blocks "github.com/ipfs/go-block-format"
8
	"github.com/ipfs/go-graphsync"
9

10
	ggio "github.com/gogo/protobuf/io"
11
	cid "github.com/ipfs/go-cid"
12
	pb "github.com/ipfs/go-graphsync/message/pb"
13
	"github.com/libp2p/go-libp2p-core/network"
14 15
)

16 17
// IsTerminalSuccessCode returns true if the response code indicates the
// request terminated successfully.
18 19 20
func IsTerminalSuccessCode(status graphsync.ResponseStatusCode) bool {
	return status == graphsync.RequestCompletedFull ||
		status == graphsync.RequestCompletedPartial
21 22 23 24
}

// IsTerminalFailureCode returns true if the response code indicates the
// request terminated in failure.
25 26 27 28 29
func IsTerminalFailureCode(status graphsync.ResponseStatusCode) bool {
	return status == graphsync.RequestFailedBusy ||
		status == graphsync.RequestFailedContentNotFound ||
		status == graphsync.RequestFailedLegal ||
		status == graphsync.RequestFailedUnknown
30 31 32 33
}

// IsTerminalResponseCode returns true if the response code signals
// the end of the request
34
func IsTerminalResponseCode(status graphsync.ResponseStatusCode) bool {
35 36 37
	return IsTerminalSuccessCode(status) || IsTerminalFailureCode(status)
}

38 39 40 41 42 43 44
// GraphSyncMessage is interface that can be serialized and deserialized to send
// over the GraphSync network
type GraphSyncMessage interface {
	Requests() []GraphSyncRequest

	Responses() []GraphSyncResponse

45 46
	Blocks() []blocks.Block

47
	AddRequest(graphSyncRequest GraphSyncRequest)
48

49
	AddResponse(graphSyncResponse GraphSyncResponse)
50 51

	AddBlock(blocks.Block)
52

53 54
	Empty() bool

55 56 57 58 59 60 61 62
	Exportable

	Loggable() map[string]interface{}
}

// Exportable is an interface that can serialize to a protobuf
type Exportable interface {
	ToProto() *pb.Message
63
	ToNet(w io.Writer) error
64 65
}

66 67 68
// GraphSyncRequest is a struct to capture data on a request contained in a
// GraphSyncMessage.
type GraphSyncRequest struct {
69 70
	root       cid.Cid
	selector   []byte
71 72
	priority   graphsync.Priority
	id         graphsync.RequestID
73 74
	extensions map[string][]byte
	isCancel   bool
75 76
}

77 78 79
// GraphSyncResponse is an struct to capture data on a response sent back
// in a GraphSyncMessage.
type GraphSyncResponse struct {
80 81
	requestID  graphsync.RequestID
	status     graphsync.ResponseStatusCode
82
	extensions map[string][]byte
83 84 85
}

type graphSyncMessage struct {
86 87
	requests  map[graphsync.RequestID]GraphSyncRequest
	responses map[graphsync.RequestID]GraphSyncResponse
88
	blocks    map[cid.Cid]blocks.Block
89 90 91 92 93 94 95 96 97
}

// New initializes a new blank GraphSyncMessage
func New() GraphSyncMessage {
	return newMsg()
}

func newMsg() *graphSyncMessage {
	return &graphSyncMessage{
98 99
		requests:  make(map[graphsync.RequestID]GraphSyncRequest),
		responses: make(map[graphsync.RequestID]GraphSyncResponse),
100
		blocks:    make(map[cid.Cid]blocks.Block),
101 102 103
	}
}

104
// NewRequest builds a new Graphsync request
105
func NewRequest(id graphsync.RequestID,
106
	root cid.Cid,
107
	selector []byte,
108 109
	priority graphsync.Priority,
	extensions ...graphsync.ExtensionData) GraphSyncRequest {
110 111

	return newRequest(id, root, selector, priority, false, toExtensionsMap(extensions))
112 113 114
}

// CancelRequest request generates a request to cancel an in progress request
115
func CancelRequest(id graphsync.RequestID) GraphSyncRequest {
116 117 118
	return newRequest(id, cid.Cid{}, nil, 0, true, nil)
}

119
func toExtensionsMap(extensions []graphsync.ExtensionData) (extensionsMap map[string][]byte) {
120 121 122 123 124 125 126
	if len(extensions) > 0 {
		extensionsMap = make(map[string][]byte, len(extensions))
		for _, extension := range extensions {
			extensionsMap[string(extension.Name)] = extension.Data
		}
	}
	return
127 128
}

129
func newRequest(id graphsync.RequestID,
130
	root cid.Cid,
131
	selector []byte,
132
	priority graphsync.Priority,
133 134
	isCancel bool,
	extensions map[string][]byte) GraphSyncRequest {
135
	return GraphSyncRequest{
136 137 138 139 140 141
		id:         id,
		root:       root,
		selector:   selector,
		priority:   priority,
		isCancel:   isCancel,
		extensions: extensions,
142 143 144 145
	}
}

// NewResponse builds a new Graphsync response
146 147 148
func NewResponse(requestID graphsync.RequestID,
	status graphsync.ResponseStatusCode,
	extensions ...graphsync.ExtensionData) GraphSyncResponse {
149 150 151
	return newResponse(requestID, status, toExtensionsMap(extensions))
}

152 153
func newResponse(requestID graphsync.RequestID,
	status graphsync.ResponseStatusCode, extensions map[string][]byte) GraphSyncResponse {
154
	return GraphSyncResponse{
155 156 157
		requestID:  requestID,
		status:     status,
		extensions: extensions,
158 159
	}
}
160
func newMessageFromProto(pbm pb.Message) (GraphSyncMessage, error) {
161
	gsm := newMsg()
162
	for _, req := range pbm.Requests {
163 164 165 166
		root, err := cid.Cast(req.Root)
		if err != nil {
			return nil, err
		}
167
		gsm.AddRequest(newRequest(graphsync.RequestID(req.Id), root, req.Selector, graphsync.Priority(req.Priority), req.Cancel, req.GetExtensions()))
168 169 170
	}

	for _, res := range pbm.Responses {
171
		gsm.AddResponse(newResponse(graphsync.RequestID(res.Id), graphsync.ResponseStatusCode(res.Status), res.GetExtensions()))
172 173 174 175
	}

	for _, b := range pbm.GetData() {
		pref, err := cid.PrefixFromBytes(b.GetPrefix())
176
		if err != nil {
177 178 179 180 181 182 183 184 185 186 187
			return nil, err
		}

		c, err := pref.Sum(b.GetData())
		if err != nil {
			return nil, err
		}

		blk, err := blocks.NewBlockWithCid(b.GetData(), c)
		if err != nil {
			return nil, err
188
		}
189

190
		gsm.AddBlock(blk)
191 192 193 194 195
	}

	return gsm, nil
}

196 197 198 199
func (gsm *graphSyncMessage) Empty() bool {
	return len(gsm.blocks) == 0 && len(gsm.requests) == 0 && len(gsm.responses) == 0
}

200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
func (gsm *graphSyncMessage) Requests() []GraphSyncRequest {
	requests := make([]GraphSyncRequest, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, request)
	}
	return requests
}

func (gsm *graphSyncMessage) Responses() []GraphSyncResponse {
	responses := make([]GraphSyncResponse, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, response)
	}
	return responses
}

216 217 218 219 220 221 222 223
func (gsm *graphSyncMessage) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(gsm.blocks))
	for _, block := range gsm.blocks {
		bs = append(bs, block)
	}
	return bs
}

224 225
func (gsm *graphSyncMessage) AddRequest(graphSyncRequest GraphSyncRequest) {
	gsm.requests[graphSyncRequest.id] = graphSyncRequest
226 227
}

228 229
func (gsm *graphSyncMessage) AddResponse(graphSyncResponse GraphSyncResponse) {
	gsm.responses[graphSyncResponse.requestID] = graphSyncResponse
230 231
}

232 233 234 235
func (gsm *graphSyncMessage) AddBlock(b blocks.Block) {
	gsm.blocks[b.Cid()] = b
}

236
// FromNet can read a network stream to deserialized a GraphSyncMessage
237
func FromNet(r io.Reader) (GraphSyncMessage, error) {
238
	pbr := ggio.NewDelimitedReader(r, network.MessageSizeMax)
239
	return FromPBReader(pbr)
240 241
}

242
// FromPBReader can deserialize a protobuf message into a GraphySyncMessage.
243
func FromPBReader(pbr ggio.Reader) (GraphSyncMessage, error) {
244 245 246 247 248
	pb := new(pb.Message)
	if err := pbr.ReadMsg(pb); err != nil {
		return nil, err
	}

249
	return newMessageFromProto(*pb)
250 251 252 253
}

func (gsm *graphSyncMessage) ToProto() *pb.Message {
	pbm := new(pb.Message)
254
	pbm.Requests = make([]pb.Message_Request, 0, len(gsm.requests))
255
	for _, request := range gsm.requests {
256
		pbm.Requests = append(pbm.Requests, pb.Message_Request{
257 258 259 260 261 262
			Id:         int32(request.id),
			Root:       request.root.Bytes(),
			Selector:   request.selector,
			Priority:   int32(request.priority),
			Cancel:     request.isCancel,
			Extensions: request.extensions,
263 264 265
		})
	}

266
	pbm.Responses = make([]pb.Message_Response, 0, len(gsm.responses))
267
	for _, response := range gsm.responses {
268
		pbm.Responses = append(pbm.Responses, pb.Message_Response{
269 270 271
			Id:         int32(response.requestID),
			Status:     int32(response.status),
			Extensions: response.extensions,
272 273 274 275 276 277 278 279 280
		})
	}

	blocks := gsm.Blocks()
	pbm.Data = make([]pb.Message_Block, 0, len(blocks))
	for _, b := range blocks {
		pbm.Data = append(pbm.Data, pb.Message_Block{
			Data:   b.RawData(),
			Prefix: b.Cid().Prefix().Bytes(),
281 282 283 284 285
		})
	}
	return pbm
}

286 287 288 289 290 291
func (gsm *graphSyncMessage) ToNet(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)

	return pbw.WriteMsg(gsm.ToProto())
}

292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
func (gsm *graphSyncMessage) Loggable() map[string]interface{} {
	requests := make([]string, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, fmt.Sprintf("%d", request.id))
	}
	responses := make([]string, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, fmt.Sprintf("%d", response.requestID))
	}
	return map[string]interface{}{
		"requests":  requests,
		"responses": responses,
	}
}

307
// ID Returns the request ID for this Request
308
func (gsr GraphSyncRequest) ID() graphsync.RequestID { return gsr.id }
309

310 311 312
// Root returns the CID to the root block of this request
func (gsr GraphSyncRequest) Root() cid.Cid { return gsr.root }

313 314 315 316
// Selector returns the byte representation of the selector for this request
func (gsr GraphSyncRequest) Selector() []byte { return gsr.selector }

// Priority returns the priority of this request
317
func (gsr GraphSyncRequest) Priority() graphsync.Priority { return gsr.priority }
318

319 320
// Extension returns the content for an extension on a response, or errors
// if extension is not present
321
func (gsr GraphSyncRequest) Extension(name graphsync.ExtensionName) ([]byte, bool) {
322
	if gsr.extensions == nil {
323
		return nil, false
324 325 326
	}
	val, ok := gsr.extensions[string(name)]
	if !ok {
327
		return nil, false
328
	}
329
	return val, true
330 331
}

332 333 334 335
// IsCancel returns true if this particular request is being cancelled
func (gsr GraphSyncRequest) IsCancel() bool { return gsr.isCancel }

// RequestID returns the request ID for this response
336
func (gsr GraphSyncResponse) RequestID() graphsync.RequestID { return gsr.requestID }
337 338

// Status returns the status for a response
339
func (gsr GraphSyncResponse) Status() graphsync.ResponseStatusCode { return gsr.status }
340

341 342
// Extension returns the content for an extension on a response, or errors
// if extension is not present
343
func (gsr GraphSyncResponse) Extension(name graphsync.ExtensionName) ([]byte, bool) {
344
	if gsr.extensions == nil {
345
		return nil, false
346 347 348
	}
	val, ok := gsr.extensions[string(name)]
	if !ok {
349
		return nil, false
350
	}
351
	return val, true
352 353

}