Unverified Commit 2851c88a authored by Aarsh Shah's avatar Aarsh Shah Committed by GitHub

Routing Table Refresh manager (#601)

* rt refresh refactor
parent 08ab423f
......@@ -16,13 +16,11 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"
"go.uber.org/zap"
"go.opencensus.io/tag"
"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/libp2p/go-libp2p-kad-dht/rtrefresh"
kb "github.com/libp2p/go-libp2p-kbucket"
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
......@@ -35,6 +33,8 @@ import (
"github.com/multiformats/go-base32"
ma "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
"go.opencensus.io/tag"
"go.uber.org/zap"
)
var (
......@@ -71,6 +71,9 @@ type IpfsDHT struct {
// ProviderManager stores & manages the provider records for this Dht peer.
ProviderManager *providers.ProviderManager
// manages Routing Table refresh
rtRefreshManager *rtrefresh.RtRefreshManager
birth time.Time // When this peer started up
Validator record.Validator
......@@ -104,11 +107,7 @@ type IpfsDHT struct {
queryPeerFilter QueryFilterFunc
routingTablePeerFilter RouteTableFilterFunc
autoRefresh bool
rtRefreshQueryTimeout time.Duration
rtRefreshInterval time.Duration
triggerRtRefresh chan chan<- error
triggerSelfLookup chan chan<- error
autoRefresh bool
// A set of bootstrap peers to fallback on if all other attempts to fix
// the routing table fail (or, e.g., this is the first time this node is
......@@ -122,11 +121,6 @@ type IpfsDHT struct {
// networks).
enableProviders, enableValues bool
// successfulOutboundQueryGracePeriod is the maximum grace period we will give to a peer
// to between two successful query responses from it, failing which,
// we will ping it to see if it's alive.
successfulOutboundQueryGracePeriod time.Duration
fixLowPeersChan chan struct{}
}
......@@ -156,14 +150,13 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
if err := cfg.validate(); err != nil {
return nil, err
}
dht, err := makeDHT(ctx, h, cfg)
if err != nil {
return nil, fmt.Errorf("failed to create DHT, err=%s", err)
}
dht.autoRefresh = cfg.routingTable.autoRefresh
dht.rtRefreshInterval = cfg.routingTable.refreshInterval
dht.rtRefreshQueryTimeout = cfg.routingTable.refreshQueryTimeout
dht.maxRecordAge = cfg.maxRecordAge
dht.enableProviders = cfg.enableProviders
......@@ -196,8 +189,9 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
// handle providers
dht.proc.AddChild(dht.ProviderManager.Process())
dht.startSelfLookup()
dht.startRefreshing()
if err := dht.rtRefreshManager.Start(); err != nil {
return nil, err
}
// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
// since RT membership is decoupled from connectivity
......@@ -266,23 +260,44 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
bucketSize: cfg.bucketSize,
alpha: cfg.concurrency,
beta: cfg.resiliency,
triggerRtRefresh: make(chan chan<- error),
triggerSelfLookup: make(chan chan<- error),
queryPeerFilter: cfg.queryPeerFilter,
routingTablePeerFilter: cfg.routingTable.peerFilter,
fixLowPeersChan: make(chan struct{}, 1),
}
var maxLastSuccessfulOutboundThreshold time.Duration
// The threshold is calculated based on the expected amount of time that should pass before we
// query a peer as part of our refresh cycle.
// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
// be published soon.
if cfg.concurrency < cfg.bucketSize { // (alpha < K)
l1 := math.Log(float64(1) / float64(cfg.bucketSize)) //(Log(1/K))
l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(cfg.bucketSize))) // Log(1 - (alpha / K))
maxLastSuccessfulOutboundThreshold = time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval))
} else {
maxLastSuccessfulOutboundThreshold = cfg.routingTable.refreshInterval
}
// construct routing table
rt, err := makeRoutingTable(dht, cfg)
rt, err := makeRoutingTable(dht, cfg, maxLastSuccessfulOutboundThreshold)
if err != nil {
return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
}
dht.routingTable = rt
dht.bootstrapPeers = cfg.bootstrapPeers
// rt refresh manager
rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
if err != nil {
return nil, fmt.Errorf("failed to construct RT Refresh Manager,err=%s", err)
}
dht.rtRefreshManager = rtRefresh
// create a DHT proc with the given context
dht.proc = goprocessctx.WithContext(ctx)
dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
return rtRefresh.Close()
})
// create a tagged context derived from the original context
ctxTags := dht.newContextWithLocalTags(ctx)
......@@ -298,19 +313,32 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
return dht, nil
}
func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
// The threshold is calculated based on the expected amount of time that should pass before we
// query a peer as part of our refresh cycle.
// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
// be published soon.
l1 := math.Log(float64(1) / float64(defaultBucketSize)) //(Log(1/K))
l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(defaultBucketSize))) // Log(1 - (alpha / K))
maxLastSuccessfulOutboundThreshold := time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval))
func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
keyGenFnc := func(cpl uint) (string, error) {
p, err := dht.routingTable.GenRandPeerID(cpl)
return string(p), err
}
queryFnc := func(ctx context.Context, key string) error {
_, err := dht.GetClosestPeers(ctx, key)
return err
}
r, err := rtrefresh.NewRtRefreshManager(
dht.host, dht.routingTable, cfg.routingTable.autoRefresh,
keyGenFnc,
queryFnc,
cfg.routingTable.refreshQueryTimeout,
cfg.routingTable.refreshInterval,
maxLastSuccessfulOutboundThreshold)
return r, err
}
func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
self := kb.ConvertPeerID(dht.host.ID())
rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold)
dht.successfulOutboundQueryGracePeriod = maxLastSuccessfulOutboundThreshold
cmgr := dht.host.ConnManager()
rt.PeerAdded = func(p peer.ID) {
......@@ -397,10 +425,7 @@ func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
}
if dht.autoRefresh {
select {
case dht.triggerRtRefresh <- nil:
default:
}
dht.rtRefreshManager.RefreshNoWait()
}
}
......@@ -520,8 +545,8 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
// might support the DHT protocol.
// If we have a connection a peer but no exchange of a query RPC ->
// LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
// LastUsefulAt=N/A
// If we connect to a peer and exchange a query RPC ->
// LastUsefulAt=0
// If we connect to a peer and then exchange a query RPC ->
// LastQueriedAt=time.Now (same reason as above)
// LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
// If we query a peer we already have in our Routing Table ->
......@@ -542,12 +567,7 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
// peer not added.
return
}
// If we freshly added the peer because of a query, we need to ensure we override the "zero" lastUsefulAt
// value that must have been set in the Routing Table for this peer when it was first added during a connection.
if newlyAdded && queryPeer {
dht.routingTable.UpdateLastUsefulAt(p, time.Now())
} else if queryPeer {
if !newlyAdded && queryPeer {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
......
......@@ -2,14 +2,8 @@ package dht
import (
"context"
"fmt"
"time"
multierror "github.com/hashicorp/go-multierror"
process "github.com/jbenet/goprocess"
processctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-core/peer"
kbucket "github.com/libp2p/go-libp2p-kbucket"
"github.com/multiformats/go-multiaddr"
)
......@@ -20,8 +14,6 @@ var DefaultBootstrapPeers []multiaddr.Multiaddr
// see a new peer, we trigger a bootstrap round.
var minRTRefreshThreshold = 10
// timeout for pinging one peer
const peerPingTimeout = 10 * time.Second
const (
periodicBootstrapInterval = 2 * time.Minute
maxNBoostrappers = 2
......@@ -43,207 +35,10 @@ func init() {
}
}
// startSelfLookup starts a go-routine that listens for requests to trigger a self walk on a dedicated channel
// and then sends the error status back on the error channel sent along with the request.
// if multiple callers "simultaneously" ask for a self walk, it performs ONLY one self walk and sends the same error status to all of them.
func (dht *IpfsDHT) startSelfLookup() {
dht.proc.Go(func(proc process.Process) {
ctx := processctx.WithProcessClosing(dht.ctx, proc)
for {
var waiting []chan<- error
select {
case res := <-dht.triggerSelfLookup:
if res != nil {
waiting = append(waiting, res)
}
case <-ctx.Done():
return
}
// batch multiple refresh requests if they're all waiting at the same time.
waiting = append(waiting, collectWaitingChannels(dht.triggerSelfLookup)...)
// Do a self walk
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
_, err := dht.GetClosestPeers(queryCtx, string(dht.self))
if err == kbucket.ErrLookupFailure {
err = nil
} else if err != nil {
err = fmt.Errorf("failed to query self during routing table refresh: %s", err)
}
cancel()
// send back the error status
for _, w := range waiting {
w <- err
close(w)
}
if err != nil {
logger.Warnw("self lookup failed", "error", err)
}
}
})
}
// Start the refresh worker.
func (dht *IpfsDHT) startRefreshing() {
// scan the RT table periodically & do a random walk for cpl's that haven't been queried since the given period
dht.proc.Go(func(proc process.Process) {
ctx := processctx.WithProcessClosing(dht.ctx, proc)
refreshTicker := time.NewTicker(dht.rtRefreshInterval)
defer refreshTicker.Stop()
// refresh if option is set
if dht.autoRefresh {
err := dht.doRefresh(ctx)
if err != nil {
logger.Warn("failed when refreshing routing table", err)
}
} else {
// disable the "auto-refresh" ticker so that no more ticks are sent to this channel
refreshTicker.Stop()
}
for {
var waiting []chan<- error
select {
case <-refreshTicker.C:
case res := <-dht.triggerRtRefresh:
if res != nil {
waiting = append(waiting, res)
}
case <-ctx.Done():
return
}
// Batch multiple refresh requests if they're all waiting at the same time.
waiting = append(waiting, collectWaitingChannels(dht.triggerRtRefresh)...)
err := dht.doRefresh(ctx)
for _, w := range waiting {
w <- err
close(w)
}
if err != nil {
logger.Warnw("failed when refreshing routing table", "error", err)
}
// ping Routing Table peers that haven't been hear of/from in the interval they should have been.
for _, ps := range dht.routingTable.GetPeerInfos() {
// ping the peer if it's due for a ping and evict it if the ping fails
if time.Since(ps.LastSuccessfulOutboundQueryAt) > dht.successfulOutboundQueryGracePeriod {
livelinessCtx, cancel := context.WithTimeout(ctx, peerPingTimeout)
if err := dht.host.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil {
logger.Debugw("evicting peer after failed ping", "peer", ps.Id, "error", err)
dht.routingTable.RemovePeer(ps.Id)
}
cancel()
}
}
}
})
}
func collectWaitingChannels(source chan chan<- error) []chan<- error {
var waiting []chan<- error
for {
select {
case res := <-source:
if res != nil {
waiting = append(waiting, res)
}
default:
return waiting
}
}
}
func (dht *IpfsDHT) doRefresh(ctx context.Context) error {
var merr error
// wait for the self walk result
selfWalkres := make(chan error, 1)
select {
case dht.triggerSelfLookup <- selfWalkres:
case <-ctx.Done():
return ctx.Err()
}
select {
case err := <-selfWalkres:
if err != nil {
merr = multierror.Append(merr, err)
}
case <-ctx.Done():
return ctx.Err()
}
if err := dht.refreshCpls(ctx); err != nil {
merr = multierror.Append(merr, err)
}
return merr
}
// refreshCpls scans the routing table, and does a random walk for cpl's that haven't been queried since the given period
func (dht *IpfsDHT) refreshCpls(ctx context.Context) error {
doQuery := func(cpl uint, target string, f func(context.Context) error) error {
logger.Infof("starting refreshing cpl %d to %s (routing table size was %d)",
cpl, target, dht.routingTable.Size())
defer func() {
logger.Infof("finished refreshing cpl %d to %s (routing table size is now %d)",
cpl, target, dht.routingTable.Size())
}()
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
defer cancel()
err := f(queryCtx)
if err == context.DeadlineExceeded && queryCtx.Err() == context.DeadlineExceeded && ctx.Err() == nil {
return nil
}
return err
}
trackedCpls := dht.routingTable.GetTrackedCplsForRefresh()
var merr error
for cpl, lastRefreshedAt := range trackedCpls {
if time.Since(lastRefreshedAt) <= dht.rtRefreshInterval {
continue
}
// gen rand peer with the cpl
randPeer, err := dht.routingTable.GenRandPeerID(uint(cpl))
if err != nil {
logger.Errorw("failed to generate peer ID", "cpl", cpl, "error", err)
continue
}
// walk to the generated peer
walkFnc := func(c context.Context) error {
_, err := dht.GetClosestPeers(c, string(randPeer))
return err
}
if err := doQuery(uint(cpl), randPeer.String(), walkFnc); err != nil {
merr = multierror.Append(
merr,
fmt.Errorf("failed to do a random walk for cpl %d: %w", cpl, err),
)
}
}
return merr
}
// Bootstrap tells the DHT to get into a bootstrapped state satisfying the
// IpfsRouter interface.
func (dht *IpfsDHT) Bootstrap(_ context.Context) error {
// Important: don't block!
select {
case dht.triggerRtRefresh <- nil:
default:
}
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
dht.rtRefreshManager.RefreshNoWait()
return nil
}
......@@ -252,14 +47,14 @@ func (dht *IpfsDHT) Bootstrap(_ context.Context) error {
// The returned channel will block until the refresh finishes, then yield the
// error and close. The channel is buffered and safe to ignore.
func (dht *IpfsDHT) RefreshRoutingTable() <-chan error {
res := make(chan error, 1)
select {
// FIXME: this can block. Ideally, we'd return a channel without blocking.
// https://github.com/libp2p/go-libp2p-kad-dht/issues/609
case dht.triggerRtRefresh <- res:
case <-dht.ctx.Done():
res <- dht.ctx.Err()
close(res)
}
return res
return dht.rtRefreshManager.Refresh(false)
}
// ForceRefresh acts like RefreshRoutingTable but forces the DHT to refresh all
// buckets in the Routing Table irrespective of when they were last refreshed.
//
// The returned channel will block until the refresh finishes, then yield the
// error and close. The channel is buffered and safe to ignore.
func (dht *IpfsDHT) ForceRefresh() <-chan error {
return dht.rtRefreshManager.Refresh(true)
}
package dht
import (
"context"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/event"
kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/stretchr/testify/require"
)
func TestSelfWalkOnAddressChange(t *testing.T) {
ctx := context.Background()
// create three DHT instances with auto refresh disabled
d1 := setupDHT(ctx, t, false, DisableAutoRefresh())
d2 := setupDHT(ctx, t, false, DisableAutoRefresh())
d3 := setupDHT(ctx, t, false, DisableAutoRefresh())
var connectedTo *IpfsDHT
// connect d1 to whoever is "further"
if kb.CommonPrefixLen(kb.ConvertPeerID(d1.self), kb.ConvertPeerID(d2.self)) <=
kb.CommonPrefixLen(kb.ConvertPeerID(d1.self), kb.ConvertPeerID(d3.self)) {
connect(t, ctx, d1, d3)
connectedTo = d3
} else {
connect(t, ctx, d1, d2)
connectedTo = d2
}
// then connect d2 AND d3
connect(t, ctx, d2, d3)
// d1 should have ONLY 1 peer in it's RT
waitForWellFormedTables(t, []*IpfsDHT{d1}, 1, 1, 2*time.Second)
require.Equal(t, connectedTo.self, d1.routingTable.ListPeers()[0])
// now emit the address change event
em, err := d1.host.EventBus().Emitter(&event.EvtLocalAddressesUpdated{})
require.NoError(t, err)
require.NoError(t, em.Emit(event.EvtLocalAddressesUpdated{}))
waitForWellFormedTables(t, []*IpfsDHT{d1}, 2, 2, 2*time.Second)
// it should now have both peers in the RT
ps := d1.routingTable.ListPeers()
require.Contains(t, ps, d2.self)
require.Contains(t, ps, d3.self)
}
......@@ -18,24 +18,23 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multihash"
"github.com/multiformats/go-multistream"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
test "github.com/libp2p/go-libp2p-kad-dht/internal/testing"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/ipfs/go-cid"
u "github.com/ipfs/go-ipfs-util"
kb "github.com/libp2p/go-libp2p-kbucket"
record "github.com/libp2p/go-libp2p-record"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
ma "github.com/multiformats/go-multiaddr"
"github.com/ipfs/go-cid"
detectrace "github.com/ipfs/go-detect-race"
u "github.com/ipfs/go-ipfs-util"
ma "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
"github.com/multiformats/go-multistream"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var testCaseCids []cid.Cid
......@@ -831,6 +830,35 @@ func TestRefreshBelowMinRTThreshold(t *testing.T) {
assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!")
}
func TestQueryWithEmptyRTShouldNotPanic(t *testing.T) {
ctx := context.Background()
d := setupDHT(ctx, t, false)
// TODO This swallows the error for now, should we change it ?
// FindProviders
ps, _ := d.FindProviders(ctx, testCaseCids[0])
require.Empty(t, ps)
// GetClosestPeers
pc, err := d.GetClosestPeers(ctx, "key")
require.Nil(t, pc)
require.Equal(t, kb.ErrLookupFailure, err)
// GetValue
best, err := d.GetValue(ctx, "key")
require.Empty(t, best)
require.Error(t, err)
// SearchValue
bchan, err := d.SearchValue(ctx, "key")
require.Empty(t, bchan)
require.NoError(t, err)
// Provide
err = d.Provide(ctx, testCaseCids[0], true)
require.Equal(t, kb.ErrLookupFailure, err)
}
func TestPeriodicRefresh(t *testing.T) {
if testing.Short() {
t.SkipNow()
......
This diff is collapsed.
package rtrefresh
import (
"context"
"fmt"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
kbucket "github.com/libp2p/go-libp2p-kbucket"
"github.com/hashicorp/go-multierror"
logging "github.com/ipfs/go-log"
)
var logger = logging.Logger("dht/RtRefreshManager")
const (
peerPingTimeout = 10 * time.Second
)
type triggerRefreshReq struct {
respCh chan error
forceCplRefresh bool
}
type RtRefreshManager struct {
ctx context.Context
cancel context.CancelFunc
refcount sync.WaitGroup
closeOnce sync.Once
// peerId of this DHT peer i.e. self peerId.
h host.Host
dhtPeerId peer.ID
rt *kbucket.RoutingTable
enableAutoRefresh bool // should run periodic refreshes ?
refreshKeyGenFnc func(cpl uint) (string, error) // generate the key for the query to refresh this cpl
refreshQueryFnc func(ctx context.Context, key string) error // query to run for a refresh.
refreshQueryTimeout time.Duration // timeout for one refresh query
// interval between two periodic refreshes.
// also, a cpl wont be refreshed if the time since it was last refreshed
// is below the interval..unless a "forced" refresh is done.
refreshInterval time.Duration
successfulOutboundQueryGracePeriod time.Duration
triggerRefresh chan *triggerRefreshReq // channel to write refresh requests to.
}
func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool,
refreshKeyGenFnc func(cpl uint) (string, error),
refreshQueryFnc func(ctx context.Context, key string) error,
refreshQueryTimeout time.Duration,
refreshInterval time.Duration,
successfulOutboundQueryGracePeriod time.Duration) (*RtRefreshManager, error) {
ctx, cancel := context.WithCancel(context.Background())
return &RtRefreshManager{
ctx: ctx,
cancel: cancel,
h: h,
dhtPeerId: h.ID(),
rt: rt,
enableAutoRefresh: autoRefresh,
refreshKeyGenFnc: refreshKeyGenFnc,
refreshQueryFnc: refreshQueryFnc,
refreshQueryTimeout: refreshQueryTimeout,
refreshInterval: refreshInterval,
successfulOutboundQueryGracePeriod: successfulOutboundQueryGracePeriod,
triggerRefresh: make(chan *triggerRefreshReq),
}, nil
}
func (r *RtRefreshManager) Start() error {
r.refcount.Add(1)
go r.loop()
return nil
}
func (r *RtRefreshManager) Close() error {
r.closeOnce.Do(func() {
r.cancel()
r.refcount.Wait()
})
return nil
}
// RefreshRoutingTable requests the refresh manager to refresh the Routing Table.
// If the force parameter is set to true true, all buckets will be refreshed irrespective of when they were last refreshed.
//
// The returned channel will block until the refresh finishes, then yield the
// error and close. The channel is buffered and safe to ignore.
// FIXME: this can block. Ideally, we'd return a channel without blocking.
// https://github.com/libp2p/go-libp2p-kad-dht/issues/609
func (r *RtRefreshManager) Refresh(force bool) <-chan error {
resp := make(chan error, 1)
select {
case r.triggerRefresh <- &triggerRefreshReq{respCh: resp, forceCplRefresh: force}:
case <-r.ctx.Done():
resp <- r.ctx.Err()
}
return resp
}
// RefreshNoWait requests the refresh manager to refresh the Routing Table.
// However, it moves on without blocking if it's request can't get through.
func (r *RtRefreshManager) RefreshNoWait() {
select {
case r.triggerRefresh <- &triggerRefreshReq{}:
default:
}
}
func (r *RtRefreshManager) loop() {
defer r.refcount.Done()
var refreshTickrCh <-chan time.Time
if r.enableAutoRefresh {
err := r.doRefresh(true)
if err != nil {
logger.Warn("failed when refreshing routing table", err)
}
t := time.NewTicker(r.refreshInterval)
defer t.Stop()
refreshTickrCh = t.C
}
for {
var waiting []chan<- error
var forced bool
select {
case <-refreshTickrCh:
case triggerRefreshReq := <-r.triggerRefresh:
if triggerRefreshReq.respCh != nil {
waiting = append(waiting, triggerRefreshReq.respCh)
}
forced = forced || triggerRefreshReq.forceCplRefresh
case <-r.ctx.Done():
return
}
// Batch multiple refresh requests if they're all waiting at the same time.
OuterLoop:
for {
select {
case triggerRefreshReq := <-r.triggerRefresh:
if triggerRefreshReq.respCh != nil {
waiting = append(waiting, triggerRefreshReq.respCh)
}
forced = forced || triggerRefreshReq.forceCplRefresh
default:
break OuterLoop
}
}
// EXECUTE the refresh
// ping Routing Table peers that haven't been heard of/from in the interval they should have been.
// and evict them if they don't reply.
var wg sync.WaitGroup
for _, ps := range r.rt.GetPeerInfos() {
if time.Since(ps.LastSuccessfulOutboundQueryAt) > r.successfulOutboundQueryGracePeriod {
wg.Add(1)
go func(ps kbucket.PeerInfo) {
defer wg.Done()
livelinessCtx, cancel := context.WithTimeout(r.ctx, peerPingTimeout)
if err := r.h.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil {
logger.Debugw("evicting peer after failed ping", "peer", ps.Id, "error", err)
r.rt.RemovePeer(ps.Id)
}
cancel()
}(ps)
}
}
wg.Wait()
// Query for self and refresh the required buckets
err := r.doRefresh(forced)
for _, w := range waiting {
w <- err
close(w)
}
if err != nil {
logger.Warnw("failed when refreshing routing table", "error", err)
}
}
}
func (r *RtRefreshManager) doRefresh(forceRefresh bool) error {
var merr error
if err := r.queryForSelf(); err != nil {
merr = multierror.Append(merr, err)
}
refreshCpls := r.rt.GetTrackedCplsForRefresh()
rfnc := func(cpl uint) (err error) {
if forceRefresh {
err = r.refreshCpl(cpl)
} else {
err = r.refreshCplIfEligible(cpl, refreshCpls[cpl])
}
return
}
for c := range refreshCpls {
cpl := uint(c)
if err := rfnc(cpl); err != nil {
merr = multierror.Append(merr, err)
} else {
// If we see a gap at a Cpl in the Routing table, we ONLY refresh up until the maximum cpl we
// have in the Routing Table OR (2 * (Cpl+ 1) with the gap), whichever is smaller.
// This is to prevent refreshes for Cpls that have no peers in the network but happen to be before a very high max Cpl
// for which we do have peers in the network.
// The number of 2 * (Cpl + 1) can be proved and a proof would have been written here if the programmer
// had paid more attention in the Math classes at university.
// So, please be patient and a doc explaining it will be published soon.
if r.rt.NPeersForCpl(cpl) == 0 {
lastCpl := min(2*(c+1), len(refreshCpls)-1)
for i := c + 1; i < lastCpl+1; i++ {
if err := rfnc(uint(i)); err != nil {
merr = multierror.Append(merr, err)
}
}
return merr
}
}
}
return merr
}
func min(a int, b int) int {
if a <= b {
return a
}
return b
}
func (r *RtRefreshManager) refreshCplIfEligible(cpl uint, lastRefreshedAt time.Time) error {
if time.Since(lastRefreshedAt) <= r.refreshInterval {
logger.Debugf("not running refresh for cpl %d as time since last refresh not above interval", cpl)
return nil
}
return r.refreshCpl(cpl)
}
func (r *RtRefreshManager) refreshCpl(cpl uint) error {
// gen a key for the query to refresh the cpl
key, err := r.refreshKeyGenFnc(cpl)
if err != nil {
return fmt.Errorf("failed to generated query key for cpl=%d, err=%s", cpl, err)
}
logger.Infof("starting refreshing cpl %d with key %s (routing table size was %d)",
cpl, key, r.rt.Size())
if err := r.runRefreshDHTQuery(key); err != nil {
return fmt.Errorf("failed to refresh cpl=%d, err=%s", cpl, err)
}
logger.Infof("finished refreshing cpl %d, routing table size is now %d", cpl, r.rt.Size())
return nil
}
func (r *RtRefreshManager) queryForSelf() error {
if err := r.runRefreshDHTQuery(string(r.dhtPeerId)); err != nil {
return fmt.Errorf("failed to query for self, err=%s", err)
}
return nil
}
func (r *RtRefreshManager) runRefreshDHTQuery(key string) error {
queryCtx, cancel := context.WithTimeout(r.ctx, r.refreshQueryTimeout)
defer cancel()
err := r.refreshQueryFnc(queryCtx, key)
if err == nil || (err == context.DeadlineExceeded && queryCtx.Err() == context.DeadlineExceeded) {
return nil
}
return fmt.Errorf("failed to run refresh DHT query for key=%s, err=%s", key, err)
}
package rtrefresh
import (
"context"
"strconv"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/test"
kb "github.com/libp2p/go-libp2p-kbucket"
pstore "github.com/libp2p/go-libp2p-peerstore"
"github.com/stretchr/testify/require"
)
func TestSkipRefreshOnGapCpls(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
local := test.RandPeerIDFatal(t)
// adds a peer for a cpl.
// The "ignoreCpl" is the cpl for which we assume we have no peers in the network.
// So. if the query function gets a "key" which is basically tha stringed version of the "ignoreCpl",
// we return without adding any peers for it to the Routing Table.
qFuncWithIgnore := func(rt *kb.RoutingTable, ignoreCpl uint) func(c context.Context, key string) error {
return func(c context.Context, key string) error {
if key == string(local) {
return nil
}
u, err := strconv.ParseInt(key, 10, 64)
require.NoError(t, err)
if uint(u) == ignoreCpl {
return nil
}
p, err := rt.GenRandPeerID(uint(u))
require.NoError(t, err)
b, err := rt.TryAddPeer(p, true)
require.True(t, b)
require.NoError(t, err)
return nil
}
}
// We use the cpl as the key for the query. So, the cpl -> key transformation function
// basically just converts the uint cpl to a string key using the strconv lib.
kfnc := func(cpl uint) (string, error) {
return strconv.FormatInt(int64(cpl), 10), nil
}
// when 2*gapcpl < maxCpl
// gap is 2 and max is 10
rt, err := kb.NewRoutingTable(2, kb.ConvertPeerID(local), time.Hour, pstore.NewMetrics(), 100*time.Hour)
require.NoError(t, err)
r := &RtRefreshManager{ctx: ctx, rt: rt, refreshKeyGenFnc: kfnc, dhtPeerId: local}
icpl := uint(2)
lastCpl := 2 * (icpl + 1)
p, err := rt.GenRandPeerID(10)
require.NoError(t, err)
b, _ := rt.TryAddPeer(p, true)
require.True(t, b)
r.refreshQueryFnc = qFuncWithIgnore(rt, icpl)
require.NoError(t, r.doRefresh(true))
for i := uint(0); i < lastCpl+1; i++ {
if i == icpl {
require.Equal(t, 0, rt.NPeersForCpl(i))
continue
}
require.Equal(t, 1, rt.NPeersForCpl(uint(i)))
}
for i := lastCpl + 1; i < 10; i++ {
require.Equal(t, 0, rt.NPeersForCpl(i))
}
// when 2 * (gapcpl + 1) > maxCpl
rt, err = kb.NewRoutingTable(2, kb.ConvertPeerID(local), time.Hour, pstore.NewMetrics(), 100*time.Hour)
require.NoError(t, err)
r = &RtRefreshManager{ctx: ctx, rt: rt, refreshKeyGenFnc: kfnc, dhtPeerId: local}
icpl = uint(6)
p, err = rt.GenRandPeerID(10)
require.NoError(t, err)
b, _ = rt.TryAddPeer(p, true)
require.True(t, b)
r.refreshQueryFnc = qFuncWithIgnore(rt, icpl)
require.NoError(t, r.doRefresh(true))
for i := uint(0); i < 10; i++ {
if i == icpl {
require.Equal(t, 0, rt.NPeersForCpl(i))
continue
}
require.Equal(t, 1, rt.NPeersForCpl(uint(i)))
}
require.Equal(t, 2, rt.NPeersForCpl(10))
}
......@@ -85,10 +85,7 @@ func (nn *subscriberNotifee) subscribe(proc goprocess.Process) {
// with our new address to all peers we are connected to. However, we might not necessarily be connected
// to our closet peers & so in the true spirit of Zen, searching for ourself in the network really is the best way
// to to forge connections with those matter.
select {
case dht.triggerSelfLookup <- nil:
default:
}
dht.rtRefreshManager.RefreshNoWait()
case event.EvtPeerProtocolsUpdated:
handlePeerChangeEvent(dht, evt.Peer)
case event.EvtPeerIdentificationCompleted:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment