Unverified Commit 08ab423f authored by Aarsh Shah's avatar Aarsh Shah Committed by GitHub

Boostrap empty RT and Optimize allocs when we discover new peers (#631)

* Bootstrap when RT is empty and optimize allocations.
Co-authored-by: default avatarSteven Allen <steven@stebalien.com>
parent 9be7169f
......@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math"
"math/rand"
"sync"
"time"
......@@ -86,7 +87,8 @@ type IpfsDHT struct {
// DHT protocols we query with. We'll only add peers to our routing
// table if they speak these protocols.
protocols []protocol.ID
protocols []protocol.ID
protocolsStrs []string
// DHT protocols we can respond to.
serverProtocols []protocol.ID
......@@ -108,6 +110,11 @@ type IpfsDHT struct {
triggerRtRefresh chan chan<- error
triggerSelfLookup chan chan<- error
// A set of bootstrap peers to fallback on if all other attempts to fix
// the routing table fail (or, e.g., this is the first time this node is
// connecting to the network).
bootstrapPeers []peer.AddrInfo
maxRecordAge time.Duration
// Allows disabling dht subsystems. These should _only_ be set on
......@@ -254,6 +261,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
strmap: make(map[peer.ID]*messageSender),
birth: time.Now(),
protocols: protocols,
protocolsStrs: protocol.ConvertToStrings(protocols),
serverProtocols: serverProtocols,
bucketSize: cfg.bucketSize,
alpha: cfg.concurrency,
......@@ -262,7 +270,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
triggerSelfLookup: make(chan chan<- error),
queryPeerFilter: cfg.queryPeerFilter,
routingTablePeerFilter: cfg.routingTable.peerFilter,
fixLowPeersChan: make(chan struct{}),
fixLowPeersChan: make(chan struct{}, 1),
}
// construct routing table
......@@ -271,6 +279,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
}
dht.routingTable = rt
dht.bootstrapPeers = cfg.bootstrapPeers
// create a DHT proc with the given context
dht.proc = goprocessctx.WithContext(ctx)
......@@ -323,22 +332,70 @@ func (dht *IpfsDHT) Mode() ModeOpt {
return dht.auto
}
// fixLowPeers tries to get more peers into the routing table if we're below the threshold
// fixLowPeersRoutine tries to get more peers into the routing table if we're below the threshold
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
timer := time.NewTimer(periodicBootstrapInterval)
defer timer.Stop()
for {
select {
case <-dht.fixLowPeersChan:
case <-timer.C:
case <-proc.Closing():
return
}
if dht.routingTable.Size() > minRTRefreshThreshold {
continue
}
// we try to add all peers we are connected to to the Routing Table
// in case they aren't already there.
for _, p := range dht.host.Network().Peers() {
dht.peerFound(dht.Context(), p, false)
}
// TODO Active Bootstrapping
// We should first use non-bootstrap peers we knew of from previous
// snapshots of the Routing Table before we connect to the bootstrappers.
// See https://github.com/libp2p/go-libp2p-kad-dht/issues/387.
if dht.routingTable.Size() == 0 {
if len(dht.bootstrapPeers) == 0 {
// No point in continuing, we have no peers!
continue
}
found := 0
for _, i := range rand.Perm(len(dht.bootstrapPeers)) {
ai := dht.bootstrapPeers[i]
err := dht.Host().Connect(dht.Context(), ai)
if err == nil {
found++
} else {
logger.Warnw("failed to bootstrap", "peer", ai.ID, "error", err)
}
// Wait for two bootstrap peers, or try them all.
//
// Why two? In theory, one should be enough
// normally. However, if the network were to
// restart and everyone connected to just one
// bootstrapper, we'll end up with a mostly
// partitioned network.
//
// So we always bootstrap with two random peers.
if found == maxNBoostrappers {
break
}
}
}
// if we still don't have peers in our routing table(probably because Identify hasn't completed),
// there is no point in triggering a Refresh.
if dht.routingTable.Size() == 0 {
continue
}
if dht.autoRefresh {
select {
case dht.triggerRtRefresh <- nil:
......@@ -504,9 +561,6 @@ func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
// A peer that does not support the DHT protocol is dead for us.
// There's no point in talking to anymore till it starts supporting the DHT protocol again.
dht.routingTable.RemovePeer(p)
// since we lost a peer from the RT, we should do this here
dht.fixRTIfNeeded()
}
func (dht *IpfsDHT) fixRTIfNeeded() {
......
......@@ -22,6 +22,10 @@ var minRTRefreshThreshold = 10
// timeout for pinging one peer
const peerPingTimeout = 10 * time.Second
const (
periodicBootstrapInterval = 2 * time.Minute
maxNBoostrappers = 2
)
func init() {
for _, s := range []string{
......
......@@ -13,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-kad-dht/providers"
record "github.com/libp2p/go-libp2p-record"
ma "github.com/multiformats/go-multiaddr"
)
// ModeOpt describes what mode the dht should operate in
......@@ -60,6 +61,7 @@ type config struct {
// set to true if we're operating in v1 dht compatible mode
v1CompatibleMode bool
bootstrapPeers []peer.AddrInfo
}
func emptyQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool { return true }
......@@ -393,3 +395,16 @@ func V1CompatibleMode(enable bool) Option {
return nil
}
}
// BootstrapPeers configures the bootstrapping nodes that we will connect to to seed
// and refresh our Routing Table if it becomes empty.
func BootstrapPeers(addrs ...ma.Multiaddr) Option {
return func(c *config) error {
bootstrappers, err := peer.AddrInfosFromP2pAddrs(addrs...)
if err != nil {
return fmt.Errorf("failed to parse bootstrap peer addresses: %w", err)
}
c.bootstrapPeers = bootstrappers
return nil
}
}
......@@ -1966,3 +1966,107 @@ func TestRoutingFilter(t *testing.T) {
case <-time.After(time.Millisecond * 200):
}
}
func TestBootStrapWhenRTIsEmpty(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// create three boostrap peers each of which is connected to 1 other peer.
nBootStraps := 3
bootstrappers := setupDHTS(t, ctx, nBootStraps)
defer func() {
for i := 0; i < nBootStraps; i++ {
bootstrappers[i].Close()
defer bootstrappers[i].host.Close()
}
}()
bootstrapcons := setupDHTS(t, ctx, nBootStraps)
defer func() {
for i := 0; i < nBootStraps; i++ {
bootstrapcons[i].Close()
defer bootstrapcons[i].host.Close()
}
}()
for i := 0; i < nBootStraps; i++ {
connect(t, ctx, bootstrappers[i], bootstrapcons[i])
}
// convert the bootstrap addresses to a p2p address
bootstrapAddrs := make([]ma.Multiaddr, nBootStraps)
for i := 0; i < nBootStraps; i++ {
b, err := peer.AddrInfoToP2pAddrs(&peer.AddrInfo{ID: bootstrappers[i].self,
Addrs: bootstrappers[i].host.Addrs()})
require.NoError(t, err)
bootstrapAddrs[i] = b[0]
}
//----------------
// We will initialize a DHT with 1 bootstrapper, connect it to another DHT,
// then remove the latter from the Routing Table
// This should add the bootstrap peer and the peer that the bootstrap peer is conencted to
// to it's Routing Table.
// AutoRefresh needs to be enabled for this.
dht1, err := New(
ctx,
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
testPrefix,
NamespacedValidator("v", blankValidator{}),
Mode(ModeServer),
BootstrapPeers(bootstrapAddrs[0]),
)
require.NoError(t, err)
dht2 := setupDHT(ctx, t, false)
defer func() {
dht1.host.Close()
dht2.host.Close()
dht1.Close()
dht2.Close()
}()
connect(t, ctx, dht1, dht2)
require.NoError(t, dht2.Close())
require.NoError(t, dht2.host.Close())
require.NoError(t, dht1.host.Network().ClosePeer(dht2.self))
dht1.routingTable.RemovePeer(dht2.self)
require.NotContains(t, dht2.self, dht1.routingTable.ListPeers())
require.Eventually(t, func() bool {
return dht1.routingTable.Size() == 2 && dht1.routingTable.Find(bootstrappers[0].self) != "" &&
dht1.routingTable.Find(bootstrapcons[0].self) != ""
}, 5*time.Second, 500*time.Millisecond)
//----------------
// We will initialize a DHT with 2 bootstrappers, connect it to another DHT,
// then remove the DHT handler from the other DHT which should make the first DHT's
// routing table empty.
// This should add the bootstrap peers and the peer thats the bootstrap peers are connected to
// to it's Routing Table.
// AutoRefresh needs to be enabled for this.
dht1, err = New(
ctx,
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
testPrefix,
NamespacedValidator("v", blankValidator{}),
Mode(ModeServer),
BootstrapPeers(bootstrapAddrs[1], bootstrapAddrs[2]),
)
require.NoError(t, err)
dht2 = setupDHT(ctx, t, false)
connect(t, ctx, dht1, dht2)
defer func() {
dht1.host.Close()
dht2.host.Close()
dht1.Close()
dht2.Close()
}()
connect(t, ctx, dht1, dht2)
require.NoError(t, dht2.setMode(modeClient))
require.Eventually(t, func() bool {
rt := dht1.routingTable
return rt.Size() == 4 && rt.Find(bootstrappers[1].self) != "" &&
rt.Find(bootstrappers[2].self) != "" && rt.Find(bootstrapcons[1].self) != "" && rt.Find(bootstrapcons[2].self) != ""
}, 5*time.Second, 500*time.Millisecond)
}
This diff is collapsed.
......@@ -6,13 +6,11 @@ import (
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-eventbus"
ma "github.com/multiformats/go-multiaddr"
"github.com/jbenet/goprocess"
ma "github.com/multiformats/go-multiaddr"
)
// subscriberNotifee implements network.Notifee and also manages the subscriber to the event bus. We consume peer
......@@ -156,8 +154,8 @@ func handleLocalReachabilityChangedEvent(dht *IpfsDHT, e event.EvtLocalReachabil
// supporting the primary protocols, we do not want to add peers that are speaking obsolete secondary protocols to our
// routing table
func (dht *IpfsDHT) validRTPeer(p peer.ID) (bool, error) {
protos, err := dht.peerstore.SupportsProtocols(p, protocol.ConvertToStrings(dht.protocols)...)
if len(protos) == 0 || err != nil {
b, err := dht.peerstore.FirstSupportedProtocol(p, dht.protocolsStrs...)
if len(b) == 0 || err != nil {
return false, err
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment