Unverified Commit fbfe3824 authored by Aarsh Shah's avatar Aarsh Shah Committed by GitHub

Rank Dial addresses (#212)

* Rank dial addresss.
parent 34b2b471
This diff is collapsed.
......@@ -10,7 +10,6 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/transport"
addrutil "github.com/libp2p/go-addr-util"
ma "github.com/multiformats/go-multiaddr"
)
......@@ -43,9 +42,10 @@ func (dj *dialJob) dialTimeout() time.Duration {
type dialLimiter struct {
lk sync.Mutex
fdConsuming int
fdLimit int
waitingOnFd []*dialJob
isFdConsumingFnc isFdConsumingFnc
fdConsuming int
fdLimit int
waitingOnFd []*dialJob
dialFunc dialfunc
......@@ -55,19 +55,21 @@ type dialLimiter struct {
}
type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (transport.CapableConn, error)
type isFdConsumingFnc func(ma.Multiaddr) bool
func newDialLimiter(df dialfunc) *dialLimiter {
func newDialLimiter(df dialfunc, fdFnc isFdConsumingFnc) *dialLimiter {
fd := ConcurrentFdDials
if env := os.Getenv("LIBP2P_SWARM_FD_LIMIT"); env != "" {
if n, err := strconv.ParseInt(env, 10, 32); err == nil {
fd = int(n)
}
}
return newDialLimiterWithParams(df, fd, DefaultPerPeerRateLimit)
return newDialLimiterWithParams(fdFnc, df, fd, DefaultPerPeerRateLimit)
}
func newDialLimiterWithParams(df dialfunc, fdLimit, perPeerLimit int) *dialLimiter {
func newDialLimiterWithParams(isFdConsumingFnc isFdConsumingFnc, df dialfunc, fdLimit, perPeerLimit int) *dialLimiter {
return &dialLimiter{
isFdConsumingFnc: isFdConsumingFnc,
fdLimit: fdLimit,
perPeerLimit: perPeerLimit,
waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
......@@ -140,16 +142,26 @@ func (dl *dialLimiter) freePeerToken(dj *dialJob) {
func (dl *dialLimiter) finishedDial(dj *dialJob) {
dl.lk.Lock()
defer dl.lk.Unlock()
if addrutil.IsFDCostlyTransport(dj.addr) {
if dl.shouldConsumeFd(dj.addr) {
dl.freeFDToken()
}
dl.freePeerToken(dj)
}
func (dl *dialLimiter) shouldConsumeFd(addr ma.Multiaddr) bool {
// we don't consume FD's for relay addresses for now as they will be consumed when the Relay Transport
// actually dials the Relay server. That dial call will also pass through this limiter with
// the address of the relay server i.e. non-relay address.
_, err := addr.ValueForProtocol(ma.P_CIRCUIT)
isRelay := err == nil
return !isRelay && dl.isFdConsumingFnc(addr)
}
func (dl *dialLimiter) addCheckFdLimit(dj *dialJob) {
if addrutil.IsFDCostlyTransport(dj.addr) {
if dl.shouldConsumeFd(dj.addr) {
if dl.fdConsuming >= dl.fdLimit {
log.Debugf("[limiter] blocked dial waiting on FD token; peer: %s; addr: %s; consuming: %d; "+
"limit: %d; waiting: %d", dj.peer, dj.addr, dl.fdConsuming, dl.fdLimit, len(dl.waitingOnFd))
......
......@@ -10,11 +10,26 @@ import (
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/test"
"github.com/libp2p/go-libp2p-core/transport"
ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
)
var isFdConsuming = func(addr ma.Multiaddr) bool {
res := false
ma.ForEach(addr, func(c ma.Component) bool {
if c.Protocol().Code == ma.P_TCP {
res = true
return false
}
return true
})
return res
}
func mustAddr(t *testing.T, s string) ma.Multiaddr {
a, err := ma.NewMultiaddr(s)
if err != nil {
......@@ -61,6 +76,11 @@ func hangDialFunc(hang chan struct{}) dialfunc {
return transport.CapableConn(nil), nil
}
_, err := a.ValueForProtocol(ma.P_CIRCUIT)
if err == nil {
return transport.CapableConn(nil), nil
}
if tcpPortOver(a, 10) {
return transport.CapableConn(nil), nil
}
......@@ -74,7 +94,7 @@ func TestLimiterBasicDials(t *testing.T) {
hang := make(chan struct{})
defer close(hang)
l := newDialLimiterWithParams(hangDialFunc(hang), ConcurrentFdDials, 4)
l := newDialLimiterWithParams(isFdConsuming, hangDialFunc(hang), ConcurrentFdDials, 4)
bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
good := addrWithPort(t, 20)
......@@ -123,7 +143,7 @@ func TestLimiterBasicDials(t *testing.T) {
func TestFDLimiting(t *testing.T) {
hang := make(chan struct{})
defer close(hang)
l := newDialLimiterWithParams(hangDialFunc(hang), 16, 5)
l := newDialLimiterWithParams(isFdConsuming, hangDialFunc(hang), 16, 5)
bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
pids := []peer.ID{"testpeer1", "testpeer2", "testpeer3", "testpeer4"}
......@@ -168,6 +188,21 @@ func TestFDLimiting(t *testing.T) {
case <-time.After(time.Second * 5):
t.Fatal("timeout waiting for utp addr success")
}
// A relay address with tcp transport will complete because we do not consume fds for dials
// with relay addresses as the fd will be consumed when we actually dial the relay server.
pid6 := test.RandPeerIDFatal(t)
relayAddr := mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/20/p2p-circuit/p2p/%s", pid6))
l.AddDialJob(&dialJob{ctx: ctx, peer: pid6, addr: relayAddr, resp: resch})
select {
case res := <-resch:
if res.Err != nil {
t.Fatal("should have gotten successful response")
}
case <-time.After(time.Second * 5):
t.Fatal("timeout waiting for relay addr success")
}
}
func TestTokenRedistribution(t *testing.T) {
......@@ -184,7 +219,7 @@ func TestTokenRedistribution(t *testing.T) {
<-ch
return nil, fmt.Errorf("test bad dial")
}
l := newDialLimiterWithParams(df, 8, 4)
l := newDialLimiterWithParams(isFdConsuming, df, 8, 4)
bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
pids := []peer.ID{"testpeer1", "testpeer2"}
......@@ -277,7 +312,7 @@ func TestStressLimiter(t *testing.T) {
return nil, fmt.Errorf("test bad dial")
}
l := newDialLimiterWithParams(df, 20, 5)
l := newDialLimiterWithParams(isFdConsuming, df, 20, 5)
var bads []ma.Multiaddr
for i := 0; i < 100; i++ {
......@@ -337,7 +372,7 @@ func TestFDLimitUnderflow(t *testing.T) {
return nil, fmt.Errorf("df timed out")
}
l := newDialLimiterWithParams(df, 20, 3)
l := newDialLimiterWithParams(isFdConsuming, df, 20, 3)
var addrs []ma.Multiaddr
for i := 0; i <= 1000; i++ {
......
......@@ -119,7 +119,7 @@ func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc
}
s.dsync = NewDialSync(s.doDial)
s.limiter = newDialLimiter(s.dialAddr)
s.limiter = newDialLimiter(s.dialAddr, s.IsFdConsumingAddr)
s.proc = goprocessctx.WithContext(ctx)
s.ctx = goprocessctx.OnClosingContext(s.proc)
s.backf.init(s.ctx)
......
......@@ -10,11 +10,13 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/transport"
addrutil "github.com/libp2p/go-addr-util"
lgbl "github.com/libp2p/go-libp2p-loggables"
logging "github.com/ipfs/go-log"
addrutil "github.com/libp2p/go-addr-util"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)
// Diagram of dial sync:
......@@ -337,13 +339,6 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
}
//////
/*
This slice-to-chan code is temporary, the peerstore can currently provide
a channel as an interface for receiving addresses, but more thought
needs to be put into the execution. For now, this allows us to use
the improved rate limiter, while maintaining the outward behaviour
that we previously had (halting a dial when we run out of addrs)
*/
peerAddrs := s.peers.Addrs(p)
if len(peerAddrs) == 0 {
return nil, &DialError{Peer: p, Cause: ErrNoAddresses}
......@@ -352,23 +347,60 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
if len(goodAddrs) == 0 {
return nil, &DialError{Peer: p, Cause: ErrNoGoodAddresses}
}
goodAddrsChan := make(chan ma.Multiaddr, len(goodAddrs))
nonBackoff := false
/////// Check backoff andnRank addresses
var nonBackoff bool
for _, a := range goodAddrs {
// skip addresses in back-off
if !s.backf.Backoff(p, a) {
nonBackoff = true
goodAddrsChan <- a
}
}
close(goodAddrsChan)
if !nonBackoff {
return nil, ErrDialBackoff
}
/////////
// try to get a connection to any addr
connC, dialErr := s.dialAddrs(ctx, p, goodAddrsChan)
// ranks addresses in descending order of preference for dialing
// Private UDP > Public UDP > Private TCP > Public TCP > UDP Relay server > TCP Relay server
rankAddrsFnc := func(addrs []ma.Multiaddr) []ma.Multiaddr {
var localUdpAddrs []ma.Multiaddr // private udp
var relayUdpAddrs []ma.Multiaddr // relay udp
var othersUdp []ma.Multiaddr // public udp
var localFdAddrs []ma.Multiaddr // private fd consuming
var relayFdAddrs []ma.Multiaddr // relay fd consuming
var othersFd []ma.Multiaddr // public fd consuming
for _, a := range addrs {
if _, err := a.ValueForProtocol(ma.P_CIRCUIT); err == nil {
if s.IsFdConsumingAddr(a) {
relayFdAddrs = append(relayFdAddrs, a)
continue
}
relayUdpAddrs = append(relayUdpAddrs, a)
} else if manet.IsPrivateAddr(a) {
if s.IsFdConsumingAddr(a) {
localFdAddrs = append(localFdAddrs, a)
continue
}
localUdpAddrs = append(localUdpAddrs, a)
} else {
if s.IsFdConsumingAddr(a) {
othersFd = append(othersFd, a)
continue
}
othersUdp = append(othersUdp, a)
}
}
relays := append(relayUdpAddrs, relayFdAddrs...)
fds := append(localFdAddrs, othersFd...)
return append(append(append(localUdpAddrs, othersUdp...), fds...), relays...)
}
connC, dialErr := s.dialAddrs(ctx, p, rankAddrsFnc(goodAddrs))
if dialErr != nil {
logdial["error"] = dialErr.Cause.Error()
switch dialErr.Cause {
......@@ -424,7 +456,23 @@ func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) []ma.Mul
)
}
func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.Multiaddr) (transport.CapableConn, *DialError) {
func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (transport.CapableConn, *DialError) {
/*
This slice-to-chan code is temporary, the peerstore can currently provide
a channel as an interface for receiving addresses, but more thought
needs to be put into the execution. For now, this allows us to use
the improved rate limiter, while maintaining the outward behaviour
that we previously had (halting a dial when we run out of addrs)
*/
var remoteAddrChan chan ma.Multiaddr
if len(remoteAddrs) > 0 {
remoteAddrChan = make(chan ma.Multiaddr, len(remoteAddrs))
for i := range remoteAddrs {
remoteAddrChan <- remoteAddrs[i]
}
close(remoteAddrChan)
}
log.Debugf("%s swarm dialing %s", s.local, p)
ctx, cancel := context.WithCancel(ctx)
......@@ -438,7 +486,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.
var active int
dialLoop:
for remoteAddrs != nil || active > 0 {
for remoteAddrChan != nil || active > 0 {
// Check for context cancellations and/or responses first.
select {
case <-ctx.Done():
......@@ -464,9 +512,9 @@ dialLoop:
// Now, attempt to dial.
select {
case addr, ok := <-remoteAddrs:
case addr, ok := <-remoteAddrChan:
if !ok {
remoteAddrs = nil
remoteAddrChan = nil
continue
}
......@@ -540,3 +588,24 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (tra
// success! we got one!
return connC, nil
}
// TODO We should have a `IsFdConsuming() bool` method on the `Transport` interface in go-libp2p-core/transport.
// This function checks if any of the transport protocols in the address requires a file descriptor.
// For now:
// A Non-circuit address which has the TCP/UNIX protocol is deemed FD consuming.
// For a circuit-relay address, we look at the address of the relay server/proxy
// and use the same logic as above to decide.
func (s *Swarm) IsFdConsumingAddr(addr ma.Multiaddr) bool {
first, _ := ma.SplitFunc(addr, func(c ma.Component) bool {
return c.Protocol().Code == ma.P_CIRCUIT
})
// for safety
if first == nil {
return true
}
_, err1 := first.ValueForProtocol(ma.P_TCP)
_, err2 := first.ValueForProtocol(ma.P_UNIX)
return err1 == nil || err2 == nil
}
......@@ -13,7 +13,10 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/test"
circuit "github.com/libp2p/go-libp2p-circuit"
qc "github.com/libp2p/go-libp2p-quic-transport"
. "github.com/libp2p/go-libp2p-swarm"
. "github.com/libp2p/go-libp2p-swarm/testing"
......@@ -383,6 +386,57 @@ func TestConnectionGating(t *testing.T) {
}
}
func TestIsFdConsuming(t *testing.T) {
tcs := map[string]struct {
addr string
isFdConsuming bool
}{
"tcp": {
addr: "/ip4/127.0.0.1/tcp/20",
isFdConsuming: true,
},
"quic": {
addr: "/ip4/127.0.0.1/udp/0/quic",
isFdConsuming: false,
},
"addr-without-registered-transport": {
addr: "/ip4/127.0.0.1/tcp/20/ws",
isFdConsuming: true,
},
"relay-tcp": {
addr: fmt.Sprintf("/ip4/127.0.0.1/tcp/20/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)),
isFdConsuming: true,
},
"relay-quic": {
addr: fmt.Sprintf("/ip4/127.0.0.1/udp/20/quic/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)),
isFdConsuming: false,
},
"relay-without-serveraddr": {
addr: fmt.Sprintf("/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)),
isFdConsuming: true,
},
"relay-without-registered-transport-server": {
addr: fmt.Sprintf("/ip4/127.0.0.1/tcp/20/ws/p2p-circuit/p2p/%s", test.RandPeerIDFatal(t)),
isFdConsuming: true,
},
}
ctx := context.Background()
sw := GenSwarm(t, ctx)
sk := sw.Peerstore().PrivKey(sw.LocalPeer())
require.NotNil(t, sk)
qtpt, err := qc.NewTransport(sk, nil, nil)
require.NoError(t, err)
require.NoError(t, sw.AddTransport(qtpt))
require.NoError(t, sw.AddTransport(&circuit.RelayTransport{}))
for name := range tcs {
maddr, err := ma.NewMultiaddr(tcs[name].addr)
require.NoError(t, err, name)
require.Equal(t, tcs[name].isFdConsuming, sw.IsFdConsumingAddr(maddr), name)
}
}
func TestNoDial(t *testing.T) {
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment