diff --git a/net/conn/conn.go b/net/conn/conn.go index a7157a7962e1cf356171bc1112c4a833f76f1ec5..5741cc1d5f193bf4933f907e0ca9e92cfff24e1f 100644 --- a/net/conn/conn.go +++ b/net/conn/conn.go @@ -101,6 +101,10 @@ func (c *singleConn) ID() string { return ID(c) } +func (c *singleConn) String() string { + return String(c, "singleConn") +} + // LocalMultiaddr is the Multiaddr on this side func (c *singleConn) LocalMultiaddr() ma.Multiaddr { return c.maconn.LocalMultiaddr() @@ -131,7 +135,7 @@ func (c *singleConn) Out() chan<- []byte { return c.msgio.outgoing.MsgChan } -// ID returns the +// ID returns the ID of a given Conn. func ID(c Conn) string { l := fmt.Sprintf("%s/%s", c.LocalMultiaddr(), c.LocalPeer().ID) r := fmt.Sprintf("%s/%s", c.RemoteMultiaddr(), c.RemotePeer().ID) @@ -141,182 +145,8 @@ func ID(c Conn) string { return u.Key(ch).Pretty() } -// Dialer is an object that can open connections. We could have a "convenience" -// Dial function as before, but it would have many arguments, as dialing is -// no longer simple (need a peerstore, a local peer, a context, a network, etc) -type Dialer struct { - - // LocalPeer is the identity of the local Peer. - LocalPeer *peer.Peer - - // Peerstore is the set of peers we know about locally. The Dialer needs it - // because when an incoming connection is identified, we should reuse the - // same peer objects (otherwise things get inconsistent). - Peerstore peer.Peerstore -} - -// Dial connects to a particular peer, over a given network -// Example: d.Dial(ctx, "udp", peer) -func (d *Dialer) Dial(ctx context.Context, network string, remote *peer.Peer) (Conn, error) { - laddr := d.LocalPeer.NetAddress(network) - if laddr == nil { - return nil, fmt.Errorf("No local address for network %s", network) - } - - raddr := remote.NetAddress(network) - if raddr == nil { - return nil, fmt.Errorf("No remote address for network %s", network) - } - - // TODO: try to get reusing addr/ports to work. - // madialer := manet.Dialer{LocalAddr: laddr} - madialer := manet.Dialer{} - - log.Info("%s dialing %s %s", d.LocalPeer, remote, raddr) - maconn, err := madialer.Dial(raddr) - if err != nil { - return nil, err - } - - if err := d.Peerstore.Put(remote); err != nil { - log.Error("Error putting peer into peerstore: %s", remote) - } - - c, err := newSingleConn(ctx, d.LocalPeer, remote, maconn) - if err != nil { - return nil, err - } - - return newSecureConn(ctx, c, d.Peerstore) -} - -// listener is an object that can accept connections. It implements Listener -type listener struct { - manet.Listener - - // chansize is the size of the internal channels for concurrency - chansize int - - // channel of incoming conections - conns chan Conn - - // Local multiaddr to listen on - maddr ma.Multiaddr - - // LocalPeer is the identity of the local Peer. - local *peer.Peer - - // Peerstore is the set of peers we know about locally - peers peer.Peerstore - - // embedded ContextCloser - ContextCloser -} - -// disambiguate -func (l *listener) Close() error { - return l.ContextCloser.Close() -} - -// close called by ContextCloser.Close -func (l *listener) close() error { - log.Info("listener closing: %s %s", l.local, l.maddr) - return l.Listener.Close() -} - -func (l *listener) listen() { - l.Children().Add(1) - defer l.Children().Done() - - // handle at most chansize concurrent handshakes - sem := make(chan struct{}, l.chansize) - - // handle is a goroutine work function that handles the handshake. - // it's here only so that accepting new connections can happen quickly. - handle := func(maconn manet.Conn) { - defer func() { <-sem }() // release - - c, err := newSingleConn(l.Context(), l.local, nil, maconn) - if err != nil { - log.Error("Error accepting connection: %v", err) - return - } - - sc, err := newSecureConn(l.Context(), c, l.peers) - if err != nil { - log.Error("Error securing connection: %v", err) - return - } - - l.conns <- sc - } - - for { - maconn, err := l.Listener.Accept() - if err != nil { - - // if closing, we should exit. - select { - case <-l.Closing(): - return // done. - default: - } - - log.Error("Failed to accept connection: %v", err) - continue - } - - sem <- struct{}{} // acquire - go handle(maconn) - } -} - -// Accept waits for and returns the next connection to the listener. -// Note that unfortunately this -func (l *listener) Accept() <-chan Conn { - return l.conns -} - -// Multiaddr is the identity of the local Peer. -func (l *listener) Multiaddr() ma.Multiaddr { - return l.maddr -} - -// LocalPeer is the identity of the local Peer. -func (l *listener) LocalPeer() *peer.Peer { - return l.local -} - -// Peerstore is the set of peers we know about locally. The Listener needs it -// because when an incoming connection is identified, we should reuse the -// same peer objects (otherwise things get inconsistent). -func (l *listener) Peerstore() peer.Peerstore { - return l.peers -} - -// Listen listens on the particular multiaddr, with given peer and peerstore. -func Listen(ctx context.Context, addr ma.Multiaddr, local *peer.Peer, peers peer.Peerstore) (Listener, error) { - - ml, err := manet.Listen(addr) - if err != nil { - return nil, err - } - - // todo make this a variable - chansize := 10 - - l := &listener{ - Listener: ml, - maddr: addr, - peers: peers, - local: local, - conns: make(chan Conn, chansize), - chansize: chansize, - } - - l.ContextCloser = NewContextCloser(ctx, l.close) - - go l.listen() - - return l, nil +// String returns the user-friendly String representation of a conn +func String(c Conn, typ string) string { + return fmt.Sprintf("%s (%s) <-- %s --> (%s) %s", + c.LocalPeer(), c.LocalMultiaddr(), typ, c.RemoteMultiaddr(), c.RemotePeer()) } diff --git a/net/conn/dial.go b/net/conn/dial.go new file mode 100644 index 0000000000000000000000000000000000000000..7bf85b9138fbd5a091de2baa21fc1c52678c4cbd --- /dev/null +++ b/net/conn/dial.go @@ -0,0 +1,46 @@ +package conn + +import ( + "fmt" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + + manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net" + + peer "github.com/jbenet/go-ipfs/peer" +) + +// Dial connects to a particular peer, over a given network +// Example: d.Dial(ctx, "udp", peer) +func (d *Dialer) Dial(ctx context.Context, network string, remote *peer.Peer) (Conn, error) { + laddr := d.LocalPeer.NetAddress(network) + if laddr == nil { + return nil, fmt.Errorf("No local address for network %s", network) + } + + raddr := remote.NetAddress(network) + if raddr == nil { + return nil, fmt.Errorf("No remote address for network %s", network) + } + + // TODO: try to get reusing addr/ports to work. + // madialer := manet.Dialer{LocalAddr: laddr} + madialer := manet.Dialer{} + + log.Info("%s dialing %s %s", d.LocalPeer, remote, raddr) + maconn, err := madialer.Dial(raddr) + if err != nil { + return nil, err + } + + if err := d.Peerstore.Put(remote); err != nil { + log.Error("Error putting peer into peerstore: %s", remote) + } + + c, err := newSingleConn(ctx, d.LocalPeer, remote, maconn) + if err != nil { + return nil, err + } + + return newSecureConn(ctx, c, d.Peerstore) +} diff --git a/net/conn/interface.go b/net/conn/interface.go index 4123772068daf5de5c2bb95f257fcdd6e3c0ebe0..d079490e79e89a5acf0c5eac7675cb05a077e654 100644 --- a/net/conn/interface.go +++ b/net/conn/interface.go @@ -40,6 +40,20 @@ type Conn interface { // Close() error -- already in ContextCloser } +// Dialer is an object that can open connections. We could have a "convenience" +// Dial function as before, but it would have many arguments, as dialing is +// no longer simple (need a peerstore, a local peer, a context, a network, etc) +type Dialer struct { + + // LocalPeer is the identity of the local Peer. + LocalPeer *peer.Peer + + // Peerstore is the set of peers we know about locally. The Dialer needs it + // because when an incoming connection is identified, we should reuse the + // same peer objects (otherwise things get inconsistent). + Peerstore peer.Peerstore +} + // Listener is an object that can accept connections. It matches net.Listener type Listener interface { diff --git a/net/conn/listen.go b/net/conn/listen.go new file mode 100644 index 0000000000000000000000000000000000000000..a3241472deb9d5c96e190d44736079107aa199ca --- /dev/null +++ b/net/conn/listen.go @@ -0,0 +1,140 @@ +package conn + +import ( + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net" + + peer "github.com/jbenet/go-ipfs/peer" +) + +// listener is an object that can accept connections. It implements Listener +type listener struct { + manet.Listener + + // chansize is the size of the internal channels for concurrency + chansize int + + // channel of incoming conections + conns chan Conn + + // Local multiaddr to listen on + maddr ma.Multiaddr + + // LocalPeer is the identity of the local Peer. + local *peer.Peer + + // Peerstore is the set of peers we know about locally + peers peer.Peerstore + + // embedded ContextCloser + ContextCloser +} + +// disambiguate +func (l *listener) Close() error { + return l.ContextCloser.Close() +} + +// close called by ContextCloser.Close +func (l *listener) close() error { + log.Info("listener closing: %s %s", l.local, l.maddr) + return l.Listener.Close() +} + +func (l *listener) listen() { + l.Children().Add(1) + defer l.Children().Done() + + // handle at most chansize concurrent handshakes + sem := make(chan struct{}, l.chansize) + + // handle is a goroutine work function that handles the handshake. + // it's here only so that accepting new connections can happen quickly. + handle := func(maconn manet.Conn) { + defer func() { <-sem }() // release + + c, err := newSingleConn(l.Context(), l.local, nil, maconn) + if err != nil { + log.Error("Error accepting connection: %v", err) + return + } + + sc, err := newSecureConn(l.Context(), c, l.peers) + if err != nil { + log.Error("Error securing connection: %v", err) + return + } + + l.conns <- sc + } + + for { + maconn, err := l.Listener.Accept() + if err != nil { + + // if closing, we should exit. + select { + case <-l.Closing(): + return // done. + default: + } + + log.Error("Failed to accept connection: %v", err) + continue + } + + sem <- struct{}{} // acquire + go handle(maconn) + } +} + +// Accept waits for and returns the next connection to the listener. +// Note that unfortunately this +func (l *listener) Accept() <-chan Conn { + return l.conns +} + +// Multiaddr is the identity of the local Peer. +func (l *listener) Multiaddr() ma.Multiaddr { + return l.maddr +} + +// LocalPeer is the identity of the local Peer. +func (l *listener) LocalPeer() *peer.Peer { + return l.local +} + +// Peerstore is the set of peers we know about locally. The Listener needs it +// because when an incoming connection is identified, we should reuse the +// same peer objects (otherwise things get inconsistent). +func (l *listener) Peerstore() peer.Peerstore { + return l.peers +} + +// Listen listens on the particular multiaddr, with given peer and peerstore. +func Listen(ctx context.Context, addr ma.Multiaddr, local *peer.Peer, peers peer.Peerstore) (Listener, error) { + + ml, err := manet.Listen(addr) + if err != nil { + return nil, err + } + + // todo make this a variable + chansize := 10 + + l := &listener{ + Listener: ml, + maddr: addr, + peers: peers, + local: local, + conns: make(chan Conn, chansize), + chansize: chansize, + } + + l.ContextCloser = NewContextCloser(ctx, l.close) + + go l.listen() + + return l, nil +} diff --git a/net/conn/secure_conn.go b/net/conn/secure_conn.go index 0314602f13a89f9a6bbfa6f3c6ee13141cd94568..ac51cb429312736eed1c2975912322b5413f17f0 100644 --- a/net/conn/secure_conn.go +++ b/net/conn/secure_conn.go @@ -98,6 +98,10 @@ func (c *secureConn) ID() string { return ID(c) } +func (c *secureConn) String() string { + return String(c, "secureConn") +} + // LocalMultiaddr is the Multiaddr on this side func (c *secureConn) LocalMultiaddr() ma.Multiaddr { return c.insecure.LocalMultiaddr()