Unverified Commit f82c4250 authored by Aarsh Shah's avatar Aarsh Shah Committed by GitHub

Support for Hole punching (#233)

* support for forced direct connections.
parent e6f85ac7
...@@ -96,13 +96,13 @@ func (ad *activeDial) start(ctx context.Context) { ...@@ -96,13 +96,13 @@ func (ad *activeDial) start(ctx context.Context) {
ad.cancel() ad.cancel()
} }
func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { func (ds *DialSync) getActiveDial(ctx context.Context, p peer.ID) *activeDial {
ds.dialsLk.Lock() ds.dialsLk.Lock()
defer ds.dialsLk.Unlock() defer ds.dialsLk.Unlock()
actd, ok := ds.dials[p] actd, ok := ds.dials[p]
if !ok { if !ok {
adctx, cancel := context.WithCancel(context.Background()) adctx, cancel := context.WithCancel(ctx)
actd = &activeDial{ actd = &activeDial{
id: p, id: p,
cancel: cancel, cancel: cancel,
...@@ -123,7 +123,7 @@ func (ds *DialSync) getActiveDial(p peer.ID) *activeDial { ...@@ -123,7 +123,7 @@ func (ds *DialSync) getActiveDial(p peer.ID) *activeDial {
// DialLock initiates a dial to the given peer if there are none in progress // DialLock initiates a dial to the given peer if there are none in progress
// then waits for the dial to that peer to complete. // then waits for the dial to that peer to complete.
func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error) { func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error) {
return ds.getActiveDial(p).wait(ctx) return ds.getActiveDial(ctx, p).wait(ctx)
} }
// CancelDial cancels all in-progress dials to the given peer. // CancelDial cancels all in-progress dials to the given peer.
......
...@@ -194,6 +194,8 @@ github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJB ...@@ -194,6 +194,8 @@ github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJB
github.com/libp2p/go-libp2p-core v0.8.1/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-core v0.8.1/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.8.2 h1:/eaSZACWftJZYm07S0nRxdI84v1hSmgnCXrGOvJdpNQ= github.com/libp2p/go-libp2p-core v0.8.2 h1:/eaSZACWftJZYm07S0nRxdI84v1hSmgnCXrGOvJdpNQ=
github.com/libp2p/go-libp2p-core v0.8.2/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-core v0.8.2/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.8.3 h1:BZTReEF6o8g/n4DwxTyeFannOeae35Xy0TD+mES3CNE=
github.com/libp2p/go-libp2p-core v0.8.3/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8= github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90= github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90=
github.com/libp2p/go-libp2p-mplex v0.2.1/go.mod h1:SC99Rxs8Vuzrf/6WhmH41kNn13TiYdAWNYHrwImKLnE= github.com/libp2p/go-libp2p-mplex v0.2.1/go.mod h1:SC99Rxs8Vuzrf/6WhmH41kNn13TiYdAWNYHrwImKLnE=
......
...@@ -341,6 +341,7 @@ func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (network.Stream, error ...@@ -341,6 +341,7 @@ func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (network.Stream, error
// a non-closed connection. // a non-closed connection.
dials := 0 dials := 0
for { for {
// will prefer direct connections over relayed connections for opening streams
c := s.bestConnToPeer(p) c := s.bestConnToPeer(p)
if c == nil { if c == nil {
if nodial, _ := network.GetNoDial(ctx); nodial { if nodial, _ := network.GetNoDial(ctx); nodial {
...@@ -392,9 +393,10 @@ func (s *Swarm) ConnsToPeer(p peer.ID) []network.Conn { ...@@ -392,9 +393,10 @@ func (s *Swarm) ConnsToPeer(p peer.ID) []network.Conn {
// bestConnToPeer returns the best connection to peer. // bestConnToPeer returns the best connection to peer.
func (s *Swarm) bestConnToPeer(p peer.ID) *Conn { func (s *Swarm) bestConnToPeer(p peer.ID) *Conn {
// Selects the best connection we have to the peer.
// TODO: Prefer some transports over others. Currently, we just select // TODO: Prefer some transports over others.
// the newest non-closed connection with the most streams. // For now, prefers direct connections over Relayed connections.
// For tie-breaking, select the newest non-closed connection with the most streams.
s.conns.RLock() s.conns.RLock()
defer s.conns.RUnlock() defer s.conns.RUnlock()
...@@ -409,15 +411,25 @@ func (s *Swarm) bestConnToPeer(p peer.ID) *Conn { ...@@ -409,15 +411,25 @@ func (s *Swarm) bestConnToPeer(p peer.ID) *Conn {
cLen := len(c.streams.m) cLen := len(c.streams.m)
c.streams.Unlock() c.streams.Unlock()
if cLen >= bestLen { // We will never prefer a Relayed connection over a direct connection.
if isDirectConn(best) && !isDirectConn(c) {
continue
}
// 1. Always prefer a direct connection over a relayed connection.
// 2. If both conns are direct or relayed, pick the one with as many or more streams.
if (!isDirectConn(best) && isDirectConn(c)) || (cLen >= bestLen) {
best = c best = c
bestLen = cLen bestLen = cLen
} }
} }
return best return best
} }
func isDirectConn(c *Conn) bool {
return c != nil && !c.conn.Transport().Proxy()
}
// Connectedness returns our "connectedness" state with the given peer. // Connectedness returns our "connectedness" state with the given peer.
// //
// To check if we have an open connection, use `s.Connectedness(p) == // To check if we have an open connection, use `s.Connectedness(p) ==
......
...@@ -251,9 +251,14 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { ...@@ -251,9 +251,14 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) {
defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done() defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done()
// check if we already have an open connection first
conn := s.bestConnToPeer(p) conn := s.bestConnToPeer(p)
if conn != nil { forceDirect, _ := network.GetForceDirectDial(ctx)
if forceDirect {
if isDirectConn(conn) {
return conn, nil
}
} else if conn != nil {
// check if we already have an open connection first
return conn, nil return conn, nil
} }
...@@ -287,8 +292,13 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) { ...@@ -287,8 +292,13 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {
// Short circuit. // Short circuit.
// By the time we take the dial lock, we may already *have* a connection // By the time we take the dial lock, we may already *have* a connection
// to the peer. // to the peer.
forceDirect, _ := network.GetForceDirectDial(ctx)
c := s.bestConnToPeer(p) c := s.bestConnToPeer(p)
if c != nil { if forceDirect {
if isDirectConn(c) {
return c, nil
}
} else if c != nil {
return c, nil return c, nil
} }
...@@ -301,12 +311,17 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) { ...@@ -301,12 +311,17 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {
conn, err := s.dial(ctx, p) conn, err := s.dial(ctx, p)
if err != nil { if err != nil {
conn = s.bestConnToPeer(p) conn = s.bestConnToPeer(p)
if conn != nil { if forceDirect {
if isDirectConn(conn) {
log.Debugf("ignoring dial error because we already have a direct connection: %s", err)
return conn, nil
}
} else if conn != nil {
// Hm? What error? // Hm? What error?
// Could have canceled the dial because we received a // Could have canceled the dial because we received a
// connection or some other random reason. // connection or some other random reason.
// Just ignore the error and return the connection. // Just ignore the error and return the connection.
log.Debugf("ignoring dial error because we have a connection: %s", err) log.Debugf("ignoring dial error because we already have a connection: %s", err)
return conn, nil return conn, nil
} }
...@@ -321,6 +336,11 @@ func (s *Swarm) canDial(addr ma.Multiaddr) bool { ...@@ -321,6 +336,11 @@ func (s *Swarm) canDial(addr ma.Multiaddr) bool {
return t != nil && t.CanDial(addr) return t != nil && t.CanDial(addr)
} }
func (s *Swarm) nonProxyAddr(addr ma.Multiaddr) bool {
t := s.TransportForDialing(addr)
return !t.Proxy()
}
// ranks addresses in descending order of preference for dialing // ranks addresses in descending order of preference for dialing
// Private UDP > Public UDP > Private TCP > Public TCP > UDP Relay server > TCP Relay server // Private UDP > Public UDP > Private TCP > Public TCP > UDP Relay server > TCP Relay server
func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
...@@ -362,6 +382,7 @@ func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { ...@@ -362,6 +382,7 @@ func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
// dial is the actual swarm's dial logic, gated by Dial. // dial is the actual swarm's dial logic, gated by Dial.
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
forceDirect, _ := network.GetForceDirectDial(ctx)
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil) var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
if p == s.local { if p == s.local {
log.Event(ctx, "swarmDialDoDialSelf", logdial) log.Event(ctx, "swarmDialDoDialSelf", logdial)
...@@ -383,20 +404,25 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { ...@@ -383,20 +404,25 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
return nil, &DialError{Peer: p, Cause: ErrNoAddresses} return nil, &DialError{Peer: p, Cause: ErrNoAddresses}
} }
goodAddrs := s.filterKnownUndialables(p, peerAddrs) goodAddrs := s.filterKnownUndialables(p, peerAddrs)
if forceDirect {
goodAddrs = addrutil.FilterAddrs(goodAddrs, s.nonProxyAddr)
}
if len(goodAddrs) == 0 { if len(goodAddrs) == 0 {
return nil, &DialError{Peer: p, Cause: ErrNoGoodAddresses} return nil, &DialError{Peer: p, Cause: ErrNoGoodAddresses}
} }
/////// Check backoff andnRank addresses if !forceDirect {
var nonBackoff bool /////// Check backoff andnRank addresses
for _, a := range goodAddrs { var nonBackoff bool
// skip addresses in back-off for _, a := range goodAddrs {
if !s.backf.Backoff(p, a) { // skip addresses in back-off
nonBackoff = true if !s.backf.Backoff(p, a) {
nonBackoff = true
}
}
if !nonBackoff {
return nil, ErrDialBackoff
} }
}
if !nonBackoff {
return nil, ErrDialBackoff
} }
connC, dialErr := s.dialAddrs(ctx, p, s.rankAddrs(goodAddrs)) connC, dialErr := s.dialAddrs(ctx, p, s.rankAddrs(goodAddrs))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment