Commit e095ac86 authored by Steven Allen's avatar Steven Allen

refactor for transport interface changes

Also, refactor out reuseport logic into a separate package.
parent 28520324
......@@ -15,9 +15,9 @@
},
{
"author": "whyrusleeping",
"hash": "QmRK2LxanhK2gZq6k6R7vk5ZoYZk8ULSSTB7FzDsMUX6CB",
"hash": "QmcGXGdw9BWDysPJQHxJinjGHha3eEg4vzFETre4woNwcX",
"name": "go-multiaddr-net",
"version": "1.5.7"
"version": "1.6.0"
},
{
"author": "whyrusleeping",
......@@ -27,9 +27,9 @@
},
{
"author": "whyrusleeping",
"hash": "QmPUHzTLPZFYqv8WqcBTuMFYTgeom4uHHEaxzk7bd5GYZB",
"hash": "QmYnjSGtvn7LhrxCvwrU9uDWxKyg28uBYeXvgzTDDDzVy4",
"name": "go-libp2p-transport",
"version": "2.2.14"
"version": "3.0.0"
},
{
"hash": "QmTG23dvpBCBjqQwyDxV8CQT6jmS4PSftNr1VqHhE3MLy7",
......@@ -41,6 +41,30 @@
"hash": "QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb",
"name": "go-multiaddr",
"version": "1.2.6"
},
{
"author": "Stebalien",
"hash": "QmSieFUauuYnroStqmRAEgu9BMXDNY5LbtNgzXcFitBKXQ",
"name": "go-conn-security",
"version": "0.1.1"
},
{
"author": "stebalien",
"hash": "Qmc9kSXRd74qFRrkquZ9CTHF23xjDnPEuQpJyDWvVeg4qa",
"name": "go-reuseport-transport",
"version": "0.1.4"
},
{
"author": "steb",
"hash": "Qmf3ejfGWR8Bd3wKFBvwYGFMJ9TeKJwYJUc2WchXjMxzg7",
"name": "go-libp2p-transport-upgrader",
"version": "0.1.0"
},
{
"author": "whyrusleeping",
"hash": "QmZeGmoJ3bEwEe6Huz6GKcHENWZCx7DReuAS5va4zP24PB",
"name": "go-smux-multiplex",
"version": "3.0.8"
}
],
"gxVersion": "0.4.0",
......
package tcp
import (
"net"
"os"
"strings"
"syscall"
reuseport "github.com/libp2p/go-reuseport"
)
......@@ -35,31 +33,3 @@ func init() {
func ReuseportIsAvailable() bool {
return envReuseportVal && reuseport.Available()
}
// ReuseErrShouldRetry diagnoses whether to retry after a reuse error.
// if we failed to bind, we should retry. if bind worked and this is a
// real dial error (remote end didnt answer) then we should not retry.
func ReuseErrShouldRetry(err error) bool {
if err == nil {
return false // hey, it worked! no need to retry.
}
// if it's a network timeout error, it's a legitimate failure.
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
return false
}
errno, ok := err.(syscall.Errno)
if !ok { // not an errno? who knows what this is. retry.
return true
}
switch errno {
case syscall.EADDRINUSE, syscall.EADDRNOTAVAIL:
return true // failure to bind. retry.
case syscall.ECONNREFUSED:
return false // real dial error
default:
return true // optimistically default to retry.
}
}
package tcp
import (
"net"
"syscall"
"testing"
)
type netTimeoutErr struct {
timeout bool
}
func (e netTimeoutErr) Error() string {
return ""
}
func (e netTimeoutErr) Timeout() bool {
return e.timeout
}
func (e netTimeoutErr) Temporary() bool {
panic("not checked")
}
func TestReuseError(t *testing.T) {
var nte1 net.Error = &netTimeoutErr{true}
var nte2 net.Error = &netTimeoutErr{false}
cases := map[error]bool{
nil: false,
syscall.EADDRINUSE: true,
syscall.EADDRNOTAVAIL: true,
syscall.ECONNREFUSED: false,
nte1: false,
nte2: true, // this ones a little weird... we should check neterror.Temporary() too
// test 'default' to true
syscall.EBUSY: true,
}
for k, v := range cases {
if ReuseErrShouldRetry(k) != v {
t.Fatalf("expected %t for %#v", v, k)
}
}
}
......@@ -2,13 +2,12 @@ package tcp
import (
"context"
"fmt"
"net"
"sync"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
tpt "github.com/libp2p/go-libp2p-transport"
reuseport "github.com/libp2p/go-reuseport"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
rtpt "github.com/libp2p/go-reuseport-transport"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
mafmt "github.com/whyrusleeping/mafmt"
......@@ -16,275 +15,80 @@ import (
var log = logging.Logger("tcp-tpt")
// TcpTransport is the TCP transport.
type TcpTransport struct {
dlock sync.Mutex
dialers map[string]tpt.Dialer
// Connection upgrader for upgrading insecure stream connections to
// secure multiplex connections.
Upgrader *tptu.Upgrader
llock sync.Mutex
listeners map[string]tpt.Listener
// Explicitly disable reuseport.
DisableReuseport bool
reuse rtpt.Transport
}
var _ tpt.Transport = &TcpTransport{}
// NewTCPTransport creates a tcp transport object that tracks dialers and listeners
// created. It represents an entire tcp stack (though it might not necessarily be)
func NewTCPTransport() *TcpTransport {
return &TcpTransport{
dialers: make(map[string]tpt.Dialer),
listeners: make(map[string]tpt.Listener),
}
}
func (t *TcpTransport) Dialer(laddr ma.Multiaddr, opts ...tpt.DialOpt) (tpt.Dialer, error) {
if laddr == nil {
zaddr, err := ma.NewMultiaddr("/ip4/0.0.0.0/tcp/0")
if err != nil {
return nil, err
}
laddr = zaddr
}
t.dlock.Lock()
defer t.dlock.Unlock()
s := laddr.String()
d, found := t.dialers[s]
if found {
return d, nil
}
var doReuse bool
for _, o := range opts {
switch o := o.(type) {
case tpt.ReuseportOpt:
doReuse = bool(o)
default:
return nil, fmt.Errorf("unrecognized option: %#v", o)
}
}
tcpd, err := t.newTcpDialer(laddr, doReuse)
if err != nil {
return nil, err
}
t.dialers[s] = tcpd
return tcpd, nil
}
func (t *TcpTransport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) {
if !t.Matches(laddr) {
return nil, fmt.Errorf("tcp transport cannot listen on %q", laddr)
}
t.llock.Lock()
defer t.llock.Unlock()
s := laddr.String()
l, found := t.listeners[s]
if found {
return l, nil
}
list, err := manetListen(laddr)
if err != nil {
return nil, err
}
tlist := &tcpListener{
list: list,
transport: t,
}
t.listeners[s] = tlist
return tlist, nil
}
func manetListen(addr ma.Multiaddr) (manet.Listener, error) {
network, naddr, err := manet.DialArgs(addr)
if err != nil {
return nil, err
}
if ReuseportIsAvailable() {
nl, err := reuseport.Listen(network, naddr)
if err == nil {
// hey, it worked!
return manet.WrapNetListener(nl)
}
// reuseport is available, but we failed to listen. log debug, and retry normally.
log.Debugf("reuseport available, but failed to listen: %s %s, %s", network, naddr, err)
}
// either reuseport not available, or it failed. try normally.
return manet.Listen(addr)
}
func (t *TcpTransport) Matches(a ma.Multiaddr) bool {
return mafmt.TCP.Matches(a)
func NewTCPTransport(upgrader *tptu.Upgrader) *TcpTransport {
return &TcpTransport{Upgrader: upgrader}
}
type tcpDialer struct {
laddr ma.Multiaddr
doReuse bool
rd reuseport.Dialer
madialer manet.Dialer
pattern mafmt.Pattern
transport tpt.Transport
// CanDial returns true if this transport believes it can dial the given
// multiaddr.
func (t *TcpTransport) CanDial(addr ma.Multiaddr) bool {
return mafmt.TCP.Matches(addr)
}
var _ tpt.Dialer = &tcpDialer{}
func maddrToTcp(addr ma.Multiaddr) (*net.TCPAddr, error) {
la, err := manet.ToNetAddr(addr)
if err != nil {
return nil, err // something wrong with addr.
func (t *TcpTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (manet.Conn, error) {
if t.UseReuseport() {
return t.reuse.DialContext(ctx, raddr)
}
latcp, ok := la.(*net.TCPAddr)
if !ok {
return nil, fmt.Errorf("not a tcp multiaddr: %s", addr)
}
return latcp, nil
var d manet.Dialer
return d.DialContext(ctx, raddr)
}
func (t *TcpTransport) newTcpDialer(laddr ma.Multiaddr, doReuse bool) (*tcpDialer, error) {
// get the local net.Addr manually
la, err := maddrToTcp(laddr)
// Dial dials the peer at the remote address.
func (t *TcpTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tpt.Conn, error) {
conn, err := t.maDial(ctx, raddr)
if err != nil {
return nil, err
}
var pattern mafmt.Pattern
if TCP4.Matches(laddr) {
pattern = TCP4
} else if TCP6.Matches(laddr) {
pattern = TCP6
} else {
return nil, fmt.Errorf("local addr did not match TCP4 or TCP6: %s", laddr)
}
// Ignore the port when constructing the default (non-reuseport) dialer.
labase := *la
labase.Port = 0
dialer := &tcpDialer{
laddr: laddr,
pattern: pattern,
madialer: manet.Dialer{
Dialer: net.Dialer{
LocalAddr: &labase,
},
},
transport: t,
}
if doReuse && ReuseportIsAvailable() {
dialer.doReuse = true
dialer.rd = reuseport.Dialer{
D: net.Dialer{
LocalAddr: la,
},
}
}
return dialer, nil
}
func (d *tcpDialer) Dial(raddr ma.Multiaddr) (tpt.Conn, error) {
return d.DialContext(context.Background(), raddr)
return t.Upgrader.UpgradeOutbound(ctx, t, conn, p)
}
func (d *tcpDialer) DialContext(ctx context.Context, raddr ma.Multiaddr) (tpt.Conn, error) {
var c manet.Conn
var err error
if d.doReuse {
c, err = d.reuseDial(ctx, raddr)
} else {
c, err = d.madialer.DialContext(ctx, raddr)
}
if err != nil {
return nil, err
}
return &tcpConn{
Conn: c,
t: d.transport,
}, nil
// UseReuseport returns true if reuseport is enabled and available.
func (t *TcpTransport) UseReuseport() bool {
return !t.DisableReuseport && ReuseportIsAvailable()
}
func (d *tcpDialer) reuseDial(ctx context.Context, raddr ma.Multiaddr) (manet.Conn, error) {
network, netraddr, err := manet.DialArgs(raddr)
if err != nil {
return nil, err
}
rpev := log.EventBegin(ctx, "tptDialReusePort", logging.LoggableMap{
"raddr": raddr,
})
con, err := d.rd.DialContext(ctx, network, netraddr)
if err == nil {
rpev.Done()
return manet.WrapNetConn(con)
func (t *TcpTransport) maListen(laddr ma.Multiaddr) (manet.Listener, error) {
if t.UseReuseport() {
return t.reuse.Listen(laddr)
}
rpev.SetError(err)
rpev.Done()
if !ReuseErrShouldRetry(err) {
return nil, err
}
return d.madialer.DialContext(ctx, raddr)
}
var TCP4 = mafmt.And(mafmt.Base(ma.P_IP4), mafmt.Base(ma.P_TCP))
var TCP6 = mafmt.And(mafmt.Base(ma.P_IP6), mafmt.Base(ma.P_TCP))
func (d *tcpDialer) Matches(a ma.Multiaddr) bool {
return d.pattern.Matches(a)
}
type tcpListener struct {
list manet.Listener
transport tpt.Transport
return manet.Listen(laddr)
}
var _ tpt.Listener = &tcpListener{}
func (d *tcpListener) Accept() (tpt.Conn, error) {
c, err := d.list.Accept()
// Listen listens on the given multiaddr.
func (t *TcpTransport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) {
list, err := t.maListen(laddr)
if err != nil {
return nil, err
}
return &tcpConn{
Conn: c,
t: d.transport,
}, nil
}
func (d *tcpListener) Addr() net.Addr {
return d.list.Addr()
return t.Upgrader.UpgradeListener(t, list), nil
}
func (t *tcpListener) Multiaddr() ma.Multiaddr {
return t.list.Multiaddr()
// Protocols returns the list of terminal protocols this transport can dial.
func (t *TcpTransport) Protocols() []int {
return []int{ma.P_TCP}
}
func (t *tcpListener) NetListener() net.Listener {
return t.list.NetListener()
// Proxy always returns false for the TCP transport.
func (t *TcpTransport) Proxy() bool {
return false
}
func (d *tcpListener) Close() error {
return d.list.Close()
}
type tcpConn struct {
manet.Conn
t tpt.Transport
}
var _ tpt.Conn = &tcpConn{}
func (c *tcpConn) Transport() tpt.Transport {
return c.t
func (t *TcpTransport) String() string {
return "TCP"
}
......@@ -3,59 +3,50 @@ package tcp
import (
"testing"
tpt "github.com/libp2p/go-libp2p-transport"
insecure "github.com/libp2p/go-conn-security/insecure"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
utils "github.com/libp2p/go-libp2p-transport/test"
ma "github.com/multiformats/go-multiaddr"
mplex "github.com/whyrusleeping/go-smux-multiplex"
)
func TestTcpTransport(t *testing.T) {
ta := NewTCPTransport()
tb := NewTCPTransport()
zero := "/ip4/127.0.0.1/tcp/0"
utils.SubtestTransport(t, ta, tb, zero)
for i := 0; i < 2; i++ {
ta := NewTCPTransport(&tptu.Upgrader{
Secure: insecure.New("peerA"),
Muxer: new(mplex.Transport),
})
tb := NewTCPTransport(&tptu.Upgrader{
Secure: insecure.New("peerB"),
Muxer: new(mplex.Transport),
})
zero := "/ip4/127.0.0.1/tcp/0"
utils.SubtestTransport(t, ta, tb, zero, "peerA")
envReuseportVal = false
}
envReuseportVal = true
}
func TestTcpTransportCantListenUtp(t *testing.T) {
utpa, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/utp")
if err != nil {
t.Fatal(err)
}
for i := 0; i < 2; i++ {
utpa, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/utp")
if err != nil {
t.Fatal(err)
}
tpt := NewTCPTransport()
_, err = tpt.Listen(utpa)
if err == nil {
t.Fatal("shouldnt be able to listen on utp addr with tcp transport")
}
}
tpt := NewTCPTransport(&tptu.Upgrader{
Secure: insecure.New("peerB"),
Muxer: new(mplex.Transport),
})
func TestCorrectIPVersionMatching(t *testing.T) {
ta := NewTCPTransport()
addr4, err := ma.NewMultiaddr("/ip4/0.0.0.0/tcp/0")
if err != nil {
t.Fatal(err)
}
addr6, err := ma.NewMultiaddr("/ip6/::1/tcp/0")
if err != nil {
t.Fatal(err)
}
d4, err := ta.Dialer(addr4, tpt.ReuseportOpt(true))
if err != nil {
t.Fatal(err)
}
d6, err := ta.Dialer(addr6, tpt.ReuseportOpt(true))
if err != nil {
t.Fatal(err)
}
if d4.Matches(addr6) {
t.Fatal("tcp4 dialer should not match ipv6 address")
}
_, err = tpt.Listen(utpa)
if err == nil {
t.Fatal("shouldnt be able to listen on utp addr with tcp transport")
}
if d6.Matches(addr4) {
t.Fatal("tcp4 dialer should not match ipv6 address")
envReuseportVal = false
}
envReuseportVal = true
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment