Unverified Commit d03bc2ea authored by Raúl Kripalani's avatar Raúl Kripalani Committed by GitHub

simplify tests by removing ginkgo/gomega. (#60)

parent 43d1c530
This diff is collapsed.
...@@ -3,21 +3,24 @@ package stream_test ...@@ -3,21 +3,24 @@ package stream_test
import ( import (
"context" "context"
"errors" "errors"
"io"
"net" "net"
"sync" "sync"
"testing"
"time" "time"
"github.com/libp2p/go-libp2p-core/mux" "github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/sec/insecure" "github.com/libp2p/go-libp2p-core/sec/insecure"
tpt "github.com/libp2p/go-libp2p-core/transport" "github.com/libp2p/go-libp2p-core/transport"
mplex "github.com/libp2p/go-libp2p-mplex" mplex "github.com/libp2p/go-libp2p-mplex"
st "github.com/libp2p/go-libp2p-transport-upgrader"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net" manet "github.com/multiformats/go-multiaddr-net"
. "github.com/onsi/ginkgo" st "github.com/libp2p/go-libp2p-transport-upgrader"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/require"
) )
// negotiatingMuxer sets up a new mplex connection // negotiatingMuxer sets up a new mplex connection
...@@ -45,12 +48,18 @@ type blockingMuxer struct { ...@@ -45,12 +48,18 @@ type blockingMuxer struct {
var _ mux.Multiplexer = &blockingMuxer{} var _ mux.Multiplexer = &blockingMuxer{}
func newBlockingMuxer() *blockingMuxer { return &blockingMuxer{unblock: make(chan struct{})} } func newBlockingMuxer() *blockingMuxer {
return &blockingMuxer{unblock: make(chan struct{})}
}
func (m *blockingMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) { func (m *blockingMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
<-m.unblock <-m.unblock
return (&negotiatingMuxer{}).NewConn(c, isServer) return (&negotiatingMuxer{}).NewConn(c, isServer)
} }
func (m *blockingMuxer) Unblock() { close(m.unblock) }
func (m *blockingMuxer) Unblock() {
close(m.unblock)
}
// errorMuxer is a muxer that errors while setting up // errorMuxer is a muxer that errors while setting up
type errorMuxer struct{} type errorMuxer struct{}
...@@ -61,240 +70,318 @@ func (m *errorMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) { ...@@ -61,240 +70,318 @@ func (m *errorMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
return nil, errors.New("mux error") return nil, errors.New("mux error")
} }
var _ = Describe("Listener", func() { var (
var ( defaultUpgrader = &st.Upgrader{
defaultUpgrader = &st.Upgrader{ Secure: insecure.New(peer.ID(1)),
Secure: insecure.New(peer.ID(1)), Muxer: &negotiatingMuxer{},
Muxer: &negotiatingMuxer{}, }
} )
)
func init() {
transport.AcceptTimeout = 1 * time.Hour
}
testConn := func(clientConn, serverConn tpt.CapableConn) { func testConn(t *testing.T, clientConn, serverConn transport.CapableConn) {
cstr, err := clientConn.OpenStream() t.Helper()
ExpectWithOffset(0, err).ToNot(HaveOccurred()) require := require.New(t)
_, err = cstr.Write([]byte("foobar"))
ExpectWithOffset(0, err).ToNot(HaveOccurred()) cstr, err := clientConn.OpenStream()
sstr, err := serverConn.AcceptStream() require.NoError(err)
ExpectWithOffset(0, err).ToNot(HaveOccurred())
b := make([]byte, 6) _, err = cstr.Write([]byte("foobar"))
_, err = sstr.Read(b) require.NoError(err)
ExpectWithOffset(0, err).ToNot(HaveOccurred())
ExpectWithOffset(0, b).To(Equal([]byte("foobar"))) sstr, err := serverConn.AcceptStream()
require.NoError(err)
b := make([]byte, 6)
_, err = sstr.Read(b)
require.NoError(err)
require.Equal([]byte("foobar"), b)
}
func dial(t *testing.T, upgrader *st.Upgrader, raddr ma.Multiaddr, p peer.ID) (transport.CapableConn, error) {
t.Helper()
macon, err := manet.Dial(raddr)
if err != nil {
return nil, err
} }
createListener := func(upgrader *st.Upgrader) tpt.Listener { return upgrader.UpgradeOutbound(context.Background(), nil, macon, p)
addr, err := ma.NewMultiaddr("/ip4/0.0.0.0/tcp/0") }
ExpectWithOffset(0, err).ToNot(HaveOccurred())
ln, err := manet.Listen(addr) func createListener(t *testing.T, upgrader *st.Upgrader) transport.Listener {
ExpectWithOffset(0, err).ToNot(HaveOccurred()) t.Helper()
return upgrader.UpgradeListener(nil, ln) require := require.New(t)
addr, err := ma.NewMultiaddr("/ip4/0.0.0.0/tcp/0")
require.NoError(err)
ln, err := manet.Listen(addr)
require.NoError(err)
return upgrader.UpgradeListener(nil, ln)
}
func TestAcceptSingleConn(t *testing.T) {
require := require.New(t)
ln := createListener(t, defaultUpgrader)
defer ln.Close()
cconn, err := dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(1))
require.NoError(err)
sconn, err := ln.Accept()
require.NoError(err)
testConn(t, cconn, sconn)
}
func TestAcceptMultipleConns(t *testing.T) {
require := require.New(t)
ln := createListener(t, defaultUpgrader)
defer ln.Close()
var toClose []io.Closer
defer func() {
for _, c := range toClose {
_ = c.Close()
}
}()
for i := 0; i < 10; i++ {
cconn, err := dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(1))
require.NoError(err)
toClose = append(toClose, cconn)
sconn, err := ln.Accept()
require.NoError(err)
toClose = append(toClose, sconn)
testConn(t, cconn, sconn)
} }
}
dial := func(upgrader *st.Upgrader, raddr ma.Multiaddr, p peer.ID) (tpt.CapableConn, error) { func TestConnectionsClosedIfNotAccepted(t *testing.T) {
macon, err := manet.Dial(raddr) require := require.New(t)
const timeout = 200 * time.Millisecond
transport.AcceptTimeout = timeout
defer func() { transport.AcceptTimeout = 1 * time.Hour }()
ln := createListener(t, defaultUpgrader)
defer ln.Close()
conn, err := dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(2))
require.NoError(err)
errCh := make(chan error)
go func() {
defer conn.Close()
str, err := conn.OpenStream()
if err != nil { if err != nil {
return nil, err errCh <- err
return
} }
return upgrader.UpgradeOutbound(context.Background(), nil, macon, p) // start a Read. It will block until the connection is closed
_, _ = str.Read([]byte{0})
errCh <- nil
}()
time.Sleep(timeout / 2)
select {
case err := <-errCh:
t.Fatalf("connection closed earlier than expected. expected nothing on channel, got: %v", err)
default:
} }
BeforeEach(func() { time.Sleep(timeout)
tpt.AcceptTimeout = time.Hour require.Nil(<-errCh)
}) }
It("accepts a single connection", func() { func TestFailedUpgradeOnListen(t *testing.T) {
ln := createListener(defaultUpgrader) require := require.New(t)
defer ln.Close()
cconn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(1)) upgrader := &st.Upgrader{
Expect(err).ToNot(HaveOccurred()) Secure: insecure.New(peer.ID(1)),
sconn, err := ln.Accept() Muxer: &errorMuxer{},
Expect(err).ToNot(HaveOccurred()) }
testConn(cconn, sconn)
}) ln := createListener(t, upgrader)
defer ln.Close()
It("accepts multiple connections", func() {
ln := createListener(defaultUpgrader) errCh := make(chan error)
defer ln.Close() go func() {
const num = 10 _, err := ln.Accept()
for i := 0; i < 10; i++ { errCh <- err
cconn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(1)) }()
Expect(err).ToNot(HaveOccurred())
sconn, err := ln.Accept() _, err := dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(2))
Expect(err).ToNot(HaveOccurred()) require.Error(err)
testConn(cconn, sconn)
} // close the listener.
}) ln.Close()
require.Error(<-errCh)
It("closes connections if they are not accepted", func() { }
const timeout = 200 * time.Millisecond
tpt.AcceptTimeout = timeout func TestListenerClose(t *testing.T) {
ln := createListener(defaultUpgrader) require := require.New(t)
defer ln.Close()
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(2)) ln := createListener(t, defaultUpgrader)
if !Expect(err).ToNot(HaveOccurred()) {
return errCh := make(chan error)
go func() {
_, err := ln.Accept()
errCh <- err
}()
select {
case err := <-errCh:
t.Fatalf("connection closed earlier than expected. expected nothing on channel, got: %v", err)
case <-time.After(200 * time.Millisecond):
// nothing in 200ms.
}
// unblocks Accept when it is closed.
err := ln.Close()
require.NoError(err)
err = <-errCh
require.Error(err)
require.Contains(err.Error(), "use of closed network connection")
// doesn't accept new connections when it is closed
_, err = dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(1))
require.Error(err)
}
func TestListenerCloseClosesQueued(t *testing.T) {
require := require.New(t)
ln := createListener(t, defaultUpgrader)
var conns []transport.CapableConn
for i := 0; i < 10; i++ {
conn, err := dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(i))
require.NoError(err)
conns = append(conns, conn)
}
// wait for all the dials to happen.
time.Sleep(500 * time.Millisecond)
// all the connections are opened.
for _, c := range conns {
require.False(c.IsClosed())
}
// expect that all the connections will be closed.
err := ln.Close()
require.NoError(err)
// all the connections are closed.
require.Eventually(func() bool {
for _, c := range conns {
if !c.IsClosed() {
return false
}
} }
done := make(chan struct{}) return true
go func() { }, 3*time.Second, 100*time.Millisecond)
defer GinkgoRecover()
defer conn.Close()
str, err := conn.OpenStream()
Expect(err).ToNot(HaveOccurred())
// start a Read. It will block until the connection is closed
str.Read([]byte{0})
close(done)
}()
Consistently(done, timeout/2).ShouldNot(BeClosed())
Eventually(done, timeout).Should(BeClosed())
})
It("doesn't accept connections that fail to setup", func() { for _, c := range conns {
upgrader := &st.Upgrader{ _ = c.Close()
}
}
func TestConcurrentAccept(t *testing.T) {
var (
require = require.New(t)
num = 3 * st.AcceptQueueLength
blockingMuxer = newBlockingMuxer()
upgrader = &st.Upgrader{
Secure: insecure.New(peer.ID(1)), Secure: insecure.New(peer.ID(1)),
Muxer: &errorMuxer{}, Muxer: blockingMuxer,
} }
ln := createListener(upgrader) )
done := make(chan struct{})
go func() { ln := createListener(t, upgrader)
defer GinkgoRecover() defer ln.Close()
accepted := make(chan transport.CapableConn, num)
go func() {
for {
conn, err := ln.Accept() conn, err := ln.Accept()
if !Expect(err).To(HaveOccurred()) { if err != nil {
conn.Close() return
} }
close(done) _ = conn.Close()
}() accepted <- conn
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(2))
if !Expect(err).To(HaveOccurred()) {
conn.Close()
} }
Consistently(done).ShouldNot(BeClosed()) }()
// make the goroutine return
ln.Close()
Eventually(done).Should(BeClosed())
})
Context("concurrency", func() {
It("sets up connections concurrently", func() {
num := 3 * st.AcceptQueueLength
bm := newBlockingMuxer()
upgrader := &st.Upgrader{
Secure: insecure.New(peer.ID(1)),
Muxer: bm,
}
ln := createListener(upgrader)
accepted := make(chan tpt.CapableConn, num)
go func() {
defer GinkgoRecover()
for {
conn, err := ln.Accept()
if err != nil {
return
}
conn.Close()
accepted <- conn
}
}()
var wg sync.WaitGroup
// start num dials, which all block while setting up the muxer
for i := 0; i < num; i++ {
wg.Add(1)
go func() {
defer GinkgoRecover()
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(2))
if Expect(err).ToNot(HaveOccurred()) {
stream, err := conn.AcceptStream() // wait for conn to be accepted.
if !Expect(err).To(HaveOccurred()) {
stream.Close()
}
conn.Close()
}
wg.Done()
}()
}
// the dials are still blocked, so we shouldn't have any connection available yet
Consistently(accepted).Should(BeEmpty())
bm.Unblock() // make all dials succeed
Eventually(accepted).Should(HaveLen(num))
wg.Wait()
})
It("stops setting up when the more than AcceptQueueLength connections are waiting to get accepted", func() {
ln := createListener(defaultUpgrader)
defer ln.Close()
// setup AcceptQueueLength connections, but don't accept any of them
dialed := make(chan tpt.CapableConn, 10*st.AcceptQueueLength) // used as a thread-safe counter
for i := 0; i < st.AcceptQueueLength; i++ {
go func() {
defer GinkgoRecover()
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(2))
Expect(err).ToNot(HaveOccurred())
dialed <- conn
}()
}
Eventually(dialed).Should(HaveLen(st.AcceptQueueLength))
// dial a new connection. This connection should not complete setup, since the queue is full
go func() {
defer GinkgoRecover()
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(2))
Expect(err).ToNot(HaveOccurred())
dialed <- conn
}()
Consistently(dialed).Should(HaveLen(st.AcceptQueueLength))
// accept a single connection. Now the new connection should be set up, and fill the queue again
conn, err := ln.Accept()
if Expect(err).ToNot(HaveOccurred()) {
conn.Close()
}
Eventually(dialed).Should(HaveLen(st.AcceptQueueLength + 1))
// Cleanup // start num dials, which all block while setting up the muxer
for i := 0; i < st.AcceptQueueLength+1; i++ { errCh := make(chan error, num)
if c := <-dialed; c != nil { var wg sync.WaitGroup
c.Close() for i := 0; i < num; i++ {
} wg.Add(1)
} go func() {
}) defer wg.Done()
})
Context("closing", func() {
It("unblocks Accept when it is closed", func() {
ln := createListener(defaultUpgrader)
done := make(chan struct{})
go func() {
defer GinkgoRecover()
conn, err := ln.Accept()
if Expect(err).To(HaveOccurred()) {
Expect(err.Error()).To(ContainSubstring("use of closed network connection"))
} else {
conn.Close()
}
close(done)
}()
Consistently(done).ShouldNot(BeClosed())
Expect(ln.Close()).To(Succeed())
Eventually(done).Should(BeClosed())
})
It("doesn't accept new connections when it is closed", func() {
ln := createListener(defaultUpgrader)
Expect(ln.Close()).To(Succeed())
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(1))
if !Expect(err).To(HaveOccurred()) {
conn.Close()
}
})
It("closes incoming connections that have not yet been accepted", func() { conn, err := dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(2))
ln := createListener(defaultUpgrader) if err != nil {
conn, err := dial(defaultUpgrader, ln.Multiaddr(), peer.ID(2)) errCh <- err
if !Expect(err).ToNot(HaveOccurred()) {
ln.Close()
return return
} }
Expect(conn.IsClosed()).To(BeFalse()) defer conn.Close()
Expect(ln.Close()).To(Succeed())
Eventually(conn.IsClosed).Should(BeTrue()) _, err = conn.AcceptStream() // wait for conn to be accepted.
}) errCh <- err
}) }()
}) }
time.Sleep(200 * time.Millisecond)
// the dials are still blocked, so we shouldn't have any connection available yet
require.Empty(accepted)
blockingMuxer.Unblock() // make all dials succeed
require.Eventually(func() bool { return len(accepted) == num }, 3*time.Second, 100*time.Millisecond)
wg.Wait()
}
func TestAcceptQueueBacklogged(t *testing.T) {
require := require.New(t)
ln := createListener(t, defaultUpgrader)
defer ln.Close()
// setup AcceptQueueLength connections, but don't accept any of them
errCh := make(chan error, st.AcceptQueueLength+1)
doDial := func() {
conn, err := dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(2))
errCh <- err
if conn != nil {
_ = conn.Close()
}
}
for i := 0; i < st.AcceptQueueLength; i++ {
go doDial()
}
require.Eventually(func() bool { return len(errCh) == st.AcceptQueueLength }, 2*time.Second, 100*time.Millisecond)
// dial a new connection. This connection should not complete setup, since the queue is full
go doDial()
time.Sleep(500 * time.Millisecond)
require.Len(errCh, st.AcceptQueueLength)
// accept a single connection. Now the new connection should be set up, and fill the queue again
conn, err := ln.Accept()
require.NoError(err)
_ = conn.Close()
require.Eventually(func() bool { return len(errCh) == st.AcceptQueueLength+1 }, 2*time.Second, 100*time.Millisecond)
}
...@@ -84,16 +84,19 @@ func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma ...@@ -84,16 +84,19 @@ func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
" of Private Networks is forced by the enviroment") " of Private Networks is forced by the enviroment")
return nil, ipnet.ErrNotInPrivateNetwork return nil, ipnet.ErrNotInPrivateNetwork
} }
sconn, err := u.setupSecurity(ctx, conn, p) sconn, err := u.setupSecurity(ctx, conn, p)
if err != nil { if err != nil {
conn.Close() conn.Close()
return nil, fmt.Errorf("failed to negotiate security protocol: %s", err) return nil, fmt.Errorf("failed to negotiate security protocol: %s", err)
} }
smconn, err := u.setupMuxer(ctx, sconn, p) smconn, err := u.setupMuxer(ctx, sconn, p)
if err != nil { if err != nil {
sconn.Close() sconn.Close()
return nil, fmt.Errorf("failed to negotiate stream multiplexer: %s", err) return nil, fmt.Errorf("failed to negotiate stream multiplexer: %s", err)
} }
return &transportConn{ return &transportConn{
MuxedConn: smconn, MuxedConn: smconn,
ConnMultiaddrs: maconn, ConnMultiaddrs: maconn,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment