Commit 68b85c99 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

broke out dial + listen

parent c2a228f6
......@@ -101,6 +101,10 @@ func (c *singleConn) ID() string {
return ID(c)
}
func (c *singleConn) String() string {
return String(c, "singleConn")
}
// LocalMultiaddr is the Multiaddr on this side
func (c *singleConn) LocalMultiaddr() ma.Multiaddr {
return c.maconn.LocalMultiaddr()
......@@ -131,7 +135,7 @@ func (c *singleConn) Out() chan<- []byte {
return c.msgio.outgoing.MsgChan
}
// ID returns the
// ID returns the ID of a given Conn.
func ID(c Conn) string {
l := fmt.Sprintf("%s/%s", c.LocalMultiaddr(), c.LocalPeer().ID)
r := fmt.Sprintf("%s/%s", c.RemoteMultiaddr(), c.RemotePeer().ID)
......@@ -141,182 +145,8 @@ func ID(c Conn) string {
return u.Key(ch).Pretty()
}
// Dialer is an object that can open connections. We could have a "convenience"
// Dial function as before, but it would have many arguments, as dialing is
// no longer simple (need a peerstore, a local peer, a context, a network, etc)
type Dialer struct {
// LocalPeer is the identity of the local Peer.
LocalPeer *peer.Peer
// Peerstore is the set of peers we know about locally. The Dialer needs it
// because when an incoming connection is identified, we should reuse the
// same peer objects (otherwise things get inconsistent).
Peerstore peer.Peerstore
}
// Dial connects to a particular peer, over a given network
// Example: d.Dial(ctx, "udp", peer)
func (d *Dialer) Dial(ctx context.Context, network string, remote *peer.Peer) (Conn, error) {
laddr := d.LocalPeer.NetAddress(network)
if laddr == nil {
return nil, fmt.Errorf("No local address for network %s", network)
}
raddr := remote.NetAddress(network)
if raddr == nil {
return nil, fmt.Errorf("No remote address for network %s", network)
}
// TODO: try to get reusing addr/ports to work.
// madialer := manet.Dialer{LocalAddr: laddr}
madialer := manet.Dialer{}
log.Info("%s dialing %s %s", d.LocalPeer, remote, raddr)
maconn, err := madialer.Dial(raddr)
if err != nil {
return nil, err
}
if err := d.Peerstore.Put(remote); err != nil {
log.Error("Error putting peer into peerstore: %s", remote)
}
c, err := newSingleConn(ctx, d.LocalPeer, remote, maconn)
if err != nil {
return nil, err
}
return newSecureConn(ctx, c, d.Peerstore)
}
// listener is an object that can accept connections. It implements Listener
type listener struct {
manet.Listener
// chansize is the size of the internal channels for concurrency
chansize int
// channel of incoming conections
conns chan Conn
// Local multiaddr to listen on
maddr ma.Multiaddr
// LocalPeer is the identity of the local Peer.
local *peer.Peer
// Peerstore is the set of peers we know about locally
peers peer.Peerstore
// embedded ContextCloser
ContextCloser
}
// disambiguate
func (l *listener) Close() error {
return l.ContextCloser.Close()
}
// close called by ContextCloser.Close
func (l *listener) close() error {
log.Info("listener closing: %s %s", l.local, l.maddr)
return l.Listener.Close()
}
func (l *listener) listen() {
l.Children().Add(1)
defer l.Children().Done()
// handle at most chansize concurrent handshakes
sem := make(chan struct{}, l.chansize)
// handle is a goroutine work function that handles the handshake.
// it's here only so that accepting new connections can happen quickly.
handle := func(maconn manet.Conn) {
defer func() { <-sem }() // release
c, err := newSingleConn(l.Context(), l.local, nil, maconn)
if err != nil {
log.Error("Error accepting connection: %v", err)
return
}
sc, err := newSecureConn(l.Context(), c, l.peers)
if err != nil {
log.Error("Error securing connection: %v", err)
return
}
l.conns <- sc
}
for {
maconn, err := l.Listener.Accept()
if err != nil {
// if closing, we should exit.
select {
case <-l.Closing():
return // done.
default:
}
log.Error("Failed to accept connection: %v", err)
continue
}
sem <- struct{}{} // acquire
go handle(maconn)
}
}
// Accept waits for and returns the next connection to the listener.
// Note that unfortunately this
func (l *listener) Accept() <-chan Conn {
return l.conns
}
// Multiaddr is the identity of the local Peer.
func (l *listener) Multiaddr() ma.Multiaddr {
return l.maddr
}
// LocalPeer is the identity of the local Peer.
func (l *listener) LocalPeer() *peer.Peer {
return l.local
}
// Peerstore is the set of peers we know about locally. The Listener needs it
// because when an incoming connection is identified, we should reuse the
// same peer objects (otherwise things get inconsistent).
func (l *listener) Peerstore() peer.Peerstore {
return l.peers
}
// Listen listens on the particular multiaddr, with given peer and peerstore.
func Listen(ctx context.Context, addr ma.Multiaddr, local *peer.Peer, peers peer.Peerstore) (Listener, error) {
ml, err := manet.Listen(addr)
if err != nil {
return nil, err
}
// todo make this a variable
chansize := 10
l := &listener{
Listener: ml,
maddr: addr,
peers: peers,
local: local,
conns: make(chan Conn, chansize),
chansize: chansize,
}
l.ContextCloser = NewContextCloser(ctx, l.close)
go l.listen()
return l, nil
// String returns the user-friendly String representation of a conn
func String(c Conn, typ string) string {
return fmt.Sprintf("%s (%s) <-- %s --> (%s) %s",
c.LocalPeer(), c.LocalMultiaddr(), typ, c.RemoteMultiaddr(), c.RemotePeer())
}
package conn
import (
"fmt"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net"
peer "github.com/jbenet/go-ipfs/peer"
)
// Dial connects to a particular peer, over a given network
// Example: d.Dial(ctx, "udp", peer)
func (d *Dialer) Dial(ctx context.Context, network string, remote *peer.Peer) (Conn, error) {
laddr := d.LocalPeer.NetAddress(network)
if laddr == nil {
return nil, fmt.Errorf("No local address for network %s", network)
}
raddr := remote.NetAddress(network)
if raddr == nil {
return nil, fmt.Errorf("No remote address for network %s", network)
}
// TODO: try to get reusing addr/ports to work.
// madialer := manet.Dialer{LocalAddr: laddr}
madialer := manet.Dialer{}
log.Info("%s dialing %s %s", d.LocalPeer, remote, raddr)
maconn, err := madialer.Dial(raddr)
if err != nil {
return nil, err
}
if err := d.Peerstore.Put(remote); err != nil {
log.Error("Error putting peer into peerstore: %s", remote)
}
c, err := newSingleConn(ctx, d.LocalPeer, remote, maconn)
if err != nil {
return nil, err
}
return newSecureConn(ctx, c, d.Peerstore)
}
......@@ -40,6 +40,20 @@ type Conn interface {
// Close() error -- already in ContextCloser
}
// Dialer is an object that can open connections. We could have a "convenience"
// Dial function as before, but it would have many arguments, as dialing is
// no longer simple (need a peerstore, a local peer, a context, a network, etc)
type Dialer struct {
// LocalPeer is the identity of the local Peer.
LocalPeer *peer.Peer
// Peerstore is the set of peers we know about locally. The Dialer needs it
// because when an incoming connection is identified, we should reuse the
// same peer objects (otherwise things get inconsistent).
Peerstore peer.Peerstore
}
// Listener is an object that can accept connections. It matches net.Listener
type Listener interface {
......
package conn
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net"
peer "github.com/jbenet/go-ipfs/peer"
)
// listener is an object that can accept connections. It implements Listener
type listener struct {
manet.Listener
// chansize is the size of the internal channels for concurrency
chansize int
// channel of incoming conections
conns chan Conn
// Local multiaddr to listen on
maddr ma.Multiaddr
// LocalPeer is the identity of the local Peer.
local *peer.Peer
// Peerstore is the set of peers we know about locally
peers peer.Peerstore
// embedded ContextCloser
ContextCloser
}
// disambiguate
func (l *listener) Close() error {
return l.ContextCloser.Close()
}
// close called by ContextCloser.Close
func (l *listener) close() error {
log.Info("listener closing: %s %s", l.local, l.maddr)
return l.Listener.Close()
}
func (l *listener) listen() {
l.Children().Add(1)
defer l.Children().Done()
// handle at most chansize concurrent handshakes
sem := make(chan struct{}, l.chansize)
// handle is a goroutine work function that handles the handshake.
// it's here only so that accepting new connections can happen quickly.
handle := func(maconn manet.Conn) {
defer func() { <-sem }() // release
c, err := newSingleConn(l.Context(), l.local, nil, maconn)
if err != nil {
log.Error("Error accepting connection: %v", err)
return
}
sc, err := newSecureConn(l.Context(), c, l.peers)
if err != nil {
log.Error("Error securing connection: %v", err)
return
}
l.conns <- sc
}
for {
maconn, err := l.Listener.Accept()
if err != nil {
// if closing, we should exit.
select {
case <-l.Closing():
return // done.
default:
}
log.Error("Failed to accept connection: %v", err)
continue
}
sem <- struct{}{} // acquire
go handle(maconn)
}
}
// Accept waits for and returns the next connection to the listener.
// Note that unfortunately this
func (l *listener) Accept() <-chan Conn {
return l.conns
}
// Multiaddr is the identity of the local Peer.
func (l *listener) Multiaddr() ma.Multiaddr {
return l.maddr
}
// LocalPeer is the identity of the local Peer.
func (l *listener) LocalPeer() *peer.Peer {
return l.local
}
// Peerstore is the set of peers we know about locally. The Listener needs it
// because when an incoming connection is identified, we should reuse the
// same peer objects (otherwise things get inconsistent).
func (l *listener) Peerstore() peer.Peerstore {
return l.peers
}
// Listen listens on the particular multiaddr, with given peer and peerstore.
func Listen(ctx context.Context, addr ma.Multiaddr, local *peer.Peer, peers peer.Peerstore) (Listener, error) {
ml, err := manet.Listen(addr)
if err != nil {
return nil, err
}
// todo make this a variable
chansize := 10
l := &listener{
Listener: ml,
maddr: addr,
peers: peers,
local: local,
conns: make(chan Conn, chansize),
chansize: chansize,
}
l.ContextCloser = NewContextCloser(ctx, l.close)
go l.listen()
return l, nil
}
......@@ -98,6 +98,10 @@ func (c *secureConn) ID() string {
return ID(c)
}
func (c *secureConn) String() string {
return String(c, "secureConn")
}
// LocalMultiaddr is the Multiaddr on this side
func (c *secureConn) LocalMultiaddr() ma.Multiaddr {
return c.insecure.LocalMultiaddr()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment