Commit 7cce5bdd authored by Aarsh Shah's avatar Aarsh Shah

make bootstrap synchronous & get tests to work

parent e2842f03
......@@ -69,7 +69,8 @@ type IpfsDHT struct {
bootstrapCfg opts.BootstrapConfig
triggerBootstrap chan struct{}
triggerAutoBootstrap bool
triggerBootstrap chan *bootstrapReq
}
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
......@@ -103,6 +104,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator
dht.triggerAutoBootstrap = cfg.TriggerAutoBootstrap
if !cfg.Client {
for _, p := range cfg.Protocols {
......@@ -160,7 +162,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
routingTable: rt,
protocols: protocols,
bucketSize: bucketSize,
triggerBootstrap: make(chan struct{}),
triggerBootstrap: make(chan *bootstrapReq),
}
dht.ctx = dht.newContextWithLocalTags(ctx)
......@@ -172,10 +174,10 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
// come up with an alternative solution.
// issue is being tracked at https://github.com/libp2p/go-libp2p-kad-dht/issues/387
/*func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) {
writeResp := func(errorChan chan error, err error) {
writeResp := func(errorChan chan error, errChan error) {
select {
case <-proc.Closing():
case errorChan <- err:
case errorChan <- errChan:
}
close(errorChan)
}
......
......@@ -43,42 +43,50 @@ func init() {
}
}
type bootstrapReq struct {
errChan chan error
}
func makeBootstrapReq() *bootstrapReq {
errChan := make(chan error, 1)
return &bootstrapReq{errChan}
}
// Bootstrap i
func (dht *IpfsDHT) startBootstrapping() error {
// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
dht.proc.Go(func(proc process.Process) {
ctx := processctx.OnClosingContext(proc)
scanInterval := time.NewTicker(dht.bootstrapCfg.RoutingTableScanInterval)
scanInterval := time.NewTicker(dht.bootstrapCfg.BucketPeriod)
defer scanInterval.Stop()
var (
lastSelfWalk time.Time
walkSelf = true
)
for {
if walkSelf {
walkSelf = false
err := dht.selfWalk(ctx)
if err != nil {
logger.Warningf("self walk: error: %s", err)
} else {
lastSelfWalk = time.Now()
}
}
err := dht.bootstrapBuckets(ctx)
if err != nil {
logger.Warningf("bootstrap buckets: error bootstrapping: %s", err)
// run bootstrap if option is set
if dht.triggerAutoBootstrap {
if err := dht.doBootstrap(ctx, true, &lastSelfWalk); err != nil {
logger.Warningf("bootstrap error: %s", err)
}
}
for {
select {
case now := <-scanInterval.C:
// It doesn't make sense to query for self unless we're _also_ going to fill out the routing table.
walkSelf = now.After(lastSelfWalk.Add(dht.bootstrapCfg.SelfQueryInterval))
case <-dht.triggerBootstrap:
walkSelf = true
walkSelf := now.After(lastSelfWalk.Add(dht.bootstrapCfg.SelfQueryInterval))
if err := dht.doBootstrap(ctx, walkSelf, &lastSelfWalk); err != nil {
logger.Warning("bootstrap error: %s", err)
}
case req := <-dht.triggerBootstrap:
logger.Infof("triggering a bootstrap: RT has %d peers", dht.routingTable.Size())
err := dht.doBootstrap(ctx, true, &lastSelfWalk)
select {
case req.errChan <- err:
close(req.errChan)
default:
}
case <-ctx.Done():
return
}
......@@ -88,6 +96,22 @@ func (dht *IpfsDHT) startBootstrapping() error {
return nil
}
func (dht *IpfsDHT) doBootstrap(ctx context.Context, walkSelf bool, latestSelfWalk *time.Time) error {
if walkSelf {
if err := dht.selfWalk(ctx); err != nil {
return fmt.Errorf("self walk: error: %s", err)
} else {
*latestSelfWalk = time.Now()
}
}
if err := dht.bootstrapBuckets(ctx); err != nil {
return fmt.Errorf("bootstrap buckets: error bootstrapping: %s", err)
}
return nil
}
// bootstrapBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period
func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {
doQuery := func(bucketId int, target string, f func(context.Context) error) error {
......@@ -167,9 +191,10 @@ func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
//
// Note: the context is ignored.
func (dht *IpfsDHT) Bootstrap(_ context.Context) error {
// Returns an error just in case we want to do that in the future.
req := makeBootstrapReq()
select {
case dht.triggerBootstrap <- struct{}{}:
case dht.triggerBootstrap <- req:
return <-req.errChan
default:
}
return nil
......
......@@ -113,6 +113,7 @@ func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
opts.Client(client),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
)
if err != nil {
t.Fatal(err)
......@@ -200,7 +201,7 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
start := rand.Intn(len(dhts)) // randomize to decrease bias.
for i := range dhts {
dht := dhts[(start+i)%len(dhts)]
dht.bootstrapOnce(ctx)
dht.Bootstrap(ctx)
}
}
......@@ -690,7 +691,18 @@ func TestBootstrap(t *testing.T) {
func TestBootstrapBelowMinRTThreshold(t *testing.T) {
ctx := context.Background()
dhtA := setupDHT(ctx, t, false)
// enable auto bootstrap on A
dhtA, err := New(
ctx,
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
)
if err != nil {
t.Fatal(err)
}
dhtB := setupDHT(ctx, t, false)
dhtC := setupDHT(ctx, t, false)
......@@ -783,7 +795,7 @@ func TestPeriodicBootstrap(t *testing.T) {
t.Logf("bootstrapping them so they find each other. %d", nDHTs)
for _, dht := range dhts {
go dht.bootstrapOnce(ctx)
go dht.Bootstrap(ctx)
}
// this is async, and we dont know when it's finished with one cycle, so keep checking
......@@ -1416,6 +1428,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
opts.Protocols("/esh/dht"),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
}
dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...)
......@@ -1454,6 +1467,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
opts.Protocols("/esh/dht"),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
}...)
if err != nil {
t.Fatal(err)
......@@ -1463,6 +1477,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
opts.Protocols("/lsr/dht"),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
}...)
if err != nil {
t.Fatal(err)
......
......@@ -9,6 +9,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
opts "github.com/libp2p/go-libp2p-kad-dht/opts"
ggio "github.com/gogo/protobuf/io"
u "github.com/ipfs/go-ipfs-util"
......@@ -29,7 +30,8 @@ func TestGetFailures(t *testing.T) {
}
hosts := mn.Hosts()
d, err := New(ctx, hosts[0])
os := []opts.Option{opts.DisableAutoBootstrap()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
......@@ -149,7 +151,9 @@ func TestNotFound(t *testing.T) {
t.Fatal(err)
}
hosts := mn.Hosts()
d, err := New(ctx, hosts[0])
os := []opts.Option{opts.DisableAutoBootstrap()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
......@@ -228,7 +232,8 @@ func TestLessThanKResponses(t *testing.T) {
}
hosts := mn.Hosts()
d, err := New(ctx, hosts[0])
os := []opts.Option{opts.DisableAutoBootstrap()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
......@@ -297,7 +302,8 @@ func TestMultipleQueries(t *testing.T) {
t.Fatal(err)
}
hosts := mn.Hosts()
d, err := New(ctx, hosts[0])
os := []opts.Option{opts.DisableAutoBootstrap()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
......
......@@ -34,9 +34,9 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
if dht.host.Network().Connectedness(p) == network.Connected {
bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold
dht.Update(dht.Context(), p)
if bootstrap {
if bootstrap && dht.triggerAutoBootstrap {
select {
case dht.triggerBootstrap <- struct{}{}:
case dht.triggerBootstrap <- makeBootstrapReq():
default:
}
}
......@@ -80,9 +80,9 @@ func (nn *netNotifiee) testConnection(v network.Conn) {
if dht.host.Network().Connectedness(p) == network.Connected {
bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold
dht.Update(dht.Context(), p)
if bootstrap {
if bootstrap && dht.triggerAutoBootstrap {
select {
case dht.triggerBootstrap <- struct{}{}:
case dht.triggerBootstrap <- makeBootstrapReq():
default:
}
}
......
......@@ -21,20 +21,20 @@ var (
// BootstrapConfig specifies parameters used for bootstrapping the DHT.
type BootstrapConfig struct {
BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it
Timeout time.Duration // how long to wait for a bootstrap query to run
RoutingTableScanInterval time.Duration // how often to scan the RT for k-buckets that haven't been queried since the given period
SelfQueryInterval time.Duration // how often to query for self
BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it
Timeout time.Duration // how long to wait for a bootstrap query to run
SelfQueryInterval time.Duration // how often to query for self
}
// Options is a structure containing all the options that can be used when constructing a DHT.
type Options struct {
Datastore ds.Batching
Validator record.Validator
Client bool
Protocols []protocol.ID
BucketSize int
BootstrapConfig BootstrapConfig
Datastore ds.Batching
Validator record.Validator
Client bool
Protocols []protocol.ID
BucketSize int
BootstrapConfig BootstrapConfig
TriggerAutoBootstrap bool
}
// Apply applies the given options to this Option
......@@ -63,14 +63,13 @@ var Defaults = func(o *Options) error {
// same as that mentioned in the kad dht paper
BucketPeriod: 1 * time.Hour,
// since the default bucket period is 1 hour, a scan interval of 30 minutes sounds reasonable
RoutingTableScanInterval: 30 * time.Minute,
Timeout: 10 * time.Second,
SelfQueryInterval: 1 * time.Hour,
}
o.TriggerAutoBootstrap = true
return nil
}
......@@ -149,3 +148,11 @@ func BucketSize(bucketSize int) Option {
return nil
}
}
// DisableAutoBootstrap disables auto bootstrap on the dht
func DisableAutoBootstrap() Option {
return func(o *Options) error {
o.TriggerAutoBootstrap = false
return nil
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment