Commit 9dac5bb7 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

p2p/net: dial log -> events

This commit turns all dial logs into log.Events.

Everything's great except for one problem:
The LoggableMap I'm using does not print out things
correctly. I gave it peer.IDs, and Multiaddrs
and both got logged as nothing `{}` (didn't even call
their String() methods!) So, for now, this function
encodes it when called... This is wrong and should be
fixed before being merged in. Otherwise we  will be
constantly encoding peer.IDs and Multiaddrs without
needing to.

@briantigerchow how do you suggest doing this?
I don't know my way around your Loggable.
parent 92c7f967
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
reuseport "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport" reuseport "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr" addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
peer "github.com/jbenet/go-ipfs/p2p/peer" peer "github.com/jbenet/go-ipfs/p2p/peer"
...@@ -26,9 +27,13 @@ func (d *Dialer) String() string { ...@@ -26,9 +27,13 @@ func (d *Dialer) String() string {
// Ensures raddr is part of peer.Addresses() // Ensures raddr is part of peer.Addresses()
// Example: d.DialAddr(ctx, peer.Addresses()[0], peer) // Example: d.DialAddr(ctx, peer.Addresses()[0], peer)
func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (Conn, error) { func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (Conn, error) {
logdial := lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr)
defer log.EventBegin(ctx, "connDial", logdial).Done()
maconn, err := d.rawConnDial(ctx, raddr, remote) maconn, err := d.rawConnDial(ctx, raddr, remote)
if err != nil { if err != nil {
logdial["dial"] = "failure"
logdial["error"] = err
return nil, err return nil, err
} }
...@@ -48,11 +53,15 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) ( ...@@ -48,11 +53,15 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (
if d.PrivateKey == nil { if d.PrivateKey == nil {
log.Warning("dialer %s dialing INSECURELY %s at %s!", d, remote, raddr) log.Warning("dialer %s dialing INSECURELY %s at %s!", d, remote, raddr)
log.Event(ctx, "connDialInsecure", logdial)
connOut = c connOut = c
return return
} }
defer log.EventBegin(ctx, "connDialEncrypt", logdial).Done()
c2, err := newSecureConn(ctx, d.PrivateKey, c) c2, err := newSecureConn(ctx, d.PrivateKey, c)
if err != nil { if err != nil {
logdial["error"] = err
errOut = err errOut = err
c.Close() c.Close()
return return
...@@ -64,12 +73,20 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) ( ...@@ -64,12 +73,20 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (
select { select {
case <-ctx.Done(): case <-ctx.Done():
maconn.Close() maconn.Close()
logdial["error"] = ctx.Err()
return nil, ctx.Err() return nil, ctx.Err()
case <-done: case <-done:
// whew, finished. // whew, finished.
} }
return connOut, errOut if errOut != nil {
logdial["error"] = errOut
logdial["dial"] = "failure"
return nil, errOut
}
logdial["dial"] = "success"
return connOut, nil
} }
// rawConnDial dials the underlying net.Conn + manet.Conns // rawConnDial dials the underlying net.Conn + manet.Conns
...@@ -82,12 +99,14 @@ func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote pee ...@@ -82,12 +99,14 @@ func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote pee
} }
if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") { if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") {
log.Event(ctx, "connDialZeroAddr", lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr))
return nil, debugerror.Errorf("Attempted to connect to zero address: %s", raddr) return nil, debugerror.Errorf("Attempted to connect to zero address: %s", raddr)
} }
// get local addr to use. // get local addr to use.
laddr := pickLocalAddr(d.LocalAddrs, raddr) laddr := pickLocalAddr(d.LocalAddrs, raddr)
log.Debugf("%s dialing %s -- %s --> %s", d.LocalPeer, remote, laddr, raddr) logdial := lgbl.Dial("conn", d.LocalPeer, remote, laddr, raddr)
defer log.EventBegin(ctx, "connDialRawConn", logdial).Done()
// make a copy of the manet.Dialer, we may need to change its timeout. // make a copy of the manet.Dialer, we may need to change its timeout.
madialer := d.Dialer madialer := d.Dialer
...@@ -99,19 +118,27 @@ func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote pee ...@@ -99,19 +118,27 @@ func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote pee
// dial using reuseport.Dialer, because we're probably reusing addrs. // dial using reuseport.Dialer, because we're probably reusing addrs.
// this is optimistic, as the reuseDial may fail to bind the port. // this is optimistic, as the reuseDial may fail to bind the port.
rpev := log.EventBegin(ctx, "connDialReusePort", logdial)
if nconn, retry, reuseErr := reuseDial(madialer.Dialer, laddr, raddr); reuseErr == nil { if nconn, retry, reuseErr := reuseDial(madialer.Dialer, laddr, raddr); reuseErr == nil {
// if it worked, wrap the raw net.Conn with our manet.Conn // if it worked, wrap the raw net.Conn with our manet.Conn
log.Debugf("%s reuse worked! %s %s %s", d.LocalPeer, laddr, nconn.RemoteAddr(), nconn) logdial["reuseport"] = "success"
rpev.Done()
return manet.WrapNetConn(nconn) return manet.WrapNetConn(nconn)
} else if !retry { } else if !retry {
// reuseDial is sure this is a legitimate dial failure, not a reuseport failure. // reuseDial is sure this is a legitimate dial failure, not a reuseport failure.
logdial["reuseport"] = "failure"
logdial["error"] = reuseErr
rpev.Done()
return nil, reuseErr return nil, reuseErr
} else { } else {
// this is a failure to reuse port. log it. // this is a failure to reuse port. log it.
log.Debugf("%s port reuse failed: %s --> %s -- %s", d.LocalPeer, laddr, raddr, reuseErr) logdial["reuseport"] = "retry"
logdial["error"] = reuseErr
rpev.Done()
} }
} }
defer log.EventBegin(ctx, "connDialManet", logdial).Done()
return madialer.Dial(raddr) return madialer.Dial(raddr)
} }
......
...@@ -28,8 +28,16 @@ import ( ...@@ -28,8 +28,16 @@ import (
// any may fail if no addr at end // any may fail if no addr at end
// retry dialAttempt x // retry dialAttempt x
var (
ErrDialBackoff = errors.New("dial backoff")
ErrDialFailed = errors.New("dial attempt failed")
ErrDialToSelf = errors.New("dial to self attempted")
)
// dialAttempts governs how many times a goroutine will try to dial a given peer. // dialAttempts governs how many times a goroutine will try to dial a given peer.
const dialAttempts = 3 // Note: this is down to one, as we have _too many dials_ atm. To add back in,
// add loop back in Dial(.)
const dialAttempts = 1
// DialTimeout is the amount of time each dial attempt has. We can think about making // DialTimeout is the amount of time each dial attempt has. We can think about making
// this larger down the road, or putting more granular timeouts (i.e. within each // this larger down the road, or putting more granular timeouts (i.e. within each
...@@ -179,75 +187,96 @@ func (db *dialbackoff) Clear(p peer.ID) { ...@@ -179,75 +187,96 @@ func (db *dialbackoff) Clear(p peer.ID) {
// This allows us to use various transport protocols, do NAT traversal/relay, // This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection. // etc. to achive connection.
func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) { func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
if p == s.local { if p == s.local {
return nil, errors.New("Attempted connection to self!") log.Event(ctx, "swarmDialSelf", logdial)
} return nil, ErrDialToSelf
}
// this loop is here because dials take time, and we should not be dialing
// the same peer concurrently (silly waste). Additonally, it's structured return s.gatedDialAttempt(ctx, p)
// to check s.ConnectionsToPeer(p) _first_, and _between_ attempts because we }
// may have received an incoming connection! if so, we no longer must dial.
// func (s *Swarm) bestConnectionToPeer(p peer.ID) *Conn {
// During the dial attempts, we may be doing the dialing. if not, we wait.
var err error
var conn *Conn
for i := 0; i < dialAttempts; i++ {
// check if we already have an open connection first
cs := s.ConnectionsToPeer(p) cs := s.ConnectionsToPeer(p)
for _, conn = range cs { for _, conn := range cs {
if conn != nil { // dump out the first one we find. (TODO pick better) if conn != nil { // dump out the first one we find. (TODO pick better)
return conn, nil return conn
} }
} }
return nil
}
// check if there's an ongoing dial to this peer // gatedDialAttempt is an attempt to dial a node. It is gated by the swarm's
if ok, wait := s.dsync.Lock(p); !ok { // dial synchronization systems: dialsync and dialbackoff.
func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error) {
if s.backf.Backoff(p) { var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
log.Debugf("backoff") defer log.EventBegin(ctx, "swarmDialAttemptSync", logdial).Done()
return nil, fmt.Errorf("%s failed to dial %s, backing off.", s.local, p)
}
log.Debugf("waiting for ongoing dial") // check if we already have an open connection first
select { conn := s.bestConnectionToPeer(p)
case <-wait: // wait for that dial to finish. if conn != nil {
continue // and see if it worked (loop), OR we got an incoming dial. return conn, nil
case <-ctx.Done(): // or we may have to bail...
return nil, ctx.Err()
}
} }
// check if there's an ongoing dial to this peer
if ok, wait := s.dsync.Lock(p); ok {
// ok, we have been charged to dial! let's do it. // ok, we have been charged to dial! let's do it.
// if it succeeds, dial will add the conn to the swarm itself. // if it succeeds, dial will add the conn to the swarm itself.
log.Debugf("dial start")
defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done()
ctxT, _ := context.WithTimeout(ctx, s.dialT) ctxT, _ := context.WithTimeout(ctx, s.dialT)
conn, err = s.dial(ctxT, p) conn, err := s.dial(ctxT, p)
s.dsync.Unlock(p) s.dsync.Unlock(p)
log.Debugf("dial end %s", conn) log.Debugf("dial end %s", conn)
if err != nil { if err != nil {
log.Event(ctx, "swarmDialBackoffAdd", logdial)
s.backf.AddBackoff(p) // let others know to backoff s.backf.AddBackoff(p) // let others know to backoff
continue // ok, we failed. try again. (if loop is done, our error is output) return nil, ErrDialFailed // ok, we failed. try again. (if loop is done, our error is output)
} }
log.Event(ctx, "swarmDialBackoffClear", logdial)
s.backf.Clear(p) // okay, no longer need to backoff s.backf.Clear(p) // okay, no longer need to backoff
return conn, nil return conn, nil
} else {
// we did not dial. we must wait for someone else to dial.
// check whether we should backoff first...
if s.backf.Backoff(p) {
log.Event(ctx, "swarmDialBackoff", logdial)
return nil, ErrDialBackoff
}
defer log.EventBegin(ctx, "swarmDialWait", logdial).Done()
select {
case <-wait: // wait for that other dial to finish.
// see if it worked, OR we got an incoming dial in the meantime...
conn := s.bestConnectionToPeer(p)
if conn != nil {
return conn, nil
}
return nil, ErrDialFailed
case <-ctx.Done(): // or we may have to bail...
return nil, ctx.Err()
} }
if err == nil {
err = fmt.Errorf("%s failed to dial %s after %d attempts", s.local, p, dialAttempts)
} }
return nil, err
} }
// dial is the actual swarm's dial logic, gated by Dial. // dial is the actual swarm's dial logic, gated by Dial.
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
defer log.EventBegin(ctx, "swarmDialDo", logdial).Done()
if p == s.local { if p == s.local {
return nil, errors.New("Attempted connection to self!") log.Event(ctx, "swarmDialDoDialSelf", logdial)
return nil, ErrDialToSelf
} }
sk := s.peers.PrivKey(s.local) sk := s.peers.PrivKey(s.local)
if sk == nil { if sk == nil {
// may be fine for sk to be nil, just log a warning. // may be fine for sk to be nil, just log.
log.Warning("Dial not given PrivateKey, so WILL NOT SECURE conn.") log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.")
log.Event(ctx, "swarmDialDoInsecure", logdial)
} }
// get our own addrs. try dialing out from our listener addresses (reusing ports) // get our own addrs. try dialing out from our listener addresses (reusing ports)
...@@ -291,15 +320,16 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { ...@@ -291,15 +320,16 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
} }
// ok try to setup the new connection. // ok try to setup the new connection.
defer log.EventBegin(ctx, "swarmDialDoSetup", logdial, lgbl.NetConn(connC)).Done()
swarmC, err := dialConnSetup(ctx, s, connC) swarmC, err := dialConnSetup(ctx, s, connC)
if err != nil { if err != nil {
log.Debug("Dial newConnSetup failed. disconnecting.") log.Debug("Dial newConnSetup failed. disconnecting.")
log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err)) log.Event(ctx, "swarmDialDoSetupFailed", logdial, lgbl.NetConn(connC), lgbl.Error(err))
connC.Close() // close the connection. didn't work out :( connC.Close() // close the connection. didn't work out :(
return nil, err return nil, err
} }
log.Event(ctx, "dial", p) log.Event(ctx, "swarmDialDoSuccess", logdial, lgbl.NetConn(connC))
return swarmC, nil return swarmC, nil
} }
......
...@@ -9,7 +9,11 @@ package loggables ...@@ -9,7 +9,11 @@ package loggables
import ( import (
"net" "net"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
log "github.com/jbenet/go-ipfs/thirdparty/eventlog" log "github.com/jbenet/go-ipfs/thirdparty/eventlog"
peer "github.com/jbenet/go-ipfs/p2p/peer"
) )
// NetConn returns an eventlog.Metadata with the conn addresses // NetConn returns an eventlog.Metadata with the conn addresses
...@@ -26,3 +30,21 @@ func Error(e error) log.Loggable { ...@@ -26,3 +30,21 @@ func Error(e error) log.Loggable {
"error": e.Error(), "error": e.Error(),
} }
} }
// Dial metadata is metadata for dial events
func Dial(sys string, lid, rid peer.ID, laddr, raddr ma.Multiaddr) log.LoggableMap {
m := log.Metadata{"subsystem": sys}
if lid != "" {
m["localPeer"] = lid.Pretty()
}
if laddr != nil {
m["localAddr"] = laddr.String()
}
if rid != "" {
m["remotePeer"] = rid.Pretty()
}
if raddr != nil {
m["remoteAddr"] = raddr.String()
}
return log.LoggableMap(m)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment