dht_bootstrap.go 7.45 KB
Newer Older
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
5 6
	"crypto/rand"
	"fmt"
Aarsh Shah's avatar
Aarsh Shah committed
7 8
	"strings"
	"sync"
9 10
	"time"

Aarsh Shah's avatar
Aarsh Shah committed
11
	u "github.com/ipfs/go-ipfs-util"
12 13 14
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/routing"
	"github.com/multiformats/go-multiaddr"
Hector Sanjuan's avatar
Hector Sanjuan committed
15
	_ "github.com/multiformats/go-multiaddr-dns"
Aarsh Shah's avatar
Aarsh Shah committed
16
	"github.com/pkg/errors"
17 18
)

19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
var DefaultBootstrapPeers []multiaddr.Multiaddr

func init() {
	for _, s := range []string{
		"/dnsaddr/bootstrap.libp2p.io/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
		"/dnsaddr/bootstrap.libp2p.io/ipfs/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
		"/dnsaddr/bootstrap.libp2p.io/ipfs/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
		"/dnsaddr/bootstrap.libp2p.io/ipfs/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
		"/ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",            // mars.i.ipfs.io
		"/ip4/104.236.179.241/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM",           // pluto.i.ipfs.io
		"/ip4/128.199.219.111/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu",           // saturn.i.ipfs.io
		"/ip4/104.236.76.40/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64",             // venus.i.ipfs.io
		"/ip4/178.62.158.247/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd",            // earth.i.ipfs.io
		"/ip6/2604:a880:1:20::203:d001/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM",  // pluto.i.ipfs.io
		"/ip6/2400:6180:0:d0::151:6001/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu",  // saturn.i.ipfs.io
		"/ip6/2604:a880:800:10::4a:5001/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64", // venus.i.ipfs.io
		"/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd", // earth.i.ipfs.io
	} {
		ma, err := multiaddr.NewMultiaddr(s)
		if err != nil {
			panic(err)
		}
		DefaultBootstrapPeers = append(DefaultBootstrapPeers, ma)
	}
}

Aarsh Shah's avatar
Aarsh Shah committed
45
// BootstrapConfig specifies parameters used for bootstrapping the DHT.
46
type BootstrapConfig struct {
Aarsh Shah's avatar
Aarsh Shah committed
47 48 49 50
	BucketPeriod             time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it
	Timeout                  time.Duration // how long to wait for a bootstrap query to run
	RoutingTableScanInterval time.Duration // how often to scan the RT for k-buckets that haven't been queried since the given period
	SelfQueryInterval        time.Duration // how often to query for self
51
}
52

53
var DefaultBootstrapConfig = BootstrapConfig{
Aarsh Shah's avatar
Aarsh Shah committed
54 55 56 57 58 59 60 61 62
	// same as that mentioned in the kad dht paper
	BucketPeriod: 1 * time.Hour,

	// since the default bucket period is 1 hour, a scan interval of 30 minutes sounds reasonable
	RoutingTableScanInterval: 30 * time.Minute,

	Timeout: 10 * time.Second,

	SelfQueryInterval: 1 * time.Hour,
63 64
}

Matt Joiner's avatar
Matt Joiner committed
65 66
// A method in the IpfsRouting interface. It calls BootstrapWithConfig with
// the default bootstrap config.
67
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
Matt Joiner's avatar
Matt Joiner committed
68
	return dht.BootstrapWithConfig(ctx, DefaultBootstrapConfig)
Matt Joiner's avatar
Matt Joiner committed
69
}
70

Aarsh Shah's avatar
Aarsh Shah committed
71
// Runs cfg.Queries bootstrap queries every cfg.BucketPeriod.
Matt Joiner's avatar
Matt Joiner committed
72
func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig) error {
Aarsh Shah's avatar
Aarsh Shah committed
73 74 75 76 77
	// we should query for self periodically so we can discover closer peers
	go func() {
		for {
			err := dht.BootstrapSelf(ctx)
			if err != nil {
Aarsh Shah's avatar
Aarsh Shah committed
78
				logger.Warningf("error bootstrapping while searching for my self (I'm Too Shallow ?): %s", err)
Aarsh Shah's avatar
Aarsh Shah committed
79 80 81 82 83 84 85 86 87 88
			}
			select {
			case <-time.After(cfg.SelfQueryInterval):
			case <-ctx.Done():
				return
			}
		}
	}()

	// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
Matt Joiner's avatar
Matt Joiner committed
89
	go func() {
90
		for {
Matt Joiner's avatar
Matt Joiner committed
91 92
			err := dht.runBootstrap(ctx, cfg)
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
93
				logger.Warningf("error bootstrapping: %s", err)
Matt Joiner's avatar
Matt Joiner committed
94
			}
95
			select {
Aarsh Shah's avatar
Aarsh Shah committed
96
			case <-time.After(cfg.RoutingTableScanInterval):
Matt Joiner's avatar
Matt Joiner committed
97
			case <-ctx.Done():
98 99 100
				return
			}
		}
Matt Joiner's avatar
Matt Joiner committed
101 102
	}()
	return nil
Matt Joiner's avatar
Matt Joiner committed
103
}
vyzo's avatar
vyzo committed
104

Matt Joiner's avatar
Matt Joiner committed
105 106 107 108 109
func newRandomPeerId() peer.ID {
	id := make([]byte, 32) // SHA256 is the default. TODO: Use a more canonical way to generate random IDs.
	rand.Read(id)
	id = u.Hash(id) // TODO: Feed this directly into the multihash instead of hashing it.
	return peer.ID(id)
Matt Joiner's avatar
Matt Joiner committed
110
}
111

Matt Joiner's avatar
Matt Joiner committed
112
// Traverse the DHT toward the given ID.
113
func (dht *IpfsDHT) walk(ctx context.Context, target peer.ID) (peer.AddrInfo, error) {
Matt Joiner's avatar
Matt Joiner committed
114 115 116 117 118
	// TODO: Extract the query action (traversal logic?) inside FindPeer,
	// don't actually call through the FindPeer machinery, which can return
	// things out of the peer store etc.
	return dht.FindPeer(ctx, target)
}
Steven Allen's avatar
Steven Allen committed
119

Matt Joiner's avatar
Matt Joiner committed
120 121 122 123 124 125 126 127 128 129
// Traverse the DHT toward a random ID.
func (dht *IpfsDHT) randomWalk(ctx context.Context) error {
	id := newRandomPeerId()
	p, err := dht.walk(ctx, id)
	switch err {
	case routing.ErrNotFound:
		return nil
	case nil:
		// We found a peer from a randomly generated ID. This should be very
		// unlikely.
Matt Joiner's avatar
Matt Joiner committed
130
		logger.Warningf("random walk toward %s actually found peer: %s", id, p)
Matt Joiner's avatar
Matt Joiner committed
131 132 133
		return nil
	default:
		return err
134
	}
135 136
}

137 138 139 140 141 142 143 144 145
// Traverse the DHT toward the self ID
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
	_, err := dht.walk(ctx, dht.self)
	if err == routing.ErrNotFound {
		return nil
	}
	return err
}

Aarsh Shah's avatar
Aarsh Shah committed
146
//scan the RT,& do a random walk on k-buckets that haven't been queried since the given bucket period
147
func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error {
Matt Joiner's avatar
Matt Joiner committed
148
	doQuery := func(n int, target string, f func(context.Context) error) error {
Aarsh Shah's avatar
Aarsh Shah committed
149 150
		logger.Infof("starting bootstrap query for bucket %d to %s (routing table size was %d)",
			n, target, dht.routingTable.Size())
151
		defer func() {
Aarsh Shah's avatar
Aarsh Shah committed
152 153
			logger.Infof("finished bootstrap query for bucket %d to %s (routing table size is now %d)",
				n, target, dht.routingTable.Size())
154
		}()
155
		queryCtx, cancel := context.WithTimeout(ctx, cfg.Timeout)
156
		defer cancel()
157 158 159 160 161
		err := f(queryCtx)
		if err == context.DeadlineExceeded && queryCtx.Err() == context.DeadlineExceeded && ctx.Err() == nil {
			return nil
		}
		return err
162 163
	}

Aarsh Shah's avatar
Aarsh Shah committed
164 165 166 167 168
	buckets := dht.routingTable.GetAllBuckets()
	var wg sync.WaitGroup
	errChan := make(chan error)

	for bucketID, bucket := range buckets {
Steven Allen's avatar
Steven Allen committed
169
		if time.Since(bucket.RefreshedAt()) > cfg.BucketPeriod {
Aarsh Shah's avatar
Aarsh Shah committed
170 171 172 173
			wg.Add(1)
			go func(bucketID int, errChan chan<- error) {
				defer wg.Done()
				// gen rand peer in the bucket
Steven Allen's avatar
Steven Allen committed
174
				randPeerInBucket := dht.routingTable.GenRandPeerID(bucketID)
Aarsh Shah's avatar
Aarsh Shah committed
175 176 177 178 179 180 181 182 183 184 185 186 187 188

				// walk to the generated peer
				walkFnc := func(c context.Context) error {
					_, err := dht.walk(ctx, randPeerInBucket)
					if err == routing.ErrNotFound {
						return nil
					}
					return err
				}

				if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil {
					errChan <- errors.Wrapf(err, "failed to do a random walk on bucket %d", bucketID)
				}
			}(bucketID, errChan)
Matt Joiner's avatar
Matt Joiner committed
189
		}
Steven Allen's avatar
Steven Allen committed
190 191
	}

Aarsh Shah's avatar
Aarsh Shah committed
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
	// wait for all walks to finish & close the error channel
	go func() {
		wg.Wait()
		close(errChan)
	}()

	// accumulate errors from all go-routines
	var errStrings []string
	for err := range errChan {
		errStrings = append(errStrings, err.Error())
	}
	if len(errStrings) == 0 {
		return nil
	} else {
		return fmt.Errorf("errors encountered while running bootstrap on RT: %s", strings.Join(errStrings, "\n"))
	}
}

// This is a synchronous bootstrap.
func (dht *IpfsDHT) BootstrapOnce(ctx context.Context, cfg BootstrapConfig) error {
	if err := dht.BootstrapSelf(ctx); err != nil {
		return errors.Wrap(err, "failed bootstrap while searching for self")
	} else {
		return dht.runBootstrap(ctx, cfg)
	}
217
}
218 219 220 221 222 223

func (dht *IpfsDHT) BootstrapRandom(ctx context.Context) error {
	return dht.randomWalk(ctx)
}

func (dht *IpfsDHT) BootstrapSelf(ctx context.Context) error {
224
	return dht.selfWalk(ctx)
225
}