diff --git a/core/bootstrap.go b/core/bootstrap.go index 6381e9ba6330dd6cf65a322b98b931c71b4551fc..1539a761fdaa68f696b747d7f77657e27a6a97ee 100644 --- a/core/bootstrap.go +++ b/core/bootstrap.go @@ -1,17 +1,21 @@ package core import ( + "errors" + "fmt" "math/rand" "sync" "time" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" config "github.com/jbenet/go-ipfs/config" inet "github.com/jbenet/go-ipfs/net" peer "github.com/jbenet/go-ipfs/peer" dht "github.com/jbenet/go-ipfs/routing/dht" + lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables" math2 "github.com/jbenet/go-ipfs/util/math2" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ) const ( @@ -50,14 +54,23 @@ func bootstrap(ctx context.Context, connectedPeers := n.Peers() if len(connectedPeers) >= recoveryThreshold { + log.Event(ctx, "bootstrapSkip", n.LocalPeer()) + log.Debugf("%s bootstrap skipped -- connected to %d (> %d) nodes", + n.LocalPeer(), len(connectedPeers), recoveryThreshold) + return nil } numCxnsToCreate := recoveryThreshold - len(connectedPeers) + log.Event(ctx, "bootstrapStart", n.LocalPeer()) + log.Debugf("%s bootstrapping to %d more nodes", n.LocalPeer(), numCxnsToCreate) + var bootstrapPeers []peer.PeerInfo for _, bootstrap := range boots { p, err := toPeer(bootstrap) if err != nil { + log.Event(ctx, "bootstrapError", n.LocalPeer(), lgbl.Error(err)) + log.Errorf("%s bootstrap error: %s", n.LocalPeer(), err) return err } bootstrapPeers = append(bootstrapPeers, p) @@ -70,14 +83,30 @@ func bootstrap(ctx context.Context, } } + if len(notConnected) < 1 { + s := "must bootstrap to %d more nodes, but already connected to all candidates" + err := fmt.Errorf(s, numCxnsToCreate) + log.Event(ctx, "bootstrapError", n.LocalPeer(), lgbl.Error(err)) + log.Errorf("%s bootstrap error: %s", n.LocalPeer(), err) + return err + } + var randomSubset = randomSubsetOfPeers(notConnected, numCxnsToCreate) + + log.Debugf("%s bootstrapping to %d nodes: %s", n.LocalPeer(), numCxnsToCreate, randomSubset) if err := connect(ctx, ps, r, randomSubset); err != nil { + log.Event(ctx, "bootstrapError", n.LocalPeer(), lgbl.Error(err)) + log.Errorf("%s bootstrap error: %s", n.LocalPeer(), err) return err } return nil } func connect(ctx context.Context, ps peer.Peerstore, r *dht.IpfsDHT, peers []peer.PeerInfo) error { + if len(peers) < 1 { + return errors.New("bootstrap set empty") + } + var wg sync.WaitGroup for _, p := range peers { @@ -88,6 +117,9 @@ func connect(ctx context.Context, ps peer.Peerstore, r *dht.IpfsDHT, peers []pee wg.Add(1) go func(p peer.PeerInfo) { defer wg.Done() + log.Event(ctx, "bootstrapDial", r.LocalPeer(), p.ID) + log.Debugf("%s bootstrapping to %s", r.LocalPeer(), p.ID) + ps.AddAddresses(p.ID, p.Addrs) err := r.Connect(ctx, p.ID) if err != nil { diff --git a/net/net.go b/net/net.go index 9fda9944e9e05a92c545df41420fa3872edb77a9..5c27a6f492d11ceb88e52a1755738556426ab1f5 100644 --- a/net/net.go +++ b/net/net.go @@ -135,6 +135,7 @@ func (n *network) newConnHandler(c *swarm.Conn) { // DialPeer attempts to establish a connection to a given peer. // Respects the context. func (n *network) DialPeer(ctx context.Context, p peer.ID) error { + log.Debugf("[%s] network dialing peer [%s]", n.local, p) sc, err := n.swarm.Dial(ctx, p) if err != nil { return err @@ -242,8 +243,9 @@ func (n *network) Connectedness(p peer.ID) Connectedness { // NewStream returns a new stream to given peer p. // If there is no connection to p, attempts to create one. // If ProtocolID is "", writes no header. -func (c *network) NewStream(pr ProtocolID, p peer.ID) (Stream, error) { - s, err := c.swarm.NewStreamWithPeer(p) +func (n *network) NewStream(pr ProtocolID, p peer.ID) (Stream, error) { + log.Debugf("[%s] network opening stream to peer [%s]: %s", n.local, p, pr) + s, err := n.swarm.NewStreamWithPeer(p) if err != nil { return nil, err } diff --git a/routing/dht/dht.go b/routing/dht/dht.go index fcc0f3bf0899e87358791a45f14690a6c4350291..a893e62b8f25793a6f340ef0f1f7e8bcc8745e46 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -77,6 +77,11 @@ func NewDHT(ctx context.Context, p peer.ID, n inet.Network, dstore ds.ThreadSafe return dht } +// LocalPeer returns the peer.Peer of the dht. +func (dht *IpfsDHT) LocalPeer() peer.ID { + return dht.self +} + // Connect to a new peer at the given address, ping and add to the routing table func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error { if err := dht.network.DialPeer(ctx, npeer); err != nil {