Commit e714b058 authored by hannahhoward's avatar hannahhoward

feat(GraphSyncMessage): implement message serialiation

Initial implementation of graph sync messages including protobuf struct and
serialization/deserialization

fix #2 fix #3
parent fc91e90b
package message
import (
"fmt"
ggio "github.com/gogo/protobuf/io"
pb "github.com/ipfs/go-graphsync/message/pb"
gsselector "github.com/ipfs/go-graphsync/selector"
ipld "github.com/ipfs/go-ipld-format"
)
// GraphSyncRequestID is a unique identifier for a GraphSync request.
type GraphSyncRequestID int32
// GraphSyncPriority a priority for a GraphSync request.
type GraphSyncPriority int32
// GraphSyncResponseStatusCode is a status returned for a GraphSync Request.
type GraphSyncResponseStatusCode int32
const (
// GraphSync Response Status Codes
// Informational Response Codes (partial)
// RequestAcknowledged means the request was received and is being worked on.
RequestAcknowledged = 10
// AdditionalPeers means additional peers were found that may be able
// to satisfy the request and contained in the extra block of the response.
AdditionalPeers = 11
// NotEnoughGas means fulfilling this request requires payment.
NotEnoughGas = 12
// OtherProtocol means a different type of response than GraphSync is
// contained in extra.
OtherProtocol = 13
// Success Response Codes (request terminated)
// RequestCompletedFull means the entire fulfillment of the GraphSync request
// was sent back.
RequestCompletedFull = 20
// RequestCompletedPartial means the response is completed, and part of the
// GraphSync request was sent back, but not the complete request.
RequestCompletedPartial = 21
// Error Response Codes (request terminated)
// RequestRejected means the node did not accept the incoming request.
RequestRejected = 30
// RequestFailedBusy means the node is too busy, try again later. Backoff may
// be contained in extra.
RequestFailedBusy = 31
// RequestFailedUnknown means the request failed for an unspecified reason. May
// contain data about why in extra.
RequestFailedUnknown = 32
// RequestFailedLegal means the request failed for legal reasons.
RequestFailedLegal = 33
// RequestFailedContentNotFound means the respondent does not have the content.
RequestFailedContentNotFound = 34
)
// GraphSyncRequest is an interface for accessing data on request contained in a
// GraphSyncMessage.
type GraphSyncRequest interface {
Selector() gsselector.Selector
Root() ipld.Node
Priority() GraphSyncPriority
ID() GraphSyncRequestID
IsCancel() bool
}
// GraphSyncResponse is an interface for accessing data on a response sent back
// in a GraphSyncMessage.
type GraphSyncResponse interface {
RequestID() GraphSyncRequestID
Status() GraphSyncResponseStatusCode
Response() gsselector.SelectionResponse
}
// GraphSyncMessage is interface that can be serialized and deserialized to send
// over the GraphSync network
type GraphSyncMessage interface {
Requests() []GraphSyncRequest
Responses() []GraphSyncResponse
AddRequest(id GraphSyncRequestID,
selector gsselector.Selector,
root ipld.Node,
priority GraphSyncPriority)
Cancel(id GraphSyncRequestID)
AddResponse(
requestID GraphSyncRequestID,
status GraphSyncResponseStatusCode,
response gsselector.SelectionResponse)
Exportable
Loggable() map[string]interface{}
}
// Exportable is an interface that can serialize to a protobuf
type Exportable interface {
ToProto() *pb.Message
}
type graphSyncRequest struct {
selector gsselector.Selector
root ipld.Node
priority GraphSyncPriority
id GraphSyncRequestID
isCancel bool
}
type graphSyncResponse struct {
requestID GraphSyncRequestID
status GraphSyncResponseStatusCode
response gsselector.SelectionResponse
}
type graphSyncMessage struct {
requests map[GraphSyncRequestID]*graphSyncRequest
responses map[GraphSyncRequestID]*graphSyncResponse
}
// DecodeSelectorFunc is a function that can build a type that satisfies
// the Selector interface from a raw byte array.
type DecodeSelectorFunc func([]byte) gsselector.Selector
// DecodeRootNodeFunc is a function that can build a type that satisfies
// the ipld.Node interface from a raw byte array.
type DecodeRootNodeFunc func([]byte) ipld.Node
// DecodeSelectionResponseFunc is a function that can build a type that satisfies
// the SelectionResponse interface from a raw byte array.
type DecodeSelectionResponseFunc func([]byte) gsselector.SelectionResponse
// New initializes a new blank GraphSyncMessage
func New() GraphSyncMessage {
return newMsg()
}
func newMsg() *graphSyncMessage {
return &graphSyncMessage{
requests: make(map[GraphSyncRequestID]*graphSyncRequest),
responses: make(map[GraphSyncRequestID]*graphSyncResponse),
}
}
func newMessageFromProto(pbm pb.Message,
decodeRootNode DecodeRootNodeFunc,
decodeSelector DecodeSelectorFunc,
decodeSelectionResponse DecodeSelectionResponseFunc) (GraphSyncMessage, error) {
gsm := newMsg()
for _, req := range pbm.Reqlist {
selector := decodeSelector(req.Selector)
root := decodeRootNode(req.Root)
gsm.addRequest(GraphSyncRequestID(req.Id), selector, root, GraphSyncPriority(req.Priority), req.Cancel)
}
for _, res := range pbm.Reslist {
selectionResponse := decodeSelectionResponse(res.Data)
gsm.AddResponse(GraphSyncRequestID(res.Id), GraphSyncResponseStatusCode(res.Status), selectionResponse)
}
return gsm, nil
}
func (gsm *graphSyncMessage) Requests() []GraphSyncRequest {
requests := make([]GraphSyncRequest, 0, len(gsm.requests))
for _, request := range gsm.requests {
requests = append(requests, request)
}
return requests
}
func (gsm *graphSyncMessage) Responses() []GraphSyncResponse {
responses := make([]GraphSyncResponse, 0, len(gsm.responses))
for _, response := range gsm.responses {
responses = append(responses, response)
}
return responses
}
func (gsm *graphSyncMessage) Cancel(id GraphSyncRequestID) {
delete(gsm.requests, id)
gsm.addRequest(id, nil, nil, 0, true)
}
func (gsm *graphSyncMessage) AddRequest(id GraphSyncRequestID,
selector gsselector.Selector,
root ipld.Node,
priority GraphSyncPriority,
) {
gsm.addRequest(id, selector, root, priority, false)
}
func (gsm *graphSyncMessage) addRequest(id GraphSyncRequestID,
selector gsselector.Selector,
root ipld.Node,
priority GraphSyncPriority,
isCancel bool) {
gsm.requests[id] = &graphSyncRequest{
id: id,
selector: selector,
root: root,
priority: priority,
isCancel: isCancel,
}
}
func (gsm *graphSyncMessage) AddResponse(requestID GraphSyncRequestID,
status GraphSyncResponseStatusCode,
response gsselector.SelectionResponse) {
gsm.responses[requestID] = &graphSyncResponse{
requestID: requestID,
status: status,
response: response,
}
}
// FromPBReader can deserialize a protobuf message into a GraphySyncMessage.
func FromPBReader(pbr ggio.Reader,
decodeRootNode DecodeRootNodeFunc,
decodeSelector DecodeSelectorFunc,
decodeSelectionResponse DecodeSelectionResponseFunc) (GraphSyncMessage, error) {
pb := new(pb.Message)
if err := pbr.ReadMsg(pb); err != nil {
return nil, err
}
return newMessageFromProto(*pb, decodeRootNode, decodeSelector, decodeSelectionResponse)
}
func (gsm *graphSyncMessage) ToProto() *pb.Message {
pbm := new(pb.Message)
pbm.Reqlist = make([]pb.Message_Request, 0, len(gsm.requests))
for _, request := range gsm.requests {
pbm.Reqlist = append(pbm.Reqlist, pb.Message_Request{
Id: int32(request.id),
Root: request.root.RawData(),
Selector: request.selector.RawData(),
Priority: int32(request.priority),
Cancel: request.isCancel,
})
}
pbm.Reslist = make([]pb.Message_Response, 0, len(gsm.responses))
for _, response := range gsm.responses {
pbm.Reslist = append(pbm.Reslist, pb.Message_Response{
Id: int32(response.requestID),
Status: int32(response.status),
Data: response.response.RawData(),
})
}
return pbm
}
func (gsm *graphSyncMessage) Loggable() map[string]interface{} {
requests := make([]string, 0, len(gsm.requests))
for _, request := range gsm.requests {
requests = append(requests, fmt.Sprintf("%d", request.id))
}
responses := make([]string, 0, len(gsm.responses))
for _, response := range gsm.responses {
responses = append(responses, fmt.Sprintf("%d", response.requestID))
}
return map[string]interface{}{
"requests": requests,
"responses": responses,
}
}
func (gsr *graphSyncRequest) ID() GraphSyncRequestID { return gsr.id }
func (gsr *graphSyncRequest) Root() ipld.Node { return gsr.root }
func (gsr *graphSyncRequest) Selector() gsselector.Selector { return gsr.selector }
func (gsr *graphSyncRequest) Priority() GraphSyncPriority { return gsr.priority }
func (gsr *graphSyncRequest) IsCancel() bool { return gsr.isCancel }
func (gsr *graphSyncResponse) RequestID() GraphSyncRequestID { return gsr.requestID }
func (gsr *graphSyncResponse) Status() GraphSyncResponseStatusCode { return gsr.status }
func (gsr *graphSyncResponse) Response() gsselector.SelectionResponse { return gsr.response }
package message
import (
"math/rand"
"reflect"
"testing"
"github.com/ipfs/go-graphsync/testselector"
)
func TestAppendingRequests(t *testing.T) {
selector := testselector.GenerateSelector()
rootNode := testselector.GenerateRootNode()
id := GraphSyncRequestID(rand.Int31())
priority := GraphSyncPriority(rand.Int31())
gsm := New()
gsm.AddRequest(id, selector, rootNode, priority)
requests := gsm.Requests()
if len(requests) != 1 {
t.Fatal("Did not add request to message")
}
request := requests[0]
if request.ID() != id ||
request.IsCancel() != false ||
request.Priority() != priority ||
!reflect.DeepEqual(request.Root(), rootNode) ||
!reflect.DeepEqual(request.Selector(), selector) {
t.Fatal("Did not properly add request to message")
}
pbMessage := gsm.ToProto()
pbRequest := pbMessage.Reqlist[0]
if pbRequest.Id != int32(id) ||
pbRequest.Priority != int32(priority) ||
pbRequest.Cancel != false ||
!reflect.DeepEqual(pbRequest.Root, rootNode.RawData()) ||
!reflect.DeepEqual(pbRequest.Selector, selector.RawData()) {
t.Fatal("Did not properly serialize message to protobuf")
}
deserialized, err := newMessageFromProto(*pbMessage,
testselector.MockDecodeRootNodeFunc,
testselector.MockDecodeSelectorFunc,
testselector.MockDecodeSelectionResponseFunc,
)
if err != nil {
t.Fatal("Error deserializing protobuf message")
}
deserializedRequests := deserialized.Requests()
if len(deserializedRequests) != 1 {
t.Fatal("Did not add request to deserialized message")
}
deserializedRequest := deserializedRequests[0]
if deserializedRequest.ID() != id ||
deserializedRequest.IsCancel() != false ||
deserializedRequest.Priority() != priority ||
!reflect.DeepEqual(deserializedRequest.Root(), rootNode) ||
!reflect.DeepEqual(deserializedRequest.Selector(), selector) {
t.Fatal("Did not properly deserialize protobuf messages so requests are equal")
}
}
func TestAppendingResponses(t *testing.T) {
selectionResponse := testselector.GenerateSelectionResponse()
requestID := GraphSyncRequestID(rand.Int31())
status := GraphSyncResponseStatusCode(RequestAcknowledged)
gsm := New()
gsm.AddResponse(requestID, status, selectionResponse)
responses := gsm.Responses()
if len(responses) != 1 {
t.Fatal("Did not add response to message")
}
response := responses[0]
if response.RequestID() != requestID ||
response.Status() != status ||
!reflect.DeepEqual(response.Response(), selectionResponse) {
t.Fatal("Did not properly add response to message")
}
pbMessage := gsm.ToProto()
pbResponse := pbMessage.Reslist[0]
if pbResponse.Id != int32(requestID) ||
pbResponse.Status != int32(status) ||
!reflect.DeepEqual(pbResponse.Data, selectionResponse.RawData()) {
t.Fatal("Did not properly serialize message to protobuf")
}
deserialized, err := newMessageFromProto(*pbMessage,
testselector.MockDecodeRootNodeFunc,
testselector.MockDecodeSelectorFunc,
testselector.MockDecodeSelectionResponseFunc,
)
if err != nil {
t.Fatal("Error deserializing protobuf message")
}
deserializedResponses := deserialized.Responses()
if len(deserializedResponses) != 1 {
t.Fatal("Did not add response to message")
}
deserializedResponse := deserializedResponses[0]
if deserializedResponse.RequestID() != requestID ||
deserializedResponse.Status() != status ||
!reflect.DeepEqual(deserializedResponse.Response(), selectionResponse) {
t.Fatal("Did not properly deserialize protobuf messages so responses are equal")
}
}
func TestRequestCancel(t *testing.T) {
selector := testselector.GenerateSelector()
rootNode := testselector.GenerateRootNode()
id := GraphSyncRequestID(rand.Int31())
priority := GraphSyncPriority(rand.Int31())
gsm := New()
gsm.AddRequest(id, selector, rootNode, priority)
gsm.Cancel(id)
requests := gsm.Requests()
if len(requests) != 1 {
t.Fatal("Did not properly cancel request")
}
request := requests[0]
if request.ID() != id ||
request.IsCancel() != true {
t.Fatal("Did not properly add cancel request to message")
}
}
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --proto_path=$(GOPATH)/src:. --gogofaster_out=. $<
clean:
rm -f *.pb.go
rm -f *.go
This diff is collapsed.
syntax = "proto3";
package graphsync.message.pb;
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
message Message {
message Request {
int32 id = 1; // unique id set on the requester side
bytes root = 2; // ipld root node for selector
bytes selector = 3; // ipld selector to retrieve
bytes extra = 4; // aux information. useful for other protocols
int32 priority = 5; // the priority (normalized). default to 1
bool cancel = 7; // whether this cancels a request
}
message Response {
int32 id = 1; // the request id
int32 status = 2; // a status code.
bytes data = 3; // core response data
bytes extra = 4; // additional data
}
// the actual data included in this message
repeated Request reqlist = 1 [(gogoproto.nullable) = false];
repeated Response reslist = 2 [(gogoproto.nullable) = false];
}
......@@ -118,6 +118,12 @@
"hash": "QmSJ9n2s9NUoA9D849W5jj5SJ94nMcZpj1jCgQJieiNqSt",
"name": "go-random",
"version": "1.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmRL22E4paat7ky7vx9MLpR97JHHbFPrg3ytFQw6qp1y1s",
"name": "go-ipld-format",
"version": "0.8.0"
}
],
"gxVersion": "0.0.1",
......
package selector
import (
ipld "github.com/ipfs/go-ipld-format"
)
type Selector interface {
ipld.Node
}
type SelectionResponse interface {
ipld.Node
}
type SelectionTraverser interface {
Next() (SelectionResponse, error)
Cancel()
}
type SelectorManager interface {
Select(Selector, root ipld.Node) SelectionTraverser
Validate(Selector, root ipld.Node, incomingResponses SelectionTraverser) SelectionTraverser
DecodeSelector([]byte) Selector
DecodeSelectorResponse([]byte) SelectionResponse
}
package testselector
import (
"bytes"
"errors"
cid "github.com/ipfs/go-cid"
gsselector "github.com/ipfs/go-graphsync/selector"
ipld "github.com/ipfs/go-ipld-format"
"github.com/jbenet/go-random"
mh "github.com/multiformats/go-multihash"
)
var seedSeq int64
const (
blockSize = 512
)
func randomBytes(n int64, seed int64) []byte {
data := new(bytes.Buffer)
random.WritePseudoRandomBytes(n, data, seed)
return data.Bytes()
}
type byteNode struct {
data []byte
builder cid.Builder
}
var v0CidPrefix = cid.Prefix{
Codec: cid.DagProtobuf,
MhLength: -1,
MhType: mh.SHA2_256,
Version: 0,
}
// ErrEmptyNode is just a simple error code for stubbing out IPLD Node interface
// methods
var ErrEmptyNode = errors.New("dummy node")
func (n *byteNode) Resolve([]string) (interface{}, []string, error) {
return nil, nil, ErrEmptyNode
}
func (n *byteNode) Tree(string, int) []string {
return nil
}
func (n *byteNode) ResolveLink([]string) (*ipld.Link, []string, error) {
return nil, nil, ErrEmptyNode
}
func (n *byteNode) Copy() ipld.Node {
return &byteNode{}
}
func (n *byteNode) Cid() cid.Cid {
c, err := n.builder.Sum(n.RawData())
if err != nil {
return cid.Cid{}
}
return c
}
func (n *byteNode) Links() []*ipld.Link {
return nil
}
func (n *byteNode) Loggable() map[string]interface{} {
return nil
}
func (n *byteNode) String() string {
return string(n.data)
}
func (n *byteNode) RawData() []byte {
return n.data
}
func (n *byteNode) Size() (uint64, error) {
return 0, nil
}
func (n *byteNode) Stat() (*ipld.NodeStat, error) {
return &ipld.NodeStat{}, nil
}
func newNode(data []byte) *byteNode {
return &byteNode{
data: data,
builder: v0CidPrefix,
}
}
// MockDecodeSelectorFunc decodes raw data to a type that satisfies
// the Selector interface
func MockDecodeSelectorFunc(data []byte) gsselector.Selector {
return newNode(data)
}
// MockDecodeSelectionResponseFunc decodes raw data to a type that
// satisfies the SelectionResponse interface
func MockDecodeSelectionResponseFunc(data []byte) gsselector.SelectionResponse {
return newNode(data)
}
// MockDecodeRootNodeFunc decodes raw data to a type that satisfies an
// IPLD node interface
func MockDecodeRootNodeFunc(data []byte) ipld.Node {
return newNode(data)
}
// GenerateSelector returns a new mock Selector
func GenerateSelector() gsselector.Selector {
node := newNode(randomBytes(blockSize, seedSeq))
seedSeq++
return node
}
// GenerateSelectionResponse returns a new mock SelectionResponse
func GenerateSelectionResponse() gsselector.SelectionResponse {
node := newNode(randomBytes(blockSize, seedSeq))
seedSeq++
return node
}
// GenerateRootNode generates a new mock ipld Node to serve as a root node
func GenerateRootNode() ipld.Node {
node := newNode(randomBytes(blockSize, seedSeq))
seedSeq++
return node
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment