Unverified Commit 13985127 authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

Merge pull request #12 from ipfs/feat/wire-message-serialization

Implementation of GraphSyncMessage
parents fc91e90b f1bb8a81
docs/GraphSync.png

229 KB | W: | H:

docs/GraphSync.png

257 KB | W: | H:

docs/GraphSync.png
docs/GraphSync.png
docs/GraphSync.png
docs/GraphSync.png
  • 2-up
  • Swipe
  • Onion skin
This diff is collapsed.
@startuml "GraphSync" @startuml "GraphSync"
package "go-ipld-format" {
interface Node
}
note top of "go-selector" note top of "go-selector"
Is this the right name for this package? Is this the right name for this package?
end note end note
...@@ -20,23 +25,27 @@ package "go-selector" { ...@@ -20,23 +25,27 @@ package "go-selector" {
interface "GetBlockFunc func(cid) block.Block" as GetBlockFunc interface "GetBlockFunc func(cid) block.Block" as GetBlockFunc
interface SelectorQuerier { interface SelectorQuerier {
Select(Selector, root Node) SelectionTraverser Select(Selector, root Cid) SelectionTraverser
Validate(Selector, root Node, incomingResponses SelectionTraverser) SelectionTraverser Validate(Selector, root Cid, incomingResponses SelectionTraverser) SelectionTraverser
} }
object "Package Public Functions" as goSelectorPF { object "Package Public Functions" as goSelectorPF {
NewSelectorQuerier(getBlockFunc GetBlockFunc) SelectorQuerier NewSelectorQuerier(getBlockFunc GetBlockFunc) SelectorQuerier
} }
Node <|-- Selector
Node <|-- SelectionResponse
} }
package "go-graphsync" { package "go-graphsync" {
class GraphSync { class GraphSync {
network : GraphySyncNetwork network : GraphySyncNetwork
requestManager : RequestManager requestManager : RequestManager
responseManager: ResponseManager responseManager: ResponseManager
selectorQuerier: SelectorQuerier selectorQuerier: SelectorQuerier
Request(p peer.ID, selector Selector, root Node) SelectionTraverser Request(p peer.ID, selector Selector, root Cid) SelectionTraverser
ReceiveMessage(ctx context.Context, sender peer.ID, incoming GraphSyncMessage) ReceiveMessage(ctx context.Context, sender peer.ID, incoming GraphSyncMessage)
ReceiveError(error) ReceiveError(error)
} }
...@@ -44,6 +53,7 @@ package "go-graphsync" { ...@@ -44,6 +53,7 @@ package "go-graphsync" {
GraphSync *-- SelectorQuerier GraphSync *-- SelectorQuerier
package network { package network {
interface Receiver { interface Receiver {
ReceiveMessage(ctx context.Context, sender peer.ID, incoming GraphSyncMessage) ReceiveMessage(ctx context.Context, sender peer.ID, incoming GraphSyncMessage)
ReceiveError(error) ReceiveError(error)
...@@ -57,12 +67,14 @@ package "go-graphsync" { ...@@ -57,12 +67,14 @@ package "go-graphsync" {
Receiver <|-- GraphSync : receiver for Receiver <|-- GraphSync : receiver for
class libP2PGraphSyncNetwork { class libP2PGraphSyncNetwork {
} }
GraphSyncNetwork <|-- libP2PGraphSyncNetwork GraphSyncNetwork <|-- libP2PGraphSyncNetwork
object "Package Public Functions" as goGraphSyncNetworkPF { object "Package Public Functions" as goGraphSyncNetworkPF {
NewLibP2PNetwork(host libp2pHost.Host) GraphSyncNetwork NewLibP2PNetwork(host libp2pHost.Host,
decodeSelectionResponseFunc: DecodeSelectionResponseFunc,
decodeSelectorFunc: DecodeSelectorFunc) GraphSyncNetwork
} }
goGraphSyncNetworkPF .. libP2PGraphSyncNetwork goGraphSyncNetworkPF .. libP2PGraphSyncNetwork
} }
...@@ -72,7 +84,7 @@ package "go-graphsync" { ...@@ -72,7 +84,7 @@ package "go-graphsync" {
network : GraphSyncNetwork network : GraphSyncNetwork
selectorQuerier: SelectorQuerier selectorQuerier: SelectorQuerier
SendRequest(p peer.ID, selector Selector, node rootNode) SelectionTraverser SendRequest(p peer.ID, selector Selector, root Cid) SelectionTraverser
ProcessResponses(responses []GraphSyncResponse) ProcessResponses(responses []GraphSyncResponse)
} }
SelectorQuerier --* RequestManager SelectorQuerier --* RequestManager
...@@ -103,23 +115,30 @@ package "go-graphsync" { ...@@ -103,23 +115,30 @@ package "go-graphsync" {
end note end note
} }
package message { package message {
interface "DecodeSelectionResponseFunc func([]byte) SelectionResponse" as DecodeSelectionResponseFunc
interface "DecodeSelectorFunc func([]byte) Selector" as DecodeSelectorFunc
object "Package Public Functions" as goGraphSyncMessagePF { object "Package Public Functions" as goGraphSyncMessagePF {
func FromPBReader(pbr ggio.Reader) (GraphSyncMessage, error) func FromPBReader(pbr ggio.Reader,
decodeSelectionResponseFunc: DecodeSelectionResponseFunc,
decodeSelectorFunc: DecodeSelectorFunc) (GraphSyncMessage, error)
func FromNet(r io.Reader,
decodeSelectionResponseFunc: DecodeSelectionResponseFunc,
decodeSelectorFunc: DecodeSelectorFunc) (GraphSyncMessage, error)
} }
goGraphSyncMessagePF .. libP2PGraphSyncNetwork goGraphSyncMessagePF .. libP2PGraphSyncNetwork
class GraphSyncRequest { interface GraphSyncRequest {
Selector Selector Selector() Selector
Root Node Root() Cid
Priority Priority Priority() Priority
{field} Expires time.Duration ID() int
Id int IsCancel() bool
IsCancel bool
} }
class GraphSyncResponse { interface GraphSyncResponse {
RequestId int RequestID() int
Status GraphSyncStatus Status() GraphSyncStatus
} }
interface GraphSyncMessage { interface GraphSyncMessage {
...@@ -128,8 +147,8 @@ package "go-graphsync" { ...@@ -128,8 +147,8 @@ package "go-graphsync" {
} }
interface Exportable { interface Exportable {
ToProtoV0() ToProto()
ToProtoV1() ToNet(w io.Writer) error
} }
Exportable --|> GraphSyncMessage Exportable --|> GraphSyncMessage
...@@ -139,7 +158,7 @@ package "go-graphsync" { ...@@ -139,7 +158,7 @@ package "go-graphsync" {
} }
object "PackagePublicFunctions" as goGraphsyncPf { object "PackagePublicFunctions" as goGraphsyncPf {
New(ctx context.Context, network GraphSyncNetwork, getBlockFunc GetBlockFunc) GraphSync New(ctx context.Context, network GraphSyncNetwork, selectorQuerier SelectorQuerier) GraphSync
} }
} }
...@@ -149,6 +168,8 @@ package "go-filecoin" { ...@@ -149,6 +168,8 @@ package "go-filecoin" {
graphSync : GraphSync graphSync : GraphSync
selectorQuerier: SelectorQuerier selectorQuerier: SelectorQuerier
host: libp2pHost.Host host: libp2pHost.Host
decodeSelectionResponseFunc: DecodeSelectionResponseFunc
decodeSelectorFunc: DecodeSelectorFunc
} }
"go-filecoin" *-- GraphSync "go-filecoin" *-- GraphSync
......
package message
import (
"fmt"
"io"
ggio "github.com/gogo/protobuf/io"
cid "github.com/ipfs/go-cid"
pb "github.com/ipfs/go-graphsync/message/pb"
gsselector "github.com/ipfs/go-graphsync/selector"
inet "github.com/libp2p/go-libp2p-net"
)
// GraphSyncRequestID is a unique identifier for a GraphSync request.
type GraphSyncRequestID int32
// GraphSyncPriority a priority for a GraphSync request.
type GraphSyncPriority int32
// GraphSyncResponseStatusCode is a status returned for a GraphSync Request.
type GraphSyncResponseStatusCode int32
const (
// GraphSync Response Status Codes
// Informational Response Codes (partial)
// RequestAcknowledged means the request was received and is being worked on.
RequestAcknowledged = GraphSyncResponseStatusCode(10)
// AdditionalPeers means additional peers were found that may be able
// to satisfy the request and contained in the extra block of the response.
AdditionalPeers = GraphSyncResponseStatusCode(11)
// NotEnoughGas means fulfilling this request requires payment.
NotEnoughGas = GraphSyncResponseStatusCode(12)
// OtherProtocol means a different type of response than GraphSync is
// contained in extra.
OtherProtocol = GraphSyncResponseStatusCode(13)
// Success Response Codes (request terminated)
// RequestCompletedFull means the entire fulfillment of the GraphSync request
// was sent back.
RequestCompletedFull = GraphSyncResponseStatusCode(20)
// RequestCompletedPartial means the response is completed, and part of the
// GraphSync request was sent back, but not the complete request.
RequestCompletedPartial = GraphSyncResponseStatusCode(21)
// Error Response Codes (request terminated)
// RequestRejected means the node did not accept the incoming request.
RequestRejected = GraphSyncResponseStatusCode(30)
// RequestFailedBusy means the node is too busy, try again later. Backoff may
// be contained in extra.
RequestFailedBusy = GraphSyncResponseStatusCode(31)
// RequestFailedUnknown means the request failed for an unspecified reason. May
// contain data about why in extra.
RequestFailedUnknown = GraphSyncResponseStatusCode(32)
// RequestFailedLegal means the request failed for legal reasons.
RequestFailedLegal = GraphSyncResponseStatusCode(33)
// RequestFailedContentNotFound means the respondent does not have the content.
RequestFailedContentNotFound = GraphSyncResponseStatusCode(34)
)
// GraphSyncRequest is an interface for accessing data on request contained in a
// GraphSyncMessage.
type GraphSyncRequest interface {
Selector() gsselector.Selector
Root() cid.Cid
Priority() GraphSyncPriority
ID() GraphSyncRequestID
IsCancel() bool
}
// GraphSyncResponse is an interface for accessing data on a response sent back
// in a GraphSyncMessage.
type GraphSyncResponse interface {
RequestID() GraphSyncRequestID
Status() GraphSyncResponseStatusCode
Response() gsselector.SelectionResponse
}
// GraphSyncMessage is interface that can be serialized and deserialized to send
// over the GraphSync network
type GraphSyncMessage interface {
Requests() []GraphSyncRequest
Responses() []GraphSyncResponse
AddRequest(id GraphSyncRequestID,
selector gsselector.Selector,
root cid.Cid,
priority GraphSyncPriority)
Cancel(id GraphSyncRequestID)
AddResponse(
requestID GraphSyncRequestID,
status GraphSyncResponseStatusCode,
response gsselector.SelectionResponse)
Exportable
Loggable() map[string]interface{}
}
// Exportable is an interface that can serialize to a protobuf
type Exportable interface {
ToProto() *pb.Message
ToNet(w io.Writer) error
}
type graphSyncRequest struct {
selector gsselector.Selector
root cid.Cid
priority GraphSyncPriority
id GraphSyncRequestID
isCancel bool
}
type graphSyncResponse struct {
requestID GraphSyncRequestID
status GraphSyncResponseStatusCode
response gsselector.SelectionResponse
}
type graphSyncMessage struct {
requests map[GraphSyncRequestID]*graphSyncRequest
responses map[GraphSyncRequestID]*graphSyncResponse
}
// DecodeSelectorFunc is a function that can build a type that satisfies
// the Selector interface from a raw byte array.
type DecodeSelectorFunc func([]byte) gsselector.Selector
// DecodeSelectionResponseFunc is a function that can build a type that satisfies
// the SelectionResponse interface from a raw byte array.
type DecodeSelectionResponseFunc func([]byte) gsselector.SelectionResponse
// New initializes a new blank GraphSyncMessage
func New() GraphSyncMessage {
return newMsg()
}
func newMsg() *graphSyncMessage {
return &graphSyncMessage{
requests: make(map[GraphSyncRequestID]*graphSyncRequest),
responses: make(map[GraphSyncRequestID]*graphSyncResponse),
}
}
func newMessageFromProto(pbm pb.Message,
decodeSelector DecodeSelectorFunc,
decodeSelectionResponse DecodeSelectionResponseFunc) (GraphSyncMessage, error) {
gsm := newMsg()
for _, req := range pbm.Reqlist {
selector := decodeSelector(req.Selector)
root, err := cid.Cast([]byte(req.Root))
if err != nil {
return nil, fmt.Errorf("incorrectly formatted cid in request queery: %s", err)
}
gsm.addRequest(GraphSyncRequestID(req.Id), selector, root, GraphSyncPriority(req.Priority), req.Cancel)
}
for _, res := range pbm.Reslist {
selectionResponse := decodeSelectionResponse(res.Data)
gsm.AddResponse(GraphSyncRequestID(res.Id), GraphSyncResponseStatusCode(res.Status), selectionResponse)
}
return gsm, nil
}
func (gsm *graphSyncMessage) Requests() []GraphSyncRequest {
requests := make([]GraphSyncRequest, 0, len(gsm.requests))
for _, request := range gsm.requests {
requests = append(requests, request)
}
return requests
}
func (gsm *graphSyncMessage) Responses() []GraphSyncResponse {
responses := make([]GraphSyncResponse, 0, len(gsm.responses))
for _, response := range gsm.responses {
responses = append(responses, response)
}
return responses
}
func (gsm *graphSyncMessage) Cancel(id GraphSyncRequestID) {
delete(gsm.requests, id)
gsm.addRequest(id, nil, cid.Cid{}, 0, true)
}
func (gsm *graphSyncMessage) AddRequest(id GraphSyncRequestID,
selector gsselector.Selector,
root cid.Cid,
priority GraphSyncPriority,
) {
gsm.addRequest(id, selector, root, priority, false)
}
func (gsm *graphSyncMessage) addRequest(id GraphSyncRequestID,
selector gsselector.Selector,
root cid.Cid,
priority GraphSyncPriority,
isCancel bool) {
gsm.requests[id] = &graphSyncRequest{
id: id,
selector: selector,
root: root,
priority: priority,
isCancel: isCancel,
}
}
func (gsm *graphSyncMessage) AddResponse(requestID GraphSyncRequestID,
status GraphSyncResponseStatusCode,
response gsselector.SelectionResponse) {
gsm.responses[requestID] = &graphSyncResponse{
requestID: requestID,
status: status,
response: response,
}
}
// FromNet can read a network stream to deserialized a GraphSyncMessage
func FromNet(r io.Reader,
decodeSelector DecodeSelectorFunc,
decodeSelectionResponse DecodeSelectionResponseFunc) (GraphSyncMessage, error) {
pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax)
return FromPBReader(pbr, decodeSelector, decodeSelectionResponse)
}
// FromPBReader can deserialize a protobuf message into a GraphySyncMessage.
func FromPBReader(pbr ggio.Reader,
decodeSelector DecodeSelectorFunc,
decodeSelectionResponse DecodeSelectionResponseFunc) (GraphSyncMessage, error) {
pb := new(pb.Message)
if err := pbr.ReadMsg(pb); err != nil {
return nil, err
}
return newMessageFromProto(*pb, decodeSelector, decodeSelectionResponse)
}
func (gsm *graphSyncMessage) ToProto() *pb.Message {
pbm := new(pb.Message)
pbm.Reqlist = make([]pb.Message_Request, 0, len(gsm.requests))
for _, request := range gsm.requests {
pbm.Reqlist = append(pbm.Reqlist, pb.Message_Request{
Id: int32(request.id),
Root: request.root.Bytes(),
Selector: request.selector.RawData(),
Priority: int32(request.priority),
Cancel: request.isCancel,
})
}
pbm.Reslist = make([]pb.Message_Response, 0, len(gsm.responses))
for _, response := range gsm.responses {
pbm.Reslist = append(pbm.Reslist, pb.Message_Response{
Id: int32(response.requestID),
Status: int32(response.status),
Data: response.response.RawData(),
})
}
return pbm
}
func (gsm *graphSyncMessage) ToNet(w io.Writer) error {
pbw := ggio.NewDelimitedWriter(w)
return pbw.WriteMsg(gsm.ToProto())
}
func (gsm *graphSyncMessage) Loggable() map[string]interface{} {
requests := make([]string, 0, len(gsm.requests))
for _, request := range gsm.requests {
requests = append(requests, fmt.Sprintf("%d", request.id))
}
responses := make([]string, 0, len(gsm.responses))
for _, response := range gsm.responses {
responses = append(responses, fmt.Sprintf("%d", response.requestID))
}
return map[string]interface{}{
"requests": requests,
"responses": responses,
}
}
func (gsr *graphSyncRequest) ID() GraphSyncRequestID { return gsr.id }
func (gsr *graphSyncRequest) Root() cid.Cid { return gsr.root }
func (gsr *graphSyncRequest) Selector() gsselector.Selector { return gsr.selector }
func (gsr *graphSyncRequest) Priority() GraphSyncPriority { return gsr.priority }
func (gsr *graphSyncRequest) IsCancel() bool { return gsr.isCancel }
func (gsr *graphSyncResponse) RequestID() GraphSyncRequestID { return gsr.requestID }
func (gsr *graphSyncResponse) Status() GraphSyncResponseStatusCode { return gsr.status }
func (gsr *graphSyncResponse) Response() gsselector.SelectionResponse { return gsr.response }
package message
import (
"bytes"
"math/rand"
"reflect"
"testing"
"github.com/ipfs/go-graphsync/testselector"
)
func TestAppendingRequests(t *testing.T) {
selector := testselector.GenerateSelector()
root := testselector.GenerateRootCid()
id := GraphSyncRequestID(rand.Int31())
priority := GraphSyncPriority(rand.Int31())
gsm := New()
gsm.AddRequest(id, selector, root, priority)
requests := gsm.Requests()
if len(requests) != 1 {
t.Fatal("Did not add request to message")
}
request := requests[0]
if request.ID() != id ||
request.IsCancel() != false ||
request.Priority() != priority ||
!reflect.DeepEqual(request.Root(), root) ||
!reflect.DeepEqual(request.Selector(), selector) {
t.Fatal("Did not properly add request to message")
}
pbMessage := gsm.ToProto()
pbRequest := pbMessage.Reqlist[0]
if pbRequest.Id != int32(id) ||
pbRequest.Priority != int32(priority) ||
pbRequest.Cancel != false ||
!reflect.DeepEqual(pbRequest.Root, root.Bytes()) ||
!reflect.DeepEqual(pbRequest.Selector, selector.RawData()) {
t.Fatal("Did not properly serialize message to protobuf")
}
deserialized, err := newMessageFromProto(*pbMessage,
testselector.MockDecodeSelectorFunc,
testselector.MockDecodeSelectionResponseFunc,
)
if err != nil {
t.Fatal("Error deserializing protobuf message")
}
deserializedRequests := deserialized.Requests()
if len(deserializedRequests) != 1 {
t.Fatal("Did not add request to deserialized message")
}
deserializedRequest := deserializedRequests[0]
if deserializedRequest.ID() != id ||
deserializedRequest.IsCancel() != false ||
deserializedRequest.Priority() != priority ||
!reflect.DeepEqual(deserializedRequest.Root(), root) ||
!reflect.DeepEqual(deserializedRequest.Selector(), selector) {
t.Fatal("Did not properly deserialize protobuf messages so requests are equal")
}
}
func TestAppendingResponses(t *testing.T) {
selectionResponse := testselector.GenerateSelectionResponse()
requestID := GraphSyncRequestID(rand.Int31())
status := RequestAcknowledged
gsm := New()
gsm.AddResponse(requestID, status, selectionResponse)
responses := gsm.Responses()
if len(responses) != 1 {
t.Fatal("Did not add response to message")
}
response := responses[0]
if response.RequestID() != requestID ||
response.Status() != status ||
!reflect.DeepEqual(response.Response(), selectionResponse) {
t.Fatal("Did not properly add response to message")
}
pbMessage := gsm.ToProto()
pbResponse := pbMessage.Reslist[0]
if pbResponse.Id != int32(requestID) ||
pbResponse.Status != int32(status) ||
!reflect.DeepEqual(pbResponse.Data, selectionResponse.RawData()) {
t.Fatal("Did not properly serialize message to protobuf")
}
deserialized, err := newMessageFromProto(*pbMessage,
testselector.MockDecodeSelectorFunc,
testselector.MockDecodeSelectionResponseFunc,
)
if err != nil {
t.Fatal("Error deserializing protobuf message")
}
deserializedResponses := deserialized.Responses()
if len(deserializedResponses) != 1 {
t.Fatal("Did not add response to message")
}
deserializedResponse := deserializedResponses[0]
if deserializedResponse.RequestID() != requestID ||
deserializedResponse.Status() != status ||
!reflect.DeepEqual(deserializedResponse.Response(), selectionResponse) {
t.Fatal("Did not properly deserialize protobuf messages so responses are equal")
}
}
func TestRequestCancel(t *testing.T) {
selector := testselector.GenerateSelector()
root := testselector.GenerateRootCid()
id := GraphSyncRequestID(rand.Int31())
priority := GraphSyncPriority(rand.Int31())
gsm := New()
gsm.AddRequest(id, selector, root, priority)
gsm.Cancel(id)
requests := gsm.Requests()
if len(requests) != 1 {
t.Fatal("Did not properly cancel request")
}
request := requests[0]
if request.ID() != id ||
request.IsCancel() != true {
t.Fatal("Did not properly add cancel request to message")
}
}
func TestToNetFromNetEquivalency(t *testing.T) {
selector := testselector.GenerateSelector()
root := testselector.GenerateRootCid()
selectionResponse := testselector.GenerateSelectionResponse()
id := GraphSyncRequestID(rand.Int31())
priority := GraphSyncPriority(rand.Int31())
status := RequestAcknowledged
gsm := New()
gsm.AddRequest(id, selector, root, priority)
gsm.AddResponse(id, status, selectionResponse)
buf := new(bytes.Buffer)
err := gsm.ToNet(buf)
if err != nil {
t.Fatal("Unable to serialize GraphSyncMessage")
}
deserialized, err := FromNet(buf,
testselector.MockDecodeSelectorFunc,
testselector.MockDecodeSelectionResponseFunc,
)
if err != nil {
t.Fatal("Error deserializing protobuf message")
}
requests := gsm.Requests()
if len(requests) != 1 {
t.Fatal("Did not add request to message")
}
request := requests[0]
deserializedRequests := deserialized.Requests()
if len(deserializedRequests) != 1 {
t.Fatal("Did not add request to deserialized message")
}
deserializedRequest := deserializedRequests[0]
if deserializedRequest.ID() != request.ID() ||
deserializedRequest.IsCancel() != request.IsCancel() ||
deserializedRequest.Priority() != request.Priority() ||
!reflect.DeepEqual(deserializedRequest.Root(), request.Root()) ||
!reflect.DeepEqual(deserializedRequest.Selector(), request.Selector()) {
t.Fatal("Did not keep requests when writing to stream and back")
}
responses := gsm.Responses()
if len(responses) != 1 {
t.Fatal("Did not add response to message")
}
response := responses[0]
deserializedResponses := deserialized.Responses()
if len(deserializedResponses) != 1 {
t.Fatal("Did not add response to message")
}
deserializedResponse := deserializedResponses[0]
if deserializedResponse.RequestID() != response.RequestID() ||
deserializedResponse.Status() != response.Status() ||
!reflect.DeepEqual(deserializedResponse.Response(), response.Response()) {
t.Fatal("Did not keep responses when writing to stream and back")
}
}
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --proto_path=$(GOPATH)/src:. --gogofaster_out=. $<
clean:
rm -f *.pb.go
rm -f *.go
This diff is collapsed.
syntax = "proto3";
package graphsync.message.pb;
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
message Message {
message Request {
int32 id = 1; // unique id set on the requester side
bytes root = 2; // ipld root node for selector
bytes selector = 3; // ipld selector to retrieve
bytes extra = 4; // aux information. useful for other protocols
int32 priority = 5; // the priority (normalized). default to 1
bool cancel = 6; // whether this cancels a request
}
message Response {
int32 id = 1; // the request id
int32 status = 2; // a status code.
bytes data = 3; // core response data
bytes extra = 4; // additional data
}
// the actual data included in this message
repeated Request reqlist = 1 [(gogoproto.nullable) = false];
repeated Response reslist = 2 [(gogoproto.nullable) = false];
}
...@@ -118,6 +118,12 @@ ...@@ -118,6 +118,12 @@
"hash": "QmSJ9n2s9NUoA9D849W5jj5SJ94nMcZpj1jCgQJieiNqSt", "hash": "QmSJ9n2s9NUoA9D849W5jj5SJ94nMcZpj1jCgQJieiNqSt",
"name": "go-random", "name": "go-random",
"version": "1.0.0" "version": "1.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmRL22E4paat7ky7vx9MLpR97JHHbFPrg3ytFQw6qp1y1s",
"name": "go-ipld-format",
"version": "0.8.0"
} }
], ],
"gxVersion": "0.0.1", "gxVersion": "0.0.1",
......
package selector
import (
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
)
// Selector is an interface for an IPLD Selector.
type Selector interface {
ipld.Node
}
// SelectionResponse is an interface that represents part of the results
// of a selector query.
type SelectionResponse interface {
ipld.Node
}
// SelectionTraverser is an interface for navigating a response to a selector
// query.
type SelectionTraverser interface {
Next() (SelectionResponse, error)
Cancel()
}
// SelectorQuerier can be used to make and validate selector queries.
type SelectorQuerier interface {
Select(Selector, root cid.Cid) SelectionTraverser
Validate(Selector, root cid.Cid, incomingResponses SelectionTraverser) SelectionTraverser
}
package testselector
import (
"bytes"
"errors"
cid "github.com/ipfs/go-cid"
gsselector "github.com/ipfs/go-graphsync/selector"
ipld "github.com/ipfs/go-ipld-format"
"github.com/jbenet/go-random"
mh "github.com/multiformats/go-multihash"
)
var seedSeq int64
const (
blockSize = 512
)
func randomBytes(n int64, seed int64) []byte {
data := new(bytes.Buffer)
random.WritePseudoRandomBytes(n, data, seed)
return data.Bytes()
}
type byteNode struct {
data []byte
builder cid.Builder
}
var v0CidPrefix = cid.Prefix{
Codec: cid.DagProtobuf,
MhLength: -1,
MhType: mh.SHA2_256,
Version: 0,
}
// ErrEmptyNode is just a simple error code for stubbing out IPLD Node interface
// methods
var ErrEmptyNode = errors.New("dummy node")
func (n *byteNode) Resolve([]string) (interface{}, []string, error) {
return nil, nil, ErrEmptyNode
}
func (n *byteNode) Tree(string, int) []string {
return nil
}
func (n *byteNode) ResolveLink([]string) (*ipld.Link, []string, error) {
return nil, nil, ErrEmptyNode
}
func (n *byteNode) Copy() ipld.Node {
return &byteNode{}
}
func (n *byteNode) Cid() cid.Cid {
c, err := n.builder.Sum(n.RawData())
if err != nil {
return cid.Cid{}
}
return c
}
func (n *byteNode) Links() []*ipld.Link {
return nil
}
func (n *byteNode) Loggable() map[string]interface{} {
return nil
}
func (n *byteNode) String() string {
return string(n.data)
}
func (n *byteNode) RawData() []byte {
return n.data
}
func (n *byteNode) Size() (uint64, error) {
return 0, nil
}
func (n *byteNode) Stat() (*ipld.NodeStat, error) {
return &ipld.NodeStat{}, nil
}
func newNode(data []byte) *byteNode {
return &byteNode{
data: data,
builder: v0CidPrefix,
}
}
// MockDecodeSelectorFunc decodes raw data to a type that satisfies
// the Selector interface
func MockDecodeSelectorFunc(data []byte) gsselector.Selector {
return newNode(data)
}
// MockDecodeSelectionResponseFunc decodes raw data to a type that
// satisfies the SelectionResponse interface
func MockDecodeSelectionResponseFunc(data []byte) gsselector.SelectionResponse {
return newNode(data)
}
// GenerateSelector returns a new mock Selector
func GenerateSelector() gsselector.Selector {
node := newNode(randomBytes(blockSize, seedSeq))
seedSeq++
return node
}
// GenerateSelectionResponse returns a new mock SelectionResponse
func GenerateSelectionResponse() gsselector.SelectionResponse {
node := newNode(randomBytes(blockSize, seedSeq))
seedSeq++
return node
}
// GenerateRootCid generates a new mock CID to serve as a root
func GenerateRootCid() cid.Cid {
node := newNode(randomBytes(blockSize, seedSeq))
seedSeq++
return node.Cid()
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment