Commit 3697800e authored by tavit ohanian's avatar tavit ohanian

reference basis

parents a2dddffa ede1c13e
Pipeline #461 failed with stages
in 0 seconds
# File managed by web3-bot. DO NOT EDIT.
# See https://github.com/protocol/.github/ for details.
# Automatically merge pull requests opened by web3-bot, as soon as (and only if) all tests pass.
# This reduces the friction associated with updating with our workflows.
on: [ pull_request ]
jobs:
automerge:
if: github.event.pull_request.user.login == 'web3-bot'
runs-on: ubuntu-latest
steps:
- name: Wait on tests
uses: lewagon/wait-on-check-action@bafe56a6863672c681c3cf671f5e10b20abf2eaa # v0.2
with:
ref: ${{ github.event.pull_request.head.sha }}
repo-token: ${{ secrets.GITHUB_TOKEN }}
wait-interval: 10
running-workflow-name: 'automerge' # the name of this job
- name: Merge PR
uses: pascalgn/automerge-action@741c311a47881be9625932b0a0de1b0937aab1ae # v0.13.1
env:
GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}"
MERGE_LABELS: ""
MERGE_METHOD: "squash"
MERGE_DELETE_BRANCH: true
# File managed by web3-bot. DO NOT EDIT.
# See https://github.com/protocol/.github/ for details.
on: [push, pull_request]
jobs:
unit:
runs-on: ubuntu-latest
name: Go checks
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: "1.16.x"
- name: Install staticcheck
run: go install honnef.co/go/tools/cmd/staticcheck@be534f007836a777104a15f2456cd1fffd3ddee8 # v2020.2.2
- name: Check that go.mod is tidy
run: |
go mod tidy
if [[ -n $(git ls-files --other --exclude-standard --directory -- go.sum) ]]; then
echo "go.sum was added by go mod tidy"
exit 1
fi
git diff --exit-code -- go.sum go.mod
- name: gofmt
if: ${{ success() || failure() }} # run this step even if the previous one failed
run: |
out=$(gofmt -s -l .)
if [[ -n "$out" ]]; then
echo $out | awk '{print "::error file=" $0 ",line=0,col=0::File is not gofmt-ed."}'
exit 1
fi
- name: go vet
if: ${{ success() || failure() }} # run this step even if the previous one failed
run: go vet ./...
- name: staticcheck
if: ${{ success() || failure() }} # run this step even if the previous one failed
run: |
set -o pipefail
staticcheck ./... | sed -e 's@\(.*\)\.go@./\1.go@g'
# File managed by web3-bot. DO NOT EDIT.
# See https://github.com/protocol/.github/ for details.
on: [push, pull_request]
jobs:
unit:
strategy:
fail-fast: false
matrix:
os: [ "ubuntu", "windows", "macos" ]
go: [ "1.15.x", "1.16.x" ]
runs-on: ${{ matrix.os }}-latest
name: Unit tests (${{ matrix.os}}, Go ${{ matrix.go }})
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go }}
- name: Go information
run: |
go version
go env
- name: Run tests
run: go test -v -coverprofile coverage.txt ./...
- name: Run tests (32 bit)
if: ${{ matrix.os != 'macos' }} # can't run 32 bit tests on OSX.
env:
GOARCH: 386
run: go test -v ./...
- name: Run tests with race detector
if: ${{ matrix.os == 'ubuntu' }} # speed things up. Windows and OSX VMs are slow
run: go test -v -race ./...
- name: Upload coverage to Codecov
uses: codecov/codecov-action@967e2b38a85a62bd61be5529ada27ebc109948c2 # v1.4.1
with:
file: coverage.txt
env_vars: OS=${{ matrix.os }}, GO=${{ matrix.go }}
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, build with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736
.glide/
MIT License
Copyright (c) 2018 libp2p
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# go-libp2p-autonat
dms3 p2p go-libp2p-autonat
\ No newline at end of file
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://protocol.ai)
[![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/)
[![](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p)
[![Discourse posts](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg)](https://discuss.libp2p.io)
> Ambient NAT discovery
This package provides an ambient NAT autodiscovery service.
It allows peers to figure out their NAT dialability situation by using test dial backs through peers providing the AutoNAT service.
## Documentation
See https://godoc.org/github.com/libp2p/go-libp2p-autonat
## Contribute
Feel free to join in. All welcome. Open an [issue](https://github.com/libp2p/go-libp2p-discovery/issues)!
This repository falls under the IPFS [Code of Conduct](https://github.com/ipfs/community/blob/master/code-of-conduct.md).
## License
MIT
---
package autonat
import (
"context"
"errors"
"math/rand"
"sync/atomic"
"time"
"github.com/libp2p/go-eventbus"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
logging "github.com/ipfs/go-log"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
var log = logging.Logger("autonat")
// AmbientAutoNAT is the implementation of ambient NAT autodiscovery
type AmbientAutoNAT struct {
ctx context.Context
host host.Host
*config
inboundConn chan network.Conn
observations chan autoNATResult
// status is an autoNATResult reflecting current status.
status atomic.Value
// Reflects the confidence on of the NATStatus being private, as a single
// dialback may fail for reasons unrelated to NAT.
// If it is <3, then multiple autoNAT peers may be contacted for dialback
// If only a single autoNAT peer is known, then the confidence increases
// for each failure until it reaches 3.
confidence int
lastInbound time.Time
lastProbeTry time.Time
lastProbe time.Time
recentProbes map[peer.ID]time.Time
service *autoNATService
emitReachabilityChanged event.Emitter
subscriber event.Subscription
}
// StaticAutoNAT is a simple AutoNAT implementation when a single NAT status is desired.
type StaticAutoNAT struct {
ctx context.Context
host host.Host
reachability network.Reachability
service *autoNATService
}
type autoNATResult struct {
network.Reachability
address ma.Multiaddr
}
// New creates a new NAT autodiscovery system attached to a host
func New(ctx context.Context, h host.Host, options ...Option) (AutoNAT, error) {
var err error
conf := new(config)
conf.host = h
conf.dialPolicy.host = h
if err = defaults(conf); err != nil {
return nil, err
}
if conf.addressFunc == nil {
conf.addressFunc = h.Addrs
}
for _, o := range options {
if err = o(conf); err != nil {
return nil, err
}
}
emitReachabilityChanged, _ := h.EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful)
var service *autoNATService
if (!conf.forceReachability || conf.reachability == network.ReachabilityPublic) && conf.dialer != nil {
service, err = newAutoNATService(ctx, conf)
if err != nil {
return nil, err
}
service.Enable()
}
if conf.forceReachability {
emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: conf.reachability})
return &StaticAutoNAT{
ctx: ctx,
host: h,
reachability: conf.reachability,
service: service,
}, nil
}
as := &AmbientAutoNAT{
ctx: ctx,
host: h,
config: conf,
inboundConn: make(chan network.Conn, 5),
observations: make(chan autoNATResult, 1),
emitReachabilityChanged: emitReachabilityChanged,
service: service,
recentProbes: make(map[peer.ID]time.Time),
}
as.status.Store(autoNATResult{network.ReachabilityUnknown, nil})
subscriber, err := as.host.EventBus().Subscribe([]interface{}{new(event.EvtLocalAddressesUpdated), new(event.EvtPeerIdentificationCompleted)})
if err != nil {
return nil, err
}
as.subscriber = subscriber
h.Network().Notify(as)
go as.background()
return as, nil
}
// Status returns the AutoNAT observed reachability status.
func (as *AmbientAutoNAT) Status() network.Reachability {
s := as.status.Load().(autoNATResult)
return s.Reachability
}
func (as *AmbientAutoNAT) emitStatus() {
status := as.status.Load().(autoNATResult)
as.emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: status.Reachability})
}
// PublicAddr returns the publicly connectable Multiaddr of this node if one is known.
func (as *AmbientAutoNAT) PublicAddr() (ma.Multiaddr, error) {
s := as.status.Load().(autoNATResult)
if s.Reachability != network.ReachabilityPublic {
return nil, errors.New("NAT status is not public")
}
return s.address, nil
}
func ipInList(candidate ma.Multiaddr, list []ma.Multiaddr) bool {
candidateIP, _ := manet.ToIP(candidate)
for _, i := range list {
if ip, err := manet.ToIP(i); err == nil && ip.Equal(candidateIP) {
return true
}
}
return false
}
func (as *AmbientAutoNAT) background() {
// wait a bit for the node to come online and establish some connections
// before starting autodetection
delay := as.config.bootDelay
var lastAddrUpdated time.Time
subChan := as.subscriber.Out()
defer as.subscriber.Close()
defer as.emitReachabilityChanged.Close()
timer := time.NewTimer(delay)
defer timer.Stop()
timerRunning := true
for {
select {
// new inbound connection.
case conn := <-as.inboundConn:
localAddrs := as.host.Addrs()
ca := as.status.Load().(autoNATResult)
if ca.address != nil {
localAddrs = append(localAddrs, ca.address)
}
if manet.IsPublicAddr(conn.RemoteMultiaddr()) &&
!ipInList(conn.RemoteMultiaddr(), localAddrs) {
as.lastInbound = time.Now()
}
case e := <-subChan:
switch e := e.(type) {
case event.EvtLocalAddressesUpdated:
if !lastAddrUpdated.Add(time.Second).After(time.Now()) {
lastAddrUpdated = time.Now()
if as.confidence > 1 {
as.confidence--
}
}
case event.EvtPeerIdentificationCompleted:
if s, err := as.host.Peerstore().SupportsProtocols(e.Peer, AutoNATProto); err == nil && len(s) > 0 {
currentStatus := as.status.Load().(autoNATResult)
if currentStatus.Reachability == network.ReachabilityUnknown {
as.tryProbe(e.Peer)
}
}
default:
log.Errorf("unknown event type: %T", e)
}
// probe finished.
case result, ok := <-as.observations:
if !ok {
return
}
as.recordObservation(result)
case <-timer.C:
peer := as.getPeerToProbe()
as.tryProbe(peer)
timerRunning = false
case <-as.ctx.Done():
return
}
// Drain the timer channel if it hasn't fired in preparation for Resetting it.
if timerRunning && !timer.Stop() {
<-timer.C
}
timer.Reset(as.scheduleProbe())
timerRunning = true
}
}
func (as *AmbientAutoNAT) cleanupRecentProbes() {
fixedNow := time.Now()
for k, v := range as.recentProbes {
if fixedNow.Sub(v) > as.throttlePeerPeriod {
delete(as.recentProbes, k)
}
}
}
// scheduleProbe calculates when the next probe should be scheduled for.
func (as *AmbientAutoNAT) scheduleProbe() time.Duration {
// Our baseline is a probe every 'AutoNATRefreshInterval'
// This is modulated by:
// * if we are in an unknown state, or have low confidence, that should drop to 'AutoNATRetryInterval'
// * recent inbound connections (implying continued connectivity) should decrease the retry when public
// * recent inbound connections when not public mean we should try more actively to see if we're public.
fixedNow := time.Now()
currentStatus := as.status.Load().(autoNATResult)
nextProbe := fixedNow
// Don't look for peers in the peer store more than once per second.
if !as.lastProbeTry.IsZero() {
backoff := as.lastProbeTry.Add(time.Second)
if backoff.After(nextProbe) {
nextProbe = backoff
}
}
if !as.lastProbe.IsZero() {
untilNext := as.config.refreshInterval
if currentStatus.Reachability == network.ReachabilityUnknown {
untilNext = as.config.retryInterval
} else if as.confidence < 3 {
untilNext = as.config.retryInterval
} else if currentStatus.Reachability == network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) {
untilNext *= 2
} else if currentStatus.Reachability != network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) {
untilNext /= 5
}
if as.lastProbe.Add(untilNext).After(nextProbe) {
nextProbe = as.lastProbe.Add(untilNext)
}
}
return nextProbe.Sub(fixedNow)
}
// Update the current status based on an observed result.
func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
currentStatus := as.status.Load().(autoNATResult)
if observation.Reachability == network.ReachabilityPublic {
log.Debugf("NAT status is public")
changed := false
if currentStatus.Reachability != network.ReachabilityPublic {
// we are flipping our NATStatus, so confidence drops to 0
as.confidence = 0
if as.service != nil {
as.service.Enable()
}
changed = true
} else if as.confidence < 3 {
as.confidence++
}
if observation.address != nil {
if !changed && currentStatus.address != nil && !observation.address.Equal(currentStatus.address) {
as.confidence--
}
if currentStatus.address == nil || !observation.address.Equal(currentStatus.address) {
changed = true
}
as.status.Store(observation)
}
if observation.address != nil && changed {
as.emitStatus()
}
} else if observation.Reachability == network.ReachabilityPrivate {
log.Debugf("NAT status is private")
if currentStatus.Reachability == network.ReachabilityPublic {
if as.confidence > 0 {
as.confidence--
} else {
// we are flipping our NATStatus, so confidence drops to 0
as.confidence = 0
as.status.Store(observation)
if as.service != nil {
as.service.Disable()
}
as.emitStatus()
}
} else if as.confidence < 3 {
as.confidence++
as.status.Store(observation)
if currentStatus.Reachability != network.ReachabilityPrivate {
as.emitStatus()
}
}
} else if as.confidence > 0 {
// don't just flip to unknown, reduce confidence first
as.confidence--
} else {
log.Debugf("NAT status is unknown")
as.status.Store(autoNATResult{network.ReachabilityUnknown, nil})
if currentStatus.Reachability != network.ReachabilityUnknown {
if as.service != nil {
as.service.Enable()
}
as.emitStatus()
}
}
}
func (as *AmbientAutoNAT) tryProbe(p peer.ID) bool {
as.lastProbeTry = time.Now()
if p.Validate() != nil {
return false
}
if lastTime, ok := as.recentProbes[p]; ok {
if time.Since(lastTime) < as.throttlePeerPeriod {
return false
}
}
as.cleanupRecentProbes()
info := as.host.Peerstore().PeerInfo(p)
if !as.config.dialPolicy.skipPeer(info.Addrs) {
as.recentProbes[p] = time.Now()
as.lastProbe = time.Now()
go as.probe(&info)
return true
}
return false
}
func (as *AmbientAutoNAT) probe(pi *peer.AddrInfo) {
cli := NewAutoNATClient(as.host, as.config.addressFunc)
ctx, cancel := context.WithTimeout(as.ctx, as.config.requestTimeout)
defer cancel()
a, err := cli.DialBack(ctx, pi.ID)
var result autoNATResult
switch {
case err == nil:
log.Debugf("Dialback through %s successful; public address is %s", pi.ID.Pretty(), a.String())
result.Reachability = network.ReachabilityPublic
result.address = a
case IsDialError(err):
log.Debugf("Dialback through %s failed", pi.ID.Pretty())
result.Reachability = network.ReachabilityPrivate
default:
result.Reachability = network.ReachabilityUnknown
}
select {
case as.observations <- result:
case <-as.ctx.Done():
return
}
}
func (as *AmbientAutoNAT) getPeerToProbe() peer.ID {
peers := as.host.Network().Peers()
if len(peers) == 0 {
return ""
}
candidates := make([]peer.ID, 0, len(peers))
for _, p := range peers {
info := as.host.Peerstore().PeerInfo(p)
// Exclude peers which don't support the autonat protocol.
if proto, err := as.host.Peerstore().SupportsProtocols(p, AutoNATProto); len(proto) == 0 || err != nil {
continue
}
// Exclude peers in backoff.
if lastTime, ok := as.recentProbes[p]; ok {
if time.Since(lastTime) < as.throttlePeerPeriod {
continue
}
}
if as.config.dialPolicy.skipPeer(info.Addrs) {
continue
}
candidates = append(candidates, p)
}
if len(candidates) == 0 {
return ""
}
shufflePeers(candidates)
return candidates[0]
}
func shufflePeers(peers []peer.ID) {
for i := range peers {
j := rand.Intn(i + 1)
peers[i], peers[j] = peers[j], peers[i]
}
}
// Status returns the AutoNAT observed reachability status.
func (s *StaticAutoNAT) Status() network.Reachability {
return s.reachability
}
// PublicAddr returns the publicly connectable Multiaddr of this node if one is known.
func (s *StaticAutoNAT) PublicAddr() (ma.Multiaddr, error) {
if s.reachability != network.ReachabilityPublic {
return nil, errors.New("NAT status is not public")
}
return nil, errors.New("no available address")
}
package autonat
import (
"context"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
pb "github.com/libp2p/go-libp2p-autonat/pb"
bhost "github.com/libp2p/go-libp2p-blankhost"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
"github.com/libp2p/go-msgio/protoio"
ma "github.com/multiformats/go-multiaddr"
)
// these are mock service implementations for testing
func makeAutoNATServicePrivate(ctx context.Context, t *testing.T) host.Host {
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h.SetStreamHandler(AutoNATProto, sayAutoNATPrivate)
return h
}
func makeAutoNATServicePublic(ctx context.Context, t *testing.T) host.Host {
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h.SetStreamHandler(AutoNATProto, sayAutoNATPublic)
return h
}
func sayAutoNATPrivate(s network.Stream) {
defer s.Close()
w := protoio.NewDelimitedWriter(s)
res := pb.Message{
Type: pb.Message_DIAL_RESPONSE.Enum(),
DialResponse: newDialResponseError(pb.Message_E_DIAL_ERROR, "no dialable addresses"),
}
w.WriteMsg(&res)
}
func sayAutoNATPublic(s network.Stream) {
defer s.Close()
w := protoio.NewDelimitedWriter(s)
res := pb.Message{
Type: pb.Message_DIAL_RESPONSE.Enum(),
DialResponse: newDialResponseOK(s.Conn().RemoteMultiaddr()),
}
w.WriteMsg(&res)
}
func makeAutoNAT(ctx context.Context, t *testing.T, ash host.Host) (host.Host, AutoNAT) {
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h.Peerstore().AddAddrs(ash.ID(), ash.Addrs(), time.Minute)
h.Peerstore().AddProtocols(ash.ID(), AutoNATProto)
a, _ := New(ctx, h, WithSchedule(100*time.Millisecond, time.Second), WithoutStartupDelay())
a.(*AmbientAutoNAT).config.dialPolicy.allowSelfDials = true
a.(*AmbientAutoNAT).config.throttlePeerPeriod = 100 * time.Millisecond
return h, a
}
func identifyAsServer(server, recip host.Host) {
recip.Peerstore().AddAddrs(server.ID(), server.Addrs(), time.Minute)
recip.Peerstore().AddProtocols(server.ID(), AutoNATProto)
}
func connect(t *testing.T, a, b host.Host) {
pinfo := peer.AddrInfo{ID: a.ID(), Addrs: a.Addrs()}
err := b.Connect(context.Background(), pinfo)
if err != nil {
t.Fatal(err)
}
}
func expectEvent(t *testing.T, s event.Subscription, expected network.Reachability) {
select {
case e := <-s.Out():
ev, ok := e.(event.EvtLocalReachabilityChanged)
if !ok || ev.Reachability != expected {
t.Fatal("got wrong event type from the bus")
}
case <-time.After(100 * time.Millisecond):
t.Fatal("failed to get the reachability event from the bus")
}
}
// tests
func TestAutoNATPrivate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hs := makeAutoNATServicePrivate(ctx, t)
hc, an := makeAutoNAT(ctx, t, hs)
// subscribe to AutoNat events
s, err := hc.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{})
if err != nil {
t.Fatalf("failed to subscribe to event EvtLocalReachabilityChanged, err=%s", err)
}
status := an.Status()
if status != network.ReachabilityUnknown {
t.Fatalf("unexpected NAT status: %d", status)
}
connect(t, hs, hc)
time.Sleep(2 * time.Second)
status = an.Status()
if status != network.ReachabilityPrivate {
t.Fatalf("unexpected NAT status: %d", status)
}
expectEvent(t, s, network.ReachabilityPrivate)
}
func TestAutoNATPublic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hs := makeAutoNATServicePublic(ctx, t)
hc, an := makeAutoNAT(ctx, t, hs)
// subscribe to AutoNat events
s, err := hc.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{})
if err != nil {
t.Fatalf("failed to subscribe to event EvtLocalReachabilityChanged, err=%s", err)
}
status := an.Status()
if status != network.ReachabilityUnknown {
t.Fatalf("unexpected NAT status: %d", status)
}
connect(t, hs, hc)
time.Sleep(1500 * time.Millisecond)
status = an.Status()
if status != network.ReachabilityPublic {
t.Fatalf("unexpected NAT status: %d", status)
}
expectEvent(t, s, network.ReachabilityPublic)
}
func TestAutoNATPublictoPrivate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hs := makeAutoNATServicePublic(ctx, t)
hc, an := makeAutoNAT(ctx, t, hs)
// subscribe to AutoNat events
s, err := hc.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{})
if err != nil {
t.Fatalf("failed to subscribe to event EvtLocalRoutabilityPublic, err=%s", err)
}
status := an.Status()
if status != network.ReachabilityUnknown {
t.Fatalf("unexpected NAT status: %d", status)
}
connect(t, hs, hc)
time.Sleep(1500 * time.Millisecond)
status = an.Status()
if status != network.ReachabilityPublic {
t.Fatalf("unexpected NAT status: %d", status)
}
expectEvent(t, s, network.ReachabilityPublic)
hs.SetStreamHandler(AutoNATProto, sayAutoNATPrivate)
hps := makeAutoNATServicePrivate(ctx, t)
connect(t, hps, hc)
identifyAsServer(hps, hc)
time.Sleep(2 * time.Second)
expectEvent(t, s, network.ReachabilityPrivate)
status = an.Status()
if status != network.ReachabilityPrivate {
t.Fatalf("unexpected NAT status: %d", status)
}
}
func TestAutoNATIncomingEvents(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hs := makeAutoNATServicePrivate(ctx, t)
hc, ani := makeAutoNAT(ctx, t, hs)
an := ani.(*AmbientAutoNAT)
status := an.Status()
if status != network.ReachabilityUnknown {
t.Fatalf("unexpected NAT status: %d", status)
}
connect(t, hs, hc)
em, _ := hc.EventBus().Emitter(&event.EvtPeerIdentificationCompleted{})
em.Emit(event.EvtPeerIdentificationCompleted{Peer: hs.ID()})
time.Sleep(10 * time.Millisecond)
if an.Status() == network.ReachabilityUnknown {
t.Fatalf("Expected probe due to identification of autonat service")
}
}
func TestAutoNATObservationRecording(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hs := makeAutoNATServicePublic(ctx, t)
hc, ani := makeAutoNAT(ctx, t, hs)
an := ani.(*AmbientAutoNAT)
s, err := hc.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{})
if err != nil {
t.Fatalf("failed to subscribe to event EvtLocalRoutabilityPublic, err=%s", err)
}
// pubic observation without address should be ignored.
an.recordObservation(autoNATResult{network.ReachabilityPublic, nil})
if an.Status() != network.ReachabilityUnknown {
t.Fatalf("unexpected transition")
}
select {
case <-s.Out():
t.Fatal("not expecting a public reachability event")
default:
//expected
}
addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234")
an.recordObservation(autoNATResult{network.ReachabilityPublic, addr})
if an.Status() != network.ReachabilityPublic {
t.Fatalf("failed to transition to public.")
}
expectEvent(t, s, network.ReachabilityPublic)
// a single recording should have confidence still at 0, and transition to private quickly.
an.recordObservation(autoNATResult{network.ReachabilityPrivate, nil})
if an.Status() != network.ReachabilityPrivate {
t.Fatalf("failed to transition to private.")
}
expectEvent(t, s, network.ReachabilityPrivate)
// stronger public confidence should be harder to undo.
an.recordObservation(autoNATResult{network.ReachabilityPublic, addr})
an.recordObservation(autoNATResult{network.ReachabilityPublic, addr})
if an.Status() != network.ReachabilityPublic {
t.Fatalf("failed to transition to public.")
}
expectEvent(t, s, network.ReachabilityPublic)
an.recordObservation(autoNATResult{network.ReachabilityPrivate, nil})
if an.Status() != network.ReachabilityPublic {
t.Fatalf("too-extreme private transition.")
}
}
func TestStaticNat(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
s, _ := h.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{})
nat, err := New(ctx, h, WithReachability(network.ReachabilityPrivate))
if err != nil {
t.Fatal(err)
}
if nat.Status() != network.ReachabilityPrivate {
t.Fatalf("should be private")
}
expectEvent(t, s, network.ReachabilityPrivate)
}
package autonat
import (
"context"
"fmt"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
pb "github.com/libp2p/go-libp2p-autonat/pb"
protoio "github.com/libp2p/go-msgio/protoio"
ma "github.com/multiformats/go-multiaddr"
)
// Error wraps errors signalled by AutoNAT services
type Error struct {
Status pb.Message_ResponseStatus
Text string
}
// NewAutoNATClient creates a fresh instance of an AutoNATClient
// If addrFunc is nil, h.Addrs will be used
func NewAutoNATClient(h host.Host, addrFunc AddrFunc) Client {
if addrFunc == nil {
addrFunc = h.Addrs
}
return &client{h: h, addrFunc: addrFunc}
}
type client struct {
h host.Host
addrFunc AddrFunc
}
func (c *client) DialBack(ctx context.Context, p peer.ID) (ma.Multiaddr, error) {
s, err := c.h.NewStream(ctx, p, AutoNATProto)
if err != nil {
return nil, err
}
// Might as well just reset the stream. Once we get to this point, we
// don't care about being nice.
defer s.Close()
r := protoio.NewDelimitedReader(s, network.MessageSizeMax)
w := protoio.NewDelimitedWriter(s)
req := newDialMessage(peer.AddrInfo{ID: c.h.ID(), Addrs: c.addrFunc()})
err = w.WriteMsg(req)
if err != nil {
s.Reset()
return nil, err
}
var res pb.Message
err = r.ReadMsg(&res)
if err != nil {
s.Reset()
return nil, err
}
if res.GetType() != pb.Message_DIAL_RESPONSE {
return nil, fmt.Errorf("unexpected response: %s", res.GetType().String())
}
status := res.GetDialResponse().GetStatus()
switch status {
case pb.Message_OK:
addr := res.GetDialResponse().GetAddr()
return ma.NewMultiaddrBytes(addr)
default:
return nil, Error{Status: status, Text: res.GetDialResponse().GetStatusText()}
}
}
func (e Error) Error() string {
return fmt.Sprintf("AutoNAT error: %s (%s)", e.Text, e.Status.String())
}
// IsDialError returns true if the error was due to a dial back failure
func (e Error) IsDialError() bool {
return e.Status == pb.Message_E_DIAL_ERROR
}
// IsDialRefused returns true if the error was due to a refusal to dial back
func (e Error) IsDialRefused() bool {
return e.Status == pb.Message_E_DIAL_REFUSED
}
// IsDialError returns true if the AutoNAT peer signalled an error dialing back
func IsDialError(e error) bool {
ae, ok := e.(Error)
return ok && ae.IsDialError()
}
// IsDialRefused returns true if the AutoNAT peer signalled refusal to dial back
func IsDialRefused(e error) bool {
ae, ok := e.(Error)
return ok && ae.IsDialRefused()
}
package autonat
import (
"net"
"github.com/libp2p/go-libp2p-core/host"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
type dialPolicy struct {
allowSelfDials bool
host host.Host
}
// skipDial indicates that a multiaddress isn't worth attempted dialing.
// The same logic is used when the autonat client is considering if
// a remote peer is worth using as a server, and when the server is
// considering if a requested client is worth dialing back.
func (d *dialPolicy) skipDial(addr ma.Multiaddr) bool {
// skip relay addresses
_, err := addr.ValueForProtocol(ma.P_CIRCUIT)
if err == nil {
return true
}
if d.allowSelfDials {
return false
}
// skip private network (unroutable) addresses
if !manet.IsPublicAddr(addr) {
return true
}
candidateIP, err := manet.ToIP(addr)
if err != nil {
return true
}
// Skip dialing addresses we believe are the local node's
for _, localAddr := range d.host.Addrs() {
localIP, err := manet.ToIP(localAddr)
if err != nil {
continue
}
if localIP.Equal(candidateIP) {
return true
}
}
return false
}
// skipPeer indicates that the collection of multiaddresses representing a peer
// isn't worth attempted dialing. If one of the addresses matches an address
// we believe is ours, we exclude the peer, even if there are other valid
// public addresses in the list.
func (d *dialPolicy) skipPeer(addrs []ma.Multiaddr) bool {
localAddrs := d.host.Addrs()
localHosts := make([]net.IP, 0)
for _, lAddr := range localAddrs {
if _, err := lAddr.ValueForProtocol(ma.P_CIRCUIT); err != nil && manet.IsPublicAddr(lAddr) {
lIP, err := manet.ToIP(lAddr)
if err != nil {
continue
}
localHosts = append(localHosts, lIP)
}
}
// if a public IP of the peer is one of ours: skip the peer.
goodPublic := false
for _, addr := range addrs {
if _, err := addr.ValueForProtocol(ma.P_CIRCUIT); err != nil && manet.IsPublicAddr(addr) {
aIP, err := manet.ToIP(addr)
if err != nil {
continue
}
for _, lIP := range localHosts {
if lIP.Equal(aIP) {
return true
}
}
goodPublic = true
}
}
if d.allowSelfDials {
return false
}
return !goodPublic
}
package autonat
import (
"context"
"errors"
"net"
"testing"
blankhost "github.com/libp2p/go-libp2p-blankhost"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/transport"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
"github.com/multiformats/go-multiaddr"
)
func makeMA(a string) multiaddr.Multiaddr {
addr, err := multiaddr.NewMultiaddr(a)
if err != nil {
panic(err)
}
return addr
}
type mockT struct {
ctx context.Context
addr multiaddr.Multiaddr
}
func (m *mockT) Dial(ctx context.Context, a multiaddr.Multiaddr, p peer.ID) (transport.CapableConn, error) {
return nil, nil
}
func (m *mockT) CanDial(_ multiaddr.Multiaddr) bool { return true }
func (m *mockT) Listen(a multiaddr.Multiaddr) (transport.Listener, error) {
return &mockL{m.ctx, m.addr}, nil
}
func (m *mockT) Protocols() []int { return []int{multiaddr.P_IP4} }
func (m *mockT) Proxy() bool { return false }
func (m *mockT) String() string { return "mock-tcp-ipv4" }
type mockL struct {
ctx context.Context
addr multiaddr.Multiaddr
}
func (l *mockL) Accept() (transport.CapableConn, error) {
<-l.ctx.Done()
return nil, errors.New("expected in mocked test")
}
func (l *mockL) Close() error { return nil }
func (l *mockL) Addr() net.Addr { return nil }
func (l *mockL) Multiaddr() multiaddr.Multiaddr { return l.addr }
func TestSkipDial(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := swarmt.GenSwarm(t, ctx)
d := dialPolicy{host: blankhost.NewBlankHost(s)}
if d.skipDial(makeMA("/ip4/8.8.8.8")) != false {
t.Fatal("failed dialing a valid public addr")
}
if d.skipDial(makeMA("/ip6/2607:f8b0:400a::1")) != false {
t.Fatal("failed dialing a valid public addr")
}
if d.skipDial(makeMA("/ip4/192.168.0.1")) != true {
t.Fatal("didn't skip dialing an internal addr")
}
s.AddTransport(&mockT{ctx, makeMA("/ip4/8.8.8.8")})
err := s.AddListenAddr(makeMA("/ip4/8.8.8.8"))
if err != nil {
t.Fatal(err)
}
if d.skipDial(makeMA("/ip4/8.8.8.8")) != true {
t.Fatal("failed dialing a valid host address")
}
}
func TestSkipPeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := swarmt.GenSwarm(t, ctx)
d := dialPolicy{host: blankhost.NewBlankHost(s)}
if d.skipPeer([]multiaddr.Multiaddr{makeMA("/ip4/8.8.8.8")}) != false {
t.Fatal("failed dialing a valid public addr")
}
if d.skipPeer([]multiaddr.Multiaddr{makeMA("/ip4/8.8.8.8"), makeMA("/ip4/192.168.0.1")}) != false {
t.Fatal("failed dialing a valid public addr")
}
if d.skipPeer([]multiaddr.Multiaddr{makeMA("/ip4/192.168.0.1")}) != true {
t.Fatal("succeeded with no public addr")
}
s.AddTransport(&mockT{ctx, makeMA("/ip4/8.8.8.8")})
err := s.AddListenAddr(makeMA("/ip4/8.8.8.8"))
if err != nil {
t.Fatal(err)
}
if d.skipPeer([]multiaddr.Multiaddr{makeMA("/ip4/8.8.8.8"), makeMA("/ip4/192.168.0.1")}) != true {
t.Fatal("succeeded dialing host address")
}
if d.skipPeer([]multiaddr.Multiaddr{makeMA("/ip4/8.8.8.8"), makeMA("/ip4/9.9.9.9")}) != true {
t.Fatal("succeeded dialing host address when other public")
}
if d.skipPeer([]multiaddr.Multiaddr{makeMA("/ip4/9.9.9.9")}) != false {
t.Fatal("succeeded dialing host address when other public")
}
}
func TestSkipLocalPeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := swarmt.GenSwarm(t, ctx)
d := dialPolicy{host: blankhost.NewBlankHost(s)}
s.AddTransport(&mockT{ctx, makeMA("/ip4/192.168.0.1")})
err := s.AddListenAddr(makeMA("/ip4/192.168.0.1"))
if err != nil {
t.Fatal(err)
}
if d.skipPeer([]multiaddr.Multiaddr{makeMA("/ip4/8.8.8.8")}) != false {
t.Fatal("failed dialing a valid public addr")
}
if d.skipPeer([]multiaddr.Multiaddr{makeMA("/ip4/8.8.8.8"), makeMA("/ip4/192.168.0.1")}) != false {
t.Fatal("failed dialing a valid public addr")
}
if d.skipPeer([]multiaddr.Multiaddr{makeMA("/ip4/192.168.0.1")}) != true {
t.Fatal("succeeded with no public addr")
}
}
This diff is collapsed.
package autonat
import (
"context"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)
// AutoNAT is the interface for NAT autodiscovery
type AutoNAT interface {
// Status returns the current NAT status
Status() network.Reachability
// PublicAddr returns the public dial address when NAT status is public and an
// error otherwise
PublicAddr() (ma.Multiaddr, error)
}
// Client is a stateless client interface to AutoNAT peers
type Client interface {
// DialBack requests from a peer providing AutoNAT services to test dial back
// and report the address on a successful connection.
DialBack(ctx context.Context, p peer.ID) (ma.Multiaddr, error)
}
// AddrFunc is a function returning the candidate addresses for the local host.
type AddrFunc func() []ma.Multiaddr
// Option is an Autonat option for configuration
type Option func(*config) error
package autonat
import (
"github.com/libp2p/go-libp2p-core/network"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)
var _ network.Notifiee = (*AmbientAutoNAT)(nil)
// Listen is part of the network.Notifiee interface
func (as *AmbientAutoNAT) Listen(net network.Network, a ma.Multiaddr) {}
// ListenClose is part of the network.Notifiee interface
func (as *AmbientAutoNAT) ListenClose(net network.Network, a ma.Multiaddr) {}
// OpenedStream is part of the network.Notifiee interface
func (as *AmbientAutoNAT) OpenedStream(net network.Network, s network.Stream) {}
// ClosedStream is part of the network.Notifiee interface
func (as *AmbientAutoNAT) ClosedStream(net network.Network, s network.Stream) {}
// Connected is part of the network.Notifiee interface
func (as *AmbientAutoNAT) Connected(net network.Network, c network.Conn) {
if c.Stat().Direction == network.DirInbound &&
manet.IsPublicAddr(c.RemoteMultiaddr()) {
select {
case as.inboundConn <- c:
default:
}
}
}
// Disconnected is part of the network.Notifiee interface
func (as *AmbientAutoNAT) Disconnected(net network.Network, c network.Conn) {}
package autonat
import (
"errors"
"time"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
)
// config holds configurable options for the autonat subsystem.
type config struct {
host host.Host
addressFunc AddrFunc
dialPolicy dialPolicy
dialer network.Network
forceReachability bool
reachability network.Reachability
// client
bootDelay time.Duration
retryInterval time.Duration
refreshInterval time.Duration
requestTimeout time.Duration
throttlePeerPeriod time.Duration
// server
dialTimeout time.Duration
maxPeerAddresses int
throttleGlobalMax int
throttlePeerMax int
throttleResetPeriod time.Duration
throttleResetJitter time.Duration
}
var defaults = func(c *config) error {
c.bootDelay = 15 * time.Second
c.retryInterval = 90 * time.Second
c.refreshInterval = 15 * time.Minute
c.requestTimeout = 30 * time.Second
c.throttlePeerPeriod = 90 * time.Second
c.dialTimeout = 15 * time.Second
c.maxPeerAddresses = 16
c.throttleGlobalMax = 30
c.throttlePeerMax = 3
c.throttleResetPeriod = 1 * time.Minute
c.throttleResetJitter = 15 * time.Second
return nil
}
// EnableService specifies that AutoNAT should be allowed to run a NAT service to help
// other peers determine their own NAT status. The provided Network should not be the
// default network/dialer of the host passed to `New`, as the NAT system will need to
// make parallel connections, and as such will modify both the associated peerstore
// and terminate connections of this dialer. The dialer provided
// should be compatible (TCP/UDP) however with the transports of the libp2p network.
func EnableService(dialer network.Network) Option {
return func(c *config) error {
if dialer == c.host.Network() || dialer.Peerstore() == c.host.Peerstore() {
return errors.New("dialer should not be that of the host")
}
c.dialer = dialer
return nil
}
}
// WithReachability overrides autonat to simply report an over-ridden reachability
// status.
func WithReachability(reachability network.Reachability) Option {
return func(c *config) error {
c.forceReachability = true
c.reachability = reachability
return nil
}
}
// UsingAddresses allows overriding which Addresses the AutoNAT client believes
// are "its own". Useful for testing, or for more exotic port-forwarding
// scenarios where the host may be listening on different ports than it wants
// to externally advertise or verify connectability on.
func UsingAddresses(addrFunc AddrFunc) Option {
return func(c *config) error {
if addrFunc == nil {
return errors.New("invalid address function supplied")
}
c.addressFunc = addrFunc
return nil
}
}
// WithSchedule configures how agressively probes will be made to verify the
// address of the host. retryInterval indicates how often probes should be made
// when the host lacks confident about its address, while refresh interval
// is the schedule of periodic probes when the host believes it knows its
// steady-state reachability.
func WithSchedule(retryInterval, refreshInterval time.Duration) Option {
return func(c *config) error {
c.retryInterval = retryInterval
c.refreshInterval = refreshInterval
return nil
}
}
// WithoutStartupDelay removes the initial delay the NAT subsystem typically
// uses as a buffer for ensuring that connectivity and guesses as to the hosts
// local interfaces have settled down during startup.
func WithoutStartupDelay() Option {
return func(c *config) error {
c.bootDelay = 1
return nil
}
}
// WithoutThrottling indicates that this autonat service should not place
// restrictions on how many peers it is willing to help when acting as
// a server.
func WithoutThrottling() Option {
return func(c *config) error {
c.throttleGlobalMax = 0
return nil
}
}
// WithThrottling specifies how many peers (`amount`) it is willing to help
// ever `interval` amount of time when acting as a server.
func WithThrottling(amount int, interval time.Duration) Option {
return func(c *config) error {
c.throttleGlobalMax = amount
c.throttleResetPeriod = interval
c.throttleResetJitter = interval / 4
return nil
}
}
// WithPeerThrottling specifies a limit for the maximum number of IP checks
// this node will provide to an individual peer in each `interval`.
func WithPeerThrottling(amount int) Option {
return func(c *config) error {
c.throttlePeerMax = amount
return nil
}
}
pbgos := $(patsubst %.proto,%.pb.go,$(wildcard *.proto))
all: $(pbgos)
%.pb.go: %.proto
protoc --gogofast_out=. --proto_path=$(GOPATH)/src:. $<
This diff is collapsed.
syntax = "proto2";
package autonat.pb;
message Message {
enum MessageType {
DIAL = 0;
DIAL_RESPONSE = 1;
}
enum ResponseStatus {
OK = 0;
E_DIAL_ERROR = 100;
E_DIAL_REFUSED = 101;
E_BAD_REQUEST = 200;
E_INTERNAL_ERROR = 300;
}
message PeerInfo {
optional bytes id = 1;
repeated bytes addrs = 2;
}
message Dial {
optional PeerInfo peer = 1;
}
message DialResponse {
optional ResponseStatus status = 1;
optional string statusText = 2;
optional bytes addr = 3;
}
optional MessageType type = 1;
optional Dial dial = 2;
optional DialResponse dialResponse = 3;
}
package autonat
import (
pb "github.com/libp2p/go-libp2p-autonat/pb"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)
// AutoNATProto identifies the autonat service protocol
const AutoNATProto = "/libp2p/autonat/1.0.0"
func newDialMessage(pi peer.AddrInfo) *pb.Message {
msg := new(pb.Message)
msg.Type = pb.Message_DIAL.Enum()
msg.Dial = new(pb.Message_Dial)
msg.Dial.Peer = new(pb.Message_PeerInfo)
msg.Dial.Peer.Id = []byte(pi.ID)
msg.Dial.Peer.Addrs = make([][]byte, len(pi.Addrs))
for i, addr := range pi.Addrs {
msg.Dial.Peer.Addrs[i] = addr.Bytes()
}
return msg
}
func newDialResponseOK(addr ma.Multiaddr) *pb.Message_DialResponse {
dr := new(pb.Message_DialResponse)
dr.Status = pb.Message_OK.Enum()
dr.Addr = addr.Bytes()
return dr
}
func newDialResponseError(status pb.Message_ResponseStatus, text string) *pb.Message_DialResponse {
dr := new(pb.Message_DialResponse)
dr.Status = status.Enum()
dr.StatusText = &text
return dr
}
package autonat
import (
"context"
"errors"
"math/rand"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
pb "github.com/libp2p/go-libp2p-autonat/pb"
"github.com/libp2p/go-msgio/protoio"
ma "github.com/multiformats/go-multiaddr"
)
var streamReadTimeout = 60 * time.Second
// AutoNATService provides NAT autodetection services to other peers
type autoNATService struct {
ctx context.Context
instance context.CancelFunc
instanceLock sync.Mutex
config *config
// rate limiter
mx sync.Mutex
reqs map[peer.ID]int
globalReqs int
}
// NewAutoNATService creates a new AutoNATService instance attached to a host
func newAutoNATService(ctx context.Context, c *config) (*autoNATService, error) {
if c.dialer == nil {
return nil, errors.New("cannot create NAT service without a network")
}
as := &autoNATService{
ctx: ctx,
config: c,
reqs: make(map[peer.ID]int),
}
return as, nil
}
func (as *autoNATService) handleStream(s network.Stream) {
s.SetReadDeadline(time.Now().Add(streamReadTimeout))
defer s.Close()
pid := s.Conn().RemotePeer()
log.Debugf("New stream from %s", pid.Pretty())
r := protoio.NewDelimitedReader(s, network.MessageSizeMax)
w := protoio.NewDelimitedWriter(s)
var req pb.Message
var res pb.Message
err := r.ReadMsg(&req)
if err != nil {
log.Debugf("Error reading message from %s: %s", pid.Pretty(), err.Error())
s.Reset()
return
}
t := req.GetType()
if t != pb.Message_DIAL {
log.Debugf("Unexpected message from %s: %s (%d)", pid.Pretty(), t.String(), t)
s.Reset()
return
}
dr := as.handleDial(pid, s.Conn().RemoteMultiaddr(), req.GetDial().GetPeer())
res.Type = pb.Message_DIAL_RESPONSE.Enum()
res.DialResponse = dr
err = w.WriteMsg(&res)
if err != nil {
log.Debugf("Error writing response to %s: %s", pid.Pretty(), err.Error())
s.Reset()
return
}
}
func (as *autoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Message_PeerInfo) *pb.Message_DialResponse {
if mpi == nil {
return newDialResponseError(pb.Message_E_BAD_REQUEST, "missing peer info")
}
mpid := mpi.GetId()
if mpid != nil {
mp, err := peer.IDFromBytes(mpid)
if err != nil {
return newDialResponseError(pb.Message_E_BAD_REQUEST, "bad peer id")
}
if mp != p {
return newDialResponseError(pb.Message_E_BAD_REQUEST, "peer id mismatch")
}
}
addrs := make([]ma.Multiaddr, 0, as.config.maxPeerAddresses)
seen := make(map[string]struct{})
// Don't even try to dial peers with blocked remote addresses. In order to dial a peer, we
// need to know their public IP address, and it needs to be different from our public IP
// address.
if as.config.dialPolicy.skipDial(obsaddr) {
return newDialResponseError(pb.Message_E_DIAL_ERROR, "refusing to dial peer with blocked observed address")
}
// Determine the peer's IP address.
hostIP, _ := ma.SplitFirst(obsaddr)
switch hostIP.Protocol().Code {
case ma.P_IP4, ma.P_IP6:
default:
// This shouldn't be possible as we should skip all addresses that don't include
// public IP addresses.
return newDialResponseError(pb.Message_E_INTERNAL_ERROR, "expected an IP address")
}
// add observed addr to the list of addresses to dial
addrs = append(addrs, obsaddr)
seen[obsaddr.String()] = struct{}{}
for _, maddr := range mpi.GetAddrs() {
addr, err := ma.NewMultiaddrBytes(maddr)
if err != nil {
log.Debugf("Error parsing multiaddr: %s", err.Error())
continue
}
// For security reasons, we _only_ dial the observed IP address.
// Replace other IP addresses with the observed one so we can still try the
// requested ports/transports.
if ip, rest := ma.SplitFirst(addr); !ip.Equal(hostIP) {
// Make sure it's an IP address
switch ip.Protocol().Code {
case ma.P_IP4, ma.P_IP6:
default:
continue
}
addr = hostIP
if rest != nil {
addr = addr.Encapsulate(rest)
}
}
// Make sure we're willing to dial the rest of the address (e.g., not a circuit
// address).
if as.config.dialPolicy.skipDial(addr) {
continue
}
str := addr.String()
_, ok := seen[str]
if ok {
continue
}
addrs = append(addrs, addr)
seen[str] = struct{}{}
if len(addrs) >= as.config.maxPeerAddresses {
break
}
}
if len(addrs) == 0 {
return newDialResponseError(pb.Message_E_DIAL_ERROR, "no dialable addresses")
}
return as.doDial(peer.AddrInfo{ID: p, Addrs: addrs})
}
func (as *autoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse {
// rate limit check
as.mx.Lock()
count := as.reqs[pi.ID]
if count >= as.config.throttlePeerMax || (as.config.throttleGlobalMax > 0 &&
as.globalReqs >= as.config.throttleGlobalMax) {
as.mx.Unlock()
return newDialResponseError(pb.Message_E_DIAL_REFUSED, "too many dials")
}
as.reqs[pi.ID] = count + 1
as.globalReqs++
as.mx.Unlock()
ctx, cancel := context.WithTimeout(as.ctx, as.config.dialTimeout)
defer cancel()
as.config.dialer.Peerstore().ClearAddrs(pi.ID)
as.config.dialer.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)
conn, err := as.config.dialer.DialPeer(ctx, pi.ID)
if err != nil {
log.Debugf("error dialing %s: %s", pi.ID.Pretty(), err.Error())
// wait for the context to timeout to avoid leaking timing information
// this renders the service ineffective as a port scanner
<-ctx.Done()
return newDialResponseError(pb.Message_E_DIAL_ERROR, "dial failed")
}
ra := conn.RemoteMultiaddr()
as.config.dialer.ClosePeer(pi.ID)
return newDialResponseOK(ra)
}
// Enable the autoNAT service if it is not running.
func (as *autoNATService) Enable() {
as.instanceLock.Lock()
defer as.instanceLock.Unlock()
if as.instance != nil {
return
}
inst, cncl := context.WithCancel(as.ctx)
as.instance = cncl
go as.background(inst)
}
// Disable the autoNAT service if it is running.
func (as *autoNATService) Disable() {
as.instanceLock.Lock()
defer as.instanceLock.Unlock()
if as.instance != nil {
as.instance()
as.instance = nil
}
}
func (as *autoNATService) background(ctx context.Context) {
as.config.host.SetStreamHandler(AutoNATProto, as.handleStream)
timer := time.NewTimer(as.config.throttleResetPeriod)
defer timer.Stop()
for {
select {
case <-timer.C:
as.mx.Lock()
as.reqs = make(map[peer.ID]int)
as.globalReqs = 0
as.mx.Unlock()
jitter := rand.Float32() * float32(as.config.throttleResetJitter)
timer.Reset(as.config.throttleResetPeriod + time.Duration(int64(jitter)))
case <-ctx.Done():
as.config.host.RemoveStreamHandler(AutoNATProto)
return
}
}
}
package autonat
import (
"context"
"testing"
"time"
bhost "github.com/libp2p/go-libp2p-blankhost"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
ma "github.com/multiformats/go-multiaddr"
)
func makeAutoNATConfig(ctx context.Context, t *testing.T) *config {
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
dh := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
c := config{host: h, dialer: dh.Network()}
_ = defaults(&c)
c.forceReachability = true
c.dialPolicy.allowSelfDials = true
return &c
}
func makeAutoNATService(ctx context.Context, t *testing.T, c *config) *autoNATService {
as, err := newAutoNATService(ctx, c)
if err != nil {
t.Fatal(err)
}
as.Enable()
return as
}
func makeAutoNATClient(ctx context.Context, t *testing.T) (host.Host, Client) {
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
cli := NewAutoNATClient(h, nil)
return h, cli
}
// Note: these tests assume that the host has only private network addresses!
func TestAutoNATServiceDialError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := makeAutoNATConfig(ctx, t)
c.dialTimeout = 1 * time.Second
c.dialPolicy.allowSelfDials = false
_ = makeAutoNATService(ctx, t, c)
hc, ac := makeAutoNATClient(ctx, t)
connect(t, c.host, hc)
_, err := ac.DialBack(ctx, c.host.ID())
if err == nil {
t.Fatal("Dial back succeeded unexpectedly!")
}
if !IsDialError(err) {
t.Fatal(err)
}
}
func TestAutoNATServiceDialSuccess(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := makeAutoNATConfig(ctx, t)
_ = makeAutoNATService(ctx, t, c)
hc, ac := makeAutoNATClient(ctx, t)
connect(t, c.host, hc)
_, err := ac.DialBack(ctx, c.host.ID())
if err != nil {
t.Fatalf("Dial back failed: %s", err.Error())
}
}
func TestAutoNATServiceDialRateLimiter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := makeAutoNATConfig(ctx, t)
c.dialTimeout = 1 * time.Second
c.throttleResetPeriod = time.Second
c.throttleResetJitter = 0
c.throttlePeerMax = 1
_ = makeAutoNATService(ctx, t, c)
hc, ac := makeAutoNATClient(ctx, t)
connect(t, c.host, hc)
_, err := ac.DialBack(ctx, c.host.ID())
if err != nil {
t.Fatal(err)
}
_, err = ac.DialBack(ctx, c.host.ID())
if err == nil {
t.Fatal("Dial back succeeded unexpectedly!")
}
if !IsDialRefused(err) {
t.Fatal(err)
}
time.Sleep(2 * time.Second)
_, err = ac.DialBack(ctx, c.host.ID())
if err != nil {
t.Fatal(err)
}
}
func TestAutoNATServiceGlobalLimiter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := makeAutoNATConfig(ctx, t)
c.dialTimeout = time.Second
c.throttleResetPeriod = 10 * time.Second
c.throttleResetJitter = 0
c.throttlePeerMax = 1
c.throttleGlobalMax = 5
_ = makeAutoNATService(ctx, t, c)
hs := c.host
for i := 0; i < 5; i++ {
hc, ac := makeAutoNATClient(ctx, t)
connect(t, hs, hc)
_, err := ac.DialBack(ctx, hs.ID())
if err != nil {
t.Fatal(err)
}
}
hc, ac := makeAutoNATClient(ctx, t)
connect(t, hs, hc)
_, err := ac.DialBack(ctx, hs.ID())
if err == nil {
t.Fatal("Dial back succeeded unexpectedly!")
}
if !IsDialRefused(err) {
t.Fatal(err)
}
}
func TestAutoNATServiceRateLimitJitter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
c := makeAutoNATConfig(ctx, t)
c.throttleResetPeriod = 100 * time.Millisecond
c.throttleResetJitter = 100 * time.Millisecond
c.throttleGlobalMax = 1
svc := makeAutoNATService(ctx, t, c)
svc.mx.Lock()
svc.globalReqs = 1
svc.mx.Unlock()
time.Sleep(200 * time.Millisecond)
svc.mx.Lock()
defer svc.mx.Unlock()
if svc.globalReqs != 0 {
t.Fatal("reset of rate limitter occured slower than expected")
}
cancel()
}
func TestAutoNATServiceStartup(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
dh := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
an, err := New(ctx, h, EnableService(dh.Network()))
an.(*AmbientAutoNAT).config.dialPolicy.allowSelfDials = true
if err != nil {
t.Fatal(err)
}
hc, ac := makeAutoNATClient(ctx, t)
connect(t, h, hc)
_, err = ac.DialBack(ctx, h.ID())
if err != nil {
t.Fatal("autonat service be active in unknown mode.")
}
sub, _ := h.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged))
anc := an.(*AmbientAutoNAT)
anc.recordObservation(autoNATResult{Reachability: network.ReachabilityPublic, address: ma.StringCast("/ip4/127.0.0.1/tcp/1234")})
<-sub.Out()
_, err = ac.DialBack(ctx, h.ID())
if err != nil {
t.Fatalf("autonat should be active, was %v", err)
}
if an.Status() != network.ReachabilityPublic {
t.Fatalf("autonat should report public, but didn't")
}
}
package autonat_test
import (
"context"
"testing"
"time"
"github.com/libp2p/go-libp2p"
autonat "github.com/libp2p/go-libp2p-autonat"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/network"
)
// This separate testing package helps to resolve a circular dependency potentially
// being created between libp2p and libp2p-autonat
func TestAutonatRoundtrip(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 3 hosts are used: [client] and [service + dialback dialer]
client, err := libp2p.New(ctx, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"), libp2p.EnableAutoNAT())
if err != nil {
t.Fatal(err)
}
service, err := libp2p.New(ctx, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
if err != nil {
t.Fatal(err)
}
dialback, err := libp2p.New(ctx, libp2p.NoListenAddrs)
if err != nil {
t.Fatal(err)
}
_, err = autonat.New(ctx, service, autonat.EnableService(dialback.Network(), true))
if err != nil {
t.Fatal(err)
}
client.Peerstore().AddAddrs(service.ID(), service.Addrs(), time.Hour)
err = client.Connect(ctx, service.Peerstore().PeerInfo(service.ID()))
if err != nil {
t.Fatal(err)
}
cSub, err := client.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged))
if err != nil {
t.Fatal(err)
}
defer cSub.Close()
select {
case stat := <-cSub.Out():
if stat == network.ReachabilityUnknown {
t.Fatalf("After status update, client did not know its status")
}
case <-time.After(30 * time.Second):
t.Fatal("sub timed out.")
}
}
module github.com/libp2p/go-libp2p-autonat/test
replace github.com/libp2p/go-libp2p-autonat => ../
require (
github.com/libp2p/go-conn-security v0.1.0 // indirect
github.com/libp2p/go-libp2p v0.6.1-0.20200317201052-dd87382dd436
github.com/libp2p/go-libp2p-autonat v0.1.2-0.20200317183318-4b2cc5830d44
github.com/libp2p/go-libp2p-core v0.5.0
github.com/libp2p/go-libp2p-host v0.1.0 // indirect
github.com/libp2p/go-libp2p-interface-connmgr v0.1.0 // indirect
github.com/libp2p/go-libp2p-interface-pnet v0.1.0 // indirect
github.com/libp2p/go-libp2p-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-net v0.1.0 // indirect
github.com/libp2p/go-libp2p-protocol v0.1.0 // indirect
github.com/libp2p/go-libp2p-transport v0.1.0 // indirect
github.com/whyrusleeping/go-smux-multiplex v3.0.16+incompatible // indirect
github.com/whyrusleeping/go-smux-multistream v2.0.2+incompatible // indirect
github.com/whyrusleeping/go-smux-yamux v2.0.9+incompatible // indirect
github.com/whyrusleeping/yamux v1.2.0 // indirect
)
go 1.13
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment