Commit bbd0a017 authored by vyzo's avatar vyzo

batch dials together, rework address ranking

parent 96723d14
...@@ -58,9 +58,9 @@ var ( ...@@ -58,9 +58,9 @@ var (
) )
var ( var (
DelayDialPrivateAddr = 5 * time.Millisecond delayDialPrivateAddr = 5 * time.Millisecond
DelayDialPublicAddr = 25 * time.Millisecond delayDialPublicAddr = 25 * time.Millisecond
DelayDialRelayAddr = 50 * time.Millisecond delayDialRelayAddr = 50 * time.Millisecond
) )
// DialAttempts governs how many times a goroutine will try to dial a given peer. // DialAttempts governs how many times a goroutine will try to dial a given peer.
...@@ -361,6 +361,9 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial ...@@ -361,6 +361,9 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial
} }
var triggerDial <-chan time.Time var triggerDial <-chan time.Time
triggerNow := make(chan time.Time)
close(triggerNow)
var nextDial []ma.Multiaddr var nextDial []ma.Multiaddr
active := 0 active := 0
done := false done := false
...@@ -461,34 +464,46 @@ loop: ...@@ -461,34 +464,46 @@ loop:
nextDial = append(nextDial, todial...) nextDial = append(nextDial, todial...)
nextDial = s.rankAddrs(nextDial) nextDial = s.rankAddrs(nextDial)
if triggerDial == nil { // trigger a new dial now to account for the new addrs we added
trigger := make(chan time.Time) triggerDial = triggerNow
close(trigger)
triggerDial = trigger
}
} }
case <-triggerDial: case <-triggerDial:
if len(nextDial) == 0 { // we dial batches of addresses together, logically belonging to the same batch
triggerDial = nil // after a batch of addresses has been dialed, we add a delay before initiating the next batch
continue loop dialed := false
} last := 0
next := 0
for i, addr := range nextDial {
if dialed && !s.sameAddrBatch(nextDial[last], addr) {
break
}
next := nextDial[0] next = i + 1
nextDial = nextDial[1:]
// spawn the next dial // spawn the dial
ad := pending[next] ad := pending[addr]
err := s.dialNextAddr(ad.ctx, p, next, resch) err := s.dialNextAddr(ad.ctx, p, addr, resch)
if err != nil { if err != nil {
dispatchError(ad, err) dispatchError(ad, err)
continue loop continue
}
dialed = true
last = i
active++
} }
active++ lastDial := nextDial[last]
nextDial = nextDial[next:]
if !dialed || len(nextDial) == 0 {
// we didn't dial anything because of backoff or we don't have any more addresses
triggerDial = nil
continue loop
}
// select an appropriate delay for the next dial trigger // select an appropriate delay for the next dial batch
delay := s.delayForNextDial(next) delay := s.delayForNextDial(lastDial)
triggerDial = time.After(delay) triggerDial = time.After(delay)
case res := <-resch: case res := <-resch:
...@@ -516,6 +531,9 @@ loop: ...@@ -516,6 +531,9 @@ loop:
// oops no, we failed to add it to the swarm // oops no, we failed to add it to the swarm
res.Conn.Close() res.Conn.Close()
dispatchError(ad, err) dispatchError(ad, err)
if active == 0 && len(nextDial) > 0 {
triggerDial = triggerNow
}
continue loop continue loop
} }
...@@ -543,6 +561,9 @@ loop: ...@@ -543,6 +561,9 @@ loop:
} }
dispatchError(ad, res.Err) dispatchError(ad, res.Err)
if active == 0 && len(nextDial) > 0 {
triggerDial = triggerNow
}
} }
} }
} }
...@@ -579,16 +600,37 @@ func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, ...@@ -579,16 +600,37 @@ func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr,
return nil return nil
} }
func (s *Swarm) sameAddrBatch(a, b ma.Multiaddr) bool {
// is it a relay addr?
if s.IsRelayAddr(a) {
return s.IsRelayAddr(b)
}
// is it an expensive addr?
if s.IsExpensiveAddr(a) {
return s.IsExpensiveAddr(b)
}
// is it a public addr?
if !manet.IsPrivateAddr(a) {
return !manet.IsPrivateAddr(b) &&
s.IsFdConsumingAddr(a) == s.IsFdConsumingAddr(b)
}
// it's a private addr
return manet.IsPrivateAddr(b)
}
func (s *Swarm) delayForNextDial(addr ma.Multiaddr) time.Duration { func (s *Swarm) delayForNextDial(addr ma.Multiaddr) time.Duration {
if _, err := addr.ValueForProtocol(ma.P_CIRCUIT); err == nil { if _, err := addr.ValueForProtocol(ma.P_CIRCUIT); err == nil {
return DelayDialRelayAddr return delayDialRelayAddr
} }
if manet.IsPrivateAddr(addr) { if manet.IsPrivateAddr(addr) {
return DelayDialPrivateAddr return delayDialPrivateAddr
} }
return DelayDialPublicAddr return delayDialPublicAddr
} }
func (s *Swarm) canDial(addr ma.Multiaddr) bool { func (s *Swarm) canDial(addr ma.Multiaddr) bool {
...@@ -601,43 +643,41 @@ func (s *Swarm) nonProxyAddr(addr ma.Multiaddr) bool { ...@@ -601,43 +643,41 @@ func (s *Swarm) nonProxyAddr(addr ma.Multiaddr) bool {
return !t.Proxy() return !t.Proxy()
} }
// ranks addresses in descending order of preference for dialing // ranks addresses in descending order of preference for dialing, with the following rules:
// Private UDP > Public UDP > Private TCP > Public TCP > UDP Relay server > TCP Relay server // NonRelay > Relay
// NonWS > WS
// Private > Public
// UDP > TCP
func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
var localUdpAddrs []ma.Multiaddr // private udp addrTier := func(a ma.Multiaddr) (tier int) {
var relayUdpAddrs []ma.Multiaddr // relay udp if s.IsRelayAddr(a) {
var othersUdp []ma.Multiaddr // public udp tier |= 0b1000
}
if s.IsExpensiveAddr(a) {
tier |= 0b0100
}
if !manet.IsPrivateAddr(a) {
tier |= 0b0010
}
if s.IsFdConsumingAddr(a) {
tier |= 0b0001
}
var localFdAddrs []ma.Multiaddr // private fd consuming return tier
var relayFdAddrs []ma.Multiaddr // relay fd consuming }
var othersFd []ma.Multiaddr // public fd consuming
tiers := make([][]ma.Multiaddr, 16)
for _, a := range addrs { for _, a := range addrs {
if _, err := a.ValueForProtocol(ma.P_CIRCUIT); err == nil { tier := addrTier(a)
if s.IsFdConsumingAddr(a) { tiers[tier] = append(tiers[tier], a)
relayFdAddrs = append(relayFdAddrs, a)
continue
}
relayUdpAddrs = append(relayUdpAddrs, a)
} else if manet.IsPrivateAddr(a) {
if s.IsFdConsumingAddr(a) {
localFdAddrs = append(localFdAddrs, a)
continue
}
localUdpAddrs = append(localUdpAddrs, a)
} else {
if s.IsFdConsumingAddr(a) {
othersFd = append(othersFd, a)
continue
}
othersUdp = append(othersUdp, a)
}
} }
relays := append(relayUdpAddrs, relayFdAddrs...) result := make([]ma.Multiaddr, 0, len(addrs))
fds := append(localFdAddrs, othersFd...) for _, tier := range tiers {
result = append(result, tier...)
}
return append(append(append(localUdpAddrs, othersUdp...), fds...), relays...) return result
} }
// filterKnownUndialables takes a list of multiaddrs, and removes those // filterKnownUndialables takes a list of multiaddrs, and removes those
...@@ -729,3 +769,14 @@ func (s *Swarm) IsFdConsumingAddr(addr ma.Multiaddr) bool { ...@@ -729,3 +769,14 @@ func (s *Swarm) IsFdConsumingAddr(addr ma.Multiaddr) bool {
_, err2 := first.ValueForProtocol(ma.P_UNIX) _, err2 := first.ValueForProtocol(ma.P_UNIX)
return err1 == nil || err2 == nil return err1 == nil || err2 == nil
} }
func (s *Swarm) IsExpensiveAddr(addr ma.Multiaddr) bool {
_, err1 := addr.ValueForProtocol(ma.P_WS)
_, err2 := addr.ValueForProtocol(ma.P_WSS)
return err1 == nil || err2 == nil
}
func (s *Swarm) IsRelayAddr(addr ma.Multiaddr) bool {
_, err := addr.ValueForProtocol(ma.P_CIRCUIT)
return err == nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment