Commit 72d5d646 authored by tavit ohanian's avatar tavit ohanian

Merge branch 'port-2021-07-02'

parents a6d19bd9 87e91f69
Pipeline #736 failed with stages
in 17 seconds
# File managed by web3-bot. DO NOT EDIT.
# See https://github.com/protocol/.github/ for details.
# Automatically merge pull requests opened by web3-bot, as soon as (and only if) all tests pass.
# This reduces the friction associated with updating with our workflows.
on: [ pull_request ]
name: Automerge
jobs:
automerge-check:
if: github.event.pull_request.user.login == 'web3-bot'
runs-on: ubuntu-latest
outputs:
status: ${{ steps.should-automerge.outputs.status }}
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Check if we should automerge
id: should-automerge
run: |
for commit in $(git rev-list --first-parent origin/${{ github.event.pull_request.base.ref }}..${{ github.event.pull_request.head.sha }}); do
committer=$(git show --format=$'%ce' -s $commit)
echo "Committer: $committer"
if [[ "$committer" != "web3-bot@users.noreply.github.com" ]]; then
echo "Commit $commit wasn't committed by web3-bot, but by $committer."
echo "::set-output name=status::false"
exit
fi
done
echo "::set-output name=status::true"
automerge:
needs: automerge-check
runs-on: ubuntu-latest
if: ${{ needs.automerge-check.outputs.status == 'true' }}
steps:
- name: Wait on tests
uses: lewagon/wait-on-check-action@bafe56a6863672c681c3cf671f5e10b20abf2eaa # v0.2
with:
ref: ${{ github.event.pull_request.head.sha }}
repo-token: ${{ secrets.GITHUB_TOKEN }}
wait-interval: 10
running-workflow-name: 'automerge' # the name of this job
- name: Merge PR
uses: pascalgn/automerge-action@741c311a47881be9625932b0a0de1b0937aab1ae # v0.13.1
env:
GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}"
MERGE_LABELS: ""
MERGE_METHOD: "squash"
MERGE_DELETE_BRANCH: true
# File managed by web3-bot. DO NOT EDIT.
# See https://github.com/protocol/.github/ for details.
on: [push, pull_request]
name: Go Checks
jobs:
unit:
runs-on: ubuntu-latest
name: All
steps:
- uses: actions/checkout@v2
with:
submodules: recursive
- uses: actions/setup-go@v2
with:
go-version: "1.16.x"
- name: Install staticcheck
run: go install honnef.co/go/tools/cmd/staticcheck@434f5f3816b358fe468fa83dcba62d794e7fe04b # 2021.1 (v0.2.0)
- name: Check that go.mod is tidy
uses: protocol/multiple-go-modules@v1.0
with:
run: |
go mod tidy
if [[ -n $(git ls-files --other --exclude-standard --directory -- go.sum) ]]; then
echo "go.sum was added by go mod tidy"
exit 1
fi
git diff --exit-code -- go.sum go.mod
- name: gofmt
if: ${{ success() || failure() }} # run this step even if the previous one failed
run: |
out=$(gofmt -s -l .)
if [[ -n "$out" ]]; then
echo $out | awk '{print "::error file=" $0 ",line=0,col=0::File is not gofmt-ed."}'
exit 1
fi
- name: go vet
if: ${{ success() || failure() }} # run this step even if the previous one failed
uses: protocol/multiple-go-modules@v1.0
with:
run: go vet ./...
- name: staticcheck
if: ${{ success() || failure() }} # run this step even if the previous one failed
uses: protocol/multiple-go-modules@v1.0
with:
run: |
set -o pipefail
staticcheck ./... | sed -e 's@\(.*\)\.go@./\1.go@g'
# File managed by web3-bot. DO NOT EDIT.
# See https://github.com/protocol/.github/ for details.
on: [push, pull_request]
name: Go Test
jobs:
unit:
strategy:
fail-fast: false
matrix:
os: [ "ubuntu", "windows", "macos" ]
go: [ "1.15.x", "1.16.x" ]
runs-on: ${{ matrix.os }}-latest
name: ${{ matrix.os}} (go ${{ matrix.go }})
steps:
- uses: actions/checkout@v2
with:
submodules: recursive
- uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go }}
- name: Go information
run: |
go version
go env
- name: Run tests
uses: protocol/multiple-go-modules@v1.0
with:
run: go test -v -coverprofile coverage.txt ./...
- name: Run tests (32 bit)
if: ${{ matrix.os != 'macos' }} # can't run 32 bit tests on OSX.
uses: protocol/multiple-go-modules@v1.0
env:
GOARCH: 386
with:
run: go test -v ./...
- name: Run tests with race detector
if: ${{ matrix.os == 'ubuntu' }} # speed things up. Windows and OSX VMs are slow
uses: protocol/multiple-go-modules@v1.0
with:
run: go test -v -race ./...
- name: Upload coverage to Codecov
uses: codecov/codecov-action@a1ed4b322b4b38cb846afb5a0ebfa17086917d27 # v1.5.0
with:
file: coverage.txt
env_vars: OS=${{ matrix.os }}, GO=${{ matrix.go }}
stages:
- build
- test
variables:
BUILD_DIR: "/tmp/$CI_CONCURRENT_PROJECT_ID"
before_script:
- mkdir -p $BUILD_DIR/src
- cd $BUILD_DIR/src
- if [ -d $CI_PROJECT_DIR ]
- then
- echo "soft link $CI_PROJECT_DIR exists"
- else
- echo "creating soft link $CI_PROJECT_DIR"
- ln -s $CI_PROJECT_DIR
- fi
- cd $CI_PROJECT_DIR
build:
stage: build
tags:
- testing
script:
- echo $CI_JOB_STAGE
- go build
test:
stage: test
tags:
- testing
script:
- echo $CI_JOB_STAGE
- go test -cover
coverage: '/coverage: \d+.\d+% of statements/'
The MIT License (MIT)
Copyright (c) 2016 Jeromy Johnson
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
# go-tcp-transport go-tcp-transport
==================
dms3 p2p go-tcp-transport [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai)
\ No newline at end of file [![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](https://libp2p.io/)
[![](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p)
[![Discourse posts](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg)](https://discuss.libp2p.io)
[![Coverage Status](https://coveralls.io/repos/github/libp2p/go-tcp-transport/badge.svg?branch=master)](https://coveralls.io/github/libp2p/go-tcp-transport?branch=master)
[![Travis CI](https://travis-ci.com/libp2p/go-tcp-transport.svg?branch=master)](https://travis-ci.com/libp2p/go-tcp-transport)
> A libp2p transport implementation for tcp, including reuseport socket options.
`go-tcp-transport` is an implementation of the [libp2p transport
interface][concept-transport] that streams data over TCP/IP sockets. It is
included by default in the main [`go-libp2p`][go-libp2p] "entry point" module.
## Table of Contents
- [go-tcp-transport](#go-tcp-transport)
- [Table of Contents](#table-of-contents)
- [Install](#install)
- [Usage](#usage)
- [Security and Multiplexing](#security-and-multiplexing)
- [reuseport](#reuseport)
- [Contribute](#contribute)
- [License](#license)
## Install
`go-tcp-transport` is included as a dependency of `go-libp2p`, which is the most
common libp2p entry point. If you depend on `go-libp2p`, there is generally no
need to explicitly depend on this module.
`go-tcp-transport` is a standard Go module which can be installed with:
``` sh
go get github.com/libp2p/go-tcp-transport
```
This repo is [gomod](https://github.com/golang/go/wiki/Modules)-compatible, and users of
go 1.11 and later with modules enabled will automatically pull the latest tagged release
by referencing this package. Upgrades to future releases can be managed using `go get`,
or by editing your `go.mod` file as [described by the gomod documentation](https://github.com/golang/go/wiki/Modules#how-to-upgrade-and-downgrade-dependencies).
## Usage
TCP is one of the default transports enabled when constructing a standard libp2p
Host, along with [WebSockets](https://github.com/libp2p/go-ws-transport).
Calling [`libp2p.New`][godoc-libp2p-new] to construct a libp2p Host will enable
the TCP transport, unless you override the default transports by passing in
`Options` to `libp2p.New`.
To explicitly enable the TCP transport while constructing a host, use the
`libp2p.Transport` option, passing in the `NewTCPTransport` constructor function:
``` go
import (
"context"
libp2p "github.com/libp2p/go-libp2p"
tcp "github.com/libp2p/go-tcp-transport"
)
ctx := context.Background()
// TCP only:
h, err := libp2p.New(ctx,
libp2p.Transport(tcp.NewTCPTransport)
)
```
The example above will replace the default transports with a single TCP
transport. To add multiple tranports, use `ChainOptions`:
``` go
// TCP and QUIC:
h, err := libp2p.New(ctx,
libp2p.ChainOptions(
libp2p.Transport(tcp.NewTCPTransport),
libp2p.Transport(quic.NewTransport)) // see https://github.com/libp2p/go-libp2p-quic-transport
)
```
## Addresses
The TCP transport supports [multiaddrs][multiaddr] that contain a `tcp`
component, provided that there is sufficient addressing information for the IP
layer of the connection.
Examples:
| addr | description |
|----------------------------|----------------------------------------------------|
| `/ip4/1.2.3.4/tcp/1234` | IPv4: 1.2.3.4, TCP port 1234 |
| `/ip6/::1/tcp/1234` | IPv6 loopback, TCP port 1234 |
| `/dns4/example.com/tcp/80` | DNS over IPv4, hostname `example.com`, TCP port 80 |
Support for IP layer protocols is provided by the
[go-multiaddr-net](https://github.com/multiformats/go-multiaddr-net) module.
## Security and Multiplexing
Because TCP lacks native connection security and stream multiplexing facilities,
the TCP transport uses a [transport upgrader][transport-upgrader] to provide
those features. The transport upgrader negotiates transport security and
multiplexing for each connection according to the protocols supported by each
party.
## reuseport
The [`SO_REUSEPORT`][explain-reuseport] socket option allows multiple processes
or threads to bind to the same TCP port, provided that all of them set the
socket option. This has some performance benefits, and it can potentially assist
in NAT traversal by only requiring one port to be accessible for many
connections.
The reuseport functionality is provided by a seperate module,
[go-reuseport-transport](https://github.com/libp2p/go-reuseport-transport). It
is enabled by default, but can be disabled at runtime by setting the
`LIBP2P_TCP_REUSEPORT` environment variable to `false` or `0`.
## Contribute
PRs are welcome!
Small note: If editing the Readme, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification.
## License
MIT © Jeromy Johnson
---
The last gx published version of this module was: 2.0.28: QmTGiDkw4eeKq31wwpQRk5GwWiReaxrcTQLuCCLWgfKo5M
<!-- reference links -->
[go-libp2p]: https://github.com/libp2p/go-libp2p
[concept-transport]: https://docs.libp2p.io/concepts/transport/
[interface-host]: https://github.com/libp2p/go-libp2p-core/blob/master/host/host.go
[godoc-libp2p-new]: https://godoc.org/github.com/libp2p/go-libp2p#New
[transport-upgrader]: https://github.com/libp2p/go-libp2p-transport-upgrader
[explain-reuseport]: https://lwn.net/Articles/542629/
[multiaddr]: https://github.com/multiformats/multiaddr
coverage:
range: "50...100"
comment: off
This diff is collapsed.
package tcp
import (
"strings"
"sync"
"time"
"github.com/marten-seemann/tcp"
"github.com/mikioh/tcpinfo"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/prometheus/client_golang/prometheus"
)
var (
newConns *prometheus.CounterVec
closedConns *prometheus.CounterVec
segsSentDesc *prometheus.Desc
segsRcvdDesc *prometheus.Desc
bytesSentDesc *prometheus.Desc
bytesRcvdDesc *prometheus.Desc
)
var collector *aggregatingCollector
func init() {
segsSentDesc = prometheus.NewDesc("tcp_sent_segments_total", "TCP segments sent", nil, nil)
segsRcvdDesc = prometheus.NewDesc("tcp_rcvd_segments_total", "TCP segments received", nil, nil)
bytesSentDesc = prometheus.NewDesc("tcp_sent_bytes", "TCP bytes sent", nil, nil)
bytesRcvdDesc = prometheus.NewDesc("tcp_rcvd_bytes", "TCP bytes received", nil, nil)
collector = newAggregatingCollector()
prometheus.MustRegister(collector)
const direction = "direction"
newConns = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "tcp_connections_new_total",
Help: "TCP new connections",
},
[]string{direction},
)
prometheus.MustRegister(newConns)
closedConns = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "tcp_connections_closed_total",
Help: "TCP connections closed",
},
[]string{direction},
)
prometheus.MustRegister(closedConns)
}
type aggregatingCollector struct {
mutex sync.Mutex
highestID uint64
conns map[uint64] /* id */ *tracingConn
rtts prometheus.Histogram
connDurations prometheus.Histogram
}
var _ prometheus.Collector = &aggregatingCollector{}
func newAggregatingCollector() *aggregatingCollector {
return &aggregatingCollector{
conns: make(map[uint64]*tracingConn),
rtts: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tcp_rtt",
Help: "TCP round trip time",
Buckets: prometheus.ExponentialBuckets(0.001, 1.25, 40), // 1ms to ~6000ms
}),
connDurations: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tcp_connection_duration",
Help: "TCP Connection Duration",
Buckets: prometheus.ExponentialBuckets(1, 1.5, 40), // 1s to ~12 weeks
}),
}
}
func (c *aggregatingCollector) AddConn(t *tracingConn) uint64 {
c.mutex.Lock()
defer c.mutex.Unlock()
c.highestID++
c.conns[c.highestID] = t
return c.highestID
}
func (c *aggregatingCollector) removeConn(id uint64) {
delete(c.conns, id)
}
func (c *aggregatingCollector) Describe(descs chan<- *prometheus.Desc) {
descs <- c.rtts.Desc()
descs <- c.connDurations.Desc()
if hasSegmentCounter {
descs <- segsSentDesc
descs <- segsRcvdDesc
}
if hasByteCounter {
descs <- bytesSentDesc
descs <- bytesRcvdDesc
}
}
func (c *aggregatingCollector) Collect(metrics chan<- prometheus.Metric) {
now := time.Now()
c.mutex.Lock()
var segsSent, segsRcvd uint64
var bytesSent, bytesRcvd uint64
for _, conn := range c.conns {
info, err := conn.getTCPInfo()
if err != nil {
if strings.Contains(err.Error(), "use of closed network connection") {
c.closedConn(conn)
continue
}
log.Errorf("Failed to get TCP info: %s", err)
continue
}
if hasSegmentCounter {
segsSent += getSegmentsSent(info)
segsRcvd += getSegmentsRcvd(info)
}
if hasByteCounter {
bytesSent += getBytesSent(info)
bytesRcvd += getBytesRcvd(info)
}
c.rtts.Observe(info.RTT.Seconds())
c.connDurations.Observe(now.Sub(conn.startTime).Seconds())
if info.State == tcpinfo.Closed {
c.closedConn(conn)
}
}
c.mutex.Unlock()
metrics <- c.rtts
metrics <- c.connDurations
if hasSegmentCounter {
segsSentMetric, err := prometheus.NewConstMetric(segsSentDesc, prometheus.CounterValue, float64(segsSent))
if err != nil {
log.Errorf("creating tcp_sent_segments_total metric failed: %v", err)
return
}
segsRcvdMetric, err := prometheus.NewConstMetric(segsRcvdDesc, prometheus.CounterValue, float64(segsRcvd))
if err != nil {
log.Errorf("creating tcp_rcvd_segments_total metric failed: %v", err)
return
}
metrics <- segsSentMetric
metrics <- segsRcvdMetric
}
if hasByteCounter {
bytesSentMetric, err := prometheus.NewConstMetric(bytesSentDesc, prometheus.CounterValue, float64(bytesSent))
if err != nil {
log.Errorf("creating tcp_sent_bytes metric failed: %v", err)
return
}
bytesRcvdMetric, err := prometheus.NewConstMetric(bytesRcvdDesc, prometheus.CounterValue, float64(bytesRcvd))
if err != nil {
log.Errorf("creating tcp_rcvd_bytes metric failed: %v", err)
return
}
metrics <- bytesSentMetric
metrics <- bytesRcvdMetric
}
}
func (c *aggregatingCollector) closedConn(conn *tracingConn) {
collector.removeConn(conn.id)
closedConns.WithLabelValues(conn.getDirection()).Inc()
}
type tracingConn struct {
id uint64
startTime time.Time
isClient bool
manet.Conn
tcpConn *tcp.Conn
}
func newTracingConn(c manet.Conn, isClient bool) (*tracingConn, error) {
conn, err := tcp.NewConn(c)
if err != nil {
return nil, err
}
tc := &tracingConn{
startTime: time.Now(),
isClient: isClient,
Conn: c,
tcpConn: conn,
}
tc.id = collector.AddConn(tc)
newConns.WithLabelValues(tc.getDirection()).Inc()
return tc, nil
}
func (c *tracingConn) getDirection() string {
if c.isClient {
return "outgoing"
}
return "incoming"
}
func (c *tracingConn) Close() error {
return c.Conn.Close()
}
func (c *tracingConn) getTCPInfo() (*tcpinfo.Info, error) {
var o tcpinfo.Info
var b [256]byte
i, err := c.tcpConn.Option(o.Level(), o.Name(), b[:])
if err != nil {
return nil, err
}
info := i.(*tcpinfo.Info)
return info, nil
}
type tracingListener struct {
manet.Listener
}
func (l *tracingListener) Accept() (manet.Conn, error) {
conn, err := l.Listener.Accept()
if err != nil {
return nil, err
}
return newTracingConn(conn, false)
}
//+build darwin
package tcp
import "github.com/mikioh/tcpinfo"
const (
hasSegmentCounter = true
hasByteCounter = true
)
func getSegmentsSent(info *tcpinfo.Info) uint64 { return info.Sys.SegsSent }
func getSegmentsRcvd(info *tcpinfo.Info) uint64 { return info.Sys.SegsReceived }
func getBytesSent(info *tcpinfo.Info) uint64 { return info.Sys.BytesSent }
func getBytesRcvd(info *tcpinfo.Info) uint64 { return info.Sys.BytesReceived }
//+build !linux,!darwin
package tcp
import "github.com/mikioh/tcpinfo"
const (
hasSegmentCounter = false
hasByteCounter = false
)
func getSegmentsSent(info *tcpinfo.Info) uint64 { return 0 }
func getSegmentsRcvd(info *tcpinfo.Info) uint64 { return 0 }
func getBytesSent(info *tcpinfo.Info) uint64 { return 0 }
func getBytesRcvd(info *tcpinfo.Info) uint64 { return 0 }
//+build linux
package tcp
import "github.com/mikioh/tcpinfo"
const (
hasSegmentCounter = true
hasByteCounter = false
)
func getSegmentsSent(info *tcpinfo.Info) uint64 { return uint64(info.Sys.SegsOut) }
func getSegmentsRcvd(info *tcpinfo.Info) uint64 { return uint64(info.Sys.SegsIn) }
func getBytesSent(info *tcpinfo.Info) uint64 { return 0 }
func getBytesRcvd(info *tcpinfo.Info) uint64 { return 0 }
package tcp
import (
"os"
"strings"
"gitlab.dms3.io/p2p/go-reuseport"
)
// envReuseport is the env variable name used to turn off reuse port.
// It default to true.
const envReuseport = "LIBP2P_TCP_REUSEPORT"
const deprecatedEnvReuseport = "IPFS_REUSEPORT"
// envReuseportVal stores the value of envReuseport. defaults to true.
var envReuseportVal = true
func init() {
v := strings.ToLower(os.Getenv(envReuseport))
if v == "false" || v == "f" || v == "0" {
envReuseportVal = false
log.Infof("REUSEPORT disabled (LIBP2P_TCP_REUSEPORT=%s)", v)
}
v, exist := os.LookupEnv(deprecatedEnvReuseport)
if exist {
log.Warn("IPFS_REUSEPORT is deprecated, use LIBP2P_TCP_REUSEPORT instead")
if v == "false" || v == "f" || v == "0" {
envReuseportVal = false
log.Infof("REUSEPORT disabled (IPFS_REUSEPORT=%s)", v)
}
}
}
// reuseportIsAvailable returns whether reuseport is available to be used. This
// is here because we want to be able to turn reuseport on and off selectively.
// For now we use an ENV variable, as this handles our pressing need:
//
// LIBP2P_TCP_REUSEPORT=false ipfs daemon
//
// If this becomes a sought after feature, we could add this to the config.
// In the end, reuseport is a stop-gap.
func ReuseportIsAvailable() bool {
return envReuseportVal && reuseport.Available()
}
package tcp
import (
"context"
"net"
"runtime"
"time"
logging "gitlab.dms3.io/dms3/public/go-log"
"gitlab.dms3.io/p2p/go-p2p-core/peer"
"gitlab.dms3.io/p2p/go-p2p-core/transport"
tptu "gitlab.dms3.io/p2p/go-p2p-transport-upgrader"
rtpt "gitlab.dms3.io/p2p/go-reuseport-transport"
ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
manet "github.com/multiformats/go-multiaddr/net"
)
// DefaultConnectTimeout is the (default) maximum amount of time the TCP
// transport will spend on the initial TCP connect before giving up.
var DefaultConnectTimeout = 5 * time.Second
var log = logging.Logger("tcp-tpt")
const keepAlivePeriod = 30 * time.Second
type canKeepAlive interface {
SetKeepAlive(bool) error
SetKeepAlivePeriod(time.Duration) error
}
var _ canKeepAlive = &net.TCPConn{}
func tryKeepAlive(conn net.Conn, keepAlive bool) {
keepAliveConn, ok := conn.(canKeepAlive)
if !ok {
log.Errorf("Can't set TCP keepalives.")
return
}
if err := keepAliveConn.SetKeepAlive(keepAlive); err != nil {
log.Errorf("Failed to enable TCP keepalive: %s", err)
return
}
if runtime.GOOS != "openbsd" {
if err := keepAliveConn.SetKeepAlivePeriod(keepAlivePeriod); err != nil {
log.Errorf("Failed set keepalive period: %s", err)
}
}
}
// try to set linger on the connection, if possible.
func tryLinger(conn net.Conn, sec int) {
type canLinger interface {
SetLinger(int) error
}
if lingerConn, ok := conn.(canLinger); ok {
_ = lingerConn.SetLinger(sec)
}
}
type tcpListener struct {
manet.Listener
sec int
}
func (ll *tcpListener) Accept() (manet.Conn, error) {
c, err := ll.Listener.Accept()
if err != nil {
return nil, err
}
tryLinger(c, ll.sec)
tryKeepAlive(c, true)
return c, nil
}
// TcpTransport is the TCP transport.
type TcpTransport struct {
// Connection upgrader for upgrading insecure stream connections to
// secure multiplex connections.
Upgrader *tptu.Upgrader
// Explicitly disable reuseport.
DisableReuseport bool
// TCP connect timeout
ConnectTimeout time.Duration
reuse rtpt.Transport
}
var _ transport.Transport = &TcpTransport{}
// NewTCPTransport creates a tcp transport object that tracks dialers and listeners
// created. It represents an entire tcp stack (though it might not necessarily be)
func NewTCPTransport(upgrader *tptu.Upgrader) *TcpTransport {
return &TcpTransport{Upgrader: upgrader, ConnectTimeout: DefaultConnectTimeout}
}
var dialMatcher = mafmt.And(mafmt.IP, mafmt.Base(ma.P_TCP))
// CanDial returns true if this transport believes it can dial the given
// multiaddr.
func (t *TcpTransport) CanDial(addr ma.Multiaddr) bool {
return dialMatcher.Matches(addr)
}
func (t *TcpTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (manet.Conn, error) {
// Apply the deadline iff applicable
if t.ConnectTimeout > 0 {
deadline := time.Now().Add(t.ConnectTimeout)
if d, ok := ctx.Deadline(); !ok || deadline.Before(d) {
var cancel func()
ctx, cancel = context.WithDeadline(ctx, deadline)
defer cancel()
}
}
if t.UseReuseport() {
return t.reuse.DialContext(ctx, raddr)
}
var d manet.Dialer
return d.DialContext(ctx, raddr)
}
// Dial dials the peer at the remote address.
func (t *TcpTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (transport.CapableConn, error) {
conn, err := t.maDial(ctx, raddr)
if err != nil {
return nil, err
}
// Set linger to 0 so we never get stuck in the TIME-WAIT state. When
// linger is 0, connections are _reset_ instead of closed with a FIN.
// This means we can immediately reuse the 5-tuple and reconnect.
tryLinger(conn, 0)
tryKeepAlive(conn, true)
c, err := newTracingConn(conn, true)
if err != nil {
return nil, err
}
return t.Upgrader.UpgradeOutbound(ctx, t, c, p)
}
// UseReuseport returns true if reuseport is enabled and available.
func (t *TcpTransport) UseReuseport() bool {
return !t.DisableReuseport && ReuseportIsAvailable()
}
func (t *TcpTransport) maListen(laddr ma.Multiaddr) (manet.Listener, error) {
if t.UseReuseport() {
return t.reuse.Listen(laddr)
}
return manet.Listen(laddr)
}
// Listen listens on the given multiaddr.
func (t *TcpTransport) Listen(laddr ma.Multiaddr) (transport.Listener, error) {
list, err := t.maListen(laddr)
if err != nil {
return nil, err
}
list = &tracingListener{&tcpListener{list, 0}}
return t.Upgrader.UpgradeListener(t, list), nil
}
// Protocols returns the list of terminal protocols this transport can dial.
func (t *TcpTransport) Protocols() []int {
return []int{ma.P_TCP}
}
// Proxy always returns false for the TCP transport.
func (t *TcpTransport) Proxy() bool {
return false
}
func (t *TcpTransport) String() string {
return "TCP"
}
package tcp
import (
"testing"
"gitlab.dms3.io/p2p/go-p2p-core/crypto"
"gitlab.dms3.io/p2p/go-p2p-core/peer"
"gitlab.dms3.io/p2p/go-p2p-core/sec/insecure"
mplex "gitlab.dms3.io/p2p/go-p2p-mplex"
tptu "gitlab.dms3.io/p2p/go-p2p-transport-upgrader"
ttransport "gitlab.dms3.io/p2p/go-p2p-testing/suites/transport"
ma "github.com/multiformats/go-multiaddr"
)
func TestTcpTransport(t *testing.T) {
for i := 0; i < 2; i++ {
ia := makeInsecureTransport(t)
ib := makeInsecureTransport(t)
ta := NewTCPTransport(&tptu.Upgrader{
Secure: ia,
Muxer: new(mplex.Transport),
})
tb := NewTCPTransport(&tptu.Upgrader{
Secure: ib,
Muxer: new(mplex.Transport),
})
zero := "/ip4/127.0.0.1/tcp/0"
ttransport.SubtestTransport(t, ta, tb, zero, ia.LocalPeer())
envReuseportVal = false
}
envReuseportVal = true
}
func TestTcpTransportCantDialDNS(t *testing.T) {
for i := 0; i < 2; i++ {
dnsa, err := ma.NewMultiaddr("/dns4/example.com/tcp/1234")
if err != nil {
t.Fatal(err)
}
tpt := NewTCPTransport(&tptu.Upgrader{
Secure: makeInsecureTransport(t),
Muxer: new(mplex.Transport),
})
if tpt.CanDial(dnsa) {
t.Fatal("shouldn't be able to dial dns")
}
envReuseportVal = false
}
envReuseportVal = true
}
func TestTcpTransportCantListenUtp(t *testing.T) {
for i := 0; i < 2; i++ {
utpa, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/utp")
if err != nil {
t.Fatal(err)
}
tpt := NewTCPTransport(&tptu.Upgrader{
Secure: makeInsecureTransport(t),
Muxer: new(mplex.Transport),
})
_, err = tpt.Listen(utpa)
if err == nil {
t.Fatal("shouldnt be able to listen on utp addr with tcp transport")
}
envReuseportVal = false
}
envReuseportVal = true
}
func makeInsecureTransport(t *testing.T) *insecure.Transport {
priv, pub, err := crypto.GenerateKeyPair(crypto.Ed25519, 256)
if err != nil {
t.Fatal(err)
}
id, err := peer.IDFromPublicKey(pub)
if err != nil {
t.Fatal(err)
}
return insecure.NewWithIdentity(id, priv)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment