Unverified Commit cca24684 authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

Merge pull request #16 from ipfs/chore/update-message-format-15

Chore/update message format 15
parents 1b5d1f52 420ce519
@startuml "GraphSync"
package "go-ipld-format" {
interface Node
}
note top of "go-selector"
Is this the right name for this package?
end note
package "go-selector" {
package "go-ipld-prime" {
interface Node {
interface Selector {
}
interface SelectionResponse {
interface Selector {
}
interface SelectionTraverser {
Next() SelectionResponse, err
Cancel()
}
interface "GetBlockFunc func(cid) block.Block" as GetBlockFunc
interface SelectorQuerier {
Select(Selector, root Cid) SelectionTraverser
Validate(Selector, root Cid, incomingResponses SelectionTraverser) SelectionTraverser
}
object "Package Public Functions" as goSelectorPF {
NewSelectorQuerier(getBlockFunc GetBlockFunc) SelectorQuerier
}
Node <|-- Selector
Node <|-- SelectionResponse
}
package "go-graphsync" {
......@@ -43,14 +17,10 @@ package "go-graphsync" {
network : GraphySyncNetwork
requestManager : RequestManager
responseManager: ResponseManager
selectorQuerier: SelectorQuerier
Request(p peer.ID, selector Selector, root Cid) SelectionTraverser
Request(p peer.ID, selector Selector, root Cid) chan Block
ReceiveMessage(ctx context.Context, sender peer.ID, incoming GraphSyncMessage)
ReceiveError(error)
}
GraphSync *-- SelectorQuerier
package network {
......@@ -80,9 +50,7 @@ package "go-graphsync" {
GraphSyncNetwork <|-- libP2PGraphSyncNetwork
object "Package Public Functions" as goGraphSyncNetworkPF {
NewLibP2PNetwork(host libp2pHost.Host,
decodeSelectionResponseFunc: DecodeSelectionResponseFunc,
decodeSelectorFunc: DecodeSelectorFunc) GraphSyncNetwork
NewLibP2PNetwork(host libp2pHost.Host) GraphSyncNetwork
}
goGraphSyncNetworkPF .. libP2PGraphSyncNetwork
}
......@@ -90,55 +58,31 @@ package "go-graphsync" {
package requestmanager {
class RequestManager {
network : GraphSyncNetwork
selectorQuerier: SelectorQuerier
SendRequest(p peer.ID, selector Selector, root Cid) SelectionTraverser
SendRequest(p peer.ID, selector Selector, root Cid) chan Block
ProcessResponses(responses []GraphSyncResponse)
}
SelectorQuerier --* RequestManager
RequestManager *-- GraphSyncNetwork
GraphSync *-- RequestManager
note as requestManager
The ResponseManager uses Validate from go-selector
to filter incoming Responses based on whether they are valid and produce
a traverser that only returns valid responses
end note
}
package responsemanager {
class ResponseManager {
network : GraphySyncNetwork
selectorQuerier: SelectorQuerier
ProcessRequests(p peer.ID, requests []GraphSyncRequests)
}
SelectorQuerier --* ResponseManager
ResponseManager *-- GraphSyncNetwork
GraphSync *-- ResponseManager
note as responseManagerNote
The ResponseManager uses Select from go-selector
to translate the raw bytes into a selector and then
traverse the graph to produce a list of blocks
Question: how to know if response is partial?
end note
}
package message {
interface "DecodeSelectionResponseFunc func([]byte) SelectionResponse" as DecodeSelectionResponseFunc
interface "DecodeSelectorFunc func([]byte) Selector" as DecodeSelectorFunc
object "Package Public Functions" as goGraphSyncMessagePF {
func FromPBReader(pbr ggio.Reader,
decodeSelectionResponseFunc: DecodeSelectionResponseFunc,
decodeSelectorFunc: DecodeSelectorFunc) (GraphSyncMessage, error)
func FromNet(r io.Reader,
decodeSelectionResponseFunc: DecodeSelectionResponseFunc,
decodeSelectorFunc: DecodeSelectorFunc) (GraphSyncMessage, error)
func FromPBReader(pbr ggio.Reader) (GraphSyncMessage, error)
func FromNet(r io.Reader) (GraphSyncMessage, error)
}
goGraphSyncMessagePF .. libP2PGraphSyncNetwork
interface GraphSyncRequest {
Selector() Selector
Root() Cid
Selector() []bytes
Priority() Priority
ID() int
IsCancel() bool
......@@ -147,11 +91,13 @@ package "go-graphsync" {
interface GraphSyncResponse {
RequestID() int
Status() GraphSyncStatus
Extra() []bytes
}
interface GraphSyncMessage {
Requests() : []GraphSyncRequest
Responses() : []GraphSyncResponse
Blocks() : []Blocks
}
interface Exportable {
......@@ -166,7 +112,7 @@ package "go-graphsync" {
}
object "PackagePublicFunctions" as goGraphsyncPf {
New(ctx context.Context, network GraphSyncNetwork, selectorQuerier SelectorQuerier) GraphSync
New(ctx context.Context, network GraphSyncNetwork) GraphSync
}
}
......@@ -174,18 +120,13 @@ package "go-graphsync" {
package "go-filecoin" {
class "go-filecoin" {
graphSync : GraphSync
selectorQuerier: SelectorQuerier
host: libp2pHost.Host
decodeSelectionResponseFunc: DecodeSelectionResponseFunc
decodeSelectorFunc: DecodeSelectorFunc
}
"go-filecoin" *-- GraphSync
"go-filecoin" .. goGraphsyncPf
"go-filecoin" .. goGraphSyncNetworkPF
"go-filecoin" .. Selector
"go-filecoin" *-- SelectorQuerier
"go-filecoin" .. goSelectorPF
}
......
......@@ -4,10 +4,11 @@ import (
"fmt"
"io"
"github.com/ipfs/go-block-format"
ggio "github.com/gogo/protobuf/io"
cid "github.com/ipfs/go-cid"
pb "github.com/ipfs/go-graphsync/message/pb"
gsselector "github.com/ipfs/go-graphsync/selector"
inet "github.com/libp2p/go-libp2p-net"
)
......@@ -36,6 +37,9 @@ const (
// OtherProtocol means a different type of response than GraphSync is
// contained in extra.
OtherProtocol = GraphSyncResponseStatusCode(13)
// PartialResponse may include blocks and metadata about the in progress response
// in extra.
PartialResponse = GraphSyncResponseStatusCode(14)
// Success Response Codes (request terminated)
......@@ -65,8 +69,7 @@ const (
// GraphSyncRequest is an interface for accessing data on request contained in a
// GraphSyncMessage.
type GraphSyncRequest interface {
Selector() gsselector.Selector
Root() cid.Cid
Selector() []byte
Priority() GraphSyncPriority
ID() GraphSyncRequestID
IsCancel() bool
......@@ -77,7 +80,7 @@ type GraphSyncRequest interface {
type GraphSyncResponse interface {
RequestID() GraphSyncRequestID
Status() GraphSyncResponseStatusCode
Response() gsselector.SelectionResponse
Extra() []byte
}
// GraphSyncMessage is interface that can be serialized and deserialized to send
......@@ -87,9 +90,10 @@ type GraphSyncMessage interface {
Responses() []GraphSyncResponse
Blocks() []blocks.Block
AddRequest(id GraphSyncRequestID,
selector gsselector.Selector,
root cid.Cid,
selector []byte,
priority GraphSyncPriority)
Cancel(id GraphSyncRequestID)
......@@ -97,7 +101,9 @@ type GraphSyncMessage interface {
AddResponse(
requestID GraphSyncRequestID,
status GraphSyncResponseStatusCode,
response gsselector.SelectionResponse)
extra []byte)
AddBlock(blocks.Block)
Exportable
......@@ -111,8 +117,7 @@ type Exportable interface {
}
type graphSyncRequest struct {
selector gsselector.Selector
root cid.Cid
selector []byte
priority GraphSyncPriority
id GraphSyncRequestID
isCancel bool
......@@ -121,22 +126,15 @@ type graphSyncRequest struct {
type graphSyncResponse struct {
requestID GraphSyncRequestID
status GraphSyncResponseStatusCode
response gsselector.SelectionResponse
extra []byte
}
type graphSyncMessage struct {
requests map[GraphSyncRequestID]*graphSyncRequest
responses map[GraphSyncRequestID]*graphSyncResponse
blocks map[cid.Cid]blocks.Block
}
// DecodeSelectorFunc is a function that can build a type that satisfies
// the Selector interface from a raw byte array.
type DecodeSelectorFunc func([]byte) gsselector.Selector
// DecodeSelectionResponseFunc is a function that can build a type that satisfies
// the SelectionResponse interface from a raw byte array.
type DecodeSelectionResponseFunc func([]byte) gsselector.SelectionResponse
// New initializes a new blank GraphSyncMessage
func New() GraphSyncMessage {
return newMsg()
......@@ -146,25 +144,37 @@ func newMsg() *graphSyncMessage {
return &graphSyncMessage{
requests: make(map[GraphSyncRequestID]*graphSyncRequest),
responses: make(map[GraphSyncRequestID]*graphSyncResponse),
blocks: make(map[cid.Cid]blocks.Block),
}
}
func newMessageFromProto(pbm pb.Message,
decodeSelector DecodeSelectorFunc,
decodeSelectionResponse DecodeSelectionResponseFunc) (GraphSyncMessage, error) {
func newMessageFromProto(pbm pb.Message) (GraphSyncMessage, error) {
gsm := newMsg()
for _, req := range pbm.Reqlist {
selector := decodeSelector(req.Selector)
root, err := cid.Cast([]byte(req.Root))
for _, req := range pbm.Requests {
gsm.addRequest(GraphSyncRequestID(req.Id), req.Selector, GraphSyncPriority(req.Priority), req.Cancel)
}
for _, res := range pbm.Responses {
gsm.AddResponse(GraphSyncRequestID(res.Id), GraphSyncResponseStatusCode(res.Status), res.Extra)
}
for _, b := range pbm.GetData() {
pref, err := cid.PrefixFromBytes(b.GetPrefix())
if err != nil {
return nil, fmt.Errorf("incorrectly formatted cid in request queery: %s", err)
return nil, err
}
c, err := pref.Sum(b.GetData())
if err != nil {
return nil, err
}
blk, err := blocks.NewBlockWithCid(b.GetData(), c)
if err != nil {
return nil, err
}
gsm.addRequest(GraphSyncRequestID(req.Id), selector, root, GraphSyncPriority(req.Priority), req.Cancel)
}
for _, res := range pbm.Reslist {
selectionResponse := decodeSelectionResponse(res.Data)
gsm.AddResponse(GraphSyncRequestID(res.Id), GraphSyncResponseStatusCode(res.Status), selectionResponse)
gsm.AddBlock(blk)
}
return gsm, nil
......@@ -186,28 +196,33 @@ func (gsm *graphSyncMessage) Responses() []GraphSyncResponse {
return responses
}
func (gsm *graphSyncMessage) Blocks() []blocks.Block {
bs := make([]blocks.Block, 0, len(gsm.blocks))
for _, block := range gsm.blocks {
bs = append(bs, block)
}
return bs
}
func (gsm *graphSyncMessage) Cancel(id GraphSyncRequestID) {
delete(gsm.requests, id)
gsm.addRequest(id, nil, cid.Cid{}, 0, true)
gsm.addRequest(id, nil, 0, true)
}
func (gsm *graphSyncMessage) AddRequest(id GraphSyncRequestID,
selector gsselector.Selector,
root cid.Cid,
selector []byte,
priority GraphSyncPriority,
) {
gsm.addRequest(id, selector, root, priority, false)
gsm.addRequest(id, selector, priority, false)
}
func (gsm *graphSyncMessage) addRequest(id GraphSyncRequestID,
selector gsselector.Selector,
root cid.Cid,
selector []byte,
priority GraphSyncPriority,
isCancel bool) {
gsm.requests[id] = &graphSyncRequest{
id: id,
selector: selector,
root: root,
priority: priority,
isCancel: isCancel,
}
......@@ -215,53 +230,61 @@ func (gsm *graphSyncMessage) addRequest(id GraphSyncRequestID,
func (gsm *graphSyncMessage) AddResponse(requestID GraphSyncRequestID,
status GraphSyncResponseStatusCode,
response gsselector.SelectionResponse) {
extra []byte) {
gsm.responses[requestID] = &graphSyncResponse{
requestID: requestID,
status: status,
response: response,
extra: extra,
}
}
func (gsm *graphSyncMessage) AddBlock(b blocks.Block) {
gsm.blocks[b.Cid()] = b
}
// FromNet can read a network stream to deserialized a GraphSyncMessage
func FromNet(r io.Reader,
decodeSelector DecodeSelectorFunc,
decodeSelectionResponse DecodeSelectionResponseFunc) (GraphSyncMessage, error) {
func FromNet(r io.Reader) (GraphSyncMessage, error) {
pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax)
return FromPBReader(pbr, decodeSelector, decodeSelectionResponse)
return FromPBReader(pbr)
}
// FromPBReader can deserialize a protobuf message into a GraphySyncMessage.
func FromPBReader(pbr ggio.Reader,
decodeSelector DecodeSelectorFunc,
decodeSelectionResponse DecodeSelectionResponseFunc) (GraphSyncMessage, error) {
func FromPBReader(pbr ggio.Reader) (GraphSyncMessage, error) {
pb := new(pb.Message)
if err := pbr.ReadMsg(pb); err != nil {
return nil, err
}
return newMessageFromProto(*pb, decodeSelector, decodeSelectionResponse)
return newMessageFromProto(*pb)
}
func (gsm *graphSyncMessage) ToProto() *pb.Message {
pbm := new(pb.Message)
pbm.Reqlist = make([]pb.Message_Request, 0, len(gsm.requests))
pbm.Requests = make([]pb.Message_Request, 0, len(gsm.requests))
for _, request := range gsm.requests {
pbm.Reqlist = append(pbm.Reqlist, pb.Message_Request{
pbm.Requests = append(pbm.Requests, pb.Message_Request{
Id: int32(request.id),
Root: request.root.Bytes(),
Selector: request.selector.RawData(),
Selector: request.selector,
Priority: int32(request.priority),
Cancel: request.isCancel,
})
}
pbm.Reslist = make([]pb.Message_Response, 0, len(gsm.responses))
pbm.Responses = make([]pb.Message_Response, 0, len(gsm.responses))
for _, response := range gsm.responses {
pbm.Reslist = append(pbm.Reslist, pb.Message_Response{
pbm.Responses = append(pbm.Responses, pb.Message_Response{
Id: int32(response.requestID),
Status: int32(response.status),
Data: response.response.RawData(),
Extra: response.extra,
})
}
blocks := gsm.Blocks()
pbm.Data = make([]pb.Message_Block, 0, len(blocks))
for _, b := range blocks {
pbm.Data = append(pbm.Data, pb.Message_Block{
Data: b.RawData(),
Prefix: b.Cid().Prefix().Bytes(),
})
}
return pbm
......@@ -288,12 +311,11 @@ func (gsm *graphSyncMessage) Loggable() map[string]interface{} {
}
}
func (gsr *graphSyncRequest) ID() GraphSyncRequestID { return gsr.id }
func (gsr *graphSyncRequest) Root() cid.Cid { return gsr.root }
func (gsr *graphSyncRequest) Selector() gsselector.Selector { return gsr.selector }
func (gsr *graphSyncRequest) Priority() GraphSyncPriority { return gsr.priority }
func (gsr *graphSyncRequest) IsCancel() bool { return gsr.isCancel }
func (gsr *graphSyncRequest) ID() GraphSyncRequestID { return gsr.id }
func (gsr *graphSyncRequest) Selector() []byte { return gsr.selector }
func (gsr *graphSyncRequest) Priority() GraphSyncPriority { return gsr.priority }
func (gsr *graphSyncRequest) IsCancel() bool { return gsr.isCancel }
func (gsr *graphSyncResponse) RequestID() GraphSyncRequestID { return gsr.requestID }
func (gsr *graphSyncResponse) Status() GraphSyncResponseStatusCode { return gsr.status }
func (gsr *graphSyncResponse) Response() gsselector.SelectionResponse { return gsr.response }
func (gsr *graphSyncResponse) RequestID() GraphSyncRequestID { return gsr.requestID }
func (gsr *graphSyncResponse) Status() GraphSyncResponseStatusCode { return gsr.status }
func (gsr *graphSyncResponse) Extra() []byte { return gsr.extra }
......@@ -6,17 +6,19 @@ import (
"reflect"
"testing"
"github.com/ipfs/go-graphsync/testselector"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync/testutil"
)
func TestAppendingRequests(t *testing.T) {
selector := testselector.GenerateSelector()
root := testselector.GenerateRootCid()
selector := testutil.RandomBytes(100)
id := GraphSyncRequestID(rand.Int31())
priority := GraphSyncPriority(rand.Int31())
gsm := New()
gsm.AddRequest(id, selector, root, priority)
gsm.AddRequest(id, selector, priority)
requests := gsm.Requests()
if len(requests) != 1 {
t.Fatal("Did not add request to message")
......@@ -25,25 +27,20 @@ func TestAppendingRequests(t *testing.T) {
if request.ID() != id ||
request.IsCancel() != false ||
request.Priority() != priority ||
!reflect.DeepEqual(request.Root(), root) ||
!reflect.DeepEqual(request.Selector(), selector) {
t.Fatal("Did not properly add request to message")
}
pbMessage := gsm.ToProto()
pbRequest := pbMessage.Reqlist[0]
pbRequest := pbMessage.Requests[0]
if pbRequest.Id != int32(id) ||
pbRequest.Priority != int32(priority) ||
pbRequest.Cancel != false ||
!reflect.DeepEqual(pbRequest.Root, root.Bytes()) ||
!reflect.DeepEqual(pbRequest.Selector, selector.RawData()) {
!reflect.DeepEqual(pbRequest.Selector, selector) {
t.Fatal("Did not properly serialize message to protobuf")
}
deserialized, err := newMessageFromProto(*pbMessage,
testselector.MockDecodeSelectorFunc,
testselector.MockDecodeSelectionResponseFunc,
)
deserialized, err := newMessageFromProto(*pbMessage)
if err != nil {
t.Fatal("Error deserializing protobuf message")
}
......@@ -55,19 +52,18 @@ func TestAppendingRequests(t *testing.T) {
if deserializedRequest.ID() != id ||
deserializedRequest.IsCancel() != false ||
deserializedRequest.Priority() != priority ||
!reflect.DeepEqual(deserializedRequest.Root(), root) ||
!reflect.DeepEqual(deserializedRequest.Selector(), selector) {
t.Fatal("Did not properly deserialize protobuf messages so requests are equal")
}
}
func TestAppendingResponses(t *testing.T) {
selectionResponse := testselector.GenerateSelectionResponse()
extra := testutil.RandomBytes(100)
requestID := GraphSyncRequestID(rand.Int31())
status := RequestAcknowledged
gsm := New()
gsm.AddResponse(requestID, status, selectionResponse)
gsm.AddResponse(requestID, status, extra)
responses := gsm.Responses()
if len(responses) != 1 {
t.Fatal("Did not add response to message")
......@@ -75,22 +71,19 @@ func TestAppendingResponses(t *testing.T) {
response := responses[0]
if response.RequestID() != requestID ||
response.Status() != status ||
!reflect.DeepEqual(response.Response(), selectionResponse) {
!reflect.DeepEqual(response.Extra(), extra) {
t.Fatal("Did not properly add response to message")
}
pbMessage := gsm.ToProto()
pbResponse := pbMessage.Reslist[0]
pbResponse := pbMessage.Responses[0]
if pbResponse.Id != int32(requestID) ||
pbResponse.Status != int32(status) ||
!reflect.DeepEqual(pbResponse.Data, selectionResponse.RawData()) {
!reflect.DeepEqual(pbResponse.Extra, extra) {
t.Fatal("Did not properly serialize message to protobuf")
}
deserialized, err := newMessageFromProto(*pbMessage,
testselector.MockDecodeSelectorFunc,
testselector.MockDecodeSelectionResponseFunc,
)
deserialized, err := newMessageFromProto(*pbMessage)
if err != nil {
t.Fatal("Error deserializing protobuf message")
}
......@@ -101,19 +94,48 @@ func TestAppendingResponses(t *testing.T) {
deserializedResponse := deserializedResponses[0]
if deserializedResponse.RequestID() != requestID ||
deserializedResponse.Status() != status ||
!reflect.DeepEqual(deserializedResponse.Response(), selectionResponse) {
!reflect.DeepEqual(deserializedResponse.Extra(), extra) {
t.Fatal("Did not properly deserialize protobuf messages so responses are equal")
}
}
func TestAppendBlock(t *testing.T) {
strs := make([]string, 2)
strs = append(strs, "Celeritas")
strs = append(strs, "Incendia")
m := New()
for _, str := range strs {
block := blocks.NewBlock([]byte(str))
m.AddBlock(block)
}
// assert strings are in proto message
for _, block := range m.ToProto().GetData() {
s := bytes.NewBuffer(block.GetData()).String()
if !contains(strs, s) {
t.Fail()
}
}
}
func contains(strs []string, x string) bool {
for _, s := range strs {
if s == x {
return true
}
}
return false
}
func TestRequestCancel(t *testing.T) {
selector := testselector.GenerateSelector()
root := testselector.GenerateRootCid()
selector := testutil.RandomBytes(100)
id := GraphSyncRequestID(rand.Int31())
priority := GraphSyncPriority(rand.Int31())
gsm := New()
gsm.AddRequest(id, selector, root, priority)
gsm.AddRequest(id, selector, priority)
gsm.Cancel(id)
......@@ -129,26 +151,27 @@ func TestRequestCancel(t *testing.T) {
}
func TestToNetFromNetEquivalency(t *testing.T) {
selector := testselector.GenerateSelector()
root := testselector.GenerateRootCid()
selectionResponse := testselector.GenerateSelectionResponse()
selector := testutil.RandomBytes(100)
extra := testutil.RandomBytes(100)
id := GraphSyncRequestID(rand.Int31())
priority := GraphSyncPriority(rand.Int31())
status := RequestAcknowledged
gsm := New()
gsm.AddRequest(id, selector, root, priority)
gsm.AddResponse(id, status, selectionResponse)
gsm.AddRequest(id, selector, priority)
gsm.AddResponse(id, status, extra)
gsm.AddBlock(blocks.NewBlock([]byte("W")))
gsm.AddBlock(blocks.NewBlock([]byte("E")))
gsm.AddBlock(blocks.NewBlock([]byte("F")))
gsm.AddBlock(blocks.NewBlock([]byte("M")))
buf := new(bytes.Buffer)
err := gsm.ToNet(buf)
if err != nil {
t.Fatal("Unable to serialize GraphSyncMessage")
}
deserialized, err := FromNet(buf,
testselector.MockDecodeSelectorFunc,
testselector.MockDecodeSelectionResponseFunc,
)
deserialized, err := FromNet(buf)
if err != nil {
t.Fatal("Error deserializing protobuf message")
}
......@@ -166,7 +189,6 @@ func TestToNetFromNetEquivalency(t *testing.T) {
if deserializedRequest.ID() != request.ID() ||
deserializedRequest.IsCancel() != request.IsCancel() ||
deserializedRequest.Priority() != request.Priority() ||
!reflect.DeepEqual(deserializedRequest.Root(), request.Root()) ||
!reflect.DeepEqual(deserializedRequest.Selector(), request.Selector()) {
t.Fatal("Did not keep requests when writing to stream and back")
}
......@@ -183,7 +205,18 @@ func TestToNetFromNetEquivalency(t *testing.T) {
deserializedResponse := deserializedResponses[0]
if deserializedResponse.RequestID() != response.RequestID() ||
deserializedResponse.Status() != response.Status() ||
!reflect.DeepEqual(deserializedResponse.Response(), response.Response()) {
!reflect.DeepEqual(deserializedResponse.Extra(), response.Extra()) {
t.Fatal("Did not keep responses when writing to stream and back")
}
keys := make(map[cid.Cid]bool)
for _, b := range deserialized.Blocks() {
keys[b.Cid()] = true
}
for _, b := range gsm.Blocks() {
if _, ok := keys[b.Cid()]; !ok {
t.Fail()
}
}
}
This diff is collapsed.
......@@ -8,21 +8,27 @@ message Message {
message Request {
int32 id = 1; // unique id set on the requester side
bytes root = 2; // ipld root node for selector
bytes selector = 3; // ipld selector to retrieve
bytes extra = 4; // aux information. useful for other protocols
int32 priority = 5; // the priority (normalized). default to 1
bool cancel = 6; // whether this cancels a request
bytes selector = 2; // ipld selector to retrieve
bytes extra = 3; // aux information. useful for other protocols
int32 priority = 4; // the priority (normalized). default to 1
bool cancel = 5; // whether this cancels a request
}
message Response {
int32 id = 1; // the request id
int32 status = 2; // a status code.
bytes data = 3; // core response data
bytes extra = 4; // additional data
bytes extra = 3; // additional data
}
message Block {
bytes prefix = 1; // CID prefix (cid version, multicodec and multihash prefix (type + length)
bytes data = 2;
}
// the actual data included in this message
repeated Request reqlist = 1 [(gogoproto.nullable) = false];
repeated Response reslist = 2 [(gogoproto.nullable) = false];
bool completeRequestList = 1; // This request list includes *all* requests, replacing outstanding requests.
repeated Request requests = 2 [(gogoproto.nullable) = false]; // The list of requests.
repeated Response responses = 3 [(gogoproto.nullable) = false]; // The list of responses.
repeated Block data = 4 [(gogoproto.nullable) = false]; // Blocks related to the responses
}
......@@ -22,13 +22,9 @@ var log = logging.Logger("graphsync_network")
var sendMessageTimeout = time.Minute * 10
// NewFromLibp2pHost returns a GraphSyncNetwork supported by underlying Libp2p host.
func NewFromLibp2pHost(host host.Host,
decodeSelectorFunc gsmsg.DecodeSelectorFunc,
decodeSelectionResponseFunc gsmsg.DecodeSelectionResponseFunc) GraphSyncNetwork {
func NewFromLibp2pHost(host host.Host) GraphSyncNetwork {
graphSyncNetwork := libp2pGraphSyncNetwork{
host: host,
decodeSelectorFunc: decodeSelectorFunc,
decodeSelectionResponseFunc: decodeSelectionResponseFunc,
host: host,
}
host.SetStreamHandler(ProtocolGraphsync, graphSyncNetwork.handleNewStream)
......@@ -38,9 +34,7 @@ func NewFromLibp2pHost(host host.Host,
// libp2pGraphSyncNetwork transforms the libp2p host interface, which sends and receives
// NetMessage objects, into the graphsync network interface.
type libp2pGraphSyncNetwork struct {
host host.Host
decodeSelectionResponseFunc gsmsg.DecodeSelectionResponseFunc
decodeSelectorFunc gsmsg.DecodeSelectorFunc
host host.Host
// inbound messages from the network are forwarded to the receiver
receiver Receiver
}
......@@ -146,9 +140,7 @@ func (gsnet *libp2pGraphSyncNetwork) handleNewStream(s inet.Stream) {
reader := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
for {
received, err := gsmsg.FromPBReader(reader,
gsnet.decodeSelectorFunc,
gsnet.decodeSelectionResponseFunc)
received, err := gsmsg.FromPBReader(reader)
if err != nil {
if err != io.EOF {
s.Reset()
......
......@@ -8,7 +8,7 @@ import (
"time"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/testselector"
"github.com/ipfs/go-graphsync/testutil"
"github.com/libp2p/go-libp2p-peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
)
......@@ -54,28 +54,23 @@ func TestMessageSendAndReceive(t *testing.T) {
if err != nil {
t.Fatal("error linking hosts")
}
gsnet1 := NewFromLibp2pHost(host1,
testselector.MockDecodeSelectorFunc,
testselector.MockDecodeSelectionResponseFunc)
gsnet2 := NewFromLibp2pHost(host2,
testselector.MockDecodeSelectorFunc,
testselector.MockDecodeSelectionResponseFunc)
gsnet1 := NewFromLibp2pHost(host1)
gsnet2 := NewFromLibp2pHost(host2)
r := &receiver{
messageReceived: make(chan struct{}),
}
gsnet1.SetDelegate(r)
gsnet2.SetDelegate(r)
selector := testselector.GenerateSelector()
root := testselector.GenerateRootCid()
selectionResponse := testselector.GenerateSelectionResponse()
selector := testutil.RandomBytes(100)
extra := testutil.RandomBytes(100)
id := gsmsg.GraphSyncRequestID(rand.Int31())
priority := gsmsg.GraphSyncPriority(rand.Int31())
status := gsmsg.RequestAcknowledged
sent := gsmsg.New()
sent.AddRequest(id, selector, root, priority)
sent.AddResponse(id, status, selectionResponse)
sent.AddRequest(id, selector, priority)
sent.AddResponse(id, status, extra)
err = gsnet1.ConnectTo(ctx, host2.ID())
if err != nil {
......@@ -110,11 +105,9 @@ func TestMessageSendAndReceive(t *testing.T) {
if receivedRequest.ID() != sentRequest.ID() ||
receivedRequest.IsCancel() != sentRequest.IsCancel() ||
receivedRequest.Priority() != sentRequest.Priority() ||
!reflect.DeepEqual(receivedRequest.Root(), sentRequest.Root()) ||
!reflect.DeepEqual(receivedRequest.Selector(), sentRequest.Selector()) {
t.Fatal("Sent message requests did not match received message requests")
}
sentResponses := sent.Responses()
if len(sentResponses) != 1 {
t.Fatal("Did not add response to sent message")
......@@ -127,7 +120,7 @@ func TestMessageSendAndReceive(t *testing.T) {
receivedResponse := receivedResponses[0]
if receivedResponse.RequestID() != sentResponse.RequestID() ||
receivedResponse.Status() != sentResponse.Status() ||
!reflect.DeepEqual(receivedResponse.Response(), sentResponse.Response()) {
!reflect.DeepEqual(receivedResponse.Extra(), sentResponse.Extra()) {
t.Fatal("Sent message responses did not match received message responses")
}
}
package selector
import (
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
)
// Selector is an interface for an IPLD Selector.
type Selector interface {
ipld.Node
}
// SelectionResponse is an interface that represents part of the results
// of a selector query.
type SelectionResponse interface {
ipld.Node
}
// SelectionTraverser is an interface for navigating a response to a selector
// query.
type SelectionTraverser interface {
Next() (SelectionResponse, error)
Cancel()
}
// SelectorQuerier can be used to make and validate selector queries.
type SelectorQuerier interface {
Select(Selector, root cid.Cid) SelectionTraverser
Validate(Selector, root cid.Cid, incomingResponses SelectionTraverser) SelectionTraverser
}
package testselector
import (
"bytes"
"errors"
cid "github.com/ipfs/go-cid"
gsselector "github.com/ipfs/go-graphsync/selector"
ipld "github.com/ipfs/go-ipld-format"
"github.com/jbenet/go-random"
mh "github.com/multiformats/go-multihash"
)
var seedSeq int64
const (
blockSize = 512
)
func randomBytes(n int64, seed int64) []byte {
data := new(bytes.Buffer)
random.WritePseudoRandomBytes(n, data, seed)
return data.Bytes()
}
type byteNode struct {
data []byte
builder cid.Builder
}
var v0CidPrefix = cid.Prefix{
Codec: cid.DagProtobuf,
MhLength: -1,
MhType: mh.SHA2_256,
Version: 0,
}
// ErrEmptyNode is just a simple error code for stubbing out IPLD Node interface
// methods
var ErrEmptyNode = errors.New("dummy node")
func (n *byteNode) Resolve([]string) (interface{}, []string, error) {
return nil, nil, ErrEmptyNode
}
func (n *byteNode) Tree(string, int) []string {
return nil
}
func (n *byteNode) ResolveLink([]string) (*ipld.Link, []string, error) {
return nil, nil, ErrEmptyNode
}
func (n *byteNode) Copy() ipld.Node {
return &byteNode{}
}
func (n *byteNode) Cid() cid.Cid {
c, err := n.builder.Sum(n.RawData())
if err != nil {
return cid.Cid{}
}
return c
}
func (n *byteNode) Links() []*ipld.Link {
return nil
}
func (n *byteNode) Loggable() map[string]interface{} {
return nil
}
func (n *byteNode) String() string {
return string(n.data)
}
func (n *byteNode) RawData() []byte {
return n.data
}
func (n *byteNode) Size() (uint64, error) {
return 0, nil
}
func (n *byteNode) Stat() (*ipld.NodeStat, error) {
return &ipld.NodeStat{}, nil
}
func newNode(data []byte) *byteNode {
return &byteNode{
data: data,
builder: v0CidPrefix,
}
}
// MockDecodeSelectorFunc decodes raw data to a type that satisfies
// the Selector interface
func MockDecodeSelectorFunc(data []byte) gsselector.Selector {
return newNode(data)
}
// MockDecodeSelectionResponseFunc decodes raw data to a type that
// satisfies the SelectionResponse interface
func MockDecodeSelectionResponseFunc(data []byte) gsselector.SelectionResponse {
return newNode(data)
}
// GenerateSelector returns a new mock Selector
func GenerateSelector() gsselector.Selector {
node := newNode(randomBytes(blockSize, seedSeq))
seedSeq++
return node
}
// GenerateSelectionResponse returns a new mock SelectionResponse
func GenerateSelectionResponse() gsselector.SelectionResponse {
node := newNode(randomBytes(blockSize, seedSeq))
seedSeq++
return node
}
// GenerateRootCid generates a new mock CID to serve as a root
func GenerateRootCid() cid.Cid {
node := newNode(randomBytes(blockSize, seedSeq))
seedSeq++
return node.Cid()
}
package testutil
import (
"bytes"
"github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
random "github.com/jbenet/go-random"
peer "github.com/libp2p/go-libp2p-peer"
)
var blockGenerator = blocksutil.NewBlockGenerator()
var prioritySeq int
var seedSeq int64
// RandomBytes returns a byte array of the given size with random values.
func RandomBytes(n int64) []byte {
data := new(bytes.Buffer)
random.WritePseudoRandomBytes(n, data, seedSeq)
seedSeq++
return data.Bytes()
}
// GenerateBlocksOfSize generates a series of blocks of the given byte size
func GenerateBlocksOfSize(n int, size int64) []blocks.Block {
generatedBlocks := make([]blocks.Block, 0, n)
for i := 0; i < n; i++ {
b := blocks.NewBlock(RandomBytes(size))
generatedBlocks = append(generatedBlocks, b)
}
return generatedBlocks
}
// GenerateCids produces n content identifiers.
func GenerateCids(n int) []cid.Cid {
cids := make([]cid.Cid, 0, n)
for i := 0; i < n; i++ {
c := blockGenerator.Next().Cid()
cids = append(cids, c)
}
return cids
}
var peerSeq int
// GeneratePeers creates n peer ids.
func GeneratePeers(n int) []peer.ID {
peerIds := make([]peer.ID, 0, n)
for i := 0; i < n; i++ {
peerSeq++
p := peer.ID(peerSeq)
peerIds = append(peerIds, p)
}
return peerIds
}
// ContainsPeer returns true if a peer is found n a list of peers.
func ContainsPeer(peers []peer.ID, p peer.ID) bool {
for _, n := range peers {
if p == n {
return true
}
}
return false
}
// IndexOf returns the index of a given cid in an array of blocks
func IndexOf(blks []blocks.Block, c cid.Cid) int {
for i, n := range blks {
if n.Cid() == c {
return i
}
}
return -1
}
// ContainsBlock returns true if a block is found n a list of blocks
func ContainsBlock(blks []blocks.Block, block blocks.Block) bool {
return IndexOf(blks, block.Cid()) != -1
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment