Commit cc644d96 authored by Jeromy's avatar Jeromy

extract tcp transport from go-libp2p-transport

parents
os:
- linux
- osx
language: go
go:
- 1.7
install: true
before_install:
- make deps
script:
- go vet
- $GOPATH/bin/goveralls -service="travis-ci"
cache:
directories:
- $GOPATH/src/gx
notifications:
email: false
The MIT License (MIT)
Copyright (c) 2016 Jeromy Johnson
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
gx:
go get github.com/whyrusleeping/gx
go get github.com/whyrusleeping/gx-go
covertools:
go get github.com/mattn/goveralls
go get golang.org/x/tools/cmd/cover
deps: gx covertools
gx --verbose install --global
gx-go rewrite
publish:
gx-go rewrite --undo
go-tcp-transport
==================
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://libp2p.io/)
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
[![Coverage Status](https://coveralls.io/repos/github/libp2p/go-tcp-transport/badge.svg?branch=master)](https://coveralls.io/github/libp2p/go-tcp-transport?branch=master)
[![Travis CI](https://travis-ci.org/libp2p/go-tcp-transport.svg?branch=master)](https://travis-ci.org/libp2p/go-tcp-transport)
> A libp2p transport implementation for tcp, including reuseport socket options.
## Table of Contents
- [Install](#install)
- [Usage](#usage)
- [API](#api)
- [Contribute](#contribute)
- [License](#license)
## Install
```sh
make install
```
## Contribute
PRs are welcome!
Small note: If editing the Readme, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification.
## License
MIT © Jeromy Johnson
{
"author": "whyrusleeping",
"bugs": {
"url": "https://github.com/libp2p/go-tcp-transport"
},
"gx": {
"dvcsimport": "github.com/libp2p/go-tcp-transport"
},
"gxDependencies": [
{
"author": "whyrusleeping",
"hash": "QmYzDkkgAEmrcNzFCiYo6L1dTX4EAG1gZkbtdbd9trL4vd",
"name": "go-multiaddr",
"version": "0.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmYrv4LgCC8FhG2Ab4bwuq5DqBdwMtx3hMb3KKJDZcr2d7",
"name": "go-libp2p-loggables",
"version": "1.0.11"
},
{
"author": "whyrusleeping",
"hash": "QmaaC9QMYTQHCbMq3Ebr3uMaAR2ev4AVqMmsJpgQijAZbJ",
"name": "go-reuseport",
"version": "0.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmY83KqqnQ286ZWbV2x7ixpeemH3cBpk8R54egS619WYff",
"name": "go-multiaddr-net",
"version": "1.3.0"
},
{
"author": "whyrusleeping",
"hash": "QmeLQ13LftT9XhNn22piZc3GP56fGqhijuL5Y8KdUaRn1g",
"name": "mafmt",
"version": "1.1.1"
},
{
"author": "whyrusleeping",
"hash": "QmekekLwe4KGWtaeVHr8AfRht25RgdGzLunnQF2gHVi62E",
"name": "go-libp2p-transport",
"version": "2.1.1"
}
],
"gxVersion": "0.4.0",
"language": "go",
"license": "MIT",
"name": "go-tcp-transport",
"version": "1.6.0"
}
package tcp
import (
"net"
"os"
"strings"
"syscall"
reuseport "github.com/jbenet/go-reuseport"
)
// envReuseport is the env variable name used to turn off reuse port.
// It default to true.
const envReuseport = "IPFS_REUSEPORT"
// envReuseportVal stores the value of envReuseport. defaults to true.
var envReuseportVal = true
func init() {
v := strings.ToLower(os.Getenv(envReuseport))
if v == "false" || v == "f" || v == "0" {
envReuseportVal = false
log.Infof("REUSEPORT disabled (IPFS_REUSEPORT=%s)", v)
}
}
// reuseportIsAvailable returns whether reuseport is available to be used. This
// is here because we want to be able to turn reuseport on and off selectively.
// For now we use an ENV variable, as this handles our pressing need:
//
// IPFS_REUSEPORT=false ipfs daemon
//
// If this becomes a sought after feature, we could add this to the config.
// In the end, reuseport is a stop-gap.
func ReuseportIsAvailable() bool {
return envReuseportVal && reuseport.Available()
}
// ReuseErrShouldRetry diagnoses whether to retry after a reuse error.
// if we failed to bind, we should retry. if bind worked and this is a
// real dial error (remote end didnt answer) then we should not retry.
func ReuseErrShouldRetry(err error) bool {
if err == nil {
return false // hey, it worked! no need to retry.
}
// if it's a network timeout error, it's a legitimate failure.
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
return false
}
errno, ok := err.(syscall.Errno)
if !ok { // not an errno? who knows what this is. retry.
return true
}
switch errno {
case syscall.EADDRINUSE, syscall.EADDRNOTAVAIL:
return true // failure to bind. retry.
case syscall.ECONNREFUSED:
return false // real dial error
default:
return true // optimistically default to retry.
}
}
package tcp
import (
"net"
"syscall"
"testing"
)
type netTimeoutErr struct {
timeout bool
}
func (e netTimeoutErr) Error() string {
return ""
}
func (e netTimeoutErr) Timeout() bool {
return e.timeout
}
func (e netTimeoutErr) Temporary() bool {
panic("not checked")
}
func TestReuseError(t *testing.T) {
var nte1 net.Error = &netTimeoutErr{true}
var nte2 net.Error = &netTimeoutErr{false}
cases := map[error]bool{
nil: false,
syscall.EADDRINUSE: true,
syscall.EADDRNOTAVAIL: true,
syscall.ECONNREFUSED: false,
nte1: false,
nte2: true, // this ones a little weird... we should check neterror.Temporary() too
// test 'default' to true
syscall.EBUSY: true,
}
for k, v := range cases {
if ReuseErrShouldRetry(k) != v {
t.Fatalf("expected %b for %#v", v, k)
}
}
}
package tcp
import (
"context"
"fmt"
"net"
"sync"
"time"
lgbl "github.com/ipfs/go-libp2p-loggables"
logging "github.com/ipfs/go-log"
ma "github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-multiaddr-net"
reuseport "github.com/jbenet/go-reuseport"
tpt "github.com/libp2p/go-libp2p-transport"
mafmt "github.com/whyrusleeping/mafmt"
)
var log = logging.Logger("tcp-tpt")
type TcpTransport struct {
dlock sync.Mutex
dialers map[string]tpt.Dialer
llock sync.Mutex
listeners map[string]tpt.Listener
}
// NewTCPTransport creates a tcp transport object that tracks dialers and listeners
// created. It represents an entire tcp stack (though it might not necessarily be)
func NewTCPTransport() *TcpTransport {
return &TcpTransport{
dialers: make(map[string]tpt.Dialer),
listeners: make(map[string]tpt.Listener),
}
}
func (t *TcpTransport) Dialer(laddr ma.Multiaddr, opts ...tpt.DialOpt) (tpt.Dialer, error) {
t.dlock.Lock()
defer t.dlock.Unlock()
s := laddr.String()
d, found := t.dialers[s]
if found {
return d, nil
}
var base manet.Dialer
var doReuse bool
for _, o := range opts {
switch o := o.(type) {
case tpt.TimeoutOpt:
base.Timeout = time.Duration(o)
case tpt.ReuseportOpt:
doReuse = bool(o)
default:
return nil, fmt.Errorf("unrecognized option: %#v", o)
}
}
tcpd, err := t.newTcpDialer(base, laddr, doReuse)
if err != nil {
return nil, err
}
t.dialers[s] = tcpd
return tcpd, nil
}
func (t *TcpTransport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) {
if !t.Matches(laddr) {
return nil, fmt.Errorf("tcp transport cannot listen on %q", laddr)
}
t.llock.Lock()
defer t.llock.Unlock()
s := laddr.String()
l, found := t.listeners[s]
if found {
return l, nil
}
list, err := manetListen(laddr)
if err != nil {
return nil, err
}
tlist := &tcpListener{
list: list,
transport: t,
}
t.listeners[s] = tlist
return tlist, nil
}
func manetListen(addr ma.Multiaddr) (manet.Listener, error) {
network, naddr, err := manet.DialArgs(addr)
if err != nil {
return nil, err
}
if ReuseportIsAvailable() {
nl, err := reuseport.Listen(network, naddr)
if err == nil {
// hey, it worked!
return manet.WrapNetListener(nl)
}
// reuseport is available, but we failed to listen. log debug, and retry normally.
log.Debugf("reuseport available, but failed to listen: %s %s, %s", network, naddr, err)
}
// either reuseport not available, or it failed. try normally.
return manet.Listen(addr)
}
func (t *TcpTransport) Matches(a ma.Multiaddr) bool {
return mafmt.TCP.Matches(a)
}
type tcpDialer struct {
laddr ma.Multiaddr
doReuse bool
rd reuseport.Dialer
madialer manet.Dialer
transport tpt.Transport
}
func (t *TcpTransport) newTcpDialer(base manet.Dialer, laddr ma.Multiaddr, doReuse bool) (*tcpDialer, error) {
// get the local net.Addr manually
la, err := manet.ToNetAddr(laddr)
if err != nil {
return nil, err // something wrong with laddr.
}
if doReuse && ReuseportIsAvailable() {
rd := reuseport.Dialer{
D: net.Dialer{
LocalAddr: la,
Timeout: base.Timeout,
},
}
return &tcpDialer{
doReuse: true,
laddr: laddr,
rd: rd,
madialer: base,
transport: t,
}, nil
}
return &tcpDialer{
doReuse: false,
laddr: laddr,
madialer: base,
transport: t,
}, nil
}
func (d *tcpDialer) Dial(raddr ma.Multiaddr) (tpt.Conn, error) {
return d.DialContext(context.Background(), raddr)
}
func (d *tcpDialer) DialContext(ctx context.Context, raddr ma.Multiaddr) (tpt.Conn, error) {
var c manet.Conn
var err error
if d.doReuse {
c, err = d.reuseDial(ctx, raddr)
} else {
c, err = d.madialer.DialContext(ctx, raddr)
}
if err != nil {
return nil, err
}
return &tpt.ConnWrap{
Conn: c,
Tpt: d.transport,
}, nil
}
func (d *tcpDialer) reuseDial(ctx context.Context, raddr ma.Multiaddr) (manet.Conn, error) {
logdial := lgbl.Dial("conn", "", "", d.laddr, raddr)
rpev := log.EventBegin(ctx, "tptDialReusePort", logdial)
network, netraddr, err := manet.DialArgs(raddr)
if err != nil {
return nil, err
}
_ = ctx // TODO: implement DialContext in reuseport
con, err := d.rd.Dial(network, netraddr)
if err == nil {
logdial["reuseport"] = "success"
rpev.Done()
return manet.WrapNetConn(con)
}
if !ReuseErrShouldRetry(err) {
logdial["reuseport"] = "failure"
logdial["error"] = err
rpev.Done()
return nil, err
}
logdial["reuseport"] = "retry"
logdial["error"] = err
rpev.Done()
return d.madialer.DialContext(ctx, raddr)
}
func (d *tcpDialer) Matches(a ma.Multiaddr) bool {
return mafmt.TCP.Matches(a)
}
type tcpListener struct {
list manet.Listener
transport tpt.Transport
}
func (d *tcpListener) Accept() (tpt.Conn, error) {
c, err := d.list.Accept()
if err != nil {
return nil, err
}
return &tpt.ConnWrap{
Conn: c,
Tpt: d.transport,
}, nil
}
func (d *tcpListener) Addr() net.Addr {
return d.list.Addr()
}
func (t *tcpListener) Multiaddr() ma.Multiaddr {
return t.list.Multiaddr()
}
func (t *tcpListener) NetListener() net.Listener {
return t.list.NetListener()
}
func (d *tcpListener) Close() error {
return d.list.Close()
}
package tcp
import (
"testing"
ma "github.com/jbenet/go-multiaddr"
utils "github.com/libp2p/go-libp2p-transport/test"
)
func TestTcpTransport(t *testing.T) {
ta := NewTCPTransport()
tb := NewTCPTransport()
zero := "/ip4/127.0.0.1/tcp/0"
utils.SubtestTransport(t, ta, tb, zero)
}
func TestTcpTransportCantListenUtp(t *testing.T) {
utpa, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/utp")
if err != nil {
t.Fatal(err)
}
tpt := NewTCPTransport()
_, err = tpt.Listen(utpa)
if err == nil {
t.Fatal("shouldnt be able to listen on utp addr with tcp transport")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment