Commit 7fa6dacb authored by Aarsh Shah's avatar Aarsh Shah

add peer to RT & fix/document races

parent 04ae65e7
......@@ -19,8 +19,10 @@ type peerDistanceSorter struct {
target ID
}
func (pds *peerDistanceSorter) Len() int { return len(pds.peers) }
func (pds *peerDistanceSorter) Swap(a, b int) { pds.peers[a], pds.peers[b] = pds.peers[b], pds.peers[a] }
func (pds *peerDistanceSorter) Len() int { return len(pds.peers) }
func (pds *peerDistanceSorter) Swap(a, b int) {
pds.peers[a], pds.peers[b] = pds.peers[b], pds.peers[a]
}
func (pds *peerDistanceSorter) Less(a, b int) bool {
return pds.peers[a].distance.less(pds.peers[b].distance)
}
......
......@@ -142,72 +142,6 @@ func (rt *RoutingTable) Close() error {
return nil
}
func (rt *RoutingTable) cleanup() {
validatePeerF := func(p peer.ID) bool {
queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
defer cancel()
return rt.peerValidationFnc(queryCtx, p)
}
cleanupTickr := time.NewTicker(rt.tableCleanupInterval)
defer cleanupTickr.Stop()
for {
select {
case <-cleanupTickr.C:
ps := rt.peersToValidate()
for _, pinfo := range ps {
// continue if we are able to successfully validate the peer
// it will be marked alive in the RT when the DHT connection notification handler calls RT.HandlePeerAlive()
// TODO Should we revisit this ? It makes more sense for the RT to mark it as active here
if validatePeerF(pinfo.Id) {
log.Infof("successfully validated missing peer=%s", pinfo.Id)
continue
}
// peer does not seem to be alive, let's try to replace it
log.Infof("failed to validate missing peer=%s, evicting it from the RT & requesting a replace", pinfo.Id)
// evict missing peer & request replacement
rt.HandlePeerDead(pinfo.Id)
}
case <-rt.ctx.Done():
return
}
}
}
// replaces a peer using a valid peer from the replacement cache
func (rt *RoutingTable) startPeerReplacement() {
validatePeerF := func(p peer.ID) bool {
queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
defer cancel()
return rt.peerValidationFnc(queryCtx, p)
}
for {
select {
case p := <-rt.peerReplaceCh:
// keep trying replacement candidates till we get a successful validation or
// we run out of candidates
cpl := uint(CommonPrefixLen(ConvertPeerID(p), rt.local))
c, notEmpty := rt.cplReplacementCache.pop(cpl)
for notEmpty {
if validatePeerF(c) {
log.Infof("successfully validated candidate=%s for peer=%s", c, p)
break
}
log.Infof("failed to validated candidate=%s", c)
c, notEmpty = rt.cplReplacementCache.pop(cpl)
}
if !notEmpty {
log.Infof("failed to replace missing peer=%s as all candidates were invalid", p)
}
case <-rt.ctx.Done():
return
}
}
}
// returns the peers that need to be validated.
func (rt *RoutingTable) peersToValidate() []PeerInfo {
rt.tabLock.RLock()
......@@ -323,6 +257,11 @@ func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error)
rt.tabLock.Lock()
defer rt.tabLock.Unlock()
return rt.addPeer(p)
}
// locking is the responsibility of the caller
func (rt *RoutingTable) addPeer(p peer.ID) (evicted peer.ID, err error) {
bucketID := rt.bucketIdForPeer(p)
bucket := rt.buckets[bucketID]
if peer := bucket.getPeer(p); peer != nil {
......@@ -368,9 +307,13 @@ func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error)
// It evicts the peer from the Routing Table and tries to replace it with a valid & eligible
// candidate from the replacement cache.
func (rt *RoutingTable) HandlePeerDead(p peer.ID) {
// remove it from the RT
rt.tabLock.Lock()
defer rt.tabLock.Unlock()
rt.removePeer(p)
}
// locking is the responsibility of the caller
func (rt *RoutingTable) removePeer(p peer.ID) {
bucketID := rt.bucketIdForPeer(p)
bucket := rt.buckets[bucketID]
if bucket.remove(p) {
......@@ -501,12 +444,13 @@ func (rt *RoutingTable) Size() int {
// ListPeers takes a RoutingTable and returns a list of all peers from all buckets in the table.
func (rt *RoutingTable) ListPeers() []peer.ID {
var peers []peer.ID
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()
var peers []peer.ID
for _, buck := range rt.buckets {
peers = append(peers, buck.peerIds()...)
}
rt.tabLock.RUnlock()
return peers
}
......
package kbucket
import (
"context"
"time"
"github.com/libp2p/go-libp2p-core/peer"
)
func (rt *RoutingTable) cleanup() {
validatePeerF := func(p peer.ID) bool {
queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
defer cancel()
return rt.peerValidationFnc(queryCtx, p)
}
cleanupTickr := time.NewTicker(rt.tableCleanupInterval)
defer cleanupTickr.Stop()
for {
select {
case <-cleanupTickr.C:
ps := rt.peersToValidate()
for _, pinfo := range ps {
// TODO This is racy
// A peer could disconnect immediately after we validate it & would thus be in missing state again
// which means we would wrongly mark it as active here. The longer term solution is to
// handle all peer related events in a single event loop in the RT or more fingrained locking at bucket/peer level.
// See https://github.com/libp2p/go-libp2p-kbucket/issues/60
if validatePeerF(pinfo.Id) {
rt.tabLock.Lock()
// add it back/mark it as active ONLY if it is still in the RT
// to avoid adding it back if it's been marked as dead
i := rt.bucketIdForPeer(pinfo.Id)
if rt.buckets[i].getPeer(pinfo.Id) != nil {
log.Infof("successfully validated missing peer=%s, marking it as active", pinfo.Id)
rt.addPeer(pinfo.Id)
}
rt.tabLock.Unlock()
continue
}
// peer does not seem to be alive, let's try to replace it
// evict missing peer & request replacement ONLY if it's NOT marked as active to avoid removing a peer that connected after
// the failed validation
rt.tabLock.Lock()
i := rt.bucketIdForPeer(pinfo.Id)
p := rt.buckets[i].getPeer(pinfo.Id)
if p != nil && p.State != PeerStateActive {
log.Infof("failed to validate missing peer=%s, evicting it from the RT & requesting a replace", pinfo.Id)
rt.removePeer(pinfo.Id)
}
rt.tabLock.Unlock()
}
case <-rt.ctx.Done():
return
}
}
}
// replaces a peer using a valid peer from the replacement cache
func (rt *RoutingTable) startPeerReplacement() {
validatePeerF := func(p peer.ID) bool {
queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
defer cancel()
return rt.peerValidationFnc(queryCtx, p)
}
for {
select {
case p := <-rt.peerReplaceCh:
// keep trying replacement candidates till we get a successful validation or
// we run out of candidates
cpl := uint(CommonPrefixLen(ConvertPeerID(p), rt.local))
c, notEmpty := rt.cplReplacementCache.pop(cpl)
for notEmpty {
if validatePeerF(c) {
log.Infof("successfully validated candidate=%s for peer=%s", c, p)
// TODO There is a race here. The peer could disconnect from us or stop supporting the DHT
// protocol after the validation which means we should not be adding it to the RT here.
// See https://github.com/libp2p/go-libp2p-kbucket/issues/60
rt.tabLock.Lock()
rt.addPeer(c)
rt.tabLock.Unlock()
break
}
log.Infof("failed to validated candidate=%s", c)
c, notEmpty = rt.cplReplacementCache.pop(cpl)
}
if !notEmpty {
log.Infof("failed to replace missing peer=%s as all candidates were invalid", p)
}
case <-rt.ctx.Done():
return
}
}
}
package kbucket
import (
"context"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/test"
pstore "github.com/libp2p/go-libp2p-peerstore"
"github.com/stretchr/testify/require"
)
func TestTableCleanup(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t)
// Generate:
// 6 peers with CPL 0
// 6 peers with CPL 1
cplPeerMap := make(map[int][]peer.ID)
for cpl := 0; cpl < 2; cpl++ {
i := 0
for {
p := test.RandPeerIDFatal(t)
if CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p)) == cpl {
cplPeerMap[cpl] = append(cplPeerMap[cpl], p)
i++
if i == 6 {
break
}
}
}
}
// mock peer validation fnc that successfully validates p[1], p[3] & p[5]
f := func(ctx context.Context, p peer.ID) bool {
cpl := CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p))
if cplPeerMap[cpl][1] == p || cplPeerMap[cpl][3] == p || cplPeerMap[cpl][5] == p {
return true
} else {
return false
}
}
// create RT with a very short cleanup interval
rt, err := NewRoutingTable(3, ConvertPeerID(local), time.Hour, pstore.NewMetrics(), PeerValidationFnc(f),
TableCleanupInterval(100*time.Millisecond))
require.NoError(t, err)
// for each CPL, p[0], p[1] & p[2] got the bucket & p[3], p[4] & p[5] become candidates
for _, peers := range cplPeerMap {
for _, p := range peers {
rt.HandlePeerAlive(p)
}
}
// validate current TABLE state
require.Len(t, rt.ListPeers(), 6)
rt.tabLock.RLock()
ps0 := rt.buckets[0].peerIds()
require.Len(t, ps0, 3)
ps1 := rt.buckets[1].peerIds()
require.Len(t, ps1, 3)
require.Contains(t, ps0, cplPeerMap[0][0])
require.Contains(t, ps0, cplPeerMap[0][1])
require.Contains(t, ps0, cplPeerMap[0][2])
require.Contains(t, ps1, cplPeerMap[1][0])
require.Contains(t, ps1, cplPeerMap[1][1])
require.Contains(t, ps1, cplPeerMap[1][2])
rt.tabLock.RUnlock()
// validate current state of replacement cache
rt.cplReplacementCache.Lock()
require.Len(t, rt.cplReplacementCache.candidates, 2)
require.Len(t, rt.cplReplacementCache.candidates[0], 3)
require.Len(t, rt.cplReplacementCache.candidates[1], 3)
rt.cplReplacementCache.Unlock()
// now disconnect peers 0 1 & 2 from both buckets so it has only peer 1 left after it gets validated
for _, peers := range cplPeerMap {
rt.HandlePeerDisconnect(peers[0])
rt.HandlePeerDisconnect(peers[1])
rt.HandlePeerDisconnect(peers[2])
}
// let RT cleanup complete
time.Sleep(2 * time.Second)
// verify RT state
require.Len(t, rt.ListPeers(), 6)
rt.tabLock.RLock()
ps0 = rt.buckets[0].peerIds()
require.Len(t, ps0, 3)
ps1 = rt.buckets[1].peerIds()
require.Len(t, ps1, 3)
require.Contains(t, ps0, cplPeerMap[0][1])
require.Contains(t, ps0, cplPeerMap[0][3])
require.Contains(t, ps0, cplPeerMap[0][5])
require.Contains(t, ps1, cplPeerMap[1][1])
require.Contains(t, ps1, cplPeerMap[1][1])
require.Contains(t, ps1, cplPeerMap[1][3])
require.Contains(t, ps1, cplPeerMap[1][5])
rt.tabLock.RUnlock()
// verify candidates were removed from the replacement cache
rt.cplReplacementCache.Lock()
require.Empty(t, rt.cplReplacementCache.candidates)
rt.cplReplacementCache.Unlock()
// close RT
require.NoError(t, rt.Close())
}
......@@ -598,122 +598,6 @@ func TestTableMultithreaded(t *testing.T) {
<-done
}
func TestTableCleanup(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t)
// Generate:
// 6 peers with CPL 0
// 6 peers with CPL 1
cplPeerMap := make(map[int][]peer.ID)
for cpl := 0; cpl < 2; cpl++ {
i := 0
for {
p := test.RandPeerIDFatal(t)
if CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p)) == cpl {
cplPeerMap[cpl] = append(cplPeerMap[cpl], p)
i++
if i == 6 {
break
}
}
}
}
// mock peer validation fnc that successfully validates p[1], p[3] & p[5]
var addedCandidatesLk sync.Mutex
addedCandidates := make(map[peer.ID]struct{})
f := func(ctx context.Context, p peer.ID) bool {
cpl := CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p))
if cplPeerMap[cpl][1] == p || cplPeerMap[cpl][3] == p || cplPeerMap[cpl][5] == p {
// 1 is already in the RT, but 3 & 5 are candidates
if cplPeerMap[cpl][3] == p || cplPeerMap[cpl][5] == p {
addedCandidatesLk.Lock()
addedCandidates[p] = struct{}{}
addedCandidatesLk.Unlock()
}
return true
} else {
return false
}
}
// create RT with a very short cleanup interval
rt, err := NewRoutingTable(3, ConvertPeerID(local), time.Hour, pstore.NewMetrics(), PeerValidationFnc(f),
TableCleanupInterval(100*time.Millisecond))
require.NoError(t, err)
// for each CPL, p[0], p[1] & p[2] got the bucket & p[3], p[4] & p[5] become candidates
for _, peers := range cplPeerMap {
for _, p := range peers {
rt.HandlePeerAlive(p)
}
}
// validate current TABLE state
rt.tabLock.RLock()
require.Len(t, rt.ListPeers(), 6)
ps0 := rt.buckets[0].peerIds()
require.Len(t, ps0, 3)
ps1 := rt.buckets[1].peerIds()
require.Len(t, ps1, 3)
require.Contains(t, ps0, cplPeerMap[0][0])
require.Contains(t, ps0, cplPeerMap[0][1])
require.Contains(t, ps0, cplPeerMap[0][2])
require.Contains(t, ps1, cplPeerMap[1][0])
require.Contains(t, ps1, cplPeerMap[1][1])
require.Contains(t, ps1, cplPeerMap[1][2])
rt.tabLock.RUnlock()
// validate current state of replacement cache
rt.cplReplacementCache.Lock()
require.Len(t, rt.cplReplacementCache.candidates, 2)
require.Len(t, rt.cplReplacementCache.candidates[0], 3)
require.Len(t, rt.cplReplacementCache.candidates[1], 3)
rt.cplReplacementCache.Unlock()
// now disconnect peers 0 1 & 2 from both buckets so it has only peer 1 left after it gets validated
for _, peers := range cplPeerMap {
rt.HandlePeerDisconnect(peers[0])
rt.HandlePeerDisconnect(peers[1])
rt.HandlePeerDisconnect(peers[2])
}
// let RT cleanup complete
time.Sleep(2 * time.Second)
// verify RT state
rt.tabLock.RLock()
require.Len(t, rt.ListPeers(), 2)
ps0 = rt.buckets[0].peerIds()
require.Len(t, ps0, 1)
ps1 = rt.buckets[1].peerIds()
require.Len(t, ps1, 1)
require.Contains(t, ps0, cplPeerMap[0][1])
require.Contains(t, ps1, cplPeerMap[1][1])
rt.tabLock.RUnlock()
// verify peers were replaced with candidates
addedCandidatesLk.Lock()
require.Len(t, addedCandidates, 4)
require.Contains(t, addedCandidates, cplPeerMap[0][3])
require.Contains(t, addedCandidates, cplPeerMap[0][5])
require.Contains(t, addedCandidates, cplPeerMap[1][3])
require.Contains(t, addedCandidates, cplPeerMap[1][5])
addedCandidatesLk.Unlock()
// verify candidates were removed from the replacement cache
rt.cplReplacementCache.Lock()
require.Empty(t, rt.cplReplacementCache.candidates)
rt.cplReplacementCache.Unlock()
// close RT
require.NoError(t, rt.Close())
}
func BenchmarkHandlePeerAlive(b *testing.B) {
b.StopTimer()
local := ConvertKey("localKey")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment