Commit ffba0314 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

test closing/cancellation

- does end properly
- no goroutines leaked!
parent 8065b61c
......@@ -18,6 +18,7 @@ import (
"hash"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ci "github.com/jbenet/go-ipfs/crypto"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
......@@ -229,7 +230,15 @@ func (s *SecurePipe) handleSecureIn(hashType string, tIV, tCKey, tMKey []byte) {
theirMac, macSize := makeMac(hashType, tMKey)
for {
data, ok := <-s.insecure.In
var data []byte
ok := true
select {
case <-s.ctx.Done():
ok = false // return out
case data, ok = <-s.insecure.In:
}
if !ok {
close(s.Duplex.In)
return
......@@ -266,8 +275,17 @@ func (s *SecurePipe) handleSecureOut(hashType string, mIV, mCKey, mMKey []byte)
myMac, macSize := makeMac(hashType, mMKey)
for {
data, ok := <-s.Out
var data []byte
ok := true
select {
case <-s.ctx.Done():
ok = false // return out
case data, ok = <-s.Out:
}
if !ok {
close(s.insecure.Out)
return
}
......@@ -363,7 +381,8 @@ func getOrConstructPeer(peers peer.Peerstore, rpk ci.PubKey) (*peer.Peer, error)
// this shouldn't ever happen, given we hashed, etc, but it could mean
// expected code (or protocol) invariants violated.
if !npeer.PubKey.Equals(rpk) {
return nil, fmt.Errorf("WARNING: PubKey mismatch: %v", npeer)
log.Error("WARNING: PubKey mismatch: %v", npeer)
panic("secure channel pubkey mismatch")
}
return npeer, nil
}
package conn
import (
"bytes"
"fmt"
"runtime"
"strconv"
"sync"
"testing"
"time"
ci "github.com/jbenet/go-ipfs/crypto"
peer "github.com/jbenet/go-ipfs/peer"
......@@ -55,6 +61,43 @@ func echo(ctx context.Context, c Conn) {
}
}
func setupConn(t *testing.T, ctx context.Context, a1, a2 string) (a, b Conn) {
p1, err := setupPeer(a1)
if err != nil {
t.Fatal("error setting up peer", err)
}
p2, err := setupPeer(a2)
if err != nil {
t.Fatal("error setting up peer", err)
}
laddr := p1.NetAddress("tcp")
if laddr == nil {
t.Fatal("Listen address is nil.")
}
l1, err := Listen(ctx, laddr, p1, peer.NewPeerstore())
if err != nil {
t.Fatal(err)
}
d2 := &Dialer{
Peerstore: peer.NewPeerstore(),
LocalPeer: p2,
}
c2, err := d2.Dial(ctx, "tcp", p1)
if err != nil {
t.Fatal("error dialing peer", err)
}
c1 := <-l1.Accept()
return c1, c2
}
func TestDialer(t *testing.T) {
p1, err := setupPeer("/ip4/127.0.0.1/tcp/1234")
......@@ -113,3 +156,119 @@ func TestDialer(t *testing.T) {
l.Close()
cancel()
}
func TestClose(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/1234", "/ip4/127.0.0.1/tcp/2345")
select {
case <-c1.Done():
t.Fatal("done before close")
case <-c2.Done():
t.Fatal("done before close")
default:
}
c1.Close()
select {
case <-c1.Done():
default:
t.Fatal("not done after cancel")
}
c2.Close()
select {
case <-c2.Done():
default:
t.Fatal("not done after cancel")
}
cancel() // close the listener :P
}
func TestCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/1234", "/ip4/127.0.0.1/tcp/2345")
select {
case <-c1.Done():
t.Fatal("done before close")
case <-c2.Done():
t.Fatal("done before close")
default:
}
cancel()
// wait to ensure other goroutines run and close things.
<-time.After(time.Microsecond * 10)
// test that cancel called Close.
select {
case <-c1.Done():
default:
t.Fatal("not done after cancel")
}
select {
case <-c2.Done():
default:
t.Fatal("not done after cancel")
}
}
func TestCloseLeak(t *testing.T) {
var wg sync.WaitGroup
runPair := func(p1, p2, num int) {
a1 := strconv.Itoa(p1)
a2 := strconv.Itoa(p2)
ctx, cancel := context.WithCancel(context.Background())
c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/"+a1, "/ip4/127.0.0.1/tcp/"+a2)
for i := 0; i < num; i++ {
b1 := []byte("beep")
c1.Out() <- b1
b2 := <-c2.In()
if !bytes.Equal(b1, b2) {
panic("bytes not equal")
}
b2 = []byte("boop")
c2.Out() <- b2
b1 = <-c1.In()
if !bytes.Equal(b1, b2) {
panic("bytes not equal")
}
<-time.After(time.Microsecond * 5)
}
cancel() // close the listener
wg.Done()
}
var cons = 20
var msgs = 100
fmt.Printf("Running %d connections * %d msgs.\n", cons, msgs)
for i := 0; i < cons; i++ {
wg.Add(1)
go runPair(2000+i, 2001+i, msgs)
}
fmt.Printf("Waiting...\n")
wg.Wait()
// done!
<-time.After(time.Microsecond * 100)
if runtime.NumGoroutine() > 10 {
// panic("uncomment me to debug")
t.Fatal("leaking goroutines:", runtime.NumGoroutine())
}
}
......@@ -12,6 +12,8 @@ type Map map[u.Key]Conn
// Conn is a generic message-based Peer-to-Peer connection.
type Conn interface {
// implement ContextCloser too!
ContextCloser
// LocalPeer is the Peer on this side
LocalPeer() *peer.Peer
......@@ -26,7 +28,7 @@ type Conn interface {
Out() chan<- []byte
// Close ends the connection
Close() error
// Close() error -- already in ContextCloser
}
// Listener is an object that can accept connections. It matches net.Listener
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment