bootstrap.go 3.05 KB
Newer Older
1 2 3
package core

import (
4
	"math/rand"
5 6 7 8 9
	"sync"
	"time"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
10
	config "github.com/jbenet/go-ipfs/config"
11
	inet "github.com/jbenet/go-ipfs/net"
12 13
	peer "github.com/jbenet/go-ipfs/peer"
	dht "github.com/jbenet/go-ipfs/routing/dht"
14 15
)

16 17 18 19 20
const (
	period                          = 30 * time.Second // how often to check connection status
	connectiontimeout time.Duration = period / 3       // duration to wait when attempting to connect
	recoveryThreshold               = 4                // attempt to bootstrap if connection count falls below this value
)
21 22 23 24 25 26 27 28

func superviseConnections(parent context.Context,
	n *IpfsNode,
	route *dht.IpfsDHT,
	store peer.Peerstore,
	peers []*config.BootstrapPeer) error {

	for {
29
		ctx, _ := context.WithTimeout(parent, connectiontimeout)
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
		// TODO get config from disk so |peers| always reflects the latest
		// information
		if err := bootstrap(ctx, n.Network, route, store, peers); err != nil {
			log.Error(err)
		}
		select {
		case <-parent.Done():
			return parent.Err()
		case <-time.Tick(period):
		}
	}
	return nil
}

func bootstrap(ctx context.Context,
	n inet.Network,
	r *dht.IpfsDHT,
	ps peer.Peerstore,
	boots []*config.BootstrapPeer) error {

50 51 52 53 54 55
	if len(n.GetConnections()) >= recoveryThreshold {
		return nil
	}
	numCxnsToCreate := recoveryThreshold - len(n.GetConnections())

	var bootstrapPeers []peer.Peer
56 57 58 59 60
	for _, bootstrap := range boots {
		p, err := toPeer(ps, bootstrap)
		if err != nil {
			return err
		}
61
		bootstrapPeers = append(bootstrapPeers, p)
62 63 64
	}

	var notConnected []peer.Peer
65
	for _, p := range bootstrapPeers {
66 67 68 69 70 71 72
		if !n.IsConnected(p) {
			notConnected = append(notConnected, p)
		}
	}
	for _, p := range notConnected {
		log.Infof("not connected to %v", p)
	}
73 74 75 76 77 78

	var randomSubset []peer.Peer
	for _, val := range rand.Perm(numCxnsToCreate) {
		randomSubset = append(randomSubset, notConnected[val])
	}
	if err := connect(ctx, r, randomSubset); err != nil {
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
		return err
	}
	return nil
}

func connect(ctx context.Context, r *dht.IpfsDHT, peers []peer.Peer) error {
	var wg sync.WaitGroup
	for _, p := range peers {

		// performed asynchronously because when performed synchronously, if
		// one `Connect` call hangs, subsequent calls are more likely to
		// fail/abort due to an expiring context.

		wg.Add(1)
		go func(p peer.Peer) {
			defer wg.Done()
			err := r.Connect(ctx, p)
			if err != nil {
				log.Event(ctx, "bootstrapFailed", p)
				log.Criticalf("failed to bootstrap with %v", p)
				return
			}
			log.Event(ctx, "bootstrapSuccess", p)
			log.Infof("bootstrapped with %v", p)
		}(p)
	}
	wg.Wait()
	return nil
}

func toPeer(ps peer.Peerstore, bootstrap *config.BootstrapPeer) (peer.Peer, error) {
	id, err := peer.DecodePrettyID(bootstrap.PeerID)
	if err != nil {
		return nil, err
	}
	p, err := ps.FindOrCreate(id)
	if err != nil {
		return nil, err
	}
	maddr, err := ma.NewMultiaddr(bootstrap.Address)
	if err != nil {
		return nil, err
	}
	p.AddAddress(maddr)
	return p, nil
}