change backoffs to per-address

parent b6831d43
...@@ -200,7 +200,7 @@ func TestDialWait(t *testing.T) { ...@@ -200,7 +200,7 @@ func TestDialWait(t *testing.T) {
t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts) t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts)
} }
if !s1.Backoff().Backoff(s2p) { if !s1.Backoff().Backoff(s2p, s2addr) {
t.Error("s2 should now be on backoff") t.Error("s2 should now be on backoff")
} }
} }
...@@ -337,10 +337,10 @@ func TestDialBackoff(t *testing.T) { ...@@ -337,10 +337,10 @@ func TestDialBackoff(t *testing.T) {
} }
// check backoff state // check backoff state
if s1.Backoff().Backoff(s2.LocalPeer()) { if s1.Backoff().Backoff(s2.LocalPeer(), s2addrs[0]) {
t.Error("s2 should not be on backoff") t.Error("s2 should not be on backoff")
} }
if !s1.Backoff().Backoff(s3p) { if !s1.Backoff().Backoff(s3p, s3addr) {
t.Error("s3 should be on backoff") t.Error("s3 should be on backoff")
} }
...@@ -407,10 +407,10 @@ func TestDialBackoff(t *testing.T) { ...@@ -407,10 +407,10 @@ func TestDialBackoff(t *testing.T) {
} }
// check backoff state (the same) // check backoff state (the same)
if s1.Backoff().Backoff(s2.LocalPeer()) { if s1.Backoff().Backoff(s2.LocalPeer(), s2addrs[0]) {
t.Error("s2 should not be on backoff") t.Error("s2 should not be on backoff")
} }
if !s1.Backoff().Backoff(s3p) { if !s1.Backoff().Backoff(s3p, s3addr) {
t.Error("s3 should be on backoff") t.Error("s3 should be on backoff")
} }
} }
...@@ -451,7 +451,7 @@ func TestDialBackoffClears(t *testing.T) { ...@@ -451,7 +451,7 @@ func TestDialBackoffClears(t *testing.T) {
t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts) t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts)
} }
if !s1.Backoff().Backoff(s2.LocalPeer()) { if !s1.Backoff().Backoff(s2.LocalPeer(), s2bad) {
t.Error("s2 should now be on backoff") t.Error("s2 should now be on backoff")
} else { } else {
t.Log("correctly added to backoff") t.Log("correctly added to backoff")
...@@ -464,8 +464,9 @@ func TestDialBackoffClears(t *testing.T) { ...@@ -464,8 +464,9 @@ func TestDialBackoffClears(t *testing.T) {
} }
s1.Peerstore().AddAddrs(s2.LocalPeer(), ifaceAddrs1, peerstore.PermanentAddrTTL) s1.Peerstore().AddAddrs(s2.LocalPeer(), ifaceAddrs1, peerstore.PermanentAddrTTL)
if _, err := s1.DialPeer(ctx, s2.LocalPeer()); err == nil { if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err == nil {
t.Fatal("should have failed to dial backed off peer") c.Close()
t.Log("backoffs are per address, not peer")
} }
time.Sleep(BackoffBase) time.Sleep(BackoffBase)
...@@ -477,7 +478,7 @@ func TestDialBackoffClears(t *testing.T) { ...@@ -477,7 +478,7 @@ func TestDialBackoffClears(t *testing.T) {
t.Log("correctly connected") t.Log("correctly connected")
} }
if s1.Backoff().Backoff(s2.LocalPeer()) { if s1.Backoff().Backoff(s2.LocalPeer(), s2bad) {
t.Error("s2 should no longer be on backoff") t.Error("s2 should no longer be on backoff")
} else { } else {
t.Log("correctly cleared backoff") t.Log("correctly cleared backoff")
......
...@@ -101,7 +101,9 @@ type DialBackoff struct { ...@@ -101,7 +101,9 @@ type DialBackoff struct {
lock sync.RWMutex lock sync.RWMutex
} }
type backoffPeer struct { type backoffPeer map[ma.Multiaddr]*backoffAddr
type backoffAddr struct {
tries int tries int
until time.Time until time.Time
} }
...@@ -113,14 +115,18 @@ func (db *DialBackoff) init() { ...@@ -113,14 +115,18 @@ func (db *DialBackoff) init() {
} }
// Backoff returns whether the client should backoff from dialing // Backoff returns whether the client should backoff from dialing
// peer p // peer p at address addr
func (db *DialBackoff) Backoff(p peer.ID) (backoff bool) { func (db *DialBackoff) Backoff(p peer.ID, addr ma.Multiaddr) (backoff bool) {
db.lock.Lock() db.lock.Lock()
defer db.lock.Unlock() defer db.lock.Unlock()
db.init() db.init()
bp, found := db.entries[p] bp, found := db.entries[p]
if found && time.Now().Before(bp.until) { if found && bp != nil {
return true ap, found := (*bp)[addr]
// TODO: cleanup out of date entries.
if found && time.Now().Before(ap.until) {
return true
}
} }
return false return false
...@@ -145,25 +151,36 @@ var BackoffMax = time.Minute * 5 ...@@ -145,25 +151,36 @@ var BackoffMax = time.Minute * 5
// BackoffBase + BakoffCoef * PriorBackoffs^2 // BackoffBase + BakoffCoef * PriorBackoffs^2
// //
// Where PriorBackoffs is the number of previous backoffs. // Where PriorBackoffs is the number of previous backoffs.
func (db *DialBackoff) AddBackoff(p peer.ID) { func (db *DialBackoff) AddBackoff(p peer.ID, addr ma.Multiaddr) {
db.lock.Lock() db.lock.Lock()
defer db.lock.Unlock() defer db.lock.Unlock()
db.init() db.init()
bp, ok := db.entries[p] bp, ok := db.entries[p]
if !ok { if !ok {
db.entries[p] = &backoffPeer{ bp := backoffPeer(make(map[ma.Multiaddr]*backoffAddr))
db.entries[p] = &bp
bp[addr] = &backoffAddr{
tries: 1,
until: time.Now().Add(BackoffBase),
}
return
}
// todo: cleanup out of date entries.
ba, ok := (*bp)[addr]
if !ok {
(*bp)[addr] = &backoffAddr{
tries: 1, tries: 1,
until: time.Now().Add(BackoffBase), until: time.Now().Add(BackoffBase),
} }
return return
} }
backoffTime := BackoffBase + BackoffCoef*time.Duration(bp.tries*bp.tries) backoffTime := BackoffBase + BackoffCoef*time.Duration(ba.tries*ba.tries)
if backoffTime > BackoffMax { if backoffTime > BackoffMax {
backoffTime = BackoffMax backoffTime = BackoffMax
} }
bp.until = time.Now().Add(backoffTime) ba.until = time.Now().Add(backoffTime)
bp.tries++ ba.tries++
} }
// Clear removes a backoff record. Clients should call this after a // Clear removes a backoff record. Clients should call this after a
...@@ -210,12 +227,6 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { ...@@ -210,12 +227,6 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) {
return conn, nil return conn, nil
} }
// if this peer has been backed off, lets get out of here
if s.backf.Backoff(p) {
log.Event(ctx, "swarmDialBackoff", p)
return nil, ErrDialBackoff
}
// apply the DialPeer timeout // apply the DialPeer timeout
ctx, cancel := context.WithTimeout(ctx, network.GetDialPeerTimeout(ctx)) ctx, cancel := context.WithTimeout(ctx, network.GetDialPeerTimeout(ctx))
defer cancel() defer cancel()
...@@ -268,10 +279,6 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) { ...@@ -268,10 +279,6 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {
log.Debugf("ignoring dial error because we have a connection: %s", err) log.Debugf("ignoring dial error because we have a connection: %s", err)
return conn, nil return conn, nil
} }
if err != context.Canceled {
log.Event(ctx, "swarmDialBackoffAdd", logdial)
s.backf.AddBackoff(p) // let others know to backoff
}
// ok, we failed. // ok, we failed.
return nil, err return nil, err
...@@ -318,10 +325,18 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { ...@@ -318,10 +325,18 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
return nil, &DialError{Peer: p, Cause: ErrNoGoodAddresses} return nil, &DialError{Peer: p, Cause: ErrNoGoodAddresses}
} }
goodAddrsChan := make(chan ma.Multiaddr, len(goodAddrs)) goodAddrsChan := make(chan ma.Multiaddr, len(goodAddrs))
nonBackoff := false
for _, a := range goodAddrs { for _, a := range goodAddrs {
goodAddrsChan <- a // skip addresses in back-off
if !s.backf.Backoff(p, a) {
nonBackoff = true
goodAddrsChan <- a
}
} }
close(goodAddrsChan) close(goodAddrsChan)
if !nonBackoff {
return nil, ErrDialBackoff
}
///////// /////////
// try to get a connection to any addr // try to get a connection to any addr
...@@ -402,6 +417,10 @@ dialLoop: ...@@ -402,6 +417,10 @@ dialLoop:
active-- active--
if resp.Err != nil { if resp.Err != nil {
// Errors are normal, lots of dials will fail // Errors are normal, lots of dials will fail
if resp.Err != context.Canceled {
s.backf.AddBackoff(p, resp.Addr)
}
log.Infof("got error on dial: %s", resp.Err) log.Infof("got error on dial: %s", resp.Err)
err.recordErr(resp.Addr, resp.Err) err.recordErr(resp.Addr, resp.Err)
} else if resp.Conn != nil { } else if resp.Conn != nil {
...@@ -429,6 +448,10 @@ dialLoop: ...@@ -429,6 +448,10 @@ dialLoop:
active-- active--
if resp.Err != nil { if resp.Err != nil {
// Errors are normal, lots of dials will fail // Errors are normal, lots of dials will fail
if resp.Err != context.Canceled {
s.backf.AddBackoff(p, resp.Addr)
}
log.Infof("got error on dial: %s", resp.Err) log.Infof("got error on dial: %s", resp.Err)
err.recordErr(resp.Addr, resp.Err) err.recordErr(resp.Addr, resp.Err)
} else if resp.Conn != nil { } else if resp.Conn != nil {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment