Commit 5f3ae91f authored by Brian Tiger Chow's avatar Brian Tiger Chow

Merge pull request #422 from jbenet/feat/supervise-connections

feat(core) supervise bootstrap connections
parents 422d3429 db700839
package core
import (
"math/rand"
"sync"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
config "github.com/jbenet/go-ipfs/config"
inet "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer"
dht "github.com/jbenet/go-ipfs/routing/dht"
)
const (
period = 30 * time.Second // how often to check connection status
connectiontimeout time.Duration = period / 3 // duration to wait when attempting to connect
recoveryThreshold = 4 // attempt to bootstrap if connection count falls below this value
)
func superviseConnections(parent context.Context,
n inet.Network,
route *dht.IpfsDHT, // TODO depend on abstract interface for testing purposes
store peer.Peerstore,
peers []*config.BootstrapPeer) error {
for {
ctx, _ := context.WithTimeout(parent, connectiontimeout)
// TODO get config from disk so |peers| always reflects the latest
// information
if err := bootstrap(ctx, n, route, store, peers); err != nil {
log.Error(err)
}
select {
case <-parent.Done():
return parent.Err()
case <-time.Tick(period):
}
}
return nil
}
func bootstrap(ctx context.Context,
n inet.Network,
r *dht.IpfsDHT,
ps peer.Peerstore,
boots []*config.BootstrapPeer) error {
if len(n.GetConnections()) >= recoveryThreshold {
return nil
}
numCxnsToCreate := recoveryThreshold - len(n.GetConnections())
var bootstrapPeers []peer.Peer
for _, bootstrap := range boots {
p, err := toPeer(ps, bootstrap)
if err != nil {
return err
}
bootstrapPeers = append(bootstrapPeers, p)
}
var notConnected []peer.Peer
for _, p := range bootstrapPeers {
if !n.IsConnected(p) {
notConnected = append(notConnected, p)
}
}
var randomSubset []peer.Peer
for _, val := range rand.Perm(numCxnsToCreate) {
randomSubset = append(randomSubset, notConnected[val])
}
if err := connect(ctx, r, randomSubset); err != nil {
return err
}
return nil
}
func connect(ctx context.Context, r *dht.IpfsDHT, peers []peer.Peer) error {
var wg sync.WaitGroup
for _, p := range peers {
// performed asynchronously because when performed synchronously, if
// one `Connect` call hangs, subsequent calls are more likely to
// fail/abort due to an expiring context.
wg.Add(1)
go func(p peer.Peer) {
defer wg.Done()
err := r.Connect(ctx, p)
if err != nil {
log.Event(ctx, "bootstrapFailed", p)
log.Criticalf("failed to bootstrap with %v", p)
return
}
log.Event(ctx, "bootstrapSuccess", p)
log.Infof("bootstrapped with %v", p)
}(p)
}
wg.Wait()
return nil
}
func toPeer(ps peer.Peerstore, bootstrap *config.BootstrapPeer) (peer.Peer, error) {
id, err := peer.DecodePrettyID(bootstrap.PeerID)
if err != nil {
return nil, err
}
p, err := ps.FindOrCreate(id)
if err != nil {
return nil, err
}
maddr, err := ma.NewMultiaddr(bootstrap.Address)
if err != nil {
return nil, err
}
p.AddAddress(maddr)
return p, nil
}
......@@ -179,7 +179,13 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) {
n.Exchange = bitswap.New(ctx, n.Identity, bitswapNetwork, n.Routing, blockstore, alwaysSendToPeer)
go initConnections(ctx, n.Config, n.Peerstore, dhtRouting)
// TODO consider moving connection supervision into the Network. We've
// discussed improvements to this Node constructor. One improvement
// would be to make the node configurable, allowing clients to inject
// an Exchange, Network, or Routing component and have the constructor
// manage the wiring. In that scenario, this dangling function is a bit
// awkward.
go superviseConnections(ctx, n.Network, dhtRouting, n.Peerstore, n.Config.Bootstrap)
}
// TODO(brian): when offline instantiate the BlockService with a bitswap
......@@ -250,41 +256,6 @@ func initIdentity(cfg *config.Identity, peers peer.Peerstore, online bool) (peer
return self, nil
}
func initConnections(ctx context.Context, cfg *config.Config, pstore peer.Peerstore, route *dht.IpfsDHT) {
// TODO consider stricter error handling
// TODO consider Criticalf error logging
for _, p := range cfg.Bootstrap {
if p.PeerID == "" {
log.Criticalf("error: peer does not include PeerID. %v", p)
}
maddr, err := ma.NewMultiaddr(p.Address)
if err != nil {
log.Error(err)
continue
}
// setup peer
id, err := peer.DecodePrettyID(p.PeerID)
if err != nil {
log.Criticalf("Bootstrapping error: %v", err)
continue
}
npeer, err := pstore.FindOrCreate(id)
if err != nil {
log.Criticalf("Bootstrapping error: %v", err)
continue
}
npeer.AddAddress(maddr)
if _, err = route.Connect(ctx, npeer); err != nil {
log.Criticalf("Bootstrapping error: %v", err)
continue
}
log.Event(ctx, "bootstrap", npeer)
}
}
func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) {
var err error
......
......@@ -31,7 +31,7 @@ const (
// ReleaseBuffer puts the given byte array back into the buffer pool,
// first verifying that it is the correct size
func ReleaseBuffer(b []byte) {
log.Warningf("Releasing buffer! (cap,size = %d, %d)", cap(b), len(b))
log.Debugf("Releasing buffer! (cap,size = %d, %d)", cap(b), len(b))
mpool.ByteSlicePool.Put(uint32(cap(b)), b)
}
......@@ -71,7 +71,7 @@ func newSingleConn(ctx context.Context, local, remote peer.Peer,
conn.ContextCloser = ctxc.NewContextCloser(ctx, conn.close)
log.Info("newSingleConn: %v to %v", local, remote)
log.Debugf("newSingleConn: %v to %v", local, remote)
// setup the various io goroutines
conn.Children().Add(1)
......
......@@ -61,7 +61,7 @@ func (d *Dialer) DialAddr(ctx context.Context, raddr ma.Multiaddr, remote peer.P
// madialer := manet.Dialer{LocalAddr: laddr}
madialer := manet.Dialer{}
log.Infof("%s dialing %s %s", d.LocalPeer, remote, raddr)
log.Debugf("%s dialing %s %s", d.LocalPeer, remote, raddr)
maconn, err := madialer.Dial(raddr)
if err != nil {
return nil, err
......
......@@ -69,7 +69,7 @@ func (c *MultiConn) Add(conns ...Conn) {
defer c.Unlock()
for _, c2 := range conns {
log.Infof("MultiConn: adding %s", c2)
log.Debugf("MultiConn: adding %s", c2)
if c.LocalPeer() != c2.LocalPeer() || c.RemotePeer() != c2.RemotePeer() {
log.Error(c2)
c.Unlock() // ok to unlock (to log). panicing.
......@@ -86,7 +86,7 @@ func (c *MultiConn) Add(conns ...Conn) {
c.Children().Add(1)
c2.Children().Add(1) // yep, on the child too.
go c.fanInSingle(c2)
log.Infof("MultiConn: added %s", c2)
log.Debugf("MultiConn: added %s", c2)
}
}
......@@ -149,7 +149,7 @@ func (c *MultiConn) fanOut() {
// send data out through our "best connection"
case m, more := <-c.duplex.Out:
if !more {
log.Infof("%s out channel closed", c)
log.Debugf("%s out channel closed", c)
return
}
sc := c.BestConn()
......@@ -159,7 +159,7 @@ func (c *MultiConn) fanOut() {
}
i++
log.Infof("%s sending (%d)", sc, i)
log.Debugf("%s sending (%d)", sc, i)
sc.Out() <- m
}
}
......@@ -170,7 +170,7 @@ func (c *MultiConn) fanOut() {
func (c *MultiConn) fanInSingle(child Conn) {
// cleanup all data associated with this child Connection.
defer func() {
log.Infof("closing: %s", child)
log.Debugf("closing: %s", child)
// in case it still is in the map, remove it.
c.Lock()
......@@ -197,7 +197,7 @@ func (c *MultiConn) fanInSingle(child Conn) {
case m, more := <-child.In(): // receiving data
if !more {
log.Infof("%s in channel closed", child)
log.Debugf("%s in channel closed", child)
err := c.GetError()
if err != nil {
log.Errorf("Found error on connection: %s", err)
......@@ -205,7 +205,7 @@ func (c *MultiConn) fanInSingle(child Conn) {
return // closed
}
i++
log.Infof("%s received (%d)", child, i)
log.Debugf("%s received (%d)", child, i)
c.duplex.In <- m
}
}
......
......@@ -49,7 +49,7 @@ func Handshake3Update(lpeer, rpeer peer.Peer, msg *pb.Handshake3) (*Handshake3Re
return res, err
}
if lpeer.AddAddress(observedAddr) {
log.Infof("(nat) added new local, remote-observed address: %s", observedAddr)
log.Debugf("(nat) added new local, remote-observed address: %s", observedAddr)
}
res.LocalObservedAddress = observedAddr
......
......@@ -27,7 +27,7 @@ type Network interface {
ClosePeer(peer.Peer) error
// IsConnected returns whether a connection to given peer exists.
IsConnected(peer.Peer) (bool, error)
IsConnected(peer.Peer) bool
// GetProtocols returns the protocols registered in the network.
GetProtocols() *mux.ProtocolMap
......
......@@ -75,8 +75,8 @@ func (n *IpfsNetwork) ClosePeer(p peer.Peer) error {
}
// IsConnected returns whether a connection to given peer exists.
func (n *IpfsNetwork) IsConnected(p peer.Peer) (bool, error) {
return n.swarm.GetConnection(p.ID()) != nil, nil
func (n *IpfsNetwork) IsConnected(p peer.Peer) bool {
return n.swarm.GetConnection(p.ID()) != nil
}
// GetProtocols returns the protocols registered in the network.
......
......@@ -61,7 +61,7 @@ func resolveUnspecifiedAddresses(unspecifiedAddrs []ma.Multiaddr) ([]ma.Multiadd
}
return eventlog.Metadata{"addresses": addrs}
}())
log.Info("InterfaceListenAddresses:", outputAddrs)
log.Debug("InterfaceListenAddresses:", outputAddrs)
return outputAddrs, nil
}
......
......@@ -132,6 +132,9 @@ func (s *Swarm) Dial(peer peer.Peer) (conn.Conn, error) {
Peerstore: s.peers,
}
if len(peer.Addresses()) == 0 {
return nil, errors.New("peer has no addresses")
}
// try to connect to one of the peer's known addresses.
// for simplicity, we do this sequentially.
// A future commit will do this asynchronously.
......@@ -145,7 +148,7 @@ func (s *Swarm) Dial(peer peer.Peer) (conn.Conn, error) {
return nil, err
}
c, err = s.connSetup(c)
c2, err := s.connSetup(c)
if err != nil {
c.Close()
return nil, err
......@@ -153,14 +156,14 @@ func (s *Swarm) Dial(peer peer.Peer) (conn.Conn, error) {
// TODO replace the TODO ctx with a context passed in from caller
log.Event(context.TODO(), "dial", peer)
return c, nil
return c2, nil
}
// GetConnection returns the connection in the swarm to given peer.ID
func (s *Swarm) GetConnection(pid peer.ID) conn.Conn {
s.connsLock.RLock()
defer s.connsLock.RUnlock()
c, found := s.conns[u.Key(pid)]
s.connsLock.RUnlock()
if !found {
return nil
......
......@@ -161,11 +161,11 @@ type peer struct {
// codebase is known to be correct.
func (p *peer) String() string {
pid := p.id.String()
maxRunes := 12
maxRunes := 6
if len(pid) < maxRunes {
maxRunes = len(pid)
}
return "[Peer " + pid[:maxRunes] + "]"
return fmt.Sprintf("peer %s", pid[:maxRunes])
}
func (p *peer) Loggable() map[string]interface{} {
......
......@@ -97,32 +97,23 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia
}
// Connect to a new peer at the given address, ping and add to the routing table
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) (peer.Peer, error) {
// TODO(jbenet,whyrusleeping)
//
// Connect should take in a Peer (with ID). In a sense, we shouldn't be
// allowing connections to random multiaddrs without knowing who we're
// speaking to (i.e. peer.ID). In terms of moving around simple addresses
// -- instead of an (ID, Addr) pair -- we can use:
//
// /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm
//
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) error {
err := dht.dialer.DialPeer(ctx, npeer)
if err != nil {
return nil, err
return err
}
// Ping new peer to register in their routing table
// NOTE: this should be done better...
err = dht.Ping(ctx, npeer)
if err != nil {
return nil, fmt.Errorf("failed to ping newly connected peer: %s\n", err)
return fmt.Errorf("failed to ping newly connected peer: %s\n", err)
}
log.Event(ctx, "connect", dht.self, npeer)
dht.Update(ctx, npeer)
return npeer, nil
return nil
}
// HandleMessage implements the inet.Handler interface.
......
......@@ -101,7 +101,7 @@ func TestPing(t *testing.T) {
defer dhtA.dialer.(inet.Network).Close()
defer dhtB.dialer.(inet.Network).Close()
_, err = dhtA.Connect(ctx, peerB)
err = dhtA.Connect(ctx, peerB)
if err != nil {
t.Fatal(err)
}
......@@ -151,7 +151,7 @@ func TestValueGetSet(t *testing.T) {
defer dhtA.dialer.(inet.Network).Close()
defer dhtB.dialer.(inet.Network).Close()
_, err = dhtA.Connect(ctx, peerB)
err = dhtA.Connect(ctx, peerB)
if err != nil {
t.Fatal(err)
}
......@@ -194,17 +194,17 @@ func TestProvides(t *testing.T) {
}
}()
_, err := dhts[0].Connect(ctx, peers[1])
err := dhts[0].Connect(ctx, peers[1])
if err != nil {
t.Fatal(err)
}
_, err = dhts[1].Connect(ctx, peers[2])
err = dhts[1].Connect(ctx, peers[2])
if err != nil {
t.Fatal(err)
}
_, err = dhts[1].Connect(ctx, peers[3])
err = dhts[1].Connect(ctx, peers[3])
if err != nil {
t.Fatal(err)
}
......@@ -256,17 +256,17 @@ func TestProvidesAsync(t *testing.T) {
}
}()
_, err := dhts[0].Connect(ctx, peers[1])
err := dhts[0].Connect(ctx, peers[1])
if err != nil {
t.Fatal(err)
}
_, err = dhts[1].Connect(ctx, peers[2])
err = dhts[1].Connect(ctx, peers[2])
if err != nil {
t.Fatal(err)
}
_, err = dhts[1].Connect(ctx, peers[3])
err = dhts[1].Connect(ctx, peers[3])
if err != nil {
t.Fatal(err)
}
......@@ -321,17 +321,17 @@ func TestLayeredGet(t *testing.T) {
}
}()
_, err := dhts[0].Connect(ctx, peers[1])
err := dhts[0].Connect(ctx, peers[1])
if err != nil {
t.Fatalf("Failed to connect: %s", err)
}
_, err = dhts[1].Connect(ctx, peers[2])
err = dhts[1].Connect(ctx, peers[2])
if err != nil {
t.Fatal(err)
}
_, err = dhts[1].Connect(ctx, peers[3])
err = dhts[1].Connect(ctx, peers[3])
if err != nil {
t.Fatal(err)
}
......@@ -376,17 +376,17 @@ func TestFindPeer(t *testing.T) {
}
}()
_, err := dhts[0].Connect(ctx, peers[1])
err := dhts[0].Connect(ctx, peers[1])
if err != nil {
t.Fatal(err)
}
_, err = dhts[1].Connect(ctx, peers[2])
err = dhts[1].Connect(ctx, peers[2])
if err != nil {
t.Fatal(err)
}
_, err = dhts[1].Connect(ctx, peers[3])
err = dhts[1].Connect(ctx, peers[3])
if err != nil {
t.Fatal(err)
}
......@@ -435,14 +435,14 @@ func TestConnectCollision(t *testing.T) {
done := make(chan struct{})
go func() {
_, err := dhtA.Connect(ctx, peerB)
err := dhtA.Connect(ctx, peerB)
if err != nil {
t.Fatal(err)
}
done <- struct{}{}
}()
go func() {
_, err := dhtB.Connect(ctx, peerA)
err := dhtB.Connect(ctx, peerA)
if err != nil {
t.Fatal(err)
}
......
......@@ -274,10 +274,10 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error)
// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.Peer) error {
// Thoughts: maybe this should accept an ID and do a peer lookup?
log.Infof("ping %s start", p)
log.Debugf("ping %s start", p)
pmes := pb.NewMessage(pb.Message_PING, "", 0)
_, err := dht.sendRequest(ctx, p, pmes)
log.Infof("ping %s end (err = %s)", p, err)
log.Debugf("ping %s end (err = %s)", p, err)
return err
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment