Commit 04ae65e7 authored by Aarsh Shah's avatar Aarsh Shah

replace dead peers & increase replacement cache size

parent 2c6281f9
......@@ -14,8 +14,6 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"
logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
mh "github.com/multiformats/go-multihash"
)
......@@ -44,7 +42,11 @@ type CplRefresh struct {
// RoutingTable defines the routing table.
type RoutingTable struct {
// the routing table context
ctx context.Context
// function to cancel the RT context
ctxCancel context.CancelFunc
// ID of the local peer
local ID
......@@ -71,17 +73,20 @@ type RoutingTable struct {
PeerRemoved func(peer.ID)
PeerAdded func(peer.ID)
// is peer replacement enabled ?
isReplaceEnabled bool
// peerReplaceCh is the channel to write a peer replacement request to
peerReplaceCh chan peer.ID
// function to determine the validity of a peer for RT membership
peerValidationFnc PeerValidationFunc
// timeout for a single call to the peer validation function
peerValidationTimeout time.Duration
// interval between two runs of the table cleanup routine
tableCleanupInterval time.Duration
// function to select peers that need to be validated
// function to select peers that need to be validated during cleanup
peersForValidationFnc PeerSelectionFunc
proc goprocess.Process
}
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
......@@ -95,7 +100,6 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst
}
rt := &RoutingTable{
ctx: context.Background(),
buckets: []*bucket{newBucket()},
bucketsize: bucketsize,
local: localID,
......@@ -108,18 +112,24 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst
PeerRemoved: func(peer.ID) {},
PeerAdded: func(peer.ID) {},
peerReplaceCh: make(chan peer.ID, bucketsize*2),
peerValidationFnc: cfg.tableCleanup.peerValidationFnc,
peersForValidationFnc: cfg.tableCleanup.peersForValidationFnc,
peerValidationTimeout: cfg.tableCleanup.peerValidationTimeout,
tableCleanupInterval: cfg.tableCleanup.interval,
}
rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize)
rt.proc = goprocessctx.WithContext(rt.ctx)
// create the replacement cache
rt.cplReplacementCache = newCplReplacementCache(rt.local, rt.bucketsize*2)
rt.ctx, rt.ctxCancel = context.WithCancel(context.Background())
// schedule periodic RT cleanup if peer validation function has been passed
if rt.peerValidationFnc != nil {
rt.proc.Go(rt.cleanup)
rt.isReplaceEnabled = (rt.peerValidationFnc != nil)
if rt.isReplaceEnabled {
go rt.cleanup()
go rt.startPeerReplacement()
}
return rt, nil
......@@ -128,10 +138,11 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst
// Close shuts down the Routing Table & all associated processes.
// It is safe to call this multiple times.
func (rt *RoutingTable) Close() error {
return rt.proc.Close()
rt.ctxCancel()
return nil
}
func (rt *RoutingTable) cleanup(proc goprocess.Process) {
func (rt *RoutingTable) cleanup() {
validatePeerF := func(p peer.ID) bool {
queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
defer cancel()
......@@ -153,32 +164,45 @@ func (rt *RoutingTable) cleanup(proc goprocess.Process) {
continue
}
// peer does not seem to be alive, let's try candidates now
log.Infof("failed to validate missing peer=%s, will try candidates now...", pinfo.Id)
// evict missing peer
// peer does not seem to be alive, let's try to replace it
log.Infof("failed to validate missing peer=%s, evicting it from the RT & requesting a replace", pinfo.Id)
// evict missing peer & request replacement
rt.HandlePeerDead(pinfo.Id)
}
case <-rt.ctx.Done():
return
}
}
}
// keep trying replacement candidates for the missing peer till we get a successful validation or
// we run out of candidates
cpl := uint(CommonPrefixLen(ConvertPeerID(pinfo.Id), rt.local))
c, notEmpty := rt.cplReplacementCache.pop(cpl)
for notEmpty {
if validatePeerF(c) {
log.Infof("successfully validated candidate=%s for missing peer=%s", c, pinfo.Id)
break
}
log.Infof("failed to validated candidate=%s", c)
// remove candidate
rt.HandlePeerDead(c)
c, notEmpty = rt.cplReplacementCache.pop(cpl)
}
// replaces a peer using a valid peer from the replacement cache
func (rt *RoutingTable) startPeerReplacement() {
validatePeerF := func(p peer.ID) bool {
queryCtx, cancel := context.WithTimeout(rt.ctx, rt.peerValidationTimeout)
defer cancel()
return rt.peerValidationFnc(queryCtx, p)
}
if !notEmpty {
log.Infof("failed to replace missing peer=%s as all candidates were invalid", pinfo.Id)
for {
select {
case p := <-rt.peerReplaceCh:
// keep trying replacement candidates till we get a successful validation or
// we run out of candidates
cpl := uint(CommonPrefixLen(ConvertPeerID(p), rt.local))
c, notEmpty := rt.cplReplacementCache.pop(cpl)
for notEmpty {
if validatePeerF(c) {
log.Infof("successfully validated candidate=%s for peer=%s", c, p)
break
}
log.Infof("failed to validated candidate=%s", c)
c, notEmpty = rt.cplReplacementCache.pop(cpl)
}
case <-proc.Closing():
if !notEmpty {
log.Infof("failed to replace missing peer=%s as all candidates were invalid", p)
}
case <-rt.ctx.Done():
return
}
}
......@@ -341,17 +365,25 @@ func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error)
}
// HandlePeerDead should be called when the caller is sure that a peer is dead/not dialable.
// It evicts the peer from the Routing Table and also removes it as a replacement candidate if it is one.
// It evicts the peer from the Routing Table and tries to replace it with a valid & eligible
// candidate from the replacement cache.
func (rt *RoutingTable) HandlePeerDead(p peer.ID) {
// remove it as a candidate
rt.cplReplacementCache.remove(p)
// remove it from the RT
rt.tabLock.Lock()
defer rt.tabLock.Unlock()
bucketID := rt.bucketIdForPeer(p)
bucket := rt.buckets[bucketID]
if bucket.remove(p) {
// request a replacement
if rt.isReplaceEnabled {
select {
case rt.peerReplaceCh <- p:
default:
log.Errorf("unable to request replacement for peer=%s as queue for replace requests is full", p)
}
}
// peer removed callback
rt.PeerRemoved(p)
}
}
......
......@@ -167,37 +167,57 @@ func TestHandlePeerDead(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t)
var candidate peer.ID
for {
candidate = test.RandPeerIDFatal(t)
if CommonPrefixLen(ConvertPeerID(candidate), ConvertPeerID(local)) == 0 {
break
}
}
var lk sync.Mutex
var added peer.ID
f := func(ctx context.Context, p peer.ID) bool {
if p == candidate {
lk.Lock()
added = p
lk.Unlock()
}
return true
}
m := pstore.NewMetrics()
rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc))
rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(f))
require.NoError(t, err)
// push 3 peers -> 2 for the first bucket, and 1 as candidates
var peers []peer.ID
for i := 0; i < 3; i++ {
p, err := rt.GenRandPeerID(uint(0))
require.NoError(t, err)
require.NotEmpty(t, p)
rt.HandlePeerAlive(p)
peers = append(peers, p)
}
p1, _ := rt.GenRandPeerID(0)
p2, _ := rt.GenRandPeerID(0)
rt.HandlePeerAlive(p1)
rt.HandlePeerAlive(p2)
rt.HandlePeerAlive(candidate)
// ensure p1 & p2 are in the RT
require.Len(t, rt.ListPeers(), 2)
require.Contains(t, rt.ListPeers(), p1)
require.Contains(t, rt.ListPeers(), p2)
// ensure we have 1 candidate
rt.cplReplacementCache.Lock()
require.NotNil(t, rt.cplReplacementCache.candidates[uint(0)])
require.True(t, len(rt.cplReplacementCache.candidates[uint(0)]) == 1)
require.Len(t, rt.cplReplacementCache.candidates[uint(0)], 1)
require.Contains(t, rt.cplReplacementCache.candidates[uint(0)], candidate)
rt.cplReplacementCache.Unlock()
// mark a peer as dead and ensure it's not in the RT
require.NotEmpty(t, rt.Find(peers[0]))
rt.HandlePeerDead(peers[0])
require.Empty(t, rt.Find(peers[0]))
// mark the peer as dead & verify we don't get it as a candidate
rt.HandlePeerDead(peers[2])
// mark a peer as dead and ensure it's not in the RT & it gets replaced
require.NotEmpty(t, rt.Find(p1))
rt.HandlePeerDead(p1)
require.Empty(t, rt.Find(p1))
time.Sleep(1 * time.Second)
rt.cplReplacementCache.Lock()
require.Nil(t, rt.cplReplacementCache.candidates[uint(0)])
require.Empty(t, rt.cplReplacementCache.candidates)
rt.cplReplacementCache.Unlock()
lk.Lock()
require.Equal(t, candidate, added)
lk.Unlock()
}
func TestTableCallbacks(t *testing.T) {
......@@ -593,7 +613,6 @@ func TestTableCleanup(t *testing.T) {
p := test.RandPeerIDFatal(t)
if CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p)) == cpl {
cplPeerMap[cpl] = append(cplPeerMap[cpl], p)
i++
if i == 6 {
break
......@@ -634,7 +653,7 @@ func TestTableCleanup(t *testing.T) {
}
}
// validate current state
// validate current TABLE state
rt.tabLock.RLock()
require.Len(t, rt.ListPeers(), 6)
ps0 := rt.buckets[0].peerIds()
......@@ -649,7 +668,14 @@ func TestTableCleanup(t *testing.T) {
require.Contains(t, ps1, cplPeerMap[1][2])
rt.tabLock.RUnlock()
// now disconnect peers 0 1 & 2 from both buckets so it has only 0 left after it gets validated
// validate current state of replacement cache
rt.cplReplacementCache.Lock()
require.Len(t, rt.cplReplacementCache.candidates, 2)
require.Len(t, rt.cplReplacementCache.candidates[0], 3)
require.Len(t, rt.cplReplacementCache.candidates[1], 3)
rt.cplReplacementCache.Unlock()
// now disconnect peers 0 1 & 2 from both buckets so it has only peer 1 left after it gets validated
for _, peers := range cplPeerMap {
rt.HandlePeerDisconnect(peers[0])
rt.HandlePeerDisconnect(peers[1])
......@@ -657,7 +683,7 @@ func TestTableCleanup(t *testing.T) {
}
// let RT cleanup complete
time.Sleep(1 * time.Second)
time.Sleep(2 * time.Second)
// verify RT state
rt.tabLock.RLock()
......@@ -670,7 +696,7 @@ func TestTableCleanup(t *testing.T) {
require.Contains(t, ps1, cplPeerMap[1][1])
rt.tabLock.RUnlock()
// verify candidate state
// verify peers were replaced with candidates
addedCandidatesLk.Lock()
require.Len(t, addedCandidates, 4)
require.Contains(t, addedCandidates, cplPeerMap[0][3])
......@@ -679,6 +705,11 @@ func TestTableCleanup(t *testing.T) {
require.Contains(t, addedCandidates, cplPeerMap[1][5])
addedCandidatesLk.Unlock()
// verify candidates were removed from the replacement cache
rt.cplReplacementCache.Lock()
require.Empty(t, rt.cplReplacementCache.candidates)
rt.cplReplacementCache.Unlock()
// close RT
require.NoError(t, rt.Close())
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment