tcp.go 4.49 KB
Newer Older
1 2 3 4
package tcp

import (
	"context"
5
	"net"
Steven Allen's avatar
Steven Allen committed
6
	"time"
7 8

	logging "github.com/ipfs/go-log"
9 10
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/transport"
11 12
	tptu "github.com/libp2p/go-libp2p-transport-upgrader"
	rtpt "github.com/libp2p/go-reuseport-transport"
13

Jeromy's avatar
Jeromy committed
14
	ma "github.com/multiformats/go-multiaddr"
15
	mafmt "github.com/multiformats/go-multiaddr-fmt"
16
	manet "github.com/multiformats/go-multiaddr/net"
17 18
)

Steven Allen's avatar
Steven Allen committed
19 20 21 22
// DefaultConnectTimeout is the (default) maximum amount of time the TCP
// transport will spend on the initial TCP connect before giving up.
var DefaultConnectTimeout = 5 * time.Second

23 24
var log = logging.Logger("tcp-tpt")

Marten Seemann's avatar
Marten Seemann committed
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
const keepAlivePeriod = 30 * time.Second

type canKeepAlive interface {
	SetKeepAlive(bool) error
	SetKeepAlivePeriod(time.Duration) error
}

var _ canKeepAlive = &net.TCPConn{}

func tryKeepAlive(conn net.Conn, keepAlive bool) {
	keepAliveConn, ok := conn.(canKeepAlive)
	if !ok {
		log.Errorf("Can't set TCP keepalives.")
		return
	}
	if err := keepAliveConn.SetKeepAlive(keepAlive); err != nil {
		log.Errorf("Failed to enable TCP keepalive: %s", err)
		return
	}
	if err := keepAliveConn.SetKeepAlivePeriod(keepAlivePeriod); err != nil {
		log.Errorf("Failed set keepalive period: %s", err)
	}
}

49 50
// try to set linger on the connection, if possible.
func tryLinger(conn net.Conn, sec int) {
Steven Allen's avatar
Steven Allen committed
51
	type canLinger interface {
52
		SetLinger(int) error
Steven Allen's avatar
Steven Allen committed
53 54 55
	}

	if lingerConn, ok := conn.(canLinger); ok {
56 57 58 59
		_ = lingerConn.SetLinger(sec)
	}
}

60
type tcpListener struct {
61 62 63 64
	manet.Listener
	sec int
}

65
func (ll *tcpListener) Accept() (manet.Conn, error) {
66 67 68 69 70
	c, err := ll.Listener.Accept()
	if err != nil {
		return nil, err
	}
	tryLinger(c, ll.sec)
Marten Seemann's avatar
Marten Seemann committed
71
	tryKeepAlive(c, true)
72 73 74
	return c, nil
}

75
// TcpTransport is the TCP transport.
76
type TcpTransport struct {
77 78 79
	// Connection upgrader for upgrading insecure stream connections to
	// secure multiplex connections.
	Upgrader *tptu.Upgrader
80

81 82 83
	// Explicitly disable reuseport.
	DisableReuseport bool

Steven Allen's avatar
Steven Allen committed
84 85 86
	// TCP connect timeout
	ConnectTimeout time.Duration

87
	reuse rtpt.Transport
88 89
}

90
var _ transport.Transport = &TcpTransport{}
91

92 93
// NewTCPTransport creates a tcp transport object that tracks dialers and listeners
// created. It represents an entire tcp stack (though it might not necessarily be)
94
func NewTCPTransport(upgrader *tptu.Upgrader) *TcpTransport {
Steven Allen's avatar
Steven Allen committed
95
	return &TcpTransport{Upgrader: upgrader, ConnectTimeout: DefaultConnectTimeout}
96 97
}

98 99
var dialMatcher = mafmt.And(mafmt.IP, mafmt.Base(ma.P_TCP))

100 101 102
// CanDial returns true if this transport believes it can dial the given
// multiaddr.
func (t *TcpTransport) CanDial(addr ma.Multiaddr) bool {
103
	return dialMatcher.Matches(addr)
104 105
}

106
func (t *TcpTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (manet.Conn, error) {
Steven Allen's avatar
Steven Allen committed
107 108 109 110 111 112 113 114 115 116
	// Apply the deadline iff applicable
	if t.ConnectTimeout > 0 {
		deadline := time.Now().Add(t.ConnectTimeout)
		if d, ok := ctx.Deadline(); !ok || deadline.Before(d) {
			var cancel func()
			ctx, cancel = context.WithDeadline(ctx, deadline)
			defer cancel()
		}
	}

117 118
	if t.UseReuseport() {
		return t.reuse.DialContext(ctx, raddr)
119
	}
120 121
	var d manet.Dialer
	return d.DialContext(ctx, raddr)
122 123
}

124
// Dial dials the peer at the remote address.
125
func (t *TcpTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (transport.CapableConn, error) {
126
	conn, err := t.maDial(ctx, raddr)
127
	if err != nil {
128
		return nil, err
129
	}
130 131 132 133
	// Set linger to 0 so we never get stuck in the TIME-WAIT state. When
	// linger is 0, connections are _reset_ instead of closed with a FIN.
	// This means we can immediately reuse the 5-tuple and reconnect.
	tryLinger(conn, 0)
Marten Seemann's avatar
Marten Seemann committed
134
	tryKeepAlive(conn, true)
135
	return t.Upgrader.UpgradeOutbound(ctx, t, conn, p)
136 137
}

138 139 140
// UseReuseport returns true if reuseport is enabled and available.
func (t *TcpTransport) UseReuseport() bool {
	return !t.DisableReuseport && ReuseportIsAvailable()
141 142
}

143 144 145
func (t *TcpTransport) maListen(laddr ma.Multiaddr) (manet.Listener, error) {
	if t.UseReuseport() {
		return t.reuse.Listen(laddr)
146
	}
147
	return manet.Listen(laddr)
148 149
}

150
// Listen listens on the given multiaddr.
151
func (t *TcpTransport) Listen(laddr ma.Multiaddr) (transport.Listener, error) {
152
	list, err := t.maListen(laddr)
153 154 155
	if err != nil {
		return nil, err
	}
156
	list = &tcpListener{list, 0}
157
	return t.Upgrader.UpgradeListener(t, list), nil
158 159
}

160 161 162
// Protocols returns the list of terminal protocols this transport can dial.
func (t *TcpTransport) Protocols() []int {
	return []int{ma.P_TCP}
163 164
}

165 166 167
// Proxy always returns false for the TCP transport.
func (t *TcpTransport) Proxy() bool {
	return false
168 169
}

170 171
func (t *TcpTransport) String() string {
	return "TCP"
172
}