tcp.go 3.87 KB
Newer Older
1 2 3 4
package tcp

import (
	"context"
5
	"net"
Steven Allen's avatar
Steven Allen committed
6
	"time"
7 8

	logging "github.com/ipfs/go-log"
9 10
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/transport"
11 12
	tptu "github.com/libp2p/go-libp2p-transport-upgrader"
	rtpt "github.com/libp2p/go-reuseport-transport"
13

Jeromy's avatar
Jeromy committed
14
	ma "github.com/multiformats/go-multiaddr"
15
	mafmt "github.com/multiformats/go-multiaddr-fmt"
Jeromy's avatar
Jeromy committed
16
	manet "github.com/multiformats/go-multiaddr-net"
17 18
)

Steven Allen's avatar
Steven Allen committed
19 20 21 22
// DefaultConnectTimeout is the (default) maximum amount of time the TCP
// transport will spend on the initial TCP connect before giving up.
var DefaultConnectTimeout = 5 * time.Second

23 24
var log = logging.Logger("tcp-tpt")

25 26
// try to set linger on the connection, if possible.
func tryLinger(conn net.Conn, sec int) {
Steven Allen's avatar
Steven Allen committed
27
	type canLinger interface {
28
		SetLinger(int) error
Steven Allen's avatar
Steven Allen committed
29 30 31
	}

	if lingerConn, ok := conn.(canLinger); ok {
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
		_ = lingerConn.SetLinger(sec)
	}
}

type lingerListener struct {
	manet.Listener
	sec int
}

func (ll *lingerListener) Accept() (manet.Conn, error) {
	c, err := ll.Listener.Accept()
	if err != nil {
		return nil, err
	}
	tryLinger(c, ll.sec)
	return c, nil
}

50
// TcpTransport is the TCP transport.
51
type TcpTransport struct {
52 53 54
	// Connection upgrader for upgrading insecure stream connections to
	// secure multiplex connections.
	Upgrader *tptu.Upgrader
55

56 57 58
	// Explicitly disable reuseport.
	DisableReuseport bool

Steven Allen's avatar
Steven Allen committed
59 60 61
	// TCP connect timeout
	ConnectTimeout time.Duration

62
	reuse rtpt.Transport
63 64
}

65
var _ transport.Transport = &TcpTransport{}
66

67 68
// NewTCPTransport creates a tcp transport object that tracks dialers and listeners
// created. It represents an entire tcp stack (though it might not necessarily be)
69
func NewTCPTransport(upgrader *tptu.Upgrader) *TcpTransport {
Steven Allen's avatar
Steven Allen committed
70
	return &TcpTransport{Upgrader: upgrader, ConnectTimeout: DefaultConnectTimeout}
71 72
}

73 74
var dialMatcher = mafmt.And(mafmt.IP, mafmt.Base(ma.P_TCP))

75 76 77
// CanDial returns true if this transport believes it can dial the given
// multiaddr.
func (t *TcpTransport) CanDial(addr ma.Multiaddr) bool {
78
	return dialMatcher.Matches(addr)
79 80
}

81
func (t *TcpTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (manet.Conn, error) {
Steven Allen's avatar
Steven Allen committed
82 83 84 85 86 87 88 89 90 91
	// Apply the deadline iff applicable
	if t.ConnectTimeout > 0 {
		deadline := time.Now().Add(t.ConnectTimeout)
		if d, ok := ctx.Deadline(); !ok || deadline.Before(d) {
			var cancel func()
			ctx, cancel = context.WithDeadline(ctx, deadline)
			defer cancel()
		}
	}

92 93
	if t.UseReuseport() {
		return t.reuse.DialContext(ctx, raddr)
94
	}
95 96
	var d manet.Dialer
	return d.DialContext(ctx, raddr)
97 98
}

99
// Dial dials the peer at the remote address.
100
func (t *TcpTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (transport.CapableConn, error) {
101
	conn, err := t.maDial(ctx, raddr)
102
	if err != nil {
103
		return nil, err
104
	}
105 106 107 108
	// Set linger to 0 so we never get stuck in the TIME-WAIT state. When
	// linger is 0, connections are _reset_ instead of closed with a FIN.
	// This means we can immediately reuse the 5-tuple and reconnect.
	tryLinger(conn, 0)
109
	return t.Upgrader.UpgradeOutbound(ctx, t, conn, p)
110 111
}

112 113 114
// UseReuseport returns true if reuseport is enabled and available.
func (t *TcpTransport) UseReuseport() bool {
	return !t.DisableReuseport && ReuseportIsAvailable()
115 116
}

117 118 119
func (t *TcpTransport) maListen(laddr ma.Multiaddr) (manet.Listener, error) {
	if t.UseReuseport() {
		return t.reuse.Listen(laddr)
120
	}
121
	return manet.Listen(laddr)
122 123
}

124
// Listen listens on the given multiaddr.
125
func (t *TcpTransport) Listen(laddr ma.Multiaddr) (transport.Listener, error) {
126
	list, err := t.maListen(laddr)
127 128 129
	if err != nil {
		return nil, err
	}
130
	list = &lingerListener{list, 0}
131
	return t.Upgrader.UpgradeListener(t, list), nil
132 133
}

134 135 136
// Protocols returns the list of terminal protocols this transport can dial.
func (t *TcpTransport) Protocols() []int {
	return []int{ma.P_TCP}
137 138
}

139 140 141
// Proxy always returns false for the TCP transport.
func (t *TcpTransport) Proxy() bool {
	return false
142 143
}

144 145
func (t *TcpTransport) String() string {
	return "TCP"
146
}