dht_bootstrap.go 5.81 KB
Newer Older
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
5 6 7 8
	"crypto/rand"
	"fmt"
	"time"

9
	u "github.com/ipfs/go-ipfs-util"
Steven Allen's avatar
Steven Allen committed
10 11
	goprocess "github.com/jbenet/goprocess"
	periodicproc "github.com/jbenet/goprocess/periodic"
12
	peer "github.com/libp2p/go-libp2p-peer"
George Antoniadis's avatar
George Antoniadis committed
13
	routing "github.com/libp2p/go-libp2p-routing"
14 15
)

16
// BootstrapConfig specifies parameters used bootstrapping the DHT.
17
//
18 19 20 21 22
// Note there is a tradeoff between the bootstrap period and the
// number of queries. We could support a higher period with less
// queries.
type BootstrapConfig struct {
	Queries int           // how many queries to run per period
23 24
	Period  time.Duration // how often to run periodic bootstrap.
	Timeout time.Duration // how long to wait for a bootstrap query to run
25
}
26

27 28 29 30 31 32
var DefaultBootstrapConfig = BootstrapConfig{
	// For now, this is set to 1 query.
	// We are currently more interested in ensuring we have a properly formed
	// DHT than making sure our dht minimizes traffic. Once we are more certain
	// of our implementation's robustness, we should lower this down to 8 or 4.
	Queries: 1,
33

Cole Brown's avatar
Cole Brown committed
34
	// For now, this is set to 5 minutes, which is a medium period. We are
35
	// We are currently more interested in ensuring we have a properly formed
36 37
	// DHT than making sure our dht minimizes traffic.
	Period: time.Duration(5 * time.Minute),
38

39
	Timeout: time.Duration(10 * time.Second),
40 41
}

Steven Allen's avatar
Steven Allen committed
42 43 44 45 46 47
// Bootstrap ensures the dht routing table remains healthy as peers come and go.
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
// process will run a number of queries each time, and run every time signal fires.
// These parameters are configurable.
//
// As opposed to BootstrapWithConfig, Bootstrap satisfies the routing interface
48
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
Steven Allen's avatar
Steven Allen committed
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
	proc, err := dht.BootstrapWithConfig(DefaultBootstrapConfig)
	if err != nil {
		return err
	}

	// wait till ctx or dht.Context exits.
	// we have to do it this way to satisfy the Routing interface (contexts)
	go func() {
		defer proc.Close()
		select {
		case <-ctx.Done():
		case <-dht.Context().Done():
		}
	}()

	return nil
Matt Joiner's avatar
Matt Joiner committed
65
}
66

Steven Allen's avatar
Steven Allen committed
67 68 69 70 71 72 73
// BootstrapWithConfig ensures the dht routing table remains healthy as peers come and go.
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
// process will run a number of queries each time, and run every time signal fires.
// These parameters are configurable.
//
// BootstrapWithConfig returns a process, so the user can stop it.
func (dht *IpfsDHT) BootstrapWithConfig(cfg BootstrapConfig) (goprocess.Process, error) {
Matt Joiner's avatar
Matt Joiner committed
74
	if cfg.Queries <= 0 {
Steven Allen's avatar
Steven Allen committed
75
		return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries)
Matt Joiner's avatar
Matt Joiner committed
76
	}
Steven Allen's avatar
Steven Allen committed
77 78 79

	proc := dht.Process().Go(func(p goprocess.Process) {
		<-p.Go(dht.bootstrapWorker(cfg)).Closed()
80 81 82
		for {
			select {
			case <-time.After(cfg.Period):
Steven Allen's avatar
Steven Allen committed
83 84
				<-p.Go(dht.bootstrapWorker(cfg)).Closed()
			case <-p.Closing():
85 86 87
				return
			}
		}
Steven Allen's avatar
Steven Allen committed
88 89 90
	})

	return proc, nil
Matt Joiner's avatar
Matt Joiner committed
91
}
vyzo's avatar
vyzo committed
92

Steven Allen's avatar
Steven Allen committed
93 94 95 96 97 98 99
// SignalBootstrap ensures the dht routing table remains healthy as peers come and go.
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
// process will run a number of queries each time, and run every time signal fires.
// These parameters are configurable.
//
// SignalBootstrap returns a process, so the user can stop it.
func (dht *IpfsDHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Time) (goprocess.Process, error) {
Matt Joiner's avatar
Matt Joiner committed
100
	if cfg.Queries <= 0 {
Steven Allen's avatar
Steven Allen committed
101
		return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries)
Matt Joiner's avatar
Matt Joiner committed
102 103
	}

Steven Allen's avatar
Steven Allen committed
104 105 106
	if signal == nil {
		return nil, fmt.Errorf("invalid signal: %v", signal)
	}
107

Steven Allen's avatar
Steven Allen committed
108 109 110
	proc := periodicproc.Ticker(signal, dht.bootstrapWorker(cfg))

	return proc, nil
Matt Joiner's avatar
Matt Joiner committed
111
}
112

Steven Allen's avatar
Steven Allen committed
113 114 115 116 117 118 119 120 121 122
func (dht *IpfsDHT) bootstrapWorker(cfg BootstrapConfig) func(worker goprocess.Process) {
	return func(worker goprocess.Process) {
		// it would be useful to be able to send out signals of when we bootstrap, too...
		// maybe this is a good case for whole module event pub/sub?

		ctx := dht.Context()
		if err := dht.runBootstrap(ctx, cfg); err != nil {
			log.Warning(err)
			// A bootstrapping error is important to notice but not fatal.
		}
123
	}
124 125 126
}

// runBootstrap builds up list of peers by requesting random peer IDs
127
func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
128 129 130 131 132 133
	bslog := func(msg string) {
		log.Debugf("DHT %s dhtRunBootstrap %s -- routing table size: %d", dht.self, msg, dht.routingTable.Size())
	}
	bslog("start")
	defer bslog("end")
	defer log.EventBegin(ctx, "dhtRunBootstrap").Done()
134

Steven Allen's avatar
Steven Allen committed
135 136 137 138 139 140 141 142 143 144 145 146 147
	var merr u.MultiErr

	randomID := func() peer.ID {
		// 16 random bytes is not a valid peer id. it may be fine becuase
		// the dht will rehash to its own keyspace anyway.
		id := make([]byte, 16)
		rand.Read(id)
		id = u.Hash(id)
		return peer.ID(id)
	}

	// bootstrap sequentially, as results will compound
	runQuery := func(ctx context.Context, id peer.ID) {
148 149
		ctx, cancel := context.WithTimeout(ctx, cfg.Timeout)
		defer cancel()
150

Steven Allen's avatar
Steven Allen committed
151 152 153 154 155 156 157 158 159 160 161
		p, err := dht.FindPeer(ctx, id)
		if err == routing.ErrNotFound {
			// this isn't an error. this is precisely what we expect.
		} else if err != nil {
			merr = append(merr, err)
		} else {
			// woah, actually found a peer with that ID? this shouldn't happen normally
			// (as the ID we use is not a real ID). this is an odd error worth logging.
			err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p)
			log.Warningf("%s", err)
			merr = append(merr, err)
Matt Joiner's avatar
Matt Joiner committed
162
		}
163 164
	}

Steven Allen's avatar
Steven Allen committed
165 166 167 168 169 170 171 172
	// these should be parallel normally. but can make them sequential for debugging.
	// note that the core/bootstrap context deadline should be extended too for that.
	for i := 0; i < cfg.Queries; i++ {
		id := randomID()
		log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id)
		runQuery(ctx, id)
	}

173
	// Find self to distribute peer info to our neighbors.
Steven Allen's avatar
Steven Allen committed
174 175 176 177 178 179 180 181
	// Do this after bootstrapping.
	log.Debugf("Bootstrapping query to self: %s", dht.self)
	runQuery(ctx, dht.self)

	if len(merr) > 0 {
		return merr
	}
	return nil
182
}