Commit 420ce519 authored by hannahhoward's avatar hannahhoward

feat(message): update to final PB format

Update to finalized protobuf format.
Also remove deprecated selector interface which is out of date
parent 2cac5abb
@startuml "GraphSync" @startuml "GraphSync"
package "go-ipld-format" {
interface Node
}
note top of "go-selector"
Is this the right name for this package?
end note
package "go-selector" {
interface Selector {
}
interface SelectionResponse {
}
interface SelectionTraverser { package "go-ipld-prime" {
Next() SelectionResponse, err interface Node {
Cancel()
}
interface "GetBlockFunc func(cid) block.Block" as GetBlockFunc
interface SelectorQuerier {
Select(Selector, root Cid) SelectionTraverser
Validate(Selector, root Cid, incomingResponses SelectionTraverser) SelectionTraverser
} }
interface Selector {
object "Package Public Functions" as goSelectorPF {
NewSelectorQuerier(getBlockFunc GetBlockFunc) SelectorQuerier
} }
Node <|-- Selector
Node <|-- SelectionResponse
} }
package "go-graphsync" { package "go-graphsync" {
...@@ -43,15 +17,11 @@ package "go-graphsync" { ...@@ -43,15 +17,11 @@ package "go-graphsync" {
network : GraphySyncNetwork network : GraphySyncNetwork
requestManager : RequestManager requestManager : RequestManager
responseManager: ResponseManager responseManager: ResponseManager
selectorQuerier: SelectorQuerier Request(p peer.ID, selector Selector, root Cid) chan Block
Request(p peer.ID, selector Selector, root Cid) SelectionTraverser
ReceiveMessage(ctx context.Context, sender peer.ID, incoming GraphSyncMessage) ReceiveMessage(ctx context.Context, sender peer.ID, incoming GraphSyncMessage)
ReceiveError(error) ReceiveError(error)
} }
GraphSync *-- SelectorQuerier
package network { package network {
interface Receiver { interface Receiver {
...@@ -80,9 +50,7 @@ package "go-graphsync" { ...@@ -80,9 +50,7 @@ package "go-graphsync" {
GraphSyncNetwork <|-- libP2PGraphSyncNetwork GraphSyncNetwork <|-- libP2PGraphSyncNetwork
object "Package Public Functions" as goGraphSyncNetworkPF { object "Package Public Functions" as goGraphSyncNetworkPF {
NewLibP2PNetwork(host libp2pHost.Host, NewLibP2PNetwork(host libp2pHost.Host) GraphSyncNetwork
decodeSelectionResponseFunc: DecodeSelectionResponseFunc,
decodeSelectorFunc: DecodeSelectorFunc) GraphSyncNetwork
} }
goGraphSyncNetworkPF .. libP2PGraphSyncNetwork goGraphSyncNetworkPF .. libP2PGraphSyncNetwork
} }
...@@ -90,55 +58,31 @@ package "go-graphsync" { ...@@ -90,55 +58,31 @@ package "go-graphsync" {
package requestmanager { package requestmanager {
class RequestManager { class RequestManager {
network : GraphSyncNetwork network : GraphSyncNetwork
selectorQuerier: SelectorQuerier
SendRequest(p peer.ID, selector Selector, root Cid) SelectionTraverser SendRequest(p peer.ID, selector Selector, root Cid) chan Block
ProcessResponses(responses []GraphSyncResponse) ProcessResponses(responses []GraphSyncResponse)
} }
SelectorQuerier --* RequestManager
RequestManager *-- GraphSyncNetwork RequestManager *-- GraphSyncNetwork
GraphSync *-- RequestManager GraphSync *-- RequestManager
note as requestManager
The ResponseManager uses Validate from go-selector
to filter incoming Responses based on whether they are valid and produce
a traverser that only returns valid responses
end note
} }
package responsemanager { package responsemanager {
class ResponseManager { class ResponseManager {
network : GraphySyncNetwork network : GraphySyncNetwork
selectorQuerier: SelectorQuerier
ProcessRequests(p peer.ID, requests []GraphSyncRequests) ProcessRequests(p peer.ID, requests []GraphSyncRequests)
} }
SelectorQuerier --* ResponseManager
ResponseManager *-- GraphSyncNetwork ResponseManager *-- GraphSyncNetwork
GraphSync *-- ResponseManager GraphSync *-- ResponseManager
note as responseManagerNote
The ResponseManager uses Select from go-selector
to translate the raw bytes into a selector and then
traverse the graph to produce a list of blocks
Question: how to know if response is partial?
end note
} }
package message { package message {
interface "DecodeSelectionResponseFunc func([]byte) SelectionResponse" as DecodeSelectionResponseFunc
interface "DecodeSelectorFunc func([]byte) Selector" as DecodeSelectorFunc
object "Package Public Functions" as goGraphSyncMessagePF { object "Package Public Functions" as goGraphSyncMessagePF {
func FromPBReader(pbr ggio.Reader, func FromPBReader(pbr ggio.Reader) (GraphSyncMessage, error)
decodeSelectionResponseFunc: DecodeSelectionResponseFunc, func FromNet(r io.Reader) (GraphSyncMessage, error)
decodeSelectorFunc: DecodeSelectorFunc) (GraphSyncMessage, error)
func FromNet(r io.Reader,
decodeSelectionResponseFunc: DecodeSelectionResponseFunc,
decodeSelectorFunc: DecodeSelectorFunc) (GraphSyncMessage, error)
} }
goGraphSyncMessagePF .. libP2PGraphSyncNetwork goGraphSyncMessagePF .. libP2PGraphSyncNetwork
interface GraphSyncRequest { interface GraphSyncRequest {
Selector() Selector Selector() []bytes
Root() Cid
Priority() Priority Priority() Priority
ID() int ID() int
IsCancel() bool IsCancel() bool
...@@ -147,11 +91,13 @@ package "go-graphsync" { ...@@ -147,11 +91,13 @@ package "go-graphsync" {
interface GraphSyncResponse { interface GraphSyncResponse {
RequestID() int RequestID() int
Status() GraphSyncStatus Status() GraphSyncStatus
Extra() []bytes
} }
interface GraphSyncMessage { interface GraphSyncMessage {
Requests() : []GraphSyncRequest Requests() : []GraphSyncRequest
Responses() : []GraphSyncResponse Responses() : []GraphSyncResponse
Blocks() : []Blocks
} }
interface Exportable { interface Exportable {
...@@ -166,7 +112,7 @@ package "go-graphsync" { ...@@ -166,7 +112,7 @@ package "go-graphsync" {
} }
object "PackagePublicFunctions" as goGraphsyncPf { object "PackagePublicFunctions" as goGraphsyncPf {
New(ctx context.Context, network GraphSyncNetwork, selectorQuerier SelectorQuerier) GraphSync New(ctx context.Context, network GraphSyncNetwork) GraphSync
} }
} }
...@@ -174,18 +120,13 @@ package "go-graphsync" { ...@@ -174,18 +120,13 @@ package "go-graphsync" {
package "go-filecoin" { package "go-filecoin" {
class "go-filecoin" { class "go-filecoin" {
graphSync : GraphSync graphSync : GraphSync
selectorQuerier: SelectorQuerier
host: libp2pHost.Host host: libp2pHost.Host
decodeSelectionResponseFunc: DecodeSelectionResponseFunc
decodeSelectorFunc: DecodeSelectorFunc
} }
"go-filecoin" *-- GraphSync "go-filecoin" *-- GraphSync
"go-filecoin" .. goGraphsyncPf "go-filecoin" .. goGraphsyncPf
"go-filecoin" .. goGraphSyncNetworkPF "go-filecoin" .. goGraphSyncNetworkPF
"go-filecoin" .. Selector "go-filecoin" .. Selector
"go-filecoin" *-- SelectorQuerier
"go-filecoin" .. goSelectorPF
} }
......
...@@ -4,10 +4,11 @@ import ( ...@@ -4,10 +4,11 @@ import (
"fmt" "fmt"
"io" "io"
"github.com/ipfs/go-block-format"
ggio "github.com/gogo/protobuf/io" ggio "github.com/gogo/protobuf/io"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
pb "github.com/ipfs/go-graphsync/message/pb" pb "github.com/ipfs/go-graphsync/message/pb"
gsselector "github.com/ipfs/go-graphsync/selector"
inet "github.com/libp2p/go-libp2p-net" inet "github.com/libp2p/go-libp2p-net"
) )
...@@ -36,6 +37,9 @@ const ( ...@@ -36,6 +37,9 @@ const (
// OtherProtocol means a different type of response than GraphSync is // OtherProtocol means a different type of response than GraphSync is
// contained in extra. // contained in extra.
OtherProtocol = GraphSyncResponseStatusCode(13) OtherProtocol = GraphSyncResponseStatusCode(13)
// PartialResponse may include blocks and metadata about the in progress response
// in extra.
PartialResponse = GraphSyncResponseStatusCode(14)
// Success Response Codes (request terminated) // Success Response Codes (request terminated)
...@@ -65,8 +69,7 @@ const ( ...@@ -65,8 +69,7 @@ const (
// GraphSyncRequest is an interface for accessing data on request contained in a // GraphSyncRequest is an interface for accessing data on request contained in a
// GraphSyncMessage. // GraphSyncMessage.
type GraphSyncRequest interface { type GraphSyncRequest interface {
Selector() gsselector.Selector Selector() []byte
Root() cid.Cid
Priority() GraphSyncPriority Priority() GraphSyncPriority
ID() GraphSyncRequestID ID() GraphSyncRequestID
IsCancel() bool IsCancel() bool
...@@ -77,7 +80,7 @@ type GraphSyncRequest interface { ...@@ -77,7 +80,7 @@ type GraphSyncRequest interface {
type GraphSyncResponse interface { type GraphSyncResponse interface {
RequestID() GraphSyncRequestID RequestID() GraphSyncRequestID
Status() GraphSyncResponseStatusCode Status() GraphSyncResponseStatusCode
Response() gsselector.SelectionResponse Extra() []byte
} }
// GraphSyncMessage is interface that can be serialized and deserialized to send // GraphSyncMessage is interface that can be serialized and deserialized to send
...@@ -87,9 +90,10 @@ type GraphSyncMessage interface { ...@@ -87,9 +90,10 @@ type GraphSyncMessage interface {
Responses() []GraphSyncResponse Responses() []GraphSyncResponse
Blocks() []blocks.Block
AddRequest(id GraphSyncRequestID, AddRequest(id GraphSyncRequestID,
selector gsselector.Selector, selector []byte,
root cid.Cid,
priority GraphSyncPriority) priority GraphSyncPriority)
Cancel(id GraphSyncRequestID) Cancel(id GraphSyncRequestID)
...@@ -97,7 +101,9 @@ type GraphSyncMessage interface { ...@@ -97,7 +101,9 @@ type GraphSyncMessage interface {
AddResponse( AddResponse(
requestID GraphSyncRequestID, requestID GraphSyncRequestID,
status GraphSyncResponseStatusCode, status GraphSyncResponseStatusCode,
response gsselector.SelectionResponse) extra []byte)
AddBlock(blocks.Block)
Exportable Exportable
...@@ -111,8 +117,7 @@ type Exportable interface { ...@@ -111,8 +117,7 @@ type Exportable interface {
} }
type graphSyncRequest struct { type graphSyncRequest struct {
selector gsselector.Selector selector []byte
root cid.Cid
priority GraphSyncPriority priority GraphSyncPriority
id GraphSyncRequestID id GraphSyncRequestID
isCancel bool isCancel bool
...@@ -121,22 +126,15 @@ type graphSyncRequest struct { ...@@ -121,22 +126,15 @@ type graphSyncRequest struct {
type graphSyncResponse struct { type graphSyncResponse struct {
requestID GraphSyncRequestID requestID GraphSyncRequestID
status GraphSyncResponseStatusCode status GraphSyncResponseStatusCode
response gsselector.SelectionResponse extra []byte
} }
type graphSyncMessage struct { type graphSyncMessage struct {
requests map[GraphSyncRequestID]*graphSyncRequest requests map[GraphSyncRequestID]*graphSyncRequest
responses map[GraphSyncRequestID]*graphSyncResponse responses map[GraphSyncRequestID]*graphSyncResponse
blocks map[cid.Cid]blocks.Block
} }
// DecodeSelectorFunc is a function that can build a type that satisfies
// the Selector interface from a raw byte array.
type DecodeSelectorFunc func([]byte) gsselector.Selector
// DecodeSelectionResponseFunc is a function that can build a type that satisfies
// the SelectionResponse interface from a raw byte array.
type DecodeSelectionResponseFunc func([]byte) gsselector.SelectionResponse
// New initializes a new blank GraphSyncMessage // New initializes a new blank GraphSyncMessage
func New() GraphSyncMessage { func New() GraphSyncMessage {
return newMsg() return newMsg()
...@@ -146,25 +144,37 @@ func newMsg() *graphSyncMessage { ...@@ -146,25 +144,37 @@ func newMsg() *graphSyncMessage {
return &graphSyncMessage{ return &graphSyncMessage{
requests: make(map[GraphSyncRequestID]*graphSyncRequest), requests: make(map[GraphSyncRequestID]*graphSyncRequest),
responses: make(map[GraphSyncRequestID]*graphSyncResponse), responses: make(map[GraphSyncRequestID]*graphSyncResponse),
blocks: make(map[cid.Cid]blocks.Block),
} }
} }
func newMessageFromProto(pbm pb.Message, func newMessageFromProto(pbm pb.Message) (GraphSyncMessage, error) {
decodeSelector DecodeSelectorFunc,
decodeSelectionResponse DecodeSelectionResponseFunc) (GraphSyncMessage, error) {
gsm := newMsg() gsm := newMsg()
for _, req := range pbm.Reqlist { for _, req := range pbm.Requests {
selector := decodeSelector(req.Selector) gsm.addRequest(GraphSyncRequestID(req.Id), req.Selector, GraphSyncPriority(req.Priority), req.Cancel)
root, err := cid.Cast([]byte(req.Root)) }
for _, res := range pbm.Responses {
gsm.AddResponse(GraphSyncRequestID(res.Id), GraphSyncResponseStatusCode(res.Status), res.Extra)
}
for _, b := range pbm.GetData() {
pref, err := cid.PrefixFromBytes(b.GetPrefix())
if err != nil { if err != nil {
return nil, fmt.Errorf("incorrectly formatted cid in request queery: %s", err) return nil, err
} }
gsm.addRequest(GraphSyncRequestID(req.Id), selector, root, GraphSyncPriority(req.Priority), req.Cancel)
c, err := pref.Sum(b.GetData())
if err != nil {
return nil, err
} }
for _, res := range pbm.Reslist { blk, err := blocks.NewBlockWithCid(b.GetData(), c)
selectionResponse := decodeSelectionResponse(res.Data) if err != nil {
gsm.AddResponse(GraphSyncRequestID(res.Id), GraphSyncResponseStatusCode(res.Status), selectionResponse) return nil, err
}
gsm.AddBlock(blk)
} }
return gsm, nil return gsm, nil
...@@ -186,28 +196,33 @@ func (gsm *graphSyncMessage) Responses() []GraphSyncResponse { ...@@ -186,28 +196,33 @@ func (gsm *graphSyncMessage) Responses() []GraphSyncResponse {
return responses return responses
} }
func (gsm *graphSyncMessage) Blocks() []blocks.Block {
bs := make([]blocks.Block, 0, len(gsm.blocks))
for _, block := range gsm.blocks {
bs = append(bs, block)
}
return bs
}
func (gsm *graphSyncMessage) Cancel(id GraphSyncRequestID) { func (gsm *graphSyncMessage) Cancel(id GraphSyncRequestID) {
delete(gsm.requests, id) delete(gsm.requests, id)
gsm.addRequest(id, nil, cid.Cid{}, 0, true) gsm.addRequest(id, nil, 0, true)
} }
func (gsm *graphSyncMessage) AddRequest(id GraphSyncRequestID, func (gsm *graphSyncMessage) AddRequest(id GraphSyncRequestID,
selector gsselector.Selector, selector []byte,
root cid.Cid,
priority GraphSyncPriority, priority GraphSyncPriority,
) { ) {
gsm.addRequest(id, selector, root, priority, false) gsm.addRequest(id, selector, priority, false)
} }
func (gsm *graphSyncMessage) addRequest(id GraphSyncRequestID, func (gsm *graphSyncMessage) addRequest(id GraphSyncRequestID,
selector gsselector.Selector, selector []byte,
root cid.Cid,
priority GraphSyncPriority, priority GraphSyncPriority,
isCancel bool) { isCancel bool) {
gsm.requests[id] = &graphSyncRequest{ gsm.requests[id] = &graphSyncRequest{
id: id, id: id,
selector: selector, selector: selector,
root: root,
priority: priority, priority: priority,
isCancel: isCancel, isCancel: isCancel,
} }
...@@ -215,53 +230,61 @@ func (gsm *graphSyncMessage) addRequest(id GraphSyncRequestID, ...@@ -215,53 +230,61 @@ func (gsm *graphSyncMessage) addRequest(id GraphSyncRequestID,
func (gsm *graphSyncMessage) AddResponse(requestID GraphSyncRequestID, func (gsm *graphSyncMessage) AddResponse(requestID GraphSyncRequestID,
status GraphSyncResponseStatusCode, status GraphSyncResponseStatusCode,
response gsselector.SelectionResponse) { extra []byte) {
gsm.responses[requestID] = &graphSyncResponse{ gsm.responses[requestID] = &graphSyncResponse{
requestID: requestID, requestID: requestID,
status: status, status: status,
response: response, extra: extra,
} }
} }
func (gsm *graphSyncMessage) AddBlock(b blocks.Block) {
gsm.blocks[b.Cid()] = b
}
// FromNet can read a network stream to deserialized a GraphSyncMessage // FromNet can read a network stream to deserialized a GraphSyncMessage
func FromNet(r io.Reader, func FromNet(r io.Reader) (GraphSyncMessage, error) {
decodeSelector DecodeSelectorFunc,
decodeSelectionResponse DecodeSelectionResponseFunc) (GraphSyncMessage, error) {
pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax) pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax)
return FromPBReader(pbr, decodeSelector, decodeSelectionResponse) return FromPBReader(pbr)
} }
// FromPBReader can deserialize a protobuf message into a GraphySyncMessage. // FromPBReader can deserialize a protobuf message into a GraphySyncMessage.
func FromPBReader(pbr ggio.Reader, func FromPBReader(pbr ggio.Reader) (GraphSyncMessage, error) {
decodeSelector DecodeSelectorFunc,
decodeSelectionResponse DecodeSelectionResponseFunc) (GraphSyncMessage, error) {
pb := new(pb.Message) pb := new(pb.Message)
if err := pbr.ReadMsg(pb); err != nil { if err := pbr.ReadMsg(pb); err != nil {
return nil, err return nil, err
} }
return newMessageFromProto(*pb, decodeSelector, decodeSelectionResponse) return newMessageFromProto(*pb)
} }
func (gsm *graphSyncMessage) ToProto() *pb.Message { func (gsm *graphSyncMessage) ToProto() *pb.Message {
pbm := new(pb.Message) pbm := new(pb.Message)
pbm.Reqlist = make([]pb.Message_Request, 0, len(gsm.requests)) pbm.Requests = make([]pb.Message_Request, 0, len(gsm.requests))
for _, request := range gsm.requests { for _, request := range gsm.requests {
pbm.Reqlist = append(pbm.Reqlist, pb.Message_Request{ pbm.Requests = append(pbm.Requests, pb.Message_Request{
Id: int32(request.id), Id: int32(request.id),
Root: request.root.Bytes(), Selector: request.selector,
Selector: request.selector.RawData(),
Priority: int32(request.priority), Priority: int32(request.priority),
Cancel: request.isCancel, Cancel: request.isCancel,
}) })
} }
pbm.Reslist = make([]pb.Message_Response, 0, len(gsm.responses)) pbm.Responses = make([]pb.Message_Response, 0, len(gsm.responses))
for _, response := range gsm.responses { for _, response := range gsm.responses {
pbm.Reslist = append(pbm.Reslist, pb.Message_Response{ pbm.Responses = append(pbm.Responses, pb.Message_Response{
Id: int32(response.requestID), Id: int32(response.requestID),
Status: int32(response.status), Status: int32(response.status),
Data: response.response.RawData(), Extra: response.extra,
})
}
blocks := gsm.Blocks()
pbm.Data = make([]pb.Message_Block, 0, len(blocks))
for _, b := range blocks {
pbm.Data = append(pbm.Data, pb.Message_Block{
Data: b.RawData(),
Prefix: b.Cid().Prefix().Bytes(),
}) })
} }
return pbm return pbm
...@@ -289,11 +312,10 @@ func (gsm *graphSyncMessage) Loggable() map[string]interface{} { ...@@ -289,11 +312,10 @@ func (gsm *graphSyncMessage) Loggable() map[string]interface{} {
} }
func (gsr *graphSyncRequest) ID() GraphSyncRequestID { return gsr.id } func (gsr *graphSyncRequest) ID() GraphSyncRequestID { return gsr.id }
func (gsr *graphSyncRequest) Root() cid.Cid { return gsr.root } func (gsr *graphSyncRequest) Selector() []byte { return gsr.selector }
func (gsr *graphSyncRequest) Selector() gsselector.Selector { return gsr.selector }
func (gsr *graphSyncRequest) Priority() GraphSyncPriority { return gsr.priority } func (gsr *graphSyncRequest) Priority() GraphSyncPriority { return gsr.priority }
func (gsr *graphSyncRequest) IsCancel() bool { return gsr.isCancel } func (gsr *graphSyncRequest) IsCancel() bool { return gsr.isCancel }
func (gsr *graphSyncResponse) RequestID() GraphSyncRequestID { return gsr.requestID } func (gsr *graphSyncResponse) RequestID() GraphSyncRequestID { return gsr.requestID }
func (gsr *graphSyncResponse) Status() GraphSyncResponseStatusCode { return gsr.status } func (gsr *graphSyncResponse) Status() GraphSyncResponseStatusCode { return gsr.status }
func (gsr *graphSyncResponse) Response() gsselector.SelectionResponse { return gsr.response } func (gsr *graphSyncResponse) Extra() []byte { return gsr.extra }
...@@ -6,17 +6,19 @@ import ( ...@@ -6,17 +6,19 @@ import (
"reflect" "reflect"
"testing" "testing"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync/testutil" "github.com/ipfs/go-graphsync/testutil"
) )
func TestAppendingRequests(t *testing.T) { func TestAppendingRequests(t *testing.T) {
selector := testutil.GenerateSelector() selector := testutil.RandomBytes(100)
root := testutil.GenerateRootCid()
id := GraphSyncRequestID(rand.Int31()) id := GraphSyncRequestID(rand.Int31())
priority := GraphSyncPriority(rand.Int31()) priority := GraphSyncPriority(rand.Int31())
gsm := New() gsm := New()
gsm.AddRequest(id, selector, root, priority) gsm.AddRequest(id, selector, priority)
requests := gsm.Requests() requests := gsm.Requests()
if len(requests) != 1 { if len(requests) != 1 {
t.Fatal("Did not add request to message") t.Fatal("Did not add request to message")
...@@ -25,25 +27,20 @@ func TestAppendingRequests(t *testing.T) { ...@@ -25,25 +27,20 @@ func TestAppendingRequests(t *testing.T) {
if request.ID() != id || if request.ID() != id ||
request.IsCancel() != false || request.IsCancel() != false ||
request.Priority() != priority || request.Priority() != priority ||
!reflect.DeepEqual(request.Root(), root) ||
!reflect.DeepEqual(request.Selector(), selector) { !reflect.DeepEqual(request.Selector(), selector) {
t.Fatal("Did not properly add request to message") t.Fatal("Did not properly add request to message")
} }
pbMessage := gsm.ToProto() pbMessage := gsm.ToProto()
pbRequest := pbMessage.Reqlist[0] pbRequest := pbMessage.Requests[0]
if pbRequest.Id != int32(id) || if pbRequest.Id != int32(id) ||
pbRequest.Priority != int32(priority) || pbRequest.Priority != int32(priority) ||
pbRequest.Cancel != false || pbRequest.Cancel != false ||
!reflect.DeepEqual(pbRequest.Root, root.Bytes()) || !reflect.DeepEqual(pbRequest.Selector, selector) {
!reflect.DeepEqual(pbRequest.Selector, selector.RawData()) {
t.Fatal("Did not properly serialize message to protobuf") t.Fatal("Did not properly serialize message to protobuf")
} }
deserialized, err := newMessageFromProto(*pbMessage, deserialized, err := newMessageFromProto(*pbMessage)
testutil.MockDecodeSelectorFunc,
testutil.MockDecodeSelectionResponseFunc,
)
if err != nil { if err != nil {
t.Fatal("Error deserializing protobuf message") t.Fatal("Error deserializing protobuf message")
} }
...@@ -55,19 +52,18 @@ func TestAppendingRequests(t *testing.T) { ...@@ -55,19 +52,18 @@ func TestAppendingRequests(t *testing.T) {
if deserializedRequest.ID() != id || if deserializedRequest.ID() != id ||
deserializedRequest.IsCancel() != false || deserializedRequest.IsCancel() != false ||
deserializedRequest.Priority() != priority || deserializedRequest.Priority() != priority ||
!reflect.DeepEqual(deserializedRequest.Root(), root) ||
!reflect.DeepEqual(deserializedRequest.Selector(), selector) { !reflect.DeepEqual(deserializedRequest.Selector(), selector) {
t.Fatal("Did not properly deserialize protobuf messages so requests are equal") t.Fatal("Did not properly deserialize protobuf messages so requests are equal")
} }
} }
func TestAppendingResponses(t *testing.T) { func TestAppendingResponses(t *testing.T) {
selectionResponse := testutil.GenerateSelectionResponse() extra := testutil.RandomBytes(100)
requestID := GraphSyncRequestID(rand.Int31()) requestID := GraphSyncRequestID(rand.Int31())
status := RequestAcknowledged status := RequestAcknowledged
gsm := New() gsm := New()
gsm.AddResponse(requestID, status, selectionResponse) gsm.AddResponse(requestID, status, extra)
responses := gsm.Responses() responses := gsm.Responses()
if len(responses) != 1 { if len(responses) != 1 {
t.Fatal("Did not add response to message") t.Fatal("Did not add response to message")
...@@ -75,22 +71,19 @@ func TestAppendingResponses(t *testing.T) { ...@@ -75,22 +71,19 @@ func TestAppendingResponses(t *testing.T) {
response := responses[0] response := responses[0]
if response.RequestID() != requestID || if response.RequestID() != requestID ||
response.Status() != status || response.Status() != status ||
!reflect.DeepEqual(response.Response(), selectionResponse) { !reflect.DeepEqual(response.Extra(), extra) {
t.Fatal("Did not properly add response to message") t.Fatal("Did not properly add response to message")
} }
pbMessage := gsm.ToProto() pbMessage := gsm.ToProto()
pbResponse := pbMessage.Reslist[0] pbResponse := pbMessage.Responses[0]
if pbResponse.Id != int32(requestID) || if pbResponse.Id != int32(requestID) ||
pbResponse.Status != int32(status) || pbResponse.Status != int32(status) ||
!reflect.DeepEqual(pbResponse.Data, selectionResponse.RawData()) { !reflect.DeepEqual(pbResponse.Extra, extra) {
t.Fatal("Did not properly serialize message to protobuf") t.Fatal("Did not properly serialize message to protobuf")
} }
deserialized, err := newMessageFromProto(*pbMessage, deserialized, err := newMessageFromProto(*pbMessage)
testutil.MockDecodeSelectorFunc,
testutil.MockDecodeSelectionResponseFunc,
)
if err != nil { if err != nil {
t.Fatal("Error deserializing protobuf message") t.Fatal("Error deserializing protobuf message")
} }
...@@ -101,19 +94,48 @@ func TestAppendingResponses(t *testing.T) { ...@@ -101,19 +94,48 @@ func TestAppendingResponses(t *testing.T) {
deserializedResponse := deserializedResponses[0] deserializedResponse := deserializedResponses[0]
if deserializedResponse.RequestID() != requestID || if deserializedResponse.RequestID() != requestID ||
deserializedResponse.Status() != status || deserializedResponse.Status() != status ||
!reflect.DeepEqual(deserializedResponse.Response(), selectionResponse) { !reflect.DeepEqual(deserializedResponse.Extra(), extra) {
t.Fatal("Did not properly deserialize protobuf messages so responses are equal") t.Fatal("Did not properly deserialize protobuf messages so responses are equal")
} }
} }
func TestAppendBlock(t *testing.T) {
strs := make([]string, 2)
strs = append(strs, "Celeritas")
strs = append(strs, "Incendia")
m := New()
for _, str := range strs {
block := blocks.NewBlock([]byte(str))
m.AddBlock(block)
}
// assert strings are in proto message
for _, block := range m.ToProto().GetData() {
s := bytes.NewBuffer(block.GetData()).String()
if !contains(strs, s) {
t.Fail()
}
}
}
func contains(strs []string, x string) bool {
for _, s := range strs {
if s == x {
return true
}
}
return false
}
func TestRequestCancel(t *testing.T) { func TestRequestCancel(t *testing.T) {
selector := testutil.GenerateSelector() selector := testutil.RandomBytes(100)
root := testutil.GenerateRootCid()
id := GraphSyncRequestID(rand.Int31()) id := GraphSyncRequestID(rand.Int31())
priority := GraphSyncPriority(rand.Int31()) priority := GraphSyncPriority(rand.Int31())
gsm := New() gsm := New()
gsm.AddRequest(id, selector, root, priority) gsm.AddRequest(id, selector, priority)
gsm.Cancel(id) gsm.Cancel(id)
...@@ -129,26 +151,27 @@ func TestRequestCancel(t *testing.T) { ...@@ -129,26 +151,27 @@ func TestRequestCancel(t *testing.T) {
} }
func TestToNetFromNetEquivalency(t *testing.T) { func TestToNetFromNetEquivalency(t *testing.T) {
selector := testutil.GenerateSelector() selector := testutil.RandomBytes(100)
root := testutil.GenerateRootCid() extra := testutil.RandomBytes(100)
selectionResponse := testutil.GenerateSelectionResponse()
id := GraphSyncRequestID(rand.Int31()) id := GraphSyncRequestID(rand.Int31())
priority := GraphSyncPriority(rand.Int31()) priority := GraphSyncPriority(rand.Int31())
status := RequestAcknowledged status := RequestAcknowledged
gsm := New() gsm := New()
gsm.AddRequest(id, selector, root, priority) gsm.AddRequest(id, selector, priority)
gsm.AddResponse(id, status, selectionResponse) gsm.AddResponse(id, status, extra)
gsm.AddBlock(blocks.NewBlock([]byte("W")))
gsm.AddBlock(blocks.NewBlock([]byte("E")))
gsm.AddBlock(blocks.NewBlock([]byte("F")))
gsm.AddBlock(blocks.NewBlock([]byte("M")))
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
err := gsm.ToNet(buf) err := gsm.ToNet(buf)
if err != nil { if err != nil {
t.Fatal("Unable to serialize GraphSyncMessage") t.Fatal("Unable to serialize GraphSyncMessage")
} }
deserialized, err := FromNet(buf, deserialized, err := FromNet(buf)
testutil.MockDecodeSelectorFunc,
testutil.MockDecodeSelectionResponseFunc,
)
if err != nil { if err != nil {
t.Fatal("Error deserializing protobuf message") t.Fatal("Error deserializing protobuf message")
} }
...@@ -166,7 +189,6 @@ func TestToNetFromNetEquivalency(t *testing.T) { ...@@ -166,7 +189,6 @@ func TestToNetFromNetEquivalency(t *testing.T) {
if deserializedRequest.ID() != request.ID() || if deserializedRequest.ID() != request.ID() ||
deserializedRequest.IsCancel() != request.IsCancel() || deserializedRequest.IsCancel() != request.IsCancel() ||
deserializedRequest.Priority() != request.Priority() || deserializedRequest.Priority() != request.Priority() ||
!reflect.DeepEqual(deserializedRequest.Root(), request.Root()) ||
!reflect.DeepEqual(deserializedRequest.Selector(), request.Selector()) { !reflect.DeepEqual(deserializedRequest.Selector(), request.Selector()) {
t.Fatal("Did not keep requests when writing to stream and back") t.Fatal("Did not keep requests when writing to stream and back")
} }
...@@ -183,7 +205,18 @@ func TestToNetFromNetEquivalency(t *testing.T) { ...@@ -183,7 +205,18 @@ func TestToNetFromNetEquivalency(t *testing.T) {
deserializedResponse := deserializedResponses[0] deserializedResponse := deserializedResponses[0]
if deserializedResponse.RequestID() != response.RequestID() || if deserializedResponse.RequestID() != response.RequestID() ||
deserializedResponse.Status() != response.Status() || deserializedResponse.Status() != response.Status() ||
!reflect.DeepEqual(deserializedResponse.Response(), response.Response()) { !reflect.DeepEqual(deserializedResponse.Extra(), response.Extra()) {
t.Fatal("Did not keep responses when writing to stream and back") t.Fatal("Did not keep responses when writing to stream and back")
} }
keys := make(map[cid.Cid]bool)
for _, b := range deserialized.Blocks() {
keys[b.Cid()] = true
}
for _, b := range gsm.Blocks() {
if _, ok := keys[b.Cid()]; !ok {
t.Fail()
}
}
} }
This diff is collapsed.
...@@ -8,21 +8,27 @@ message Message { ...@@ -8,21 +8,27 @@ message Message {
message Request { message Request {
int32 id = 1; // unique id set on the requester side int32 id = 1; // unique id set on the requester side
bytes root = 2; // ipld root node for selector bytes selector = 2; // ipld selector to retrieve
bytes selector = 3; // ipld selector to retrieve bytes extra = 3; // aux information. useful for other protocols
bytes extra = 4; // aux information. useful for other protocols int32 priority = 4; // the priority (normalized). default to 1
int32 priority = 5; // the priority (normalized). default to 1 bool cancel = 5; // whether this cancels a request
bool cancel = 6; // whether this cancels a request
} }
message Response { message Response {
int32 id = 1; // the request id int32 id = 1; // the request id
int32 status = 2; // a status code. int32 status = 2; // a status code.
bytes data = 3; // core response data bytes extra = 3; // additional data
bytes extra = 4; // additional data }
message Block {
bytes prefix = 1; // CID prefix (cid version, multicodec and multihash prefix (type + length)
bytes data = 2;
} }
// the actual data included in this message // the actual data included in this message
repeated Request reqlist = 1 [(gogoproto.nullable) = false]; bool completeRequestList = 1; // This request list includes *all* requests, replacing outstanding requests.
repeated Response reslist = 2 [(gogoproto.nullable) = false]; repeated Request requests = 2 [(gogoproto.nullable) = false]; // The list of requests.
repeated Response responses = 3 [(gogoproto.nullable) = false]; // The list of responses.
repeated Block data = 4 [(gogoproto.nullable) = false]; // Blocks related to the responses
} }
...@@ -22,13 +22,9 @@ var log = logging.Logger("graphsync_network") ...@@ -22,13 +22,9 @@ var log = logging.Logger("graphsync_network")
var sendMessageTimeout = time.Minute * 10 var sendMessageTimeout = time.Minute * 10
// NewFromLibp2pHost returns a GraphSyncNetwork supported by underlying Libp2p host. // NewFromLibp2pHost returns a GraphSyncNetwork supported by underlying Libp2p host.
func NewFromLibp2pHost(host host.Host, func NewFromLibp2pHost(host host.Host) GraphSyncNetwork {
decodeSelectorFunc gsmsg.DecodeSelectorFunc,
decodeSelectionResponseFunc gsmsg.DecodeSelectionResponseFunc) GraphSyncNetwork {
graphSyncNetwork := libp2pGraphSyncNetwork{ graphSyncNetwork := libp2pGraphSyncNetwork{
host: host, host: host,
decodeSelectorFunc: decodeSelectorFunc,
decodeSelectionResponseFunc: decodeSelectionResponseFunc,
} }
host.SetStreamHandler(ProtocolGraphsync, graphSyncNetwork.handleNewStream) host.SetStreamHandler(ProtocolGraphsync, graphSyncNetwork.handleNewStream)
...@@ -39,8 +35,6 @@ func NewFromLibp2pHost(host host.Host, ...@@ -39,8 +35,6 @@ func NewFromLibp2pHost(host host.Host,
// NetMessage objects, into the graphsync network interface. // NetMessage objects, into the graphsync network interface.
type libp2pGraphSyncNetwork struct { type libp2pGraphSyncNetwork struct {
host host.Host host host.Host
decodeSelectionResponseFunc gsmsg.DecodeSelectionResponseFunc
decodeSelectorFunc gsmsg.DecodeSelectorFunc
// inbound messages from the network are forwarded to the receiver // inbound messages from the network are forwarded to the receiver
receiver Receiver receiver Receiver
} }
...@@ -146,9 +140,7 @@ func (gsnet *libp2pGraphSyncNetwork) handleNewStream(s inet.Stream) { ...@@ -146,9 +140,7 @@ func (gsnet *libp2pGraphSyncNetwork) handleNewStream(s inet.Stream) {
reader := ggio.NewDelimitedReader(s, inet.MessageSizeMax) reader := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
for { for {
received, err := gsmsg.FromPBReader(reader, received, err := gsmsg.FromPBReader(reader)
gsnet.decodeSelectorFunc,
gsnet.decodeSelectionResponseFunc)
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
s.Reset() s.Reset()
......
...@@ -54,28 +54,23 @@ func TestMessageSendAndReceive(t *testing.T) { ...@@ -54,28 +54,23 @@ func TestMessageSendAndReceive(t *testing.T) {
if err != nil { if err != nil {
t.Fatal("error linking hosts") t.Fatal("error linking hosts")
} }
gsnet1 := NewFromLibp2pHost(host1, gsnet1 := NewFromLibp2pHost(host1)
testutil.MockDecodeSelectorFunc, gsnet2 := NewFromLibp2pHost(host2)
testutil.MockDecodeSelectionResponseFunc)
gsnet2 := NewFromLibp2pHost(host2,
testutil.MockDecodeSelectorFunc,
testutil.MockDecodeSelectionResponseFunc)
r := &receiver{ r := &receiver{
messageReceived: make(chan struct{}), messageReceived: make(chan struct{}),
} }
gsnet1.SetDelegate(r) gsnet1.SetDelegate(r)
gsnet2.SetDelegate(r) gsnet2.SetDelegate(r)
selector := testutil.GenerateSelector() selector := testutil.RandomBytes(100)
root := testutil.GenerateRootCid() extra := testutil.RandomBytes(100)
selectionResponse := testutil.GenerateSelectionResponse()
id := gsmsg.GraphSyncRequestID(rand.Int31()) id := gsmsg.GraphSyncRequestID(rand.Int31())
priority := gsmsg.GraphSyncPriority(rand.Int31()) priority := gsmsg.GraphSyncPriority(rand.Int31())
status := gsmsg.RequestAcknowledged status := gsmsg.RequestAcknowledged
sent := gsmsg.New() sent := gsmsg.New()
sent.AddRequest(id, selector, root, priority) sent.AddRequest(id, selector, priority)
sent.AddResponse(id, status, selectionResponse) sent.AddResponse(id, status, extra)
err = gsnet1.ConnectTo(ctx, host2.ID()) err = gsnet1.ConnectTo(ctx, host2.ID())
if err != nil { if err != nil {
...@@ -110,11 +105,9 @@ func TestMessageSendAndReceive(t *testing.T) { ...@@ -110,11 +105,9 @@ func TestMessageSendAndReceive(t *testing.T) {
if receivedRequest.ID() != sentRequest.ID() || if receivedRequest.ID() != sentRequest.ID() ||
receivedRequest.IsCancel() != sentRequest.IsCancel() || receivedRequest.IsCancel() != sentRequest.IsCancel() ||
receivedRequest.Priority() != sentRequest.Priority() || receivedRequest.Priority() != sentRequest.Priority() ||
!reflect.DeepEqual(receivedRequest.Root(), sentRequest.Root()) ||
!reflect.DeepEqual(receivedRequest.Selector(), sentRequest.Selector()) { !reflect.DeepEqual(receivedRequest.Selector(), sentRequest.Selector()) {
t.Fatal("Sent message requests did not match received message requests") t.Fatal("Sent message requests did not match received message requests")
} }
sentResponses := sent.Responses() sentResponses := sent.Responses()
if len(sentResponses) != 1 { if len(sentResponses) != 1 {
t.Fatal("Did not add response to sent message") t.Fatal("Did not add response to sent message")
...@@ -127,7 +120,7 @@ func TestMessageSendAndReceive(t *testing.T) { ...@@ -127,7 +120,7 @@ func TestMessageSendAndReceive(t *testing.T) {
receivedResponse := receivedResponses[0] receivedResponse := receivedResponses[0]
if receivedResponse.RequestID() != sentResponse.RequestID() || if receivedResponse.RequestID() != sentResponse.RequestID() ||
receivedResponse.Status() != sentResponse.Status() || receivedResponse.Status() != sentResponse.Status() ||
!reflect.DeepEqual(receivedResponse.Response(), sentResponse.Response()) { !reflect.DeepEqual(receivedResponse.Extra(), sentResponse.Extra()) {
t.Fatal("Sent message responses did not match received message responses") t.Fatal("Sent message responses did not match received message responses")
} }
} }
package selector
import (
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
)
// Selector is an interface for an IPLD Selector.
type Selector interface {
ipld.Node
}
// SelectionResponse is an interface that represents part of the results
// of a selector query.
type SelectionResponse interface {
ipld.Node
}
// SelectionTraverser is an interface for navigating a response to a selector
// query.
type SelectionTraverser interface {
Next() (SelectionResponse, error)
Cancel()
}
// SelectorQuerier can be used to make and validate selector queries.
type SelectorQuerier interface {
Select(Selector, root cid.Cid) SelectionTraverser
Validate(Selector, root cid.Cid, incomingResponses SelectionTraverser) SelectionTraverser
}
...@@ -2,25 +2,23 @@ package testutil ...@@ -2,25 +2,23 @@ package testutil
import ( import (
"bytes" "bytes"
"errors"
"github.com/ipfs/go-block-format" "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
gsselector "github.com/ipfs/go-graphsync/selector"
blocksutil "github.com/ipfs/go-ipfs-blocksutil" blocksutil "github.com/ipfs/go-ipfs-blocksutil"
ipld "github.com/ipfs/go-ipld-format"
random "github.com/jbenet/go-random" random "github.com/jbenet/go-random"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
mh "github.com/multiformats/go-multihash"
) )
var blockGenerator = blocksutil.NewBlockGenerator() var blockGenerator = blocksutil.NewBlockGenerator()
var prioritySeq int var prioritySeq int
var seedSeq int64 var seedSeq int64
func randomBytes(n int64, seed int64) []byte { // RandomBytes returns a byte array of the given size with random values.
func RandomBytes(n int64) []byte {
data := new(bytes.Buffer) data := new(bytes.Buffer)
random.WritePseudoRandomBytes(n, data, seed) random.WritePseudoRandomBytes(n, data, seedSeq)
seedSeq++
return data.Bytes() return data.Bytes()
} }
...@@ -28,8 +26,7 @@ func randomBytes(n int64, seed int64) []byte { ...@@ -28,8 +26,7 @@ func randomBytes(n int64, seed int64) []byte {
func GenerateBlocksOfSize(n int, size int64) []blocks.Block { func GenerateBlocksOfSize(n int, size int64) []blocks.Block {
generatedBlocks := make([]blocks.Block, 0, n) generatedBlocks := make([]blocks.Block, 0, n)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
seedSeq++ b := blocks.NewBlock(RandomBytes(size))
b := blocks.NewBlock(randomBytes(size, seedSeq))
generatedBlocks = append(generatedBlocks, b) generatedBlocks = append(generatedBlocks, b)
} }
...@@ -83,111 +80,3 @@ func IndexOf(blks []blocks.Block, c cid.Cid) int { ...@@ -83,111 +80,3 @@ func IndexOf(blks []blocks.Block, c cid.Cid) int {
func ContainsBlock(blks []blocks.Block, block blocks.Block) bool { func ContainsBlock(blks []blocks.Block, block blocks.Block) bool {
return IndexOf(blks, block.Cid()) != -1 return IndexOf(blks, block.Cid()) != -1
} }
const (
blockSize = 512
)
type byteNode struct {
data []byte
builder cid.Builder
}
var v0CidPrefix = cid.Prefix{
Codec: cid.DagProtobuf,
MhLength: -1,
MhType: mh.SHA2_256,
Version: 0,
}
// ErrEmptyNode is just a simple error code for stubbing out IPLD Node interface
// methods
var ErrEmptyNode = errors.New("dummy node")
func (n *byteNode) Resolve([]string) (interface{}, []string, error) {
return nil, nil, ErrEmptyNode
}
func (n *byteNode) Tree(string, int) []string {
return nil
}
func (n *byteNode) ResolveLink([]string) (*ipld.Link, []string, error) {
return nil, nil, ErrEmptyNode
}
func (n *byteNode) Copy() ipld.Node {
return &byteNode{}
}
func (n *byteNode) Cid() cid.Cid {
c, err := n.builder.Sum(n.RawData())
if err != nil {
return cid.Cid{}
}
return c
}
func (n *byteNode) Links() []*ipld.Link {
return nil
}
func (n *byteNode) Loggable() map[string]interface{} {
return nil
}
func (n *byteNode) String() string {
return string(n.data)
}
func (n *byteNode) RawData() []byte {
return n.data
}
func (n *byteNode) Size() (uint64, error) {
return 0, nil
}
func (n *byteNode) Stat() (*ipld.NodeStat, error) {
return &ipld.NodeStat{}, nil
}
func newNode(data []byte) *byteNode {
return &byteNode{
data: data,
builder: v0CidPrefix,
}
}
// MockDecodeSelectorFunc decodes raw data to a type that satisfies
// the Selector interface
func MockDecodeSelectorFunc(data []byte) gsselector.Selector {
return newNode(data)
}
// MockDecodeSelectionResponseFunc decodes raw data to a type that
// satisfies the SelectionResponse interface
func MockDecodeSelectionResponseFunc(data []byte) gsselector.SelectionResponse {
return newNode(data)
}
// GenerateSelector returns a new mock Selector
func GenerateSelector() gsselector.Selector {
node := newNode(randomBytes(blockSize, seedSeq))
seedSeq++
return node
}
// GenerateSelectionResponse returns a new mock SelectionResponse
func GenerateSelectionResponse() gsselector.SelectionResponse {
node := newNode(randomBytes(blockSize, seedSeq))
seedSeq++
return node
}
// GenerateRootCid generates a new mock CID to serve as a root
func GenerateRootCid() cid.Cid {
node := newNode(randomBytes(blockSize, seedSeq))
seedSeq++
return node.Cid()
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment