Unverified Commit 1b5d1f52 authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

Merge pull request #13 from ipfs/feat/graph-sync-network

feat(network): first version of GraphSyncNetwork
parents 13985127 3d45ae0a
This diff is collapsed.
......@@ -62,6 +62,14 @@ package "go-graphsync" {
interface GraphSyncNetwork {
SendMessage(ctx context.Context, receiver peer.Id, m GraphSyncMessage)
SetDelegate(receiver Receiver)
ConnectTo(ctx context.Context, peer.ID) error
NewMessageSender(context.Context, peer.ID) (MessageSender, error)
}
interface MessageSender {
SendMsg(context.Context, GraphSyncMessage) error
Close() error
Reset() error
}
Receiver <|-- GraphSync : receiver for
......
package network
import (
"context"
gsmsg "github.com/ipfs/go-graphsync/message"
peer "github.com/libp2p/go-libp2p-peer"
protocol "github.com/libp2p/go-libp2p-protocol"
)
var (
// ProtocolGraphsync is the protocol identifier for graphsync messages
ProtocolGraphsync protocol.ID = "/ipfs/graphsync/1.0.0"
)
// GraphSyncNetwork provides network connectivity for GraphSync.
type GraphSyncNetwork interface {
// SendMessage sends a GraphSync message to a peer.
SendMessage(
context.Context,
peer.ID,
gsmsg.GraphSyncMessage) error
// SetDelegate registers the Reciver to handle messages received from the
// network.
SetDelegate(Receiver)
// ConnectTo establishes a connection to the given peer
ConnectTo(context.Context, peer.ID) error
NewMessageSender(context.Context, peer.ID) (MessageSender, error)
}
// MessageSender is an interface to send messages to a peer
type MessageSender interface {
SendMsg(context.Context, gsmsg.GraphSyncMessage) error
Close() error
Reset() error
}
// Receiver is an interface for receiving messages from the GraphSyncNetwork.
type Receiver interface {
ReceiveMessage(
ctx context.Context,
sender peer.ID,
incoming gsmsg.GraphSyncMessage)
ReceiveError(error)
}
package network
import (
"bufio"
"context"
"fmt"
"io"
"time"
gsmsg "github.com/ipfs/go-graphsync/message"
ggio "github.com/gogo/protobuf/io"
logging "github.com/ipfs/go-log"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
)
var log = logging.Logger("graphsync_network")
var sendMessageTimeout = time.Minute * 10
// NewFromLibp2pHost returns a GraphSyncNetwork supported by underlying Libp2p host.
func NewFromLibp2pHost(host host.Host,
decodeSelectorFunc gsmsg.DecodeSelectorFunc,
decodeSelectionResponseFunc gsmsg.DecodeSelectionResponseFunc) GraphSyncNetwork {
graphSyncNetwork := libp2pGraphSyncNetwork{
host: host,
decodeSelectorFunc: decodeSelectorFunc,
decodeSelectionResponseFunc: decodeSelectionResponseFunc,
}
host.SetStreamHandler(ProtocolGraphsync, graphSyncNetwork.handleNewStream)
return &graphSyncNetwork
}
// libp2pGraphSyncNetwork transforms the libp2p host interface, which sends and receives
// NetMessage objects, into the graphsync network interface.
type libp2pGraphSyncNetwork struct {
host host.Host
decodeSelectionResponseFunc gsmsg.DecodeSelectionResponseFunc
decodeSelectorFunc gsmsg.DecodeSelectorFunc
// inbound messages from the network are forwarded to the receiver
receiver Receiver
}
type streamMessageSender struct {
s inet.Stream
}
func (s *streamMessageSender) Close() error {
return inet.FullClose(s.s)
}
func (s *streamMessageSender) Reset() error {
return s.s.Reset()
}
func (s *streamMessageSender) SendMsg(ctx context.Context, msg gsmsg.GraphSyncMessage) error {
return msgToStream(ctx, s.s, msg)
}
func msgToStream(ctx context.Context, s inet.Stream, msg gsmsg.GraphSyncMessage) error {
deadline := time.Now().Add(sendMessageTimeout)
if dl, ok := ctx.Deadline(); ok {
deadline = dl
}
if err := s.SetWriteDeadline(deadline); err != nil {
log.Warningf("error setting deadline: %s", err)
}
w := bufio.NewWriter(s)
switch s.Protocol() {
case ProtocolGraphsync:
if err := msg.ToNet(w); err != nil {
log.Debugf("error: %s", err)
return err
}
default:
return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol())
}
if err := w.Flush(); err != nil {
log.Debugf("error: %s", err)
return err
}
if err := s.SetWriteDeadline(time.Time{}); err != nil {
log.Warningf("error resetting deadline: %s", err)
}
return nil
}
func (gsnet *libp2pGraphSyncNetwork) NewMessageSender(ctx context.Context, p peer.ID) (MessageSender, error) {
s, err := gsnet.newStreamToPeer(ctx, p)
if err != nil {
return nil, err
}
return &streamMessageSender{s: s}, nil
}
func (gsnet *libp2pGraphSyncNetwork) newStreamToPeer(ctx context.Context, p peer.ID) (inet.Stream, error) {
return gsnet.host.NewStream(ctx, p, ProtocolGraphsync)
}
func (gsnet *libp2pGraphSyncNetwork) SendMessage(
ctx context.Context,
p peer.ID,
outgoing gsmsg.GraphSyncMessage) error {
s, err := gsnet.newStreamToPeer(ctx, p)
if err != nil {
return err
}
if err = msgToStream(ctx, s, outgoing); err != nil {
s.Reset()
return err
}
// TODO(https://github.com/libp2p/go-libp2p-net/issues/28): Avoid this goroutine.
go inet.AwaitEOF(s)
return s.Close()
}
func (gsnet *libp2pGraphSyncNetwork) SetDelegate(r Receiver) {
gsnet.receiver = r
}
func (gsnet *libp2pGraphSyncNetwork) ConnectTo(ctx context.Context, p peer.ID) error {
return gsnet.host.Connect(ctx, pstore.PeerInfo{ID: p})
}
// handleNewStream receives a new stream from the network.
func (gsnet *libp2pGraphSyncNetwork) handleNewStream(s inet.Stream) {
defer s.Close()
if gsnet.receiver == nil {
s.Reset()
return
}
reader := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
for {
received, err := gsmsg.FromPBReader(reader,
gsnet.decodeSelectorFunc,
gsnet.decodeSelectionResponseFunc)
if err != nil {
if err != io.EOF {
s.Reset()
go gsnet.receiver.ReceiveError(err)
log.Debugf("graphsync net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
}
return
}
p := s.Conn().RemotePeer()
ctx := context.Background()
log.Debugf("graphsync net handleNewStream from %s", s.Conn().RemotePeer())
gsnet.receiver.ReceiveMessage(ctx, p, received)
}
}
package network
import (
"context"
"math/rand"
"reflect"
"testing"
"time"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/testselector"
"github.com/libp2p/go-libp2p-peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
)
// Receiver is an interface for receiving messages from the GraphSyncNetwork.
type receiver struct {
messageReceived chan struct{}
lastMessage gsmsg.GraphSyncMessage
lastSender peer.ID
}
func (r *receiver) ReceiveMessage(
ctx context.Context,
sender peer.ID,
incoming gsmsg.GraphSyncMessage) {
r.lastSender = sender
r.lastMessage = incoming
select {
case <-ctx.Done():
case r.messageReceived <- struct{}{}:
}
}
func (r *receiver) ReceiveError(err error) {
}
func TestMessageSendAndReceive(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
mn := mocknet.New(ctx)
host1, err := mn.GenPeer()
if err != nil {
t.Fatal("error generating host")
}
host2, err := mn.GenPeer()
if err != nil {
t.Fatal("error generating host")
}
err = mn.LinkAll()
if err != nil {
t.Fatal("error linking hosts")
}
gsnet1 := NewFromLibp2pHost(host1,
testselector.MockDecodeSelectorFunc,
testselector.MockDecodeSelectionResponseFunc)
gsnet2 := NewFromLibp2pHost(host2,
testselector.MockDecodeSelectorFunc,
testselector.MockDecodeSelectionResponseFunc)
r := &receiver{
messageReceived: make(chan struct{}),
}
gsnet1.SetDelegate(r)
gsnet2.SetDelegate(r)
selector := testselector.GenerateSelector()
root := testselector.GenerateRootCid()
selectionResponse := testselector.GenerateSelectionResponse()
id := gsmsg.GraphSyncRequestID(rand.Int31())
priority := gsmsg.GraphSyncPriority(rand.Int31())
status := gsmsg.RequestAcknowledged
sent := gsmsg.New()
sent.AddRequest(id, selector, root, priority)
sent.AddResponse(id, status, selectionResponse)
err = gsnet1.ConnectTo(ctx, host2.ID())
if err != nil {
t.Fatal("Unable to connect peers")
}
gsnet1.SendMessage(ctx, host2.ID(), sent)
select {
case <-ctx.Done():
t.Fatal("did not receive message sent")
case <-r.messageReceived:
}
sender := r.lastSender
if sender != host1.ID() {
t.Fatal("received message from wrong node")
}
received := r.lastMessage
sentRequests := sent.Requests()
if len(sentRequests) != 1 {
t.Fatal("Did not add request to sent message")
}
sentRequest := sentRequests[0]
receivedRequests := received.Requests()
if len(receivedRequests) != 1 {
t.Fatal("Did not add request to received message")
}
receivedRequest := receivedRequests[0]
if receivedRequest.ID() != sentRequest.ID() ||
receivedRequest.IsCancel() != sentRequest.IsCancel() ||
receivedRequest.Priority() != sentRequest.Priority() ||
!reflect.DeepEqual(receivedRequest.Root(), sentRequest.Root()) ||
!reflect.DeepEqual(receivedRequest.Selector(), sentRequest.Selector()) {
t.Fatal("Sent message requests did not match received message requests")
}
sentResponses := sent.Responses()
if len(sentResponses) != 1 {
t.Fatal("Did not add response to sent message")
}
sentResponse := sentResponses[0]
receivedResponses := received.Responses()
if len(receivedResponses) != 1 {
t.Fatal("Did not add response to received message")
}
receivedResponse := receivedResponses[0]
if receivedResponse.RequestID() != sentResponse.RequestID() ||
receivedResponse.Status() != sentResponse.Status() ||
!reflect.DeepEqual(receivedResponse.Response(), sentResponse.Response()) {
t.Fatal("Sent message responses did not match received message responses")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment