Unverified Commit 0c577bb4 authored by Aarsh Shah's avatar Aarsh Shah Committed by GitHub

call the connection gater when accepting connections and after crypto handshake (#55)

parent 1a7cd598
package stream_test
import (
"sync"
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/control"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)
type testGater struct {
sync.Mutex
blockAccept, blockSecured bool
}
var _ connmgr.ConnectionGater = (*testGater)(nil)
func (t *testGater) BlockAccept(block bool) {
t.Lock()
defer t.Unlock()
t.blockAccept = block
}
func (t *testGater) BlockSecured(block bool) {
t.Lock()
defer t.Unlock()
t.blockSecured = block
}
func (t *testGater) InterceptPeerDial(p peer.ID) (allow bool) {
panic("not implemented")
}
func (t *testGater) InterceptAddrDial(id peer.ID, multiaddr ma.Multiaddr) (allow bool) {
panic("not implemented")
}
func (t *testGater) InterceptAccept(multiaddrs network.ConnMultiaddrs) (allow bool) {
t.Lock()
defer t.Unlock()
return !t.blockAccept
}
func (t *testGater) InterceptSecured(direction network.Direction, id peer.ID, multiaddrs network.ConnMultiaddrs) (allow bool) {
t.Lock()
defer t.Unlock()
return !t.blockSecured
}
func (t *testGater) InterceptUpgraded(conn network.Conn) (allow bool, reason control.DisconnectReason) {
panic("not implemented")
}
......@@ -74,22 +74,22 @@ github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS
github.com/libp2p/go-libp2p-core v0.2.0/go.mod h1:X0eyB0Gy93v0DZtSYbEM7RnMChm9Uv3j7yRXjO77xSI=
github.com/libp2p/go-libp2p-core v0.3.1/go.mod h1:thvWy0hvaSBhnVBaW37BvzgVV68OUhgJJLAa6almrII=
github.com/libp2p/go-libp2p-core v0.5.0/go.mod h1:49XGI+kc38oGVwqSBhDEwytaAxgZasHhFfQKibzTls0=
github.com/libp2p/go-libp2p-core v0.5.3 h1:b9W3w7AZR2n/YJhG8d0qPFGhGhCWKIvPuJgp4hhc4MM=
github.com/libp2p/go-libp2p-core v0.5.3/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y=
github.com/libp2p/go-libp2p-core v0.5.5 h1:/yiFUZDoBWqvpWeHHJ1iA8SOs5obT1/+UdNfckwD57M=
github.com/libp2p/go-libp2p-core v0.5.5/go.mod h1:vj3awlOr9+GMZJFH9s4mpt9RHHgGqeHCopzbYKZdRjM=
github.com/libp2p/go-libp2p-mplex v0.2.3 h1:2zijwaJvpdesST2MXpI5w9wWFRgYtMcpRX7rrw0jmOo=
github.com/libp2p/go-libp2p-mplex v0.2.3/go.mod h1:CK3p2+9qH9x+7ER/gWWDYJ3QW5ZxWDkm+dVvjfuG3ek=
github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-testing v0.1.1 h1:U03z3HnGI7Ni8Xx6ONVZvUFOAzWYmolWf5W5jAOPNmU=
github.com/libp2p/go-libp2p-testing v0.1.1/go.mod h1:xaZWMJrPUM5GlDBxCeGUi7kI4eqnjVyavGroI2nxEM0=
github.com/libp2p/go-maddr-filter v0.0.5 h1:CW3AgbMO6vUvT4kf87y4N+0P8KUl2aqLYhrGyDUbLSg=
github.com/libp2p/go-maddr-filter v0.0.5/go.mod h1:Jk+36PMfIqCJhAnaASRH83bdAvfDRp/w6ENFaC9bG+M=
github.com/libp2p/go-mplex v0.1.2 h1:qOg1s+WdGLlpkrczDqmhYzyk3vCfsQ8+RxRTQjOZWwI=
github.com/libp2p/go-mplex v0.1.2/go.mod h1:Xgz2RDCi3co0LeZfgjm4OgUF15+sVR8SRcu3SFXI1lk=
github.com/libp2p/go-msgio v0.0.4 h1:agEFehY3zWJFUHK6SEMR7UYmk2z6kC3oeCM7ybLhguA=
github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
github.com/libp2p/go-openssl v0.0.4 h1:d27YZvLoTyMhIN4njrkr8zMDOM4lfpHIp6A+TK9fovg=
github.com/libp2p/go-openssl v0.0.4/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc=
github.com/libp2p/go-openssl v0.0.5 h1:pQkejVhF0xp08D4CQUcw8t+BFJeXowja6RVcb5p++EA=
github.com/libp2p/go-openssl v0.0.5/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc=
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
......@@ -108,7 +108,6 @@ github.com/mr-tron/base58 v1.1.3 h1:v+sk57XuaCKGXpWtVBX8YJzO7hMGx4Aajh4TQbdEFdc=
github.com/mr-tron/base58 v1.1.3/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI=
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-multiaddr v0.0.1/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr v0.0.4/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multiaddr v0.2.0/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y95ncPj0vFwVF6k6wJ4=
github.com/multiformats/go-multiaddr v0.2.1 h1:SgG/cw5vqyB5QQe5FPe2TqggU9WtrA9X4nZw7LlVqOI=
......@@ -172,7 +171,6 @@ golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8 h1:1wopBVtVdWnn03fZelqdXTqk7U7zPQCb+T4rbU9ZEoU=
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 h1:IcSOAf4PyMp3U3XbIEj1/xJ2BjNN2jWv7JoyOsMxXUU=
golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
......
......@@ -85,6 +85,17 @@ func (l *listener) handleIncoming() {
return
}
// gate the connection if applicable
if l.upgrader.ConnGater != nil && !l.upgrader.ConnGater.InterceptAccept(maconn) {
log.Debugf("gater blocked incoming connection on local addr %s from %s",
maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
if err := maconn.Close(); err != nil {
log.Warnf("failed to incoming connection rejected by gater; err: %s", err)
}
continue
}
// The go routine below calls Release when the context is
// canceled so there's no need to wait on it here.
l.threshold.Wait()
......
package stream_test
import (
"context"
"errors"
"io"
"net"
"sync"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/sec/insecure"
"github.com/libp2p/go-libp2p-core/transport"
mplex "github.com/libp2p/go-libp2p-mplex"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
......@@ -23,94 +18,10 @@ import (
"github.com/stretchr/testify/require"
)
// negotiatingMuxer sets up a new mplex connection
// It makes sure that this happens at the same time for client and server.
type negotiatingMuxer struct{}
func (m *negotiatingMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
var err error
// run a fake muxer negotiation
if isServer {
_, err = c.Write([]byte("setup"))
} else {
_, err = c.Read(make([]byte, 5))
}
if err != nil {
return nil, err
}
return mplex.DefaultTransport.NewConn(c, isServer)
}
// blockingMuxer blocks the muxer negotiation until the contain chan is closed
type blockingMuxer struct {
unblock chan struct{}
}
var _ mux.Multiplexer = &blockingMuxer{}
func newBlockingMuxer() *blockingMuxer {
return &blockingMuxer{unblock: make(chan struct{})}
}
func (m *blockingMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
<-m.unblock
return (&negotiatingMuxer{}).NewConn(c, isServer)
}
func (m *blockingMuxer) Unblock() {
close(m.unblock)
}
// errorMuxer is a muxer that errors while setting up
type errorMuxer struct{}
var _ mux.Multiplexer = &errorMuxer{}
func (m *errorMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
return nil, errors.New("mux error")
}
var (
defaultUpgrader = &st.Upgrader{
Secure: insecure.New(peer.ID(1)),
Muxer: &negotiatingMuxer{},
}
)
func init() {
transport.AcceptTimeout = 1 * time.Hour
}
func testConn(t *testing.T, clientConn, serverConn transport.CapableConn) {
t.Helper()
require := require.New(t)
cstr, err := clientConn.OpenStream()
require.NoError(err)
_, err = cstr.Write([]byte("foobar"))
require.NoError(err)
sstr, err := serverConn.AcceptStream()
require.NoError(err)
b := make([]byte, 6)
_, err = sstr.Read(b)
require.NoError(err)
require.Equal([]byte("foobar"), b)
}
func dial(t *testing.T, upgrader *st.Upgrader, raddr ma.Multiaddr, p peer.ID) (transport.CapableConn, error) {
t.Helper()
macon, err := manet.Dial(raddr)
if err != nil {
return nil, err
}
return upgrader.UpgradeOutbound(context.Background(), nil, macon, p)
}
func createListener(t *testing.T, upgrader *st.Upgrader) transport.Listener {
t.Helper()
require := require.New(t)
......@@ -385,3 +296,49 @@ func TestAcceptQueueBacklogged(t *testing.T) {
require.Eventually(func() bool { return len(errCh) == st.AcceptQueueLength+1 }, 2*time.Second, 100*time.Millisecond)
}
func TestListenerConnectionGater(t *testing.T) {
require := require.New(t)
testGater := &testGater{}
upgrader := *defaultUpgrader
upgrader.ConnGater = testGater
ln := createListener(t, &upgrader)
defer ln.Close()
// no gating.
conn, err := dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.NoError(err)
require.False(conn.IsClosed())
_ = conn.Close()
// rejecting after handshake.
testGater.BlockSecured(true)
testGater.BlockAccept(false)
conn, err = dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.Error(err)
require.Nil(conn)
// rejecting on accept will trigger first.
testGater.BlockSecured(true)
testGater.BlockAccept(true)
conn, err = dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.Error(err)
require.Nil(conn)
// rejecting only on acceptance.
testGater.BlockSecured(false)
testGater.BlockAccept(true)
conn, err = dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.Error(err)
require.Nil(conn)
// back to normal
testGater.BlockSecured(false)
testGater.BlockAccept(false)
conn, err = dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.NoError(err)
require.False(conn.IsClosed())
_ = conn.Close()
}
......@@ -4,8 +4,10 @@ import (
"context"
"errors"
"fmt"
"github.com/libp2p/go-libp2p-core/network"
"net"
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/peer"
ipnet "github.com/libp2p/go-libp2p-core/pnet"
......@@ -13,7 +15,6 @@ import (
"github.com/libp2p/go-libp2p-core/transport"
"github.com/libp2p/go-libp2p-pnet"
filter "github.com/libp2p/go-maddr-filter"
manet "github.com/multiformats/go-multiaddr-net"
)
......@@ -27,10 +28,10 @@ var AcceptQueueLength = 16
// Upgrader is a multistream upgrader that can upgrade an underlying connection
// to a full transport connection (secure and multiplexed).
type Upgrader struct {
PSK ipnet.PSK
Secure sec.SecureTransport
Muxer mux.Multiplexer
Filters *filter.Filters
PSK ipnet.PSK
Secure sec.SecureTransport
Muxer mux.Multiplexer
ConnGater connmgr.ConnectionGater
}
// UpgradeListener upgrades the passed multiaddr-net listener into a full libp2p-transport listener.
......@@ -55,22 +56,16 @@ func (u *Upgrader) UpgradeOutbound(ctx context.Context, t transport.Transport, m
if p == "" {
return nil, ErrNilPeer
}
return u.upgrade(ctx, t, maconn, p)
return u.upgrade(ctx, t, maconn, p, network.DirOutbound)
}
// UpgradeInbound upgrades the given inbound multiaddr-net connection into a
// full libp2p-transport connection.
func (u *Upgrader) UpgradeInbound(ctx context.Context, t transport.Transport, maconn manet.Conn) (transport.CapableConn, error) {
return u.upgrade(ctx, t, maconn, "")
return u.upgrade(ctx, t, maconn, "", network.DirInbound)
}
func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, p peer.ID) (transport.CapableConn, error) {
if u.Filters != nil && u.Filters.AddrBlocked(maconn.RemoteMultiaddr()) {
log.Debugf("blocked connection from %s", maconn.RemoteMultiaddr())
maconn.Close()
return nil, fmt.Errorf("blocked connection from %s", maconn.RemoteMultiaddr())
}
func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, p peer.ID, dir network.Direction) (transport.CapableConn, error) {
var conn net.Conn = maconn
if u.PSK != nil {
pconn, err := pnet.NewProtectedConn(u.PSK, conn)
......@@ -91,18 +86,29 @@ func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
return nil, fmt.Errorf("failed to negotiate security protocol: %s", err)
}
// call the connection gater, if one is registered.
if u.ConnGater != nil && !u.ConnGater.InterceptSecured(dir, sconn.RemotePeer(), maconn) {
if err := maconn.Close(); err != nil {
log.Errorf("failed to close connection with peer %s and addr %s; err: %s",
p.Pretty(), maconn.RemoteMultiaddr(), err)
}
return nil, fmt.Errorf("gater rejected connection with peer %s and addr %s with direction %d",
sconn.RemotePeer().Pretty(), maconn.RemoteMultiaddr(), dir)
}
smconn, err := u.setupMuxer(ctx, sconn, p)
if err != nil {
sconn.Close()
return nil, fmt.Errorf("failed to negotiate stream multiplexer: %s", err)
}
return &transportConn{
tc := &transportConn{
MuxedConn: smconn,
ConnMultiaddrs: maconn,
ConnSecurity: sconn,
transport: t,
}, nil
}
return tc, nil
}
func (u *Upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID) (sec.SecureConn, error) {
......
package stream_test
import (
"context"
"errors"
"net"
"testing"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/sec/insecure"
"github.com/libp2p/go-libp2p-core/transport"
mplex "github.com/libp2p/go-libp2p-mplex"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
"github.com/stretchr/testify/require"
st "github.com/libp2p/go-libp2p-transport-upgrader"
)
var (
defaultUpgrader = &st.Upgrader{
Secure: insecure.New(peer.ID(1)),
Muxer: &negotiatingMuxer{},
}
)
// negotiatingMuxer sets up a new mplex connection
// It makes sure that this happens at the same time for client and server.
type negotiatingMuxer struct{}
func (m *negotiatingMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
var err error
// run a fake muxer negotiation
if isServer {
_, err = c.Write([]byte("setup"))
} else {
_, err = c.Read(make([]byte, 5))
}
if err != nil {
return nil, err
}
return mplex.DefaultTransport.NewConn(c, isServer)
}
// blockingMuxer blocks the muxer negotiation until the contain chan is closed
type blockingMuxer struct {
unblock chan struct{}
}
var _ mux.Multiplexer = &blockingMuxer{}
func newBlockingMuxer() *blockingMuxer {
return &blockingMuxer{unblock: make(chan struct{})}
}
func (m *blockingMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
<-m.unblock
return (&negotiatingMuxer{}).NewConn(c, isServer)
}
func (m *blockingMuxer) Unblock() {
close(m.unblock)
}
// errorMuxer is a muxer that errors while setting up
type errorMuxer struct{}
var _ mux.Multiplexer = &errorMuxer{}
func (m *errorMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
return nil, errors.New("mux error")
}
func testConn(t *testing.T, clientConn, serverConn transport.CapableConn) {
t.Helper()
require := require.New(t)
cstr, err := clientConn.OpenStream()
require.NoError(err)
_, err = cstr.Write([]byte("foobar"))
require.NoError(err)
sstr, err := serverConn.AcceptStream()
require.NoError(err)
b := make([]byte, 6)
_, err = sstr.Read(b)
require.NoError(err)
require.Equal([]byte("foobar"), b)
}
func dial(t *testing.T, upgrader *st.Upgrader, raddr ma.Multiaddr, p peer.ID) (transport.CapableConn, error) {
t.Helper()
macon, err := manet.Dial(raddr)
if err != nil {
return nil, err
}
return upgrader.UpgradeOutbound(context.Background(), nil, macon, p)
}
func TestOutboundConnectionGating(t *testing.T) {
require := require.New(t)
ln := createListener(t, defaultUpgrader)
defer ln.Close()
testGater := &testGater{}
upgrader := *defaultUpgrader
upgrader.ConnGater = testGater
conn, err := dial(t, &upgrader, ln.Multiaddr(), peer.ID(2))
require.NoError(err)
require.NotNil(conn)
_ = conn.Close()
// blocking accepts doesn't affect the dialling side, only the listener.
testGater.BlockAccept(true)
conn, err = dial(t, &upgrader, ln.Multiaddr(), peer.ID(2))
require.NoError(err)
require.NotNil(conn)
_ = conn.Close()
// now let's block all connections after being secured.
testGater.BlockSecured(true)
conn, err = dial(t, &upgrader, ln.Multiaddr(), peer.ID(2))
require.Error(err)
require.Contains(err.Error(), "gater rejected connection")
require.Nil(conn)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment