Commit afed188d authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

separated out secure conn

parent ffba0314
package conn
import (
"errors"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
......@@ -62,13 +64,14 @@ func (c *contextCloser) Done() Wait {
func (c *contextCloser) Close() error {
select {
case <-c.Done():
panic("closed twice")
// panic("closed twice")
return errors.New("closed twice")
default:
}
c.cancel() // release anyone waiting on the context
err := c.closeFunc() // actually run the close logic
close(c.closed) // relase everyone waiting on Done
c.cancel() // release anyone waiting on the context
return err
}
......
package conn
import (
"errors"
"fmt"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
......@@ -9,7 +8,6 @@ import (
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net"
spipe "github.com/jbenet/go-ipfs/crypto/spipe"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
......@@ -40,22 +38,20 @@ type singleConn struct {
local *peer.Peer
remote *peer.Peer
maconn manet.Conn
secure *spipe.SecurePipe
insecure *msgioPipe
msgio *msgioPipe
ContextCloser
}
// newConn constructs a new connection
func newSingleConn(ctx context.Context, local, remote *peer.Peer,
peers peer.Peerstore, maconn manet.Conn) (Conn, error) {
maconn manet.Conn) (Conn, error) {
conn := &singleConn{
local: local,
remote: remote,
maconn: maconn,
insecure: newMsgioPipe(10),
local: local,
remote: remote,
maconn: maconn,
msgio: newMsgioPipe(10),
}
conn.ContextCloser = NewContextCloser(ctx, conn.close)
......@@ -63,65 +59,19 @@ func newSingleConn(ctx context.Context, local, remote *peer.Peer,
log.Info("newSingleConn: %v to %v", local, remote)
// setup the various io goroutines
go conn.insecure.outgoing.WriteTo(maconn)
go conn.insecure.incoming.ReadFrom(maconn, MaxMessageSize)
// perform secure handshake before returning this connection.
if err := conn.secureHandshake(peers); err != nil {
conn.Close()
return nil, err
}
go conn.msgio.outgoing.WriteTo(maconn)
go conn.msgio.incoming.ReadFrom(maconn, MaxMessageSize)
return conn, nil
}
// secureHandshake performs the spipe secure handshake.
func (c *singleConn) secureHandshake(peers peer.Peerstore) error {
if c.secure != nil {
return errors.New("Conn is already secured or being secured.")
}
// setup a Duplex pipe for spipe
insecure := spipe.Duplex{
In: c.insecure.incoming.MsgChan,
Out: c.insecure.outgoing.MsgChan,
}
// spipe performs the secure handshake, which takes multiple RTT
sp, err := spipe.NewSecurePipe(c.Context(), 10, c.local, peers, insecure)
if err != nil {
return err
}
// assign it into the conn object
c.secure = sp
if c.remote == nil {
c.remote = c.secure.RemotePeer()
} else if c.remote != c.secure.RemotePeer() {
// this panic is here because this would be an insidious programmer error
// that we need to ensure we catch.
log.Error("%v != %v", c.remote, c.secure.RemotePeer())
panic("peers not being constructed correctly.")
}
return nil
}
// close is the internal close function, called by ContextCloser.Close
func (c *singleConn) close() error {
log.Debug("%s closing Conn with %s", c.local, c.remote)
// close underlying connection
err := c.maconn.Close()
// closing channels
c.insecure.outgoing.Close()
if c.secure != nil { // may never have gotten here.
c.secure.Close()
}
c.msgio.outgoing.Close()
return err
}
......@@ -137,12 +87,12 @@ func (c *singleConn) RemotePeer() *peer.Peer {
// In returns a readable message channel
func (c *singleConn) In() <-chan []byte {
return c.secure.In
return c.msgio.incoming.MsgChan
}
// Out returns a writable message channel
func (c *singleConn) Out() chan<- []byte {
return c.secure.Out
return c.msgio.outgoing.MsgChan
}
// Dialer is an object that can open connections. We could have a "convenience"
......@@ -186,7 +136,12 @@ func (d *Dialer) Dial(ctx context.Context, network string, remote *peer.Peer) (C
log.Error("Error putting peer into peerstore: %s", remote)
}
return newSingleConn(ctx, d.LocalPeer, remote, d.Peerstore, maconn)
c, err := newSingleConn(ctx, d.LocalPeer, remote, maconn)
if err != nil {
return nil, err
}
return newSecureConn(ctx, c, d.Peerstore)
}
// listener is an object that can accept connections. It implements Listener
......@@ -240,13 +195,21 @@ func (l *listener) listen() {
// handle is a goroutine work function that handles the handshake.
// it's here only so that accepting new connections can happen quickly.
handle := func(maconn manet.Conn) {
c, err := newSingleConn(l.Context(), l.local, nil, l.peers, maconn)
defer func() { <-sem }() // release
c, err := newSingleConn(l.Context(), l.local, nil, maconn)
if err != nil {
log.Error("Error accepting connection: %v", err)
} else {
l.conns <- c
return
}
<-sem // release
sc, err := newSecureConn(l.Context(), c, l.peers)
if err != nil {
log.Error("Error securing connection: %v", err)
return
}
l.conns <- sc
}
for {
......
......@@ -9,154 +9,9 @@ import (
"testing"
"time"
ci "github.com/jbenet/go-ipfs/crypto"
peer "github.com/jbenet/go-ipfs/peer"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
func setupPeer(addr string) (*peer.Peer, error) {
tcp, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512)
if err != nil {
return nil, err
}
id, err := peer.IDFromPubKey(pk)
if err != nil {
return nil, err
}
p := &peer.Peer{ID: id}
p.PrivKey = sk
p.PubKey = pk
p.AddAddress(tcp)
return p, nil
}
func echoListen(ctx context.Context, listener Listener) {
for {
select {
case <-ctx.Done():
return
case c := <-listener.Accept():
go echo(ctx, c)
}
}
}
func echo(ctx context.Context, c Conn) {
for {
select {
case <-ctx.Done():
return
case m := <-c.In():
c.Out() <- m
}
}
}
func setupConn(t *testing.T, ctx context.Context, a1, a2 string) (a, b Conn) {
p1, err := setupPeer(a1)
if err != nil {
t.Fatal("error setting up peer", err)
}
p2, err := setupPeer(a2)
if err != nil {
t.Fatal("error setting up peer", err)
}
laddr := p1.NetAddress("tcp")
if laddr == nil {
t.Fatal("Listen address is nil.")
}
l1, err := Listen(ctx, laddr, p1, peer.NewPeerstore())
if err != nil {
t.Fatal(err)
}
d2 := &Dialer{
Peerstore: peer.NewPeerstore(),
LocalPeer: p2,
}
c2, err := d2.Dial(ctx, "tcp", p1)
if err != nil {
t.Fatal("error dialing peer", err)
}
c1 := <-l1.Accept()
return c1, c2
}
func TestDialer(t *testing.T) {
p1, err := setupPeer("/ip4/127.0.0.1/tcp/1234")
if err != nil {
t.Fatal("error setting up peer", err)
}
p2, err := setupPeer("/ip4/127.0.0.1/tcp/3456")
if err != nil {
t.Fatal("error setting up peer", err)
}
ctx, cancel := context.WithCancel(context.Background())
laddr := p1.NetAddress("tcp")
if laddr == nil {
t.Fatal("Listen address is nil.")
}
l, err := Listen(ctx, laddr, p1, peer.NewPeerstore())
if err != nil {
t.Fatal(err)
}
go echoListen(ctx, l)
d := &Dialer{
Peerstore: peer.NewPeerstore(),
LocalPeer: p2,
}
c, err := d.Dial(ctx, "tcp", p1)
if err != nil {
t.Fatal("error dialing peer", err)
}
// fmt.Println("sending")
c.Out() <- []byte("beep")
c.Out() <- []byte("boop")
out := <-c.In()
// fmt.Println("recving", string(out))
data := string(out)
if data != "beep" {
t.Error("unexpected conn output", data)
}
out = <-c.In()
data = string(out)
if string(out) != "boop" {
t.Error("unexpected conn output", data)
}
// fmt.Println("closing")
c.Close()
l.Close()
cancel()
}
func TestClose(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
......
package conn
import (
"testing"
ci "github.com/jbenet/go-ipfs/crypto"
peer "github.com/jbenet/go-ipfs/peer"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
func setupPeer(addr string) (*peer.Peer, error) {
tcp, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512)
if err != nil {
return nil, err
}
id, err := peer.IDFromPubKey(pk)
if err != nil {
return nil, err
}
p := &peer.Peer{ID: id}
p.PrivKey = sk
p.PubKey = pk
p.AddAddress(tcp)
return p, nil
}
func echoListen(ctx context.Context, listener Listener) {
for {
select {
case <-ctx.Done():
return
case c := <-listener.Accept():
go echo(ctx, c)
}
}
}
func echo(ctx context.Context, c Conn) {
for {
select {
case <-ctx.Done():
return
case m := <-c.In():
c.Out() <- m
}
}
}
func setupConn(t *testing.T, ctx context.Context, a1, a2 string) (a, b Conn) {
p1, err := setupPeer(a1)
if err != nil {
t.Fatal("error setting up peer", err)
}
p2, err := setupPeer(a2)
if err != nil {
t.Fatal("error setting up peer", err)
}
laddr := p1.NetAddress("tcp")
if laddr == nil {
t.Fatal("Listen address is nil.")
}
l1, err := Listen(ctx, laddr, p1, peer.NewPeerstore())
if err != nil {
t.Fatal(err)
}
d2 := &Dialer{
Peerstore: peer.NewPeerstore(),
LocalPeer: p2,
}
c2, err := d2.Dial(ctx, "tcp", p1)
if err != nil {
t.Fatal("error dialing peer", err)
}
c1 := <-l1.Accept()
return c1, c2
}
func TestDialer(t *testing.T) {
p1, err := setupPeer("/ip4/127.0.0.1/tcp/1234")
if err != nil {
t.Fatal("error setting up peer", err)
}
p2, err := setupPeer("/ip4/127.0.0.1/tcp/3456")
if err != nil {
t.Fatal("error setting up peer", err)
}
ctx, cancel := context.WithCancel(context.Background())
laddr := p1.NetAddress("tcp")
if laddr == nil {
t.Fatal("Listen address is nil.")
}
l, err := Listen(ctx, laddr, p1, peer.NewPeerstore())
if err != nil {
t.Fatal(err)
}
go echoListen(ctx, l)
d := &Dialer{
Peerstore: peer.NewPeerstore(),
LocalPeer: p2,
}
c, err := d.Dial(ctx, "tcp", p1)
if err != nil {
t.Fatal("error dialing peer", err)
}
// fmt.Println("sending")
c.Out() <- []byte("beep")
c.Out() <- []byte("boop")
out := <-c.In()
// fmt.Println("recving", string(out))
data := string(out)
if data != "beep" {
t.Error("unexpected conn output", data)
}
out = <-c.In()
data = string(out)
if string(out) != "boop" {
t.Error("unexpected conn output", data)
}
// fmt.Println("closing")
c.Close()
l.Close()
cancel()
}
package conn
import (
"errors"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
spipe "github.com/jbenet/go-ipfs/crypto/spipe"
peer "github.com/jbenet/go-ipfs/peer"
)
// secureConn wraps another Conn object with an encrypted channel.
type secureConn struct {
// the wrapped conn
insecure Conn
// secure pipe, wrapping insecure
secure *spipe.SecurePipe
ContextCloser
}
// newConn constructs a new connection
func newSecureConn(ctx context.Context, insecure Conn, peers peer.Peerstore) (Conn, error) {
conn := &secureConn{
insecure: insecure,
}
conn.ContextCloser = NewContextCloser(ctx, conn.close)
log.Debug("newSecureConn: %v to %v", insecure.LocalPeer(), insecure.RemotePeer())
// perform secure handshake before returning this connection.
if err := conn.secureHandshake(peers); err != nil {
conn.Close()
return nil, err
}
log.Debug("newSecureConn: %v to %v handshake success!", insecure.LocalPeer(), insecure.RemotePeer())
return conn, nil
}
// secureHandshake performs the spipe secure handshake.
func (c *secureConn) secureHandshake(peers peer.Peerstore) error {
if c.secure != nil {
return errors.New("Conn is already secured or being secured.")
}
// ok to panic here if this type assertion fails. Interface hack.
// when we support wrapping other Conns, we'll need to change
// spipe to do something else.
insecureSC := c.insecure.(*singleConn)
// setup a Duplex pipe for spipe
insecureD := spipe.Duplex{
In: insecureSC.msgio.incoming.MsgChan,
Out: insecureSC.msgio.outgoing.MsgChan,
}
// spipe performs the secure handshake, which takes multiple RTT
sp, err := spipe.NewSecurePipe(c.Context(), 10, c.LocalPeer(), peers, insecureD)
if err != nil {
return err
}
// assign it into the conn object
c.secure = sp
// if we do not know RemotePeer, get it from secure chan (who identifies it)
if insecureSC.remote == nil {
insecureSC.remote = c.secure.RemotePeer()
} else if insecureSC.remote != c.secure.RemotePeer() {
// this panic is here because this would be an insidious programmer error
// that we need to ensure we catch.
// update: this actually might happen under normal operation-- should
// perhaps return an error. TBD.
log.Error("secureConn peer mismatch. %v != %v", insecureSC.remote, c.secure.RemotePeer())
panic("secureConn peer mismatch. consructed incorrectly?")
}
return nil
}
// close is called by ContextCloser
func (c *secureConn) close() error {
err := c.insecure.Close()
if c.secure != nil { // may never have gotten here.
err = c.secure.Close()
}
return err
}
// LocalPeer is the Peer on this side
func (c *secureConn) LocalPeer() *peer.Peer {
return c.insecure.LocalPeer()
}
// RemotePeer is the Peer on the remote side
func (c *secureConn) RemotePeer() *peer.Peer {
return c.insecure.RemotePeer()
}
// In returns a readable message channel
func (c *secureConn) In() <-chan []byte {
return c.secure.In
}
// Out returns a writable message channel
func (c *secureConn) Out() chan<- []byte {
return c.secure.Out
}
package conn
import (
"bytes"
"fmt"
"runtime"
"strconv"
"sync"
"testing"
"time"
peer "github.com/jbenet/go-ipfs/peer"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
func setupSecureConn(t *testing.T, c Conn) Conn {
c, ok := c.(*secureConn)
if ok {
return c
}
// shouldn't happen, because dial + listen already return secure conns.
s, err := newSecureConn(c.Context(), c, peer.NewPeerstore())
if err != nil {
t.Fatal(err)
}
return s
}
func TestSecureClose(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/1234", "/ip4/127.0.0.1/tcp/2345")
c1 = setupSecureConn(t, c1)
c2 = setupSecureConn(t, c2)
select {
case <-c1.Done():
t.Fatal("done before close")
case <-c2.Done():
t.Fatal("done before close")
default:
}
c1.Close()
select {
case <-c1.Done():
default:
t.Fatal("not done after cancel")
}
c2.Close()
select {
case <-c2.Done():
default:
t.Fatal("not done after cancel")
}
cancel() // close the listener :P
}
func TestSecureCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/1234", "/ip4/127.0.0.1/tcp/2345")
c1 = setupSecureConn(t, c1)
c2 = setupSecureConn(t, c2)
select {
case <-c1.Done():
t.Fatal("done before close")
case <-c2.Done():
t.Fatal("done before close")
default:
}
cancel()
// wait to ensure other goroutines run and close things.
<-time.After(time.Microsecond * 10)
// test that cancel called Close.
select {
case <-c1.Done():
default:
t.Fatal("not done after cancel")
}
select {
case <-c2.Done():
default:
t.Fatal("not done after cancel")
}
}
func TestSecureCloseLeak(t *testing.T) {
var wg sync.WaitGroup
runPair := func(p1, p2, num int) {
a1 := strconv.Itoa(p1)
a2 := strconv.Itoa(p2)
ctx, cancel := context.WithCancel(context.Background())
c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/"+a1, "/ip4/127.0.0.1/tcp/"+a2)
c1 = setupSecureConn(t, c1)
c2 = setupSecureConn(t, c2)
for i := 0; i < num; i++ {
b1 := []byte("beep")
c1.Out() <- b1
b2 := <-c2.In()
if !bytes.Equal(b1, b2) {
panic("bytes not equal")
}
b2 = []byte("boop")
c2.Out() <- b2
b1 = <-c1.In()
if !bytes.Equal(b1, b2) {
panic("bytes not equal")
}
<-time.After(time.Microsecond * 5)
}
cancel() // close the listener
wg.Done()
}
var cons = 20
var msgs = 100
fmt.Printf("Running %d connections * %d msgs.\n", cons, msgs)
for i := 0; i < cons; i++ {
wg.Add(1)
go runPair(2000+i, 2001+i, msgs)
}
fmt.Printf("Waiting...\n")
wg.Wait()
// done!
<-time.After(time.Microsecond * 100)
if runtime.NumGoroutine() > 10 {
// panic("uncomment me to debug")
t.Fatal("leaking goroutines:", runtime.NumGoroutine())
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment