Commit 645ba5b6 authored by Steven Allen's avatar Steven Allen

fix(bootstrap): bootstrap sequentially

The default timeout is 10s so this won't take that long anyways. On the
other hand, if we do this all at once, we max the swarms dial queue.
parent ed244cd4
......@@ -2,9 +2,6 @@ package dht
import (
"context"
"fmt"
"strings"
"sync"
"time"
process "github.com/jbenet/goprocess"
......@@ -12,7 +9,6 @@ import (
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns"
"github.com/pkg/errors"
)
var DefaultBootstrapPeers []multiaddr.Multiaddr
......@@ -76,16 +72,12 @@ func (dht *IpfsDHT) startBootstrapping() error {
}
func (dht *IpfsDHT) doBootstrap(ctx context.Context) {
if err := dht.selfWalk(ctx); err != nil {
logger.Warningf("error while bootstrapping self: %s", err)
}
if err := dht.bootstrapBuckets(ctx); err != nil {
logger.Warningf("error while bootstrapping buckets: %s", err)
}
dht.selfWalk(ctx)
dht.bootstrapBuckets(ctx)
}
// bootstrapBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period
func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {
func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) {
doQuery := func(bucketId int, target string, f func(context.Context) error) error {
logger.Infof("starting bootstrap query for bucket %d to %s (routing table size was %d)",
bucketId, target, dht.routingTable.Size())
......@@ -103,60 +95,36 @@ func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {
}
buckets := dht.routingTable.GetAllBuckets()
var wg sync.WaitGroup
errChan := make(chan error)
for bucketID, bucket := range buckets {
if time.Since(bucket.RefreshedAt()) > dht.bootstrapPeriod {
wg.Add(1)
go func(bucketID int, errChan chan<- error) {
defer wg.Done()
// gen rand peer in the bucket
randPeerInBucket := dht.routingTable.GenRandPeerID(bucketID)
// walk to the generated peer
walkFnc := func(c context.Context) error {
_, err := dht.FindPeer(ctx, randPeerInBucket)
if err == routing.ErrNotFound {
return nil
}
return err
// gen rand peer in the bucket
randPeerInBucket := dht.routingTable.GenRandPeerID(bucketID)
// walk to the generated peer
walkFnc := func(c context.Context) error {
_, err := dht.FindPeer(ctx, randPeerInBucket)
if err == routing.ErrNotFound {
return nil
}
return err
}
if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil {
errChan <- errors.Wrapf(err, "failed to do a random walk on bucket %d", bucketID)
}
}(bucketID, errChan)
if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil {
logger.Warningf("failed to do a random walk on bucket %d", bucketID)
}
}
}
// wait for all walks to finish & close the error channel
go func() {
wg.Wait()
close(errChan)
}()
// accumulate errors from all go-routines. ensures wait group is completed by reading errChan until closure.
var errStrings []string
for err := range errChan {
errStrings = append(errStrings, err.Error())
}
if len(errStrings) == 0 {
return nil
} else {
return fmt.Errorf("errors encountered while running bootstrap on RT:\n%s", strings.Join(errStrings, "\n"))
}
}
// Traverse the DHT toward the self ID
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
func (dht *IpfsDHT) selfWalk(ctx context.Context) {
queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapTimeout)
defer cancel()
_, err := dht.FindPeer(queryCtx, dht.self)
if err == routing.ErrNotFound {
return nil
return
}
return err
logger.Warningf("failed to bootstrap self: %s", err)
}
// Bootstrap tells the DHT to get into a bootstrapped state.
......
......@@ -395,8 +395,6 @@ golang.org/x/tools v0.0.0-20181130052023-1c3d964395ce/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment