Unverified Commit 0f50f0c5 authored by Will Scott's avatar Will Scott

Merge branch 'service' into feat/merged

parents 62f42d93 f2136000
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
......
......@@ -27,5 +27,3 @@ This repository falls under the IPFS [Code of Conduct](https://github.com/ipfs/c
MIT
---
The last gx published version of this module was: 1.0.14: QmNnJhrc4ZtcPZg3oXDVdJ7HRtxDfLE2ccH56cZZjZ6y3p
......@@ -2,14 +2,33 @@ module github.com/libp2p/go-libp2p-autonat
require (
github.com/gogo/protobuf v1.3.1
github.com/ipfs/go-log v0.0.1
github.com/ipfs/go-ipfs-util v0.0.1 // indirect
github.com/ipfs/go-log v1.0.2
github.com/libp2p/go-conn-security v0.1.0 // indirect
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p v6.0.23+incompatible
github.com/libp2p/go-libp2p-blankhost v0.1.4
github.com/libp2p/go-libp2p-circuit v0.1.4 // indirect
github.com/libp2p/go-libp2p-core v0.4.0
github.com/libp2p/go-libp2p-host v0.1.0 // indirect
github.com/libp2p/go-libp2p-interface-connmgr v0.1.0 // indirect
github.com/libp2p/go-libp2p-interface-pnet v0.1.0 // indirect
github.com/libp2p/go-libp2p-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-nat v0.0.5 // indirect
github.com/libp2p/go-libp2p-net v0.1.0 // indirect
github.com/libp2p/go-libp2p-peer v0.2.0 // indirect
github.com/libp2p/go-libp2p-peerstore v0.1.4 // indirect
github.com/libp2p/go-libp2p-protocol v0.1.0 // indirect
github.com/libp2p/go-libp2p-swarm v0.2.2
github.com/libp2p/go-libp2p-transport v0.1.0 // indirect
github.com/libp2p/go-ws-transport v0.2.0 // indirect
github.com/multiformats/go-multiaddr v0.2.1
github.com/multiformats/go-multiaddr-net v0.1.3
github.com/whyrusleeping/go-smux-multiplex v3.0.16+incompatible // indirect
github.com/whyrusleeping/go-smux-multistream v2.0.2+incompatible // indirect
github.com/whyrusleeping/go-smux-yamux v2.0.9+incompatible // indirect
github.com/whyrusleeping/yamux v1.2.0 // indirect
google.golang.org/appengine v1.4.0
)
go 1.12
go 1.13
This diff is collapsed.
......@@ -6,6 +6,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
logging "github.com/ipfs/go-log"
ma "github.com/multiformats/go-multiaddr"
)
const AutoNATProto = "/libp2p/autonat/1.0.0"
......@@ -24,4 +25,18 @@ func newDialMessage(pi peer.AddrInfo) *pb.Message {
}
return msg
)
func newDialResponseOK(addr ma.Multiaddr) *pb.Message_DialResponse {
dr := new(pb.Message_DialResponse)
dr.Status = pb.Message_OK.Enum()
dr.Addr = addr.Bytes()
return dr
}
func newDialResponseError(status pb.Message_ResponseStatus, text string) *pb.Message_DialResponse {
dr := new(pb.Message_DialResponse)
dr.Status = status.Enum()
dr.StatusText = &text
return dr
}
package autonat
import (
"context"
"math/rand"
"net"
"sync"
"time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
pb "github.com/libp2p/go-libp2p-autonat/pb"
ggio "github.com/gogo/protobuf/io"
autonat "github.com/libp2p/go-libp2p-autonat"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)
const P_CIRCUIT = 290
var (
// AutoNATServiceDialTimeout defines how long to wait for connection
// attempts before failing.
AutoNATServiceDialTimeout = 15 * time.Second
// AutoNATServiceResetInterval defines how often to reset throttling.
AutoNATServiceResetInterval = 1 * time.Minute
// AutoNATServiceResetJitter defines the amplitude of randomness in throttle
// reset timing.
AutoNATServiceResetJitter = 15 * time.Second
// AutoNATServiceThrottle defines how many times each ResetInterval a peer
// can ask for its autonat address.
AutoNATServiceThrottle = 3
// AutoNATGlobalThrottle defines how many total autonat requests this
// service will answer each ResetInterval.
AutoNATGlobalThrottle = 30
// AutoNATMaxPeerAddresses defines maximum number of addreses the autonat
// service will consider when attempting to connect to the peer.
AutoNATMaxPeerAddresses = 16
)
// AutoNATService provides NAT autodetection services to other peers
type AutoNATService struct {
ctx context.Context
h host.Host
dialer host.Host
// rate limiter
mx sync.Mutex
reqs map[peer.ID]int
globalReqMax int
globalReqs int
}
// NewAutoNATService creates a new AutoNATService instance attached to a host
func NewAutoNATService(ctx context.Context, h host.Host, forceEnabled bool, opts ...libp2p.Option) (*AutoNATService, error) {
opts = append(opts, libp2p.NoListenAddrs)
dialer, err := libp2p.New(ctx, opts...)
if err != nil {
return nil, err
}
as := &AutoNATService{
ctx: ctx,
h: h,
dialer: dialer,
globalReqMax: AutoNATGlobalThrottle,
reqs: make(map[peer.ID]int),
}
if forceEnabled {
as.globalReqMax = 0
h.SetStreamHandler(autonat.AutoNATProto, as.handleStream)
go as.resetRateLimiter()
} else {
go as.enableWhenPublic()
}
return as, nil
}
func (as *AutoNATService) handleStream(s network.Stream) {
defer helpers.FullClose(s)
pid := s.Conn().RemotePeer()
log.Debugf("New stream from %s", pid.Pretty())
r := ggio.NewDelimitedReader(s, network.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
var req pb.Message
var res pb.Message
err := r.ReadMsg(&req)
if err != nil {
log.Debugf("Error reading message from %s: %s", pid.Pretty(), err.Error())
s.Reset()
return
}
t := req.GetType()
if t != pb.Message_DIAL {
log.Debugf("Unexpected message from %s: %s (%d)", pid.Pretty(), t.String(), t)
s.Reset()
return
}
dr := as.handleDial(pid, s.Conn().RemoteMultiaddr(), req.GetDial().GetPeer())
res.Type = pb.Message_DIAL_RESPONSE.Enum()
res.DialResponse = dr
err = w.WriteMsg(&res)
if err != nil {
log.Debugf("Error writing response to %s: %s", pid.Pretty(), err.Error())
s.Reset()
return
}
}
func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Message_PeerInfo) *pb.Message_DialResponse {
if mpi == nil {
return newDialResponseError(pb.Message_E_BAD_REQUEST, "missing peer info")
}
mpid := mpi.GetId()
if mpid != nil {
mp, err := peer.IDFromBytes(mpid)
if err != nil {
return newDialResponseError(pb.Message_E_BAD_REQUEST, "bad peer id")
}
if mp != p {
return newDialResponseError(pb.Message_E_BAD_REQUEST, "peer id mismatch")
}
}
addrs := make([]ma.Multiaddr, 0, AutoNATMaxPeerAddresses)
seen := make(map[string]struct{})
// add observed addr to the list of addresses to dial
var obsHost net.IP
if !as.skipDial(obsaddr) {
addrs = append(addrs, obsaddr)
seen[obsaddr.String()] = struct{}{}
obsHost, _ = manet.ToIP(obsaddr)
}
for _, maddr := range mpi.GetAddrs() {
addr, err := ma.NewMultiaddrBytes(maddr)
if err != nil {
log.Debugf("Error parsing multiaddr: %s", err.Error())
continue
}
if as.skipDial(addr) {
continue
}
if ip, err := manet.ToIP(addr); err != nil || !obsHost.Equal(ip) {
continue
}
str := addr.String()
_, ok := seen[str]
if ok {
continue
}
addrs = append(addrs, addr)
seen[str] = struct{}{}
if len(addrs) >= AutoNATMaxPeerAddresses {
break
}
}
if len(addrs) == 0 {
return newDialResponseError(pb.Message_E_DIAL_ERROR, "no dialable addresses")
}
return as.doDial(peer.AddrInfo{ID: p, Addrs: addrs})
}
func (as *AutoNATService) skipDial(addr ma.Multiaddr) bool {
// skip relay addresses
_, err := addr.ValueForProtocol(P_CIRCUIT)
if err == nil {
return true
}
// skip private network (unroutable) addresses
if !manet.IsPublicAddr(addr) {
return true
}
// Skip dialing addresses we believe are the local node's
for _, localAddr := range as.h.Addrs() {
if localAddr.Equal(addr) {
return true
}
}
return false
}
func (as *AutoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse {
// rate limit check
as.mx.Lock()
count := as.reqs[pi.ID]
if count >= AutoNATServiceThrottle || (as.globalReqMax > 0 && as.globalReqs >= as.globalReqMax) {
as.mx.Unlock()
return newDialResponseError(pb.Message_E_DIAL_REFUSED, "too many dials")
}
as.reqs[pi.ID] = count + 1
as.globalReqs++
as.mx.Unlock()
ctx, cancel := context.WithTimeout(as.ctx, AutoNATServiceDialTimeout)
defer cancel()
as.dialer.Peerstore().ClearAddrs(pi.ID)
as.dialer.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)
conn, err := as.dialer.Network().DialPeer(ctx, pi.ID)
if err != nil {
log.Debugf("error dialing %s: %s", pi.ID.Pretty(), err.Error())
// wait for the context to timeout to avoid leaking timing information
// this renders the service ineffective as a port scanner
<-ctx.Done()
return newDialResponseError(pb.Message_E_DIAL_ERROR, "dial failed")
}
ra := conn.RemoteMultiaddr()
as.dialer.Network().ClosePeer(pi.ID)
return newDialResponseOK(ra)
}
func (as *AutoNATService) enableWhenPublic() {
sub, _ := as.h.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{})
defer sub.Close()
running := false
for {
select {
case ev, ok := <-sub.Out():
if !ok {
return
}
state := ev.(event.EvtLocalReachabilityChanged).Reachability
if state == network.ReachabilityPublic {
as.h.SetStreamHandler(autonat.AutoNATProto, as.handleStream)
if !running {
go as.resetRateLimiter()
running = true
}
} else {
as.h.RemoveStreamHandler(autonat.AutoNATProto)
}
case <-as.ctx.Done():
return
}
}
}
func (as *AutoNATService) resetRateLimiter() {
timer := time.NewTimer(AutoNATServiceResetInterval)
defer timer.Stop()
for {
select {
case <-timer.C:
as.mx.Lock()
as.reqs = make(map[peer.ID]int)
as.globalReqs = 0
as.mx.Unlock()
jitter := rand.Float32() * float32(AutoNATServiceResetJitter)
timer.Reset(AutoNATServiceResetInterval + time.Duration(int64(jitter)))
case <-as.ctx.Done():
return
}
}
}
package autonat
import (
"context"
"net"
"testing"
"time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
autonat "github.com/libp2p/go-libp2p-autonat"
manet "github.com/multiformats/go-multiaddr-net"
)
func makeAutoNATService(ctx context.Context, t *testing.T) (host.Host, *AutoNATService) {
h, err := libp2p.New(ctx, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
if err != nil {
t.Fatal(err)
}
as, err := NewAutoNATService(ctx, h, true)
if err != nil {
t.Fatal(err)
}
return h, as
}
func makeAutoNATClient(ctx context.Context, t *testing.T) (host.Host, autonat.AutoNATClient) {
h, err := libp2p.New(ctx, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
if err != nil {
t.Fatal(err)
}
cli := autonat.NewAutoNATClient(h, nil)
return h, cli
}
func connect(t *testing.T, a, b host.Host) {
pinfo := peer.AddrInfo{ID: a.ID(), Addrs: a.Addrs()}
err := b.Connect(context.Background(), pinfo)
if err != nil {
t.Fatal(err)
}
}
// Note: these tests assume that the host has only private network addresses!
func TestAutoNATServiceDialError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
save := AutoNATServiceDialTimeout
AutoNATServiceDialTimeout = 1 * time.Second
hs, _ := makeAutoNATService(ctx, t)
hc, ac := makeAutoNATClient(ctx, t)
connect(t, hs, hc)
_, err := ac.DialBack(ctx, hs.ID())
if err == nil {
t.Fatal("Dial back succeeded unexpectedly!")
}
if !autonat.IsDialError(err) {
t.Fatal(err)
}
AutoNATServiceDialTimeout = save
}
func TestAutoNATServiceDialSuccess(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
save := manet.Private4
manet.Private4 = []*net.IPNet{}
hs, _ := makeAutoNATService(ctx, t)
hc, ac := makeAutoNATClient(ctx, t)
connect(t, hs, hc)
_, err := ac.DialBack(ctx, hs.ID())
if err != nil {
t.Fatalf("Dial back failed: %s", err.Error())
}
manet.Private4 = save
}
func TestAutoNATServiceDialRateLimiter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
save1 := AutoNATServiceDialTimeout
AutoNATServiceDialTimeout = 1 * time.Second
save2 := AutoNATServiceResetInterval
AutoNATServiceResetInterval = 1 * time.Second
save3 := AutoNATServiceThrottle
AutoNATServiceThrottle = 1
save4 := manet.Private4
manet.Private4 = []*net.IPNet{}
save5 := AutoNATServiceResetJitter
AutoNATServiceResetJitter = 0 * time.Second
hs, _ := makeAutoNATService(ctx, t)
hc, ac := makeAutoNATClient(ctx, t)
connect(t, hs, hc)
_, err := ac.DialBack(ctx, hs.ID())
if err != nil {
t.Fatal(err)
}
_, err = ac.DialBack(ctx, hs.ID())
if err == nil {
t.Fatal("Dial back succeeded unexpectedly!")
}
if !autonat.IsDialRefused(err) {
t.Fatal(err)
}
time.Sleep(2 * time.Second)
_, err = ac.DialBack(ctx, hs.ID())
if err != nil {
t.Fatal(err)
}
AutoNATServiceDialTimeout = save1
AutoNATServiceResetInterval = save2
AutoNATServiceThrottle = save3
manet.Private4 = save4
AutoNATServiceResetJitter = save5
}
func TestAutoNATServiceGlobalLimiter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
save1 := AutoNATServiceDialTimeout
AutoNATServiceDialTimeout = 1 * time.Second
save2 := AutoNATServiceResetInterval
AutoNATServiceResetInterval = 10 * time.Second
save3 := AutoNATServiceThrottle
AutoNATServiceThrottle = 1
save4 := manet.Private4
manet.Private4 = []*net.IPNet{}
save5 := AutoNATServiceResetJitter
AutoNATServiceResetJitter = 0 * time.Second
save6 := AutoNATGlobalThrottle
AutoNATGlobalThrottle = 5
hs, as := makeAutoNATService(ctx, t)
as.globalReqMax = 5
for i := 0; i < 5; i++ {
hc, ac := makeAutoNATClient(ctx, t)
connect(t, hs, hc)
_, err := ac.DialBack(ctx, hs.ID())
if err != nil {
t.Fatal(err)
}
}
hc, ac := makeAutoNATClient(ctx, t)
connect(t, hs, hc)
_, err := ac.DialBack(ctx, hs.ID())
if err == nil {
t.Fatal("Dial back succeeded unexpectedly!")
}
if !autonat.IsDialRefused(err) {
t.Fatal(err)
}
AutoNATServiceDialTimeout = save1
AutoNATServiceResetInterval = save2
AutoNATServiceThrottle = save3
manet.Private4 = save4
AutoNATServiceResetJitter = save5
AutoNATGlobalThrottle = save6
}
func TestAutoNATServiceRateLimitJitter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
save1 := AutoNATServiceResetInterval
AutoNATServiceResetInterval = 100 * time.Millisecond
save2 := AutoNATServiceResetJitter
AutoNATServiceResetJitter = 100 * time.Millisecond
_, svc := makeAutoNATService(ctx, t)
svc.mx.Lock()
svc.globalReqs = 1
svc.mx.Unlock()
time.Sleep(200 * time.Millisecond)
svc.mx.Lock()
defer svc.mx.Unlock()
if svc.globalReqs != 0 {
t.Fatal("reset of rate limitter occured slower than expected")
}
cancel()
AutoNATServiceResetInterval = save1
AutoNATServiceResetJitter = save2
}
func TestAutoNATServiceStartup(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
save := manet.Private4
manet.Private4 = []*net.IPNet{}
h, err := libp2p.New(ctx, libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
if err != nil {
t.Fatal(err)
}
_, err = NewAutoNATService(ctx, h, false)
if err != nil {
t.Fatal(err)
}
eb, _ := h.EventBus().Emitter(new(event.EvtLocalReachabilityChanged))
hc, ac := makeAutoNATClient(ctx, t)
connect(t, h, hc)
_, err = ac.DialBack(ctx, h.ID())
if err == nil {
t.Fatal("autonat should not be started / advertising.")
}
eb.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPublic})
_, err = ac.DialBack(ctx, h.ID())
if err != nil {
t.Fatalf("autonat should be active, was %v", err)
}
eb.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPrivate})
_, err = ac.DialBack(ctx, h.ID())
if err == nil {
t.Fatal("autonat should not be started / advertising.")
}
manet.Private4 = save
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment