Fix hang in BackoffDiscovery.FindPeers when requesting limit lower than number of peers available

parent e2618ed1
...@@ -68,15 +68,15 @@ func WithBackoffDiscoveryReturnedChannelSize(size int) BackoffDiscoveryOption { ...@@ -68,15 +68,15 @@ func WithBackoffDiscoveryReturnedChannelSize(size int) BackoffDiscoveryOption {
} }
type backoffCache struct { type backoffCache struct {
// strat is assigned on creation and not written to
strat BackoffStrategy
mux sync.Mutex // guards writes to all following fields
nextDiscover time.Time nextDiscover time.Time
prevPeers map[peer.ID]peer.AddrInfo prevPeers map[peer.ID]peer.AddrInfo
peers map[peer.ID]peer.AddrInfo
peers map[peer.ID]peer.AddrInfo sendingChs map[chan peer.AddrInfo]int
sendingChs map[chan peer.AddrInfo]int ongoing bool
ongoing bool
strat BackoffStrategy
mux sync.Mutex
} }
func (d *BackoffDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { func (d *BackoffDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
...@@ -112,6 +112,7 @@ func (d *BackoffDiscovery) FindPeers(ctx context.Context, ns string, opts ...dis ...@@ -112,6 +112,7 @@ func (d *BackoffDiscovery) FindPeers(ctx context.Context, ns string, opts ...dis
sendingChs: make(map[chan peer.AddrInfo]int), sendingChs: make(map[chan peer.AddrInfo]int),
strat: d.stratFactory(), strat: d.stratFactory(),
} }
d.peerCacheMux.Lock() d.peerCacheMux.Lock()
c, ok = d.peerCache[ns] c, ok = d.peerCache[ns]
...@@ -139,7 +140,11 @@ func (d *BackoffDiscovery) FindPeers(ctx context.Context, ns string, opts ...dis ...@@ -139,7 +140,11 @@ func (d *BackoffDiscovery) FindPeers(ctx context.Context, ns string, opts ...dis
} }
pch := make(chan peer.AddrInfo, chLen) pch := make(chan peer.AddrInfo, chLen)
for _, ai := range c.prevPeers { for _, ai := range c.prevPeers {
pch <- ai select {
case pch <- ai:
default:
// skip if we have asked for a lower limit than the number of peers known
}
} }
close(pch) close(pch)
return pch, nil return pch, nil
......
...@@ -41,7 +41,12 @@ func (d *delayedDiscovery) FindPeers(ctx context.Context, ns string, opts ...dis ...@@ -41,7 +41,12 @@ func (d *delayedDiscovery) FindPeers(ctx context.Context, ns string, opts ...dis
func assertNumPeers(t *testing.T, ctx context.Context, d discovery.Discovery, ns string, count int) { func assertNumPeers(t *testing.T, ctx context.Context, d discovery.Discovery, ns string, count int) {
t.Helper() t.Helper()
peerCh, err := d.FindPeers(ctx, ns, discovery.Limit(10)) assertNumPeersWithLimit(t, ctx, d, ns, 10, count)
}
func assertNumPeersWithLimit(t *testing.T, ctx context.Context, d discovery.Discovery, ns string, limit int, count int) {
t.Helper()
peerCh, err := d.FindPeers(ctx, ns, discovery.Limit(limit))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -115,7 +120,7 @@ func TestBackoffDiscoveryMultipleBackoff(t *testing.T) { ...@@ -115,7 +120,7 @@ func TestBackoffDiscoveryMultipleBackoff(t *testing.T) {
assertNumPeers(t, ctx, dCache, ns, 1) assertNumPeers(t, ctx, dCache, ns, 1)
// wait a little to make sure the extra request doesn't modify the backoff // wait a little to make sure the extra request doesn't modify the backoff
time.Sleep(time.Millisecond * 50) //50 < 100 time.Sleep(time.Millisecond * 50) // 50 < 100
assertNumPeers(t, ctx, dCache, ns, 1) assertNumPeers(t, ctx, dCache, ns, 1)
// wait for backoff to expire and check if we increase it // wait for backoff to expire and check if we increase it
...@@ -124,15 +129,15 @@ func TestBackoffDiscoveryMultipleBackoff(t *testing.T) { ...@@ -124,15 +129,15 @@ func TestBackoffDiscoveryMultipleBackoff(t *testing.T) {
d2.Advertise(ctx, ns, discovery.TTL(time.Millisecond*400)) d2.Advertise(ctx, ns, discovery.TTL(time.Millisecond*400))
time.Sleep(time.Millisecond * 150) //150 < 250 time.Sleep(time.Millisecond * 150) // 150 < 250
assertNumPeers(t, ctx, dCache, ns, 1) assertNumPeers(t, ctx, dCache, ns, 1)
time.Sleep(time.Millisecond * 150) //150 + 150 > 250 time.Sleep(time.Millisecond * 150) // 150 + 150 > 250
assertNumPeers(t, ctx, dCache, ns, 2) assertNumPeers(t, ctx, dCache, ns, 2)
// check that the backoff has been reset // check that the backoff has been reset
// also checks that we can decrease our peer count (i.e. not just growing a set) // also checks that we can decrease our peer count (i.e. not just growing a set)
time.Sleep(time.Millisecond * 110) //110 > 100, also 150+150+110>400 time.Sleep(time.Millisecond * 110) // 110 > 100, also 150+150+110>400
assertNumPeers(t, ctx, dCache, ns, 1) assertNumPeers(t, ctx, dCache, ns, 1)
} }
...@@ -185,7 +190,7 @@ func TestBackoffDiscoverySimultaneousQuery(t *testing.T) { ...@@ -185,7 +190,7 @@ func TestBackoffDiscoverySimultaneousQuery(t *testing.T) {
} }
szCh1 := 1 szCh1 := 1
for _ = range ch1 { for range ch1 {
szCh1++ szCh1++
} }
...@@ -193,3 +198,57 @@ func TestBackoffDiscoverySimultaneousQuery(t *testing.T) { ...@@ -193,3 +198,57 @@ func TestBackoffDiscoverySimultaneousQuery(t *testing.T) {
t.Fatalf("Channels returned %d, %d elements instead of %d", szCh1, szCh2, n) t.Fatalf("Channels returned %d, %d elements instead of %d", szCh1, szCh2, n)
} }
} }
func TestBackoffDiscoveryCacheCapacity(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discServer := newDiscoveryServer()
// Testing with n larger than most internal buffer sizes (32)
n := 40
advertisers := make([]discovery.Discovery, n)
for i := 0; i < n; i++ {
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
advertisers[i] = &mockDiscoveryClient{h, discServer}
}
h1 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
d1 := &mockDiscoveryClient{h1, discServer}
const discoveryInterval = time.Millisecond * 100
bkf := NewFixedBackoff(discoveryInterval)
dCache, err := NewBackoffDiscovery(d1, bkf)
if err != nil {
t.Fatal(err)
}
const ns = "test"
// add speers
for i := 0; i < n; i++ {
advertisers[i].Advertise(ctx, ns, discovery.TTL(time.Hour))
}
// Request all peers, all will be present
assertNumPeersWithLimit(t, ctx, dCache, ns, n, n)
// Request peers with a lower limit
assertNumPeersWithLimit(t, ctx, dCache, ns, n-1, n-1)
// Wait a little time but don't allow cache to expire
time.Sleep(discoveryInterval / 10)
// Request peers with a lower limit this time using cache
// Here we are testing that the cache logic does not block when there are more peers known than the limit requested
// See https://github.com/libp2p/go-libp2p-discovery/issues/67
assertNumPeersWithLimit(t, ctx, dCache, ns, n-1, n-1)
// Wait for next discovery so next request will bypass cache
time.Sleep(time.Millisecond * 100)
// Ask for all peers again
assertNumPeersWithLimit(t, ctx, dCache, ns, n, n)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment