Commit 5681e273 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

reworked Conn

parent ccaa490c
package conn
import (
"errors"
"fmt"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net"
spipe "github.com/jbenet/go-ipfs/crypto/spipe"
msg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
......@@ -19,40 +23,212 @@ const ChanBuffer = 10
// 1 MB
const MaxMessageSize = 1 << 20
// Conn represents a connection to another Peer (IPFS Node).
type Conn struct {
Local *peer.Peer
Remote *peer.Peer
Conn manet.Conn
// msgioPipe is a pipe using msgio channels.
type msgioPipe struct {
outgoing *msgio.Chan
incoming *msgio.Chan
}
Closed chan bool
Outgoing *msgio.Chan
Incoming *msgio.Chan
Secure *spipe.SecurePipe
func newMsgioPipe(size int) *msgioPipe {
return &msgioPipe{
outgoing: msgio.NewChan(10),
incoming: msgio.NewChan(10),
}
}
// singleConn represents a single connection to another Peer (IPFS Node).
type singleConn struct {
local *peer.Peer
remote *peer.Peer
maconn manet.Conn
// context + cancel
ctx context.Context
cancel context.CancelFunc
secure *spipe.SecurePipe
insecure *msgioPipe
msgpipe *msg.Pipe
}
// Map maps Keys (Peer.IDs) to Connections.
type Map map[u.Key]*Conn
// NewConn constructs a new connection
func NewConn(local, remote *peer.Peer, mconn manet.Conn) (*Conn, error) {
conn := &Conn{
Local: local,
Remote: remote,
Conn: mconn,
// newConn constructs a new connection
func newSingleConn(ctx context.Context, local, remote *peer.Peer,
peers peer.Peerstore, maconn manet.Conn) (Conn, error) {
ctx, cancel := context.WithCancel(ctx)
conn := &singleConn{
local: local,
remote: remote,
maconn: maconn,
ctx: ctx,
cancel: cancel,
insecure: newMsgioPipe(10),
msgpipe: msg.NewPipe(10),
}
if err := conn.newChans(); err != nil {
log.Info("newSingleConn: %v to %v", local, remote)
// setup the various io goroutines
go conn.insecure.outgoing.WriteTo(maconn)
go conn.insecure.incoming.ReadFrom(maconn, MaxMessageSize)
go conn.waitToClose(ctx)
// perform secure handshake before returning this connection.
if err := conn.secureHandshake(peers); err != nil {
conn.Close()
return nil, err
}
return conn, nil
}
// secureHandshake performs the spipe secure handshake.
func (c *singleConn) secureHandshake(peers peer.Peerstore) error {
if c.secure != nil {
return errors.New("Conn is already secured or being secured.")
}
var err error
c.secure, err = spipe.NewSecurePipe(c.ctx, 10, c.local, peers)
if err != nil {
return err
}
// setup a Duplex pipe for spipe
insecure := spipe.Duplex{
In: c.insecure.incoming.MsgChan,
Out: c.insecure.outgoing.MsgChan,
}
// Wrap actually performs the secure handshake, which takes multiple RTT
if err := c.secure.Wrap(c.ctx, insecure); err != nil {
return err
}
if c.remote == nil {
c.remote = c.secure.RemotePeer()
} else if c.remote != c.secure.RemotePeer() {
// this panic is here because this would be an insidious programmer error
// that we need to ensure we catch.
log.Error("%v != %v", c.remote, c.secure.RemotePeer())
panic("peers not being constructed correctly.")
}
// silly we have to do it this way.
go c.unwrapOutMsgs()
go c.wrapInMsgs()
return nil
}
// unwrapOutMsgs sends just the raw data of a message through secure
func (c *singleConn) unwrapOutMsgs() {
for {
select {
case <-c.ctx.Done():
return
case m, more := <-c.msgpipe.Outgoing:
if !more {
return
}
c.secure.Out <- m.Data()
}
}
}
// wrapInMsgs wraps a message
func (c *singleConn) wrapInMsgs() {
for {
select {
case <-c.ctx.Done():
return
case d, more := <-c.secure.In:
if !more {
return
}
c.msgpipe.Incoming <- msg.New(c.remote, d)
}
}
}
// waitToClose waits on the given context's Done before closing Conn.
func (c *singleConn) waitToClose(ctx context.Context) {
select {
case <-ctx.Done():
}
// close underlying connection
c.maconn.Close()
c.maconn = nil
// closing channels
c.insecure.outgoing.Close()
c.secure.Close()
}
// IsOpen returns whether this Conn is open or closed.
func (c *singleConn) isOpen() bool {
return c.maconn != nil
}
// Close closes the connection, and associated channels.
func (c *singleConn) Close() error {
log.Debug("%s closing Conn with %s", c.local, c.remote)
if !c.isOpen() {
return fmt.Errorf("Already closed") // already closed
}
// cancel context.
c.cancel()
c.cancel = nil
return nil
}
// LocalPeer is the Peer on this side
func (c *singleConn) LocalPeer() *peer.Peer {
return c.local
}
// RemotePeer is the Peer on the remote side
func (c *singleConn) RemotePeer() *peer.Peer {
return c.remote
}
// MsgIn returns a readable message channel
func (c *singleConn) MsgIn() <-chan msg.NetMessage {
return c.msgpipe.Incoming
}
// MsgOut returns a writable message channel
func (c *singleConn) MsgOut() chan<- msg.NetMessage {
return c.msgpipe.Outgoing
}
// Dialer is an object that can open connections. We could have a "convenience"
// Dial function as before, but it would have many arguments, as dialing is
// no longer simple (need a peerstore, a local peer, a context, a network, etc)
type Dialer struct {
// LocalPeer is the identity of the local Peer.
LocalPeer *peer.Peer
// Peerstore is the set of peers we know about locally. The Dialer needs it
// because when an incoming connection is identified, we should reuse the
// same peer objects (otherwise things get inconsistent).
Peerstore peer.Peerstore
}
// Dial connects to a particular peer, over a given network
// Example: Dial("udp", peer)
func Dial(network string, local, remote *peer.Peer) (*Conn, error) {
laddr := local.NetAddress(network)
// Example: d.Dial(ctx, "udp", peer)
func (d *Dialer) Dial(ctx context.Context, network string, remote *peer.Peer) (Conn, error) {
laddr := d.LocalPeer.NetAddress(network)
if laddr == nil {
return nil, fmt.Errorf("No local address for network %s", network)
}
......@@ -63,47 +239,147 @@ func Dial(network string, local, remote *peer.Peer) (*Conn, error) {
}
// TODO: try to get reusing addr/ports to work.
// dialer := manet.Dialer{LocalAddr: laddr}
dialer := manet.Dialer{}
// madialer := manet.Dialer{LocalAddr: laddr}
madialer := manet.Dialer{}
log.Info("%s %s dialing %s %s", local, laddr, remote, raddr)
nconn, err := dialer.Dial(raddr)
log.Info("%s dialing %s %s", d.LocalPeer, remote, raddr)
maconn, err := madialer.Dial(raddr)
if err != nil {
return nil, err
}
return NewConn(local, remote, nconn)
if err := d.Peerstore.Put(remote); err != nil {
log.Error("Error putting peer into peerstore: %s", remote)
}
return newSingleConn(ctx, d.LocalPeer, remote, d.Peerstore, maconn)
}
// listener is an object that can accept connections. It implements Listener
type listener struct {
manet.Listener
// chansize is the size of the internal channels for concurrency
chansize int
// channel of incoming conections
conns chan Conn
// Local multiaddr to listen on
maddr ma.Multiaddr
// LocalPeer is the identity of the local Peer.
local *peer.Peer
// Peerstore is the set of peers we know about locally
peers peer.Peerstore
// ctx + cancel func
ctx context.Context
cancel context.CancelFunc
}
// waitToClose is needed to hand
func (l *listener) waitToClose() {
select {
case <-l.ctx.Done():
}
l.cancel = nil
l.Listener.Close()
}
// Construct new channels for given Conn.
func (c *Conn) newChans() error {
if c.Outgoing != nil || c.Incoming != nil {
return fmt.Errorf("Conn already initialized")
func (l *listener) listen() {
// handle at most chansize concurrent handshakes
sem := make(chan struct{}, l.chansize)
// handle is a goroutine work function that handles the handshake.
// it's here only so that accepting new connections can happen quickly.
handle := func(maconn manet.Conn) {
c, err := newSingleConn(l.ctx, l.local, nil, l.peers, maconn)
if err != nil {
log.Error("Error accepting connection: %v", err)
} else {
l.conns <- c
}
<-sem // release
}
c.Outgoing = msgio.NewChan(10)
c.Incoming = msgio.NewChan(10)
c.Closed = make(chan bool, 1)
for {
maconn, err := l.Listener.Accept()
if err != nil {
go c.Outgoing.WriteTo(c.Conn)
go c.Incoming.ReadFrom(c.Conn, MaxMessageSize)
// if cancel is nil we're closed.
if l.cancel == nil {
return // done.
}
log.Error("Failed to accept connection: %v", err)
continue
}
sem <- struct{}{} // acquire
go handle(maconn)
}
}
// Accept waits for and returns the next connection to the listener.
// Note that unfortunately this
func (l *listener) Accept() <-chan Conn {
return l.conns
}
// Multiaddr is the identity of the local Peer.
func (l *listener) Multiaddr() ma.Multiaddr {
return l.maddr
}
// LocalPeer is the identity of the local Peer.
func (l *listener) LocalPeer() *peer.Peer {
return l.local
}
// Peerstore is the set of peers we know about locally. The Listener needs it
// because when an incoming connection is identified, we should reuse the
// same peer objects (otherwise things get inconsistent).
func (l *listener) Peerstore() peer.Peerstore {
return l.peers
}
// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors
func (l *listener) Close() error {
l.cancel()
return nil
}
// Close closes the connection, and associated channels.
func (c *Conn) Close() error {
log.Debug("%s closing Conn with %s", c.Local, c.Remote)
if c.Conn == nil {
return fmt.Errorf("Already closed") // already closed
// Listen listens on the particular multiaddr, with given peer and peerstore.
func Listen(ctx context.Context, addr ma.Multiaddr, local *peer.Peer, peers peer.Peerstore) (Listener, error) {
ctx, cancel := context.WithCancel(ctx)
ml, err := manet.Listen(addr)
if err != nil {
return nil, err
}
// closing net connection
err := c.Conn.Close()
c.Conn = nil
// closing channels
c.Incoming.Close()
c.Outgoing.Close()
c.Closed <- true
return err
// todo make this a variable
chansize := 10
l := &listener{
ctx: ctx,
cancel: cancel,
Listener: ml,
maddr: addr,
peers: peers,
local: local,
conns: make(chan Conn, chansize),
chansize: chansize,
}
go l.listen()
go l.waitToClose()
return l, nil
}
......@@ -3,98 +3,114 @@ package conn
import (
"testing"
ci "github.com/jbenet/go-ipfs/crypto"
msg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net"
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
)
func setupPeer(id string, addr string) (*peer.Peer, error) {
func setupPeer(addr string) (*peer.Peer, error) {
tcp, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
mh, err := mh.FromHexString(id)
sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512)
if err != nil {
return nil, err
}
p := &peer.Peer{ID: peer.ID(mh)}
id, err := peer.IDFromPubKey(pk)
if err != nil {
return nil, err
}
p := &peer.Peer{ID: id}
p.PrivKey = sk
p.PubKey = pk
p.AddAddress(tcp)
return p, nil
}
func echoListen(listener manet.Listener) {
func echoListen(ctx context.Context, listener Listener) {
for {
c, err := listener.Accept()
if err == nil {
// fmt.Println("accepeted")
go echo(c)
select {
case <-ctx.Done():
return
case c := <-listener.Accept():
go echo(ctx, c)
}
}
}
func echo(c manet.Conn) {
func echo(ctx context.Context, c Conn) {
for {
data := make([]byte, 1024)
i, err := c.Read(data)
if err != nil {
// fmt.Printf("error %v\n", err)
select {
case <-ctx.Done():
return
case m := <-c.MsgIn():
c.MsgOut() <- m
}
_, err = c.Write(data[:i])
if err != nil {
// fmt.Printf("error %v\n", err)
return
}
// fmt.Println("echoing", data[:i])
}
}
func TestDial(t *testing.T) {
func TestDialer(t *testing.T) {
maddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234")
if err != nil {
t.Fatal("failure to parse multiaddr")
}
listener, err := manet.Listen(maddr)
p1, err := setupPeer("/ip4/127.0.0.1/tcp/1234")
if err != nil {
t.Fatal("error setting up listener", err)
t.Fatal("error setting up peer", err)
}
go echoListen(listener)
p1, err := setupPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33", "/ip4/127.0.0.1/tcp/1234")
p2, err := setupPeer("/ip4/127.0.0.1/tcp/3456")
if err != nil {
t.Fatal("error setting up peer", err)
}
p2, err := setupPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a34", "/ip4/127.0.0.1/tcp/3456")
ctx, cancel := context.WithCancel(context.Background())
laddr := p1.NetAddress("tcp")
if laddr == nil {
t.Fatal("Listen address is nil.")
}
l, err := Listen(ctx, laddr, p1, peer.NewPeerstore())
if err != nil {
t.Fatal("error setting up peer", err)
t.Fatal(err)
}
c, err := Dial("tcp", p2, p1)
go echoListen(ctx, l)
d := &Dialer{
Peerstore: peer.NewPeerstore(),
LocalPeer: p2,
}
c, err := d.Dial(ctx, "tcp", p1)
if err != nil {
t.Fatal("error dialing peer", err)
}
// fmt.Println("sending")
c.Outgoing.MsgChan <- []byte("beep")
c.Outgoing.MsgChan <- []byte("boop")
out := <-c.Incoming.MsgChan
c.MsgOut() <- msg.New(p2, []byte("beep"))
c.MsgOut() <- msg.New(p2, []byte("boop"))
out := <-c.MsgIn()
// fmt.Println("recving", string(out))
if string(out) != "beep" {
t.Error("unexpected conn output")
data := string(out.Data())
if data != "beep" {
t.Error("unexpected conn output", data)
}
out = <-c.Incoming.MsgChan
if string(out) != "boop" {
t.Error("unexpected conn output")
out = <-c.MsgIn()
data = string(out.Data())
if string(out.Data()) != "boop" {
t.Error("unexpected conn output", data)
}
// fmt.Println("closing")
c.Close()
listener.Close()
l.Close()
cancel()
}
package conn
import (
msg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
// Conn is a generic message-based Peer-to-Peer connection.
type Conn interface {
// LocalPeer is the Peer on this side
LocalPeer() *peer.Peer
// RemotePeer is the Peer on the remote side
RemotePeer() *peer.Peer
// MsgIn returns a readable message channel
MsgIn() <-chan msg.NetMessage
// MsgOut returns a writable message channel
MsgOut() chan<- msg.NetMessage
// Close ends the connection
Close() error
}
// Listener is an object that can accept connections. It matches net.Listener
type Listener interface {
// Accept waits for and returns the next connection to the listener.
Accept() <-chan Conn
// Multiaddr is the identity of the local Peer.
Multiaddr() ma.Multiaddr
// LocalPeer is the identity of the local Peer.
LocalPeer() *peer.Peer
// Peerstore is the set of peers we know about locally. The Listener needs it
// because when an incoming connection is identified, we should reuse the
// same peer objects (otherwise things get inconsistent).
Peerstore() peer.Peerstore
// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
Close() error
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment