dht_bootstrap.go 6.15 KB
Newer Older
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
5
	"fmt"
Aarsh Shah's avatar
Aarsh Shah committed
6 7
	"strings"
	"sync"
8 9
	"time"

Steven Allen's avatar
Steven Allen committed
10 11
	process "github.com/jbenet/goprocess"
	processctx "github.com/jbenet/goprocess/context"
12 13
	"github.com/libp2p/go-libp2p-core/routing"
	"github.com/multiformats/go-multiaddr"
Hector Sanjuan's avatar
Hector Sanjuan committed
14
	_ "github.com/multiformats/go-multiaddr-dns"
Aarsh Shah's avatar
Aarsh Shah committed
15
	"github.com/pkg/errors"
16 17
)

18 19
var DefaultBootstrapPeers []multiaddr.Multiaddr

Aarsh Shah's avatar
Aarsh Shah committed
20 21
var minRTBootstrapThreshold = 4

22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
func init() {
	for _, s := range []string{
		"/dnsaddr/bootstrap.libp2p.io/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
		"/dnsaddr/bootstrap.libp2p.io/ipfs/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
		"/dnsaddr/bootstrap.libp2p.io/ipfs/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
		"/dnsaddr/bootstrap.libp2p.io/ipfs/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
		"/ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",            // mars.i.ipfs.io
		"/ip4/104.236.179.241/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM",           // pluto.i.ipfs.io
		"/ip4/128.199.219.111/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu",           // saturn.i.ipfs.io
		"/ip4/104.236.76.40/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64",             // venus.i.ipfs.io
		"/ip4/178.62.158.247/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd",            // earth.i.ipfs.io
		"/ip6/2604:a880:1:20::203:d001/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM",  // pluto.i.ipfs.io
		"/ip6/2400:6180:0:d0::151:6001/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu",  // saturn.i.ipfs.io
		"/ip6/2604:a880:800:10::4a:5001/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64", // venus.i.ipfs.io
		"/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd", // earth.i.ipfs.io
	} {
		ma, err := multiaddr.NewMultiaddr(s)
		if err != nil {
			panic(err)
		}
		DefaultBootstrapPeers = append(DefaultBootstrapPeers, ma)
	}
}

46 47 48 49 50 51 52 53 54
type bootstrapReq struct {
	errChan chan error
}

func makeBootstrapReq() *bootstrapReq {
	errChan := make(chan error, 1)
	return &bootstrapReq{errChan}
}

Steven Allen's avatar
Steven Allen committed
55 56 57 58 59
// Bootstrap  i
func (dht *IpfsDHT) startBootstrapping() error {
	// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
	dht.proc.Go(func(proc process.Process) {
		ctx := processctx.OnClosingContext(proc)
60 61

		scanInterval := time.NewTicker(dht.bootstrapCfg.BucketPeriod)
Steven Allen's avatar
Steven Allen committed
62
		defer scanInterval.Stop()
Aarsh Shah's avatar
Aarsh Shah committed
63

64 65
		// run bootstrap if option is set
		if dht.triggerAutoBootstrap {
66
			if err := dht.doBootstrap(ctx, true); err != nil {
67
				logger.Warningf("bootstrap error: %s", err)
Matt Joiner's avatar
Matt Joiner committed
68
			}
69
		}
Steven Allen's avatar
Steven Allen committed
70

71
		for {
72
			select {
Steven Allen's avatar
Steven Allen committed
73
			case now := <-scanInterval.C:
74 75
				walkSelf := now.After(dht.latestSelfWalk.Add(dht.bootstrapCfg.SelfQueryInterval))
				if err := dht.doBootstrap(ctx, walkSelf); err != nil {
76 77 78
					logger.Warning("bootstrap error: %s", err)
				}
			case req := <-dht.triggerBootstrap:
Steven Allen's avatar
Steven Allen committed
79
				logger.Infof("triggering a bootstrap: RT has %d peers", dht.routingTable.Size())
80
				err := dht.doBootstrap(ctx, true)
81 82 83 84 85
				select {
				case req.errChan <- err:
					close(req.errChan)
				default:
				}
Matt Joiner's avatar
Matt Joiner committed
86
			case <-ctx.Done():
87 88 89
				return
			}
		}
Steven Allen's avatar
Steven Allen committed
90
	})
Aarsh Shah's avatar
Aarsh Shah committed
91

Matt Joiner's avatar
Matt Joiner committed
92
	return nil
Matt Joiner's avatar
Matt Joiner committed
93
}
vyzo's avatar
vyzo committed
94

95
func (dht *IpfsDHT) doBootstrap(ctx context.Context, walkSelf bool) error {
96 97 98 99
	if walkSelf {
		if err := dht.selfWalk(ctx); err != nil {
			return fmt.Errorf("self walk: error: %s", err)
		} else {
100
			dht.latestSelfWalk = time.Now()
101 102 103 104 105 106 107 108 109 110
		}
	}

	if err := dht.bootstrapBuckets(ctx); err != nil {
		return fmt.Errorf("bootstrap buckets: error bootstrapping: %s", err)
	}

	return nil
}

Aarsh Shah's avatar
Aarsh Shah committed
111
// bootstrapBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period
112
func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {
Aarsh Shah's avatar
Aarsh Shah committed
113
	doQuery := func(bucketId int, target string, f func(context.Context) error) error {
Aarsh Shah's avatar
Aarsh Shah committed
114
		logger.Infof("starting bootstrap query for bucket %d to %s (routing table size was %d)",
Aarsh Shah's avatar
Aarsh Shah committed
115
			bucketId, target, dht.routingTable.Size())
116
		defer func() {
Aarsh Shah's avatar
Aarsh Shah committed
117
			logger.Infof("finished bootstrap query for bucket %d to %s (routing table size is now %d)",
Aarsh Shah's avatar
Aarsh Shah committed
118
				bucketId, target, dht.routingTable.Size())
119
		}()
120
		queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout)
121
		defer cancel()
122 123 124 125 126
		err := f(queryCtx)
		if err == context.DeadlineExceeded && queryCtx.Err() == context.DeadlineExceeded && ctx.Err() == nil {
			return nil
		}
		return err
127 128
	}

Aarsh Shah's avatar
Aarsh Shah committed
129 130 131 132 133
	buckets := dht.routingTable.GetAllBuckets()
	var wg sync.WaitGroup
	errChan := make(chan error)

	for bucketID, bucket := range buckets {
134
		if time.Since(bucket.RefreshedAt()) > dht.bootstrapCfg.BucketPeriod {
Aarsh Shah's avatar
Aarsh Shah committed
135 136 137 138
			wg.Add(1)
			go func(bucketID int, errChan chan<- error) {
				defer wg.Done()
				// gen rand peer in the bucket
Steven Allen's avatar
Steven Allen committed
139
				randPeerInBucket := dht.routingTable.GenRandPeerID(bucketID)
Aarsh Shah's avatar
Aarsh Shah committed
140 141 142

				// walk to the generated peer
				walkFnc := func(c context.Context) error {
143
					_, err := dht.FindPeer(ctx, randPeerInBucket)
Aarsh Shah's avatar
Aarsh Shah committed
144 145 146 147 148 149 150 151 152 153
					if err == routing.ErrNotFound {
						return nil
					}
					return err
				}

				if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil {
					errChan <- errors.Wrapf(err, "failed to do a random walk on bucket %d", bucketID)
				}
			}(bucketID, errChan)
Matt Joiner's avatar
Matt Joiner committed
154
		}
Steven Allen's avatar
Steven Allen committed
155 156
	}

Aarsh Shah's avatar
Aarsh Shah committed
157 158 159 160 161 162
	// wait for all walks to finish & close the error channel
	go func() {
		wg.Wait()
		close(errChan)
	}()

Aarsh Shah's avatar
Aarsh Shah committed
163
	// accumulate errors from all go-routines. ensures wait group is completed by reading errChan until closure.
Aarsh Shah's avatar
Aarsh Shah committed
164 165 166 167 168 169 170
	var errStrings []string
	for err := range errChan {
		errStrings = append(errStrings, err.Error())
	}
	if len(errStrings) == 0 {
		return nil
	} else {
Aarsh Shah's avatar
Aarsh Shah committed
171 172 173 174 175 176 177 178 179 180 181
		return fmt.Errorf("errors encountered while running bootstrap on RT:\n%s", strings.Join(errStrings, "\n"))
	}
}

// Traverse the DHT toward the self ID
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
	queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout)
	defer cancel()
	_, err := dht.FindPeer(queryCtx, dht.self)
	if err == routing.ErrNotFound {
		return nil
Aarsh Shah's avatar
Aarsh Shah committed
182
	}
Aarsh Shah's avatar
Aarsh Shah committed
183
	return err
Aarsh Shah's avatar
Aarsh Shah committed
184 185
}

Steven Allen's avatar
Steven Allen committed
186 187 188 189 190
// Bootstrap tells the DHT to get into a bootstrapped state.
//
// Note: the context is ignored.
func (dht *IpfsDHT) Bootstrap(_ context.Context) error {
	select {
191
	case dht.triggerBootstrap <- makeBootstrapReq():
Steven Allen's avatar
Steven Allen committed
192
	default:
Aarsh Shah's avatar
Aarsh Shah committed
193
	}
Steven Allen's avatar
Steven Allen committed
194
	return nil
195
}