bootstrap.go 4.46 KB
Newer Older
1 2 3
package core

import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
4 5
	"errors"
	"fmt"
6
	"math/rand"
7 8 9
	"sync"
	"time"

10
	config "github.com/jbenet/go-ipfs/config"
11
	inet "github.com/jbenet/go-ipfs/net"
12 13
	peer "github.com/jbenet/go-ipfs/peer"
	dht "github.com/jbenet/go-ipfs/routing/dht"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
14
	lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
15
	math2 "github.com/jbenet/go-ipfs/util/math2"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17 18

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
19 20
)

21 22 23 24 25
const (
	period                          = 30 * time.Second // how often to check connection status
	connectiontimeout time.Duration = period / 3       // duration to wait when attempting to connect
	recoveryThreshold               = 4                // attempt to bootstrap if connection count falls below this value
)
26 27

func superviseConnections(parent context.Context,
28 29
	n inet.Network,
	route *dht.IpfsDHT, // TODO depend on abstract interface for testing purposes
30 31 32 33
	store peer.Peerstore,
	peers []*config.BootstrapPeer) error {

	for {
34
		ctx, _ := context.WithTimeout(parent, connectiontimeout)
35 36
		// TODO get config from disk so |peers| always reflects the latest
		// information
37
		if err := bootstrap(ctx, n, route, store, peers); err != nil {
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
			log.Error(err)
		}
		select {
		case <-parent.Done():
			return parent.Err()
		case <-time.Tick(period):
		}
	}
	return nil
}

func bootstrap(ctx context.Context,
	n inet.Network,
	r *dht.IpfsDHT,
	ps peer.Peerstore,
	boots []*config.BootstrapPeer) error {

55 56
	connectedPeers := n.Peers()
	if len(connectedPeers) >= recoveryThreshold {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
57 58 59 60
		log.Event(ctx, "bootstrapSkip", n.LocalPeer())
		log.Debugf("%s bootstrap skipped -- connected to %d (> %d) nodes",
			n.LocalPeer(), len(connectedPeers), recoveryThreshold)

61 62
		return nil
	}
63
	numCxnsToCreate := recoveryThreshold - len(connectedPeers)
64

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
65 66 67
	log.Event(ctx, "bootstrapStart", n.LocalPeer())
	log.Debugf("%s bootstrapping to %d more nodes", n.LocalPeer(), numCxnsToCreate)

68
	var bootstrapPeers []peer.PeerInfo
69
	for _, bootstrap := range boots {
70
		p, err := toPeer(bootstrap)
71
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72 73
			log.Event(ctx, "bootstrapError", n.LocalPeer(), lgbl.Error(err))
			log.Errorf("%s bootstrap error: %s", n.LocalPeer(), err)
74 75
			return err
		}
76
		bootstrapPeers = append(bootstrapPeers, p)
77 78
	}

79
	var notConnected []peer.PeerInfo
80
	for _, p := range bootstrapPeers {
81
		if n.Connectedness(p.ID) != inet.Connected {
82 83 84
			notConnected = append(notConnected, p)
		}
	}
85

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86 87 88 89 90 91 92 93
	if len(notConnected) < 1 {
		s := "must bootstrap to %d more nodes, but already connected to all candidates"
		err := fmt.Errorf(s, numCxnsToCreate)
		log.Event(ctx, "bootstrapError", n.LocalPeer(), lgbl.Error(err))
		log.Errorf("%s bootstrap error: %s", n.LocalPeer(), err)
		return err
	}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
94
	var randomSubset = randomSubsetOfPeers(notConnected, numCxnsToCreate)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
95 96

	log.Debugf("%s bootstrapping to %d nodes: %s", n.LocalPeer(), numCxnsToCreate, randomSubset)
97
	if err := connect(ctx, ps, r, randomSubset); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
98 99
		log.Event(ctx, "bootstrapError", n.LocalPeer(), lgbl.Error(err))
		log.Errorf("%s bootstrap error: %s", n.LocalPeer(), err)
100 101 102 103 104
		return err
	}
	return nil
}

105
func connect(ctx context.Context, ps peer.Peerstore, r *dht.IpfsDHT, peers []peer.PeerInfo) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
106 107 108 109
	if len(peers) < 1 {
		return errors.New("bootstrap set empty")
	}

110 111 112 113 114 115 116 117
	var wg sync.WaitGroup
	for _, p := range peers {

		// performed asynchronously because when performed synchronously, if
		// one `Connect` call hangs, subsequent calls are more likely to
		// fail/abort due to an expiring context.

		wg.Add(1)
118
		go func(p peer.PeerInfo) {
119
			defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120 121 122
			log.Event(ctx, "bootstrapDial", r.LocalPeer(), p.ID)
			log.Debugf("%s bootstrapping to %s", r.LocalPeer(), p.ID)

123 124
			ps.AddAddresses(p.ID, p.Addrs)
			err := r.Connect(ctx, p.ID)
125
			if err != nil {
126 127
				log.Event(ctx, "bootstrapFailed", p.ID)
				log.Criticalf("failed to bootstrap with %v", p.ID)
128 129
				return
			}
130 131
			log.Event(ctx, "bootstrapSuccess", p.ID)
			log.Infof("bootstrapped with %v", p.ID)
132 133 134 135 136 137
		}(p)
	}
	wg.Wait()
	return nil
}

138 139
func toPeer(bootstrap *config.BootstrapPeer) (p peer.PeerInfo, err error) {
	id, err := peer.IDB58Decode(bootstrap.PeerID)
140
	if err != nil {
141
		return
142 143 144
	}
	maddr, err := ma.NewMultiaddr(bootstrap.Address)
	if err != nil {
145 146 147 148 149
		return
	}
	p = peer.PeerInfo{
		ID:    id,
		Addrs: []ma.Multiaddr{maddr},
150
	}
151
	return
152
}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
153

154
func randomSubsetOfPeers(in []peer.PeerInfo, max int) []peer.PeerInfo {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
155
	n := math2.IntMin(max, len(in))
156
	var out []peer.PeerInfo
Brian Tiger Chow's avatar
Brian Tiger Chow committed
157 158 159 160 161
	for _, val := range rand.Perm(n) {
		out = append(out, in[val])
	}
	return out
}