message.go 9.96 KB
Newer Older
1 2 3 4
package message

import (
	"fmt"
5
	"io"
6

7 8
	"github.com/ipfs/go-block-format"

9
	ggio "github.com/gogo/protobuf/io"
10
	cid "github.com/ipfs/go-cid"
11
	pb "github.com/ipfs/go-graphsync/message/pb"
12
	inet "github.com/libp2p/go-libp2p-net"
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
)

// GraphSyncRequestID is a unique identifier for a GraphSync request.
type GraphSyncRequestID int32

// GraphSyncPriority a priority for a GraphSync request.
type GraphSyncPriority int32

// GraphSyncResponseStatusCode is a status returned for a GraphSync Request.
type GraphSyncResponseStatusCode int32

const (

	// GraphSync Response Status Codes

	// Informational Response Codes (partial)

	// RequestAcknowledged means the request was received and is being worked on.
31
	RequestAcknowledged = GraphSyncResponseStatusCode(10)
32 33
	// AdditionalPeers means additional peers were found that may be able
	// to satisfy the request and contained in the extra block of the response.
34
	AdditionalPeers = GraphSyncResponseStatusCode(11)
35
	// NotEnoughGas means fulfilling this request requires payment.
36
	NotEnoughGas = GraphSyncResponseStatusCode(12)
37 38
	// OtherProtocol means a different type of response than GraphSync is
	// contained in extra.
39
	OtherProtocol = GraphSyncResponseStatusCode(13)
40 41 42
	// PartialResponse may include blocks and metadata about the in progress response
	// in extra.
	PartialResponse = GraphSyncResponseStatusCode(14)
43 44 45 46 47

	// Success Response Codes (request terminated)

	// RequestCompletedFull means the entire fulfillment of the GraphSync request
	// was sent back.
48
	RequestCompletedFull = GraphSyncResponseStatusCode(20)
49 50
	// RequestCompletedPartial means the response is completed, and part of the
	// GraphSync request was sent back, but not the complete request.
51
	RequestCompletedPartial = GraphSyncResponseStatusCode(21)
52 53 54 55

	// Error Response Codes (request terminated)

	// RequestRejected means the node did not accept the incoming request.
56
	RequestRejected = GraphSyncResponseStatusCode(30)
57 58
	// RequestFailedBusy means the node is too busy, try again later. Backoff may
	// be contained in extra.
59
	RequestFailedBusy = GraphSyncResponseStatusCode(31)
60 61
	// RequestFailedUnknown means the request failed for an unspecified reason. May
	// contain data about why in extra.
62
	RequestFailedUnknown = GraphSyncResponseStatusCode(32)
63
	// RequestFailedLegal means the request failed for legal reasons.
64
	RequestFailedLegal = GraphSyncResponseStatusCode(33)
65
	// RequestFailedContentNotFound means the respondent does not have the content.
66
	RequestFailedContentNotFound = GraphSyncResponseStatusCode(34)
67 68
)

69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
// IsTerminalSuccessCode returns true if the response code indicates the
// request terminated successfully.
func IsTerminalSuccessCode(status GraphSyncResponseStatusCode) bool {
	return status == RequestCompletedFull ||
		status == RequestCompletedPartial
}

// IsTerminalFailureCode returns true if the response code indicates the
// request terminated in failure.
func IsTerminalFailureCode(status GraphSyncResponseStatusCode) bool {
	return status == RequestFailedBusy ||
		status == RequestFailedContentNotFound ||
		status == RequestFailedLegal ||
		status == RequestFailedUnknown
}

// IsTerminalResponseCode returns true if the response code signals
// the end of the request
func IsTerminalResponseCode(status GraphSyncResponseStatusCode) bool {
	return IsTerminalSuccessCode(status) || IsTerminalFailureCode(status)
}

91 92 93
// GraphSyncRequest is an interface for accessing data on request contained in a
// GraphSyncMessage.
type GraphSyncRequest interface {
94
	Selector() []byte
95 96 97 98 99 100 101 102 103 104
	Priority() GraphSyncPriority
	ID() GraphSyncRequestID
	IsCancel() bool
}

// GraphSyncResponse is an interface for accessing data on a response sent back
// in a GraphSyncMessage.
type GraphSyncResponse interface {
	RequestID() GraphSyncRequestID
	Status() GraphSyncResponseStatusCode
105
	Extra() []byte
106 107 108 109 110 111 112 113 114
}

