Commit fe903f16 authored by Steven Allen's avatar Steven Allen

initial commit

parents
package tcpreuse
import (
"context"
"net"
reuseport "github.com/libp2p/go-reuseport"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)
type dialer interface {
Dial(network, addr string) (net.Conn, error)
DialContext(ctx context.Context, network, addr string) (net.Conn, error)
}
func (t *Transport) Dial(raddr ma.Multiaddr) (manet.Conn, error) {
return t.DialContext(context.Background(), raddr)
}
func (t *Transport) DialContext(ctx context.Context, raddr ma.Multiaddr) (manet.Conn, error) {
network, addr, err := manet.DialArgs(raddr)
if err != nil {
return nil, err
}
var d dialer
switch network {
case "tcp4":
d = t.v4.getDialer(network)
case "tcp6":
d = t.v6.getDialer(network)
default:
return nil, ErrWrongProto
}
conn, err := d.DialContext(ctx, network, addr)
if err != nil {
return nil, err
}
maconn, err := manet.WrapNetConn(conn)
if err != nil {
conn.Close()
return nil, err
}
return maconn, nil
}
func (n *network) getDialer(network string) dialer {
n.mu.RLock()
d := n.dialer
n.mu.RUnlock()
if d == nil {
n.mu.Lock()
defer n.mu.Unlock()
if n.dialer == nil {
n.dialer = n.makeDialer(network)
}
d = n.dialer
}
return d
}
func (n *network) makeDialer(network string) dialer {
if !reuseport.Available() {
log.Debug("reuseport not available")
return &net.Dialer{}
}
var unspec net.IP
switch network {
case "tcp4":
unspec = net.IPv4zero
case "tcp6":
unspec = net.IPv6unspecified
default:
panic("invalid network: must be either tcp4 or tcp6")
}
// How many ports are we listening on.
var port = 0
for l := range n.listeners {
if port == 0 {
port = l.Addr().(*net.TCPAddr).Port
} else {
// > 1
return newMultiDialer(unspec, n.listeners)
}
}
// None.
if port == 0 {
return &net.Dialer{}
}
// One. Always dial from the single port we're listening on.
laddr := &net.TCPAddr{
IP: unspec,
Port: port,
}
return (*singleDialer)(laddr)
}
package tcpreuse
import (
"net"
reuseport "github.com/libp2p/go-reuseport"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)
type listener struct {
manet.Listener
network *network
}
func (l *listener) Close() error {
l.network.mu.Lock()
delete(l.network.listeners, l)
l.network.mu.Unlock()
return l.Listener.Close()
}
func (t *Transport) Listen(laddr ma.Multiaddr) (manet.Listener, error) {
nw, naddr, err := manet.DialArgs(laddr)
if err != nil {
return nil, err
}
var n *network
switch nw {
case "tcp4":
n = &t.v4
case "tcp6":
n = &t.v6
default:
return nil, ErrWrongProto
}
if !reuseport.Available() {
return manet.Listen(laddr)
}
nl, err := reuseport.Listen(nw, naddr)
if err != nil {
return manet.Listen(laddr)
}
if _, ok := nl.Addr().(*net.TCPAddr); !ok {
nl.Close()
return nil, ErrWrongProto
}
malist, err := manet.WrapNetListener(nl)
if err != nil {
nl.Close()
return nil, err
}
list := &listener{
Listener: malist,
network: n,
}
n.mu.Lock()
defer n.mu.Unlock()
if n.listeners == nil {
n.listeners = make(map[*listener]struct{})
}
n.listeners[list] = struct{}{}
n.dialer = nil
return list, nil
}
package tcpreuse
import (
"context"
"fmt"
"math/rand"
"net"
)
type multiDialer struct {
loopback []*net.TCPAddr
unspecified []*net.TCPAddr
global *net.TCPAddr
}
func (d *multiDialer) Dial(network, addr string) (net.Conn, error) {
return d.DialContext(context.Background(), network, addr)
}
func randAddr(addrs []*net.TCPAddr) *net.TCPAddr {
if len(addrs) > 0 {
return addrs[rand.Intn(len(addrs))]
}
return nil
}
func (d *multiDialer) DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
tcpAddr, err := net.ResolveTCPAddr(network, addr)
if err != nil {
return nil, err
}
ip := tcpAddr.IP
source := d.global
switch {
case ip.IsLoopback():
switch {
case len(d.loopback) > 0:
source = randAddr(d.loopback)
case len(d.unspecified) > 0:
source = randAddr(d.unspecified)
}
case ip.IsGlobalUnicast():
switch {
case len(d.unspecified) > 0:
source = randAddr(d.unspecified)
}
default:
return nil, fmt.Errorf("undialable IP: %s", tcpAddr.IP)
}
return reuseDial(ctx, source, network, addr)
}
func newMultiDialer(unspec net.IP, listeners map[*listener]struct{}) dialer {
m := new(multiDialer)
for l := range listeners {
laddr := l.Addr().(*net.TCPAddr)
switch {
case laddr.IP.IsLoopback():
m.loopback = append(m.loopback, laddr)
case laddr.IP.IsGlobalUnicast():
// Different global ports? Crap.
//
// The *proper* way to deal with this is to, e.g., use
// netlink to figure out which source address we would
// normally use to dial a destination address and then
// pick one of the ports we're listening on on that
// source address. However, this is a pain in the ass.
//
// Instead, we're just going to always dial from the
// unspecified address with the first global port we
// find.
//
// TODO: Port priority? Addr priority?
if m.global != nil {
m.global = &net.TCPAddr{
IP: unspec,
Port: laddr.Port,
}
} else {
log.Warning("listening on external interfaces on multiple ports, will dial from %d, not %s", m.global, laddr)
}
case laddr.IP.IsUnspecified():
m.unspecified = append(m.unspecified, laddr)
}
}
return m
}
package tcpreuse
import (
"context"
"net"
"syscall"
reuseport "github.com/libp2p/go-reuseport"
)
// ReuseErrShouldRetry diagnoses whether to retry after a reuse error.
// if we failed to bind, we should retry. if bind worked and this is a
// real dial error (remote end didnt answer) then we should not retry.
func ReuseErrShouldRetry(err error) bool {
if err == nil {
return false // hey, it worked! no need to retry.
}
// if it's a network timeout error, it's a legitimate failure.
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
return false
}
errno, ok := err.(syscall.Errno)
if !ok { // not an errno? who knows what this is. retry.
return true
}
switch errno {
case syscall.EADDRINUSE, syscall.EADDRNOTAVAIL:
return true // failure to bind. retry.
case syscall.ECONNREFUSED:
return false // real dial error
default:
return true // optimistically default to retry.
}
}
// Dials using reusport and then redials normally if that fails.
func reuseDial(ctx context.Context, laddr *net.TCPAddr, network, raddr string) (net.Conn, error) {
if laddr == nil {
var d net.Dialer
return d.DialContext(ctx, network, raddr)
}
d := reuseport.Dialer{
D: net.Dialer{
LocalAddr: laddr,
},
}
con, err := d.DialContext(ctx, network, raddr)
if err != nil {
return con, err
}
if ReuseErrShouldRetry(err) && ctx.Err() == nil {
log.Debug("failed to reuse port, dialing with a random port")
var d net.Dialer
con, err = d.DialContext(ctx, network, raddr)
}
return con, err
}
package tcpreuse
import (
"net"
"syscall"
"testing"
)
type netTimeoutErr struct {
timeout bool
}
func (e netTimeoutErr) Error() string {
return ""
}
func (e netTimeoutErr) Timeout() bool {
return e.timeout
}
func (e netTimeoutErr) Temporary() bool {
panic("not checked")
}
func TestReuseError(t *testing.T) {
var nte1 net.Error = &netTimeoutErr{true}
var nte2 net.Error = &netTimeoutErr{false}
cases := map[error]bool{
nil: false,
syscall.EADDRINUSE: true,
syscall.EADDRNOTAVAIL: true,
syscall.ECONNREFUSED: false,
nte1: false,
nte2: true, // this ones a little weird... we should check neterror.Temporary() too
// test 'default' to true
syscall.EBUSY: true,
}
for k, v := range cases {
if ReuseErrShouldRetry(k) != v {
t.Fatalf("expected %t for %#v", v, k)
}
}
}
package tcpreuse
import (
"context"
"net"
)
type singleDialer net.TCPAddr
func (d *singleDialer) Dial(network, address string) (net.Conn, error) {
return d.DialContext(context.Background(), network, address)
}
func (d *singleDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
return reuseDial(ctx, (*net.TCPAddr)(d), network, address)
}
package tcpreuse
import (
"errors"
"sync"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("reuseport-transport")
// ErrWrongProto is returned when dialing a protocol other than tcp.
var ErrWrongProto = errors.New("can only dial TCP over IPv4 or IPv6")
// Transport is a TCP reuse transport that reuses listener ports.
type Transport struct {
v4 network
v6 network
}
type network struct {
mu sync.RWMutex
listeners map[*listener]struct{}
dialer dialer
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment