dht_bootstrap.go 5.13 KB
Newer Older
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
5 6
	"time"

Steven Allen's avatar
Steven Allen committed
7 8
	process "github.com/jbenet/goprocess"
	processctx "github.com/jbenet/goprocess/context"
9 10
	"github.com/libp2p/go-libp2p-core/routing"
	"github.com/multiformats/go-multiaddr"
Hector Sanjuan's avatar
Hector Sanjuan committed
11
	_ "github.com/multiformats/go-multiaddr-dns"
12 13
)

14 15
var DefaultBootstrapPeers []multiaddr.Multiaddr

16
var minRTRefreshThreshold = 4
Aarsh Shah's avatar
Aarsh Shah committed
17

18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
func init() {
	for _, s := range []string{
		"/dnsaddr/bootstrap.libp2p.io/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
		"/dnsaddr/bootstrap.libp2p.io/ipfs/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
		"/dnsaddr/bootstrap.libp2p.io/ipfs/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
		"/dnsaddr/bootstrap.libp2p.io/ipfs/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
		"/ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",            // mars.i.ipfs.io
		"/ip4/104.236.179.241/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM",           // pluto.i.ipfs.io
		"/ip4/128.199.219.111/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu",           // saturn.i.ipfs.io
		"/ip4/104.236.76.40/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64",             // venus.i.ipfs.io
		"/ip4/178.62.158.247/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd",            // earth.i.ipfs.io
		"/ip6/2604:a880:1:20::203:d001/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM",  // pluto.i.ipfs.io
		"/ip6/2400:6180:0:d0::151:6001/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu",  // saturn.i.ipfs.io
		"/ip6/2604:a880:800:10::4a:5001/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64", // venus.i.ipfs.io
		"/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd", // earth.i.ipfs.io
	} {
		ma, err := multiaddr.NewMultiaddr(s)
		if err != nil {
			panic(err)
		}
		DefaultBootstrapPeers = append(DefaultBootstrapPeers, ma)
	}
}

42 43
// Start the refresh worker.
func (dht *IpfsDHT) startRefreshing() error {
Steven Allen's avatar
Steven Allen committed
44 45 46
	// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
	dht.proc.Go(func(proc process.Process) {
		ctx := processctx.OnClosingContext(proc)
47

48 49
		refreshTicker := time.NewTicker(dht.rtRefreshPeriod)
		defer refreshTicker.Stop()
Aarsh Shah's avatar
Aarsh Shah committed
50

51 52 53
		// refresh if option is set
		if dht.autoRefresh {
			dht.doRefresh(ctx)
54
		} else {
55
			// disable the "auto-refresh" ticker so that no more ticks are sent to this channel
56
			refreshTicker.Stop()
57
		}
Steven Allen's avatar
Steven Allen committed
58

59
		for {
60
			select {
61
			case <-refreshTicker.C:
62 63
			case <-dht.triggerRtRefresh:
				logger.Infof("triggering a refresh: RT has %d peers", dht.routingTable.Size())
Matt Joiner's avatar
Matt Joiner committed
64
			case <-ctx.Done():
65 66
				return
			}
67
			dht.doRefresh(ctx)
68
		}
Steven Allen's avatar
Steven Allen committed
69
	})
Aarsh Shah's avatar
Aarsh Shah committed
70

Matt Joiner's avatar
Matt Joiner committed
71
	return nil
Matt Joiner's avatar
Matt Joiner committed
72
}
vyzo's avatar
vyzo committed
73

74
func (dht *IpfsDHT) doRefresh(ctx context.Context) {
75
	dht.selfWalk(ctx)
76
	dht.refreshBuckets(ctx)
77 78
}

79 80
// refreshBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period
func (dht *IpfsDHT) refreshBuckets(ctx context.Context) {
Aarsh Shah's avatar
Aarsh Shah committed
81
	doQuery := func(bucketId int, target string, f func(context.Context) error) error {
82
		logger.Infof("starting refreshing bucket %d to %s (routing table size was %d)",
Aarsh Shah's avatar
Aarsh Shah committed
83
			bucketId, target, dht.routingTable.Size())
84
		defer func() {
85
			logger.Infof("finished refreshing bucket %d to %s (routing table size is now %d)",
Aarsh Shah's avatar
Aarsh Shah committed
86
				bucketId, target, dht.routingTable.Size())
87
		}()
88
		queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
89
		defer cancel()
90 91 92 93 94
		err := f(queryCtx)
		if err == context.DeadlineExceeded && queryCtx.Err() == context.DeadlineExceeded && ctx.Err() == nil {
			return nil
		}
		return err
95 96
	}

Aarsh Shah's avatar
Aarsh Shah committed
97
	buckets := dht.routingTable.GetAllBuckets()
98 99 100 101 102 103
	if len(buckets) > 16 {
		// Don't bother bootstrapping more than 16 buckets.
		// GenRandPeerID can't generate target peer IDs with more than
		// 16 bits specified anyways.
		buckets = buckets[:16]
	}
Aarsh Shah's avatar
Aarsh Shah committed
104
	for bucketID, bucket := range buckets {
105 106 107 108 109 110 111 112 113 114 115
		if time.Since(bucket.RefreshedAt()) <= dht.rtRefreshPeriod {
			continue
		}
		// gen rand peer in the bucket
		randPeerInBucket := dht.routingTable.GenRandPeerID(bucketID)

		// walk to the generated peer
		walkFnc := func(c context.Context) error {
			_, err := dht.FindPeer(c, randPeerInBucket)
			if err == routing.ErrNotFound {
				return nil
116
			}
117 118
			return err
		}
Aarsh Shah's avatar
Aarsh Shah committed
119

120 121
		if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil {
			logger.Warningf("failed to do a random walk on bucket %d", bucketID)
Matt Joiner's avatar
Matt Joiner committed
122
		}
Steven Allen's avatar
Steven Allen committed
123
	}
Aarsh Shah's avatar
Aarsh Shah committed
124 125 126
}

// Traverse the DHT toward the self ID
127
func (dht *IpfsDHT) selfWalk(ctx context.Context) {
128
	queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
Aarsh Shah's avatar
Aarsh Shah committed
129 130 131
	defer cancel()
	_, err := dht.FindPeer(queryCtx, dht.self)
	if err == routing.ErrNotFound {
132
		return
Aarsh Shah's avatar
Aarsh Shah committed
133
	}
134
	logger.Warningf("failed to query self during routing table refresh: %s", err)
Aarsh Shah's avatar
Aarsh Shah committed
135 136
}

137 138
// Bootstrap tells the DHT to get into a bootstrapped state satisfying the
// IpfsRouter interface.
Steven Allen's avatar
Steven Allen committed
139
//
140
// This just calls `RefreshRoutingTable`.
Steven Allen's avatar
Steven Allen committed
141
func (dht *IpfsDHT) Bootstrap(_ context.Context) error {
142 143 144 145 146 147
	dht.RefreshRoutingTable()
	return nil
}

// RefreshRoutingTable tells the DHT to refresh it's routing tables.
func (dht *IpfsDHT) RefreshRoutingTable() {
Steven Allen's avatar
Steven Allen committed
148
	select {
149
	case dht.triggerRtRefresh <- struct{}{}:
Steven Allen's avatar
Steven Allen committed
150
	default:
Aarsh Shah's avatar
Aarsh Shah committed
151
	}
152
}