// GraphSyncMessage is interface that can be serialized and deserialized to send
// over the GraphSync network
type GraphSyncMessage interface {
	Requests() []GraphSyncRequest

	Responses() []GraphSyncResponse

115 116
	Blocks() []blocks.Block

117
	AddRequest(id GraphSyncRequestID,
118
		selector []byte,
119 120 121 122 123 124 125
		priority GraphSyncPriority)

	Cancel(id GraphSyncRequestID)

	AddResponse(
		requestID GraphSyncRequestID,
		status GraphSyncResponseStatusCode,
126 127 128
		extra []byte)

	AddBlock(blocks.Block)
129

130 131
	Empty() bool

132 133 134 135 136 137 138 139
	Exportable

	Loggable() map[string]interface{}
}

// Exportable is an interface that can serialize to a protobuf
type Exportable interface {
	ToProto() *pb.Message
140
	ToNet(w io.Writer) error
141 142 143
}

type graphSyncRequest struct {
144
	selector []byte
145 146 147 148 149 150 151 152
	priority GraphSyncPriority
	id       GraphSyncRequestID
	isCancel bool
}

type graphSyncResponse struct {
	requestID GraphSyncRequestID
	status    GraphSyncResponseStatusCode
153
	extra     []byte
154 155 156 157 158
}

type graphSyncMessage struct {
	requests  map[GraphSyncRequestID]*graphSyncRequest
	responses map[GraphSyncRequestID]*graphSyncResponse
159
	blocks    map[cid.Cid]blocks.Block
160 161 162 163 164 165 166 167 168 169 170
}

// New initializes a new blank GraphSyncMessage
func New() GraphSyncMessage {
	return newMsg()
}

func newMsg() *graphSyncMessage {
	return &graphSyncMessage{
		requests:  make(map[GraphSyncRequestID]*graphSyncRequest),
		responses: make(map[GraphSyncRequestID]*graphSyncResponse),
171
		blocks:    make(map[cid.Cid]blocks.Block),
172 173 174
	}
}

175
func newMessageFromProto(pbm pb.Message) (GraphSyncMessage, error) {
176
	gsm := newMsg()
177 178 179 180 181 182 183 184 185 186
	for _, req := range pbm.Requests {
		gsm.addRequest(GraphSyncRequestID(req.Id), req.Selector, GraphSyncPriority(req.Priority), req.Cancel)
	}

	for _, res := range pbm.Responses {
		gsm.AddResponse(GraphSyncRequestID(res.Id), GraphSyncResponseStatusCode(res.Status), res.Extra)
	}

	for _, b := range pbm.GetData() {
		pref, err := cid.PrefixFromBytes(b.GetPrefix())
187
		if err != nil {
188 189 190 191 192 193 194 195 196 197 198
			return nil, err
		}

		c, err := pref.Sum(b.GetData())
		if err != nil {
			return nil, err
		}

		blk, err := blocks.NewBlockWithCid(b.GetData(), c)
		if err != nil {
			return nil, err
199
		}
200

201
		gsm.AddBlock(blk)
202 203 204 205 206
	}

	return gsm, nil
}

207 208 209 210
func (gsm *graphSyncMessage) Empty() bool {
	return len(gsm.blocks) == 0 && len(gsm.requests) == 0 && len(gsm.responses) == 0
}

211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
func (gsm *graphSyncMessage) Requests() []GraphSyncRequest {
	requests := make([]GraphSyncRequest, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, request)
	}
	return requests
}

func (gsm *graphSyncMessage) Responses() []GraphSyncResponse {
	responses := make([]GraphSyncResponse, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, response)
	}
	return responses
}

227 228 229 230 231 232 233 234
func (gsm *graphSyncMessage) Blocks() []blocks.Block {
	bs := make([]blocks.Block, 0, len(gsm.blocks))
	for _, block := range gsm.blocks {
		bs = append(bs, block)
	}
	return bs
}

235 236
func (gsm *graphSyncMessage) Cancel(id GraphSyncRequestID) {
	delete(gsm.requests, id)
237
	gsm.addRequest(id, nil, 0, true)
238 239 240
}

func (gsm *graphSyncMessage) AddRequest(id GraphSyncRequestID,
241
	selector []byte,
242 243
	priority GraphSyncPriority,
) {
244
	gsm.addRequest(id, selector, priority, false)
245 246 247
}

func (gsm *graphSyncMessage) addRequest(id GraphSyncRequestID,
248
	selector []byte,
249 250 251 252 253 254 255 256 257 258 259 260
	priority GraphSyncPriority,
	isCancel bool) {
	gsm.requests[id] = &graphSyncRequest{
		id:       id,
		selector: selector,
		priority: priority,
		isCancel: isCancel,
	}
}

func (gsm *graphSyncMessage) AddResponse(requestID GraphSyncRequestID,
	status GraphSyncResponseStatusCode,
261
	extra []byte) {
262 263 264
	gsm.responses[requestID] = &graphSyncResponse{
		requestID: requestID,
		status:    status,
265
		extra:     extra,
266 267 268
	}
}

269 270 271 272
func (gsm *graphSyncMessage) AddBlock(b blocks.Block) {
	gsm.blocks[b.Cid()] = b
}

273
// FromNet can read a network stream to deserialized a GraphSyncMessage
274
func FromNet(r io.Reader) (GraphSyncMessage, error) {
275
	pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax)
276
	return FromPBReader(pbr)
277 278
}

279
// FromPBReader can deserialize a protobuf message into a GraphySyncMessage.
280
func FromPBReader(pbr ggio.Reader) (GraphSyncMessage, error) {
281 282 283 284 285
	pb := new(pb.Message)
	if err := pbr.ReadMsg(pb); err != nil {
		return nil, err
	}

286
	return newMessageFromProto(*pb)
287 288 289 290
}

func (gsm *graphSyncMessage) ToProto() *pb.Message {
	pbm := new(pb.Message)
291
	pbm.Requests = make([]pb.Message_Request, 0, len(gsm.requests))
292
	for _, request := range gsm.requests {
293
		pbm.Requests = append(pbm.Requests, pb.Message_Request{
294
			Id:       int32(request.id),
295
			Selector: request.selector,
296 297 298 299 300
			Priority: int32(request.priority),
			Cancel:   request.isCancel,
		})
	}

301
	pbm.Responses = make([]pb.Message_Response, 0, len(gsm.responses))
302
	for _, response := range gsm.responses {
303
		pbm.Responses = append(pbm.Responses, pb.Message_Response{
304 305
			Id:     int32(response.requestID),
			Status: int32(response.status),
306 307 308 309 310 311 312 313 314 315
			Extra:  response.extra,
		})
	}

	blocks := gsm.Blocks()
	pbm.Data = make([]pb.Message_Block, 0, len(blocks))
	for _, b := range blocks {
		pbm.Data = append(pbm.Data, pb.Message_Block{
			Data:   b.RawData(),
			Prefix: b.Cid().Prefix().Bytes(),
316 317 318 319 320
		})
	}
	return pbm
}

321 322 323 324 325 326
func (gsm *graphSyncMessage) ToNet(w io.Writer) error {
	pbw := ggio.NewDelimitedWriter(w)

	return pbw.WriteMsg(gsm.ToProto())
}

327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
func (gsm *graphSyncMessage) Loggable() map[string]interface{} {
	requests := make([]string, 0, len(gsm.requests))
	for _, request := range gsm.requests {
		requests = append(requests, fmt.Sprintf("%d", request.id))
	}
	responses := make([]string, 0, len(gsm.responses))
	for _, response := range gsm.responses {
		responses = append(responses, fmt.Sprintf("%d", response.requestID))
	}
	return map[string]interface{}{
		"requests":  requests,
		"responses": responses,
	}
}

342 343 344 345
func (gsr *graphSyncRequest) ID() GraphSyncRequestID      { return gsr.id }
func (gsr *graphSyncRequest) Selector() []byte            { return gsr.selector }
func (gsr *graphSyncRequest) Priority() GraphSyncPriority { return gsr.priority }
func (gsr *graphSyncRequest) IsCancel() bool              { return gsr.isCancel }
346

347 348 349
func (gsr *graphSyncResponse) RequestID() GraphSyncRequestID       { return gsr.requestID }
func (gsr *graphSyncResponse) Status() GraphSyncResponseStatusCode { return gsr.status }
func (gsr *graphSyncResponse) Extra() []byte                       { return gsr.extra }