tcp.go 5.76 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
package tcp

import (
	"context"
	"fmt"
	"net"
	"sync"

	logging "github.com/ipfs/go-log"
	tpt "github.com/libp2p/go-libp2p-transport"
Steven Allen's avatar
Steven Allen committed
11
	reuseport "github.com/libp2p/go-reuseport"
Jeromy's avatar
Jeromy committed
12 13
	ma "github.com/multiformats/go-multiaddr"
	manet "github.com/multiformats/go-multiaddr-net"
14 15 16 17 18 19 20 21 22 23 24 25 26
	mafmt "github.com/whyrusleeping/mafmt"
)

var log = logging.Logger("tcp-tpt")

type TcpTransport struct {
	dlock   sync.Mutex
	dialers map[string]tpt.Dialer

	llock     sync.Mutex
	listeners map[string]tpt.Listener
}

27 28
var _ tpt.Transport = &TcpTransport{}

29 30 31 32 33 34 35 36 37 38
// NewTCPTransport creates a tcp transport object that tracks dialers and listeners
// created. It represents an entire tcp stack (though it might not necessarily be)
func NewTCPTransport() *TcpTransport {
	return &TcpTransport{
		dialers:   make(map[string]tpt.Dialer),
		listeners: make(map[string]tpt.Listener),
	}
}

func (t *TcpTransport) Dialer(laddr ma.Multiaddr, opts ...tpt.DialOpt) (tpt.Dialer, error) {
Jeromy's avatar
Jeromy committed
39 40 41 42 43 44 45
	if laddr == nil {
		zaddr, err := ma.NewMultiaddr("/ip4/0.0.0.0/tcp/0")
		if err != nil {
			return nil, err
		}
		laddr = zaddr
	}
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
	t.dlock.Lock()
	defer t.dlock.Unlock()
	s := laddr.String()
	d, found := t.dialers[s]
	if found {
		return d, nil
	}
	var doReuse bool
	for _, o := range opts {
		switch o := o.(type) {
		case tpt.ReuseportOpt:
			doReuse = bool(o)
		default:
			return nil, fmt.Errorf("unrecognized option: %#v", o)
		}
	}

63
	tcpd, err := t.newTcpDialer(laddr, doReuse)
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
	if err != nil {
		return nil, err
	}

	t.dialers[s] = tcpd
	return tcpd, nil
}

func (t *TcpTransport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) {
	if !t.Matches(laddr) {
		return nil, fmt.Errorf("tcp transport cannot listen on %q", laddr)
	}

	t.llock.Lock()
	defer t.llock.Unlock()
	s := laddr.String()
	l, found := t.listeners[s]
	if found {
		return l, nil
	}

	list, err := manetListen(laddr)
	if err != nil {
		return nil, err
	}

	tlist := &tcpListener{
		list:      list,
		transport: t,
	}

	t.listeners[s] = tlist
	return tlist, nil
}

func manetListen(addr ma.Multiaddr) (manet.Listener, error) {
	network, naddr, err := manet.DialArgs(addr)
	if err != nil {
		return nil, err
	}

	if ReuseportIsAvailable() {
		nl, err := reuseport.Listen(network, naddr)
		if err == nil {
			// hey, it worked!
			return manet.WrapNetListener(nl)
		}
		// reuseport is available, but we failed to listen. log debug, and retry normally.
		log.Debugf("reuseport available, but failed to listen: %s %s, %s", network, naddr, err)
	}

	// either reuseport not available, or it failed. try normally.
	return manet.Listen(addr)
}

func (t *TcpTransport) Matches(a ma.Multiaddr) bool {
	return mafmt.TCP.Matches(a)
}

type tcpDialer struct {
	laddr ma.Multiaddr

	doReuse bool

	rd       reuseport.Dialer
	madialer manet.Dialer
Jeromy's avatar
Jeromy committed
130
	pattern  mafmt.Pattern
131 132 133 134

	transport tpt.Transport
}

135 136
var _ tpt.Dialer = &tcpDialer{}

137 138 139 140 141 142 143 144 145 146 147 148 149
func maddrToTcp(addr ma.Multiaddr) (*net.TCPAddr, error) {
	la, err := manet.ToNetAddr(addr)
	if err != nil {
		return nil, err // something wrong with addr.
	}
	latcp, ok := la.(*net.TCPAddr)
	if !ok {
		return nil, fmt.Errorf("not a tcp multiaddr: %s", addr)
	}
	return latcp, nil
}

func (t *TcpTransport) newTcpDialer(laddr ma.Multiaddr, doReuse bool) (*tcpDialer, error) {
150
	// get the local net.Addr manually
151
	la, err := maddrToTcp(laddr)
152
	if err != nil {
153
		return nil, err
154 155
	}

Jeromy's avatar
Jeromy committed
156 157 158 159 160 161 162 163 164
	var pattern mafmt.Pattern
	if TCP4.Matches(laddr) {
		pattern = TCP4
	} else if TCP6.Matches(laddr) {
		pattern = TCP6
	} else {
		return nil, fmt.Errorf("local addr did not match TCP4 or TCP6: %s", laddr)
	}

165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
	// Ignore the port when constructing the default (non-reuseport) dialer.
	labase := *la
	labase.Port = 0

	dialer := &tcpDialer{
		laddr:   laddr,
		pattern: pattern,
		madialer: manet.Dialer{
			Dialer: net.Dialer{
				LocalAddr: &labase,
			},
		},
		transport: t,
	}

180
	if doReuse && ReuseportIsAvailable() {
181 182
		dialer.doReuse = true
		dialer.rd = reuseport.Dialer{
183 184 185
			D: net.Dialer{
				LocalAddr: la,
			},
186
		}
187 188
	}
	return dialer, nil
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
}

func (d *tcpDialer) Dial(raddr ma.Multiaddr) (tpt.Conn, error) {
	return d.DialContext(context.Background(), raddr)
}

func (d *tcpDialer) DialContext(ctx context.Context, raddr ma.Multiaddr) (tpt.Conn, error) {
	var c manet.Conn
	var err error
	if d.doReuse {
		c, err = d.reuseDial(ctx, raddr)
	} else {
		c, err = d.madialer.DialContext(ctx, raddr)
	}

	if err != nil {
		return nil, err
	}

208
	return &tcpConn{
209
		Conn: c,
210
		t:    d.transport,
211 212 213 214 215 216 217 218 219
	}, nil
}

func (d *tcpDialer) reuseDial(ctx context.Context, raddr ma.Multiaddr) (manet.Conn, error) {
	network, netraddr, err := manet.DialArgs(raddr)
	if err != nil {
		return nil, err
	}

220 221 222 223
	rpev := log.EventBegin(ctx, "tptDialReusePort", logging.LoggableMap{
		"raddr": raddr,
	})

224
	con, err := d.rd.DialContext(ctx, network, netraddr)
225 226 227 228
	if err == nil {
		rpev.Done()
		return manet.WrapNetConn(con)
	}
229 230
	rpev.SetError(err)
	rpev.Done()
231 232 233 234 235 236 237 238

	if !ReuseErrShouldRetry(err) {
		return nil, err
	}

	return d.madialer.DialContext(ctx, raddr)
}

Jeromy's avatar
Jeromy committed
239 240 241
var TCP4 = mafmt.And(mafmt.Base(ma.P_IP4), mafmt.Base(ma.P_TCP))
var TCP6 = mafmt.And(mafmt.Base(ma.P_IP6), mafmt.Base(ma.P_TCP))

242
func (d *tcpDialer) Matches(a ma.Multiaddr) bool {
Jeromy's avatar
Jeromy committed
243
	return d.pattern.Matches(a)
244 245 246 247 248 249 250
}

type tcpListener struct {
	list      manet.Listener
	transport tpt.Transport
}

251 252
var _ tpt.Listener = &tcpListener{}

253 254 255 256 257 258
func (d *tcpListener) Accept() (tpt.Conn, error) {
	c, err := d.list.Accept()
	if err != nil {
		return nil, err
	}

259
	return &tcpConn{
260
		Conn: c,
261
		t:    d.transport,
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
	}, nil
}

func (d *tcpListener) Addr() net.Addr {
	return d.list.Addr()
}

func (t *tcpListener) Multiaddr() ma.Multiaddr {
	return t.list.Multiaddr()
}

func (t *tcpListener) NetListener() net.Listener {
	return t.list.NetListener()
}

func (d *tcpListener) Close() error {
	return d.list.Close()
}
280 281 282 283 284 285 286 287 288 289 290

type tcpConn struct {
	manet.Conn
	t tpt.Transport
}

var _ tpt.Conn = &tcpConn{}

func (c *tcpConn) Transport() tpt.Transport {
	return c.t
}