Commit 56b14d8e authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

Merge pull request #676 from jbenet/dial-events

p2p/net: dial log -> events
parents 03e5a3eb eb797706
......@@ -11,6 +11,7 @@ import (
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
reuseport "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
peer "github.com/jbenet/go-ipfs/p2p/peer"
......@@ -26,9 +27,14 @@ func (d *Dialer) String() string {
// Ensures raddr is part of peer.Addresses()
// Example: d.DialAddr(ctx, peer.Addresses()[0], peer)
func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (Conn, error) {
logdial := lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr)
logdial["encrypted"] = (d.PrivateKey != nil) // log wether this will be an encrypted dial or not.
defer log.EventBegin(ctx, "connDial", logdial).Done()
maconn, err := d.rawConnDial(ctx, raddr, remote)
if err != nil {
logdial["dial"] = "failure"
logdial["error"] = err
return nil, err
}
......@@ -51,6 +57,7 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (
connOut = c
return
}
c2, err := newSecureConn(ctx, d.PrivateKey, c)
if err != nil {
errOut = err
......@@ -64,12 +71,20 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (
select {
case <-ctx.Done():
maconn.Close()
logdial["error"] = ctx.Err()
return nil, ctx.Err()
case <-done:
// whew, finished.
}
return connOut, errOut
if errOut != nil {
logdial["error"] = errOut
logdial["dial"] = "failure"
return nil, errOut
}
logdial["dial"] = "success"
return connOut, nil
}
// rawConnDial dials the underlying net.Conn + manet.Conns
......@@ -82,12 +97,14 @@ func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote pee
}
if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") {
log.Event(ctx, "connDialZeroAddr", lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr))
return nil, debugerror.Errorf("Attempted to connect to zero address: %s", raddr)
}
// get local addr to use.
laddr := pickLocalAddr(d.LocalAddrs, raddr)
log.Debugf("%s dialing %s -- %s --> %s", d.LocalPeer, remote, laddr, raddr)
logdial := lgbl.Dial("conn", d.LocalPeer, remote, laddr, raddr)
defer log.EventBegin(ctx, "connDialRawConn", logdial).Done()
// make a copy of the manet.Dialer, we may need to change its timeout.
madialer := d.Dialer
......@@ -99,19 +116,27 @@ func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote pee
// dial using reuseport.Dialer, because we're probably reusing addrs.
// this is optimistic, as the reuseDial may fail to bind the port.
rpev := log.EventBegin(ctx, "connDialReusePort", logdial)
if nconn, retry, reuseErr := reuseDial(madialer.Dialer, laddr, raddr); reuseErr == nil {
// if it worked, wrap the raw net.Conn with our manet.Conn
log.Debugf("%s reuse worked! %s %s %s", d.LocalPeer, laddr, nconn.RemoteAddr(), nconn)
logdial["reuseport"] = "success"
rpev.Done()
return manet.WrapNetConn(nconn)
} else if !retry {
// reuseDial is sure this is a legitimate dial failure, not a reuseport failure.
logdial["reuseport"] = "failure"
logdial["error"] = reuseErr
rpev.Done()
return nil, reuseErr
} else {
// this is a failure to reuse port. log it.
log.Debugf("%s port reuse failed: %s --> %s -- %s", d.LocalPeer, laddr, raddr, reuseErr)
logdial["reuseport"] = "retry"
logdial["error"] = reuseErr
rpev.Done()
}
}
defer log.EventBegin(ctx, "connDialManet", logdial).Done()
return madialer.Dial(raddr)
}
......
......@@ -28,8 +28,16 @@ import (
// any may fail if no addr at end
// retry dialAttempt x
var (
ErrDialBackoff = errors.New("dial backoff")
ErrDialFailed = errors.New("dial attempt failed")
ErrDialToSelf = errors.New("dial to self attempted")
)
// dialAttempts governs how many times a goroutine will try to dial a given peer.
const dialAttempts = 3
// Note: this is down to one, as we have _too many dials_ atm. To add back in,
// add loop back in Dial(.)
const dialAttempts = 1
// DialTimeout is the amount of time each dial attempt has. We can think about making
// this larger down the road, or putting more granular timeouts (i.e. within each
......@@ -179,75 +187,97 @@ func (db *dialbackoff) Clear(p peer.ID) {
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
if p == s.local {
return nil, errors.New("Attempted connection to self!")
log.Event(ctx, "swarmDialSelf", logdial)
return nil, ErrDialToSelf
}
// this loop is here because dials take time, and we should not be dialing
// the same peer concurrently (silly waste). Additonally, it's structured
// to check s.ConnectionsToPeer(p) _first_, and _between_ attempts because we
// may have received an incoming connection! if so, we no longer must dial.
//
// During the dial attempts, we may be doing the dialing. if not, we wait.
var err error
var conn *Conn
for i := 0; i < dialAttempts; i++ {
// check if we already have an open connection first
cs := s.ConnectionsToPeer(p)
for _, conn = range cs {
if conn != nil { // dump out the first one we find. (TODO pick better)
return conn, nil
}
}
return s.gatedDialAttempt(ctx, p)
}
// check if there's an ongoing dial to this peer
if ok, wait := s.dsync.Lock(p); !ok {
func (s *Swarm) bestConnectionToPeer(p peer.ID) *Conn {
cs := s.ConnectionsToPeer(p)
for _, conn := range cs {
if conn != nil { // dump out the first one we find. (TODO pick better)
return conn
}
}
return nil
}
if s.backf.Backoff(p) {
log.Debugf("backoff")
return nil, fmt.Errorf("%s failed to dial %s, backing off.", s.local, p)
}
// gatedDialAttempt is an attempt to dial a node. It is gated by the swarm's
// dial synchronization systems: dialsync and dialbackoff.
func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error) {
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
defer log.EventBegin(ctx, "swarmDialAttemptSync", logdial).Done()
log.Debugf("waiting for ongoing dial")
select {
case <-wait: // wait for that dial to finish.
continue // and see if it worked (loop), OR we got an incoming dial.
case <-ctx.Done(): // or we may have to bail...
return nil, ctx.Err()
}
}
// check if we already have an open connection first
conn := s.bestConnectionToPeer(p)
if conn != nil {
return conn, nil
}
// check if there's an ongoing dial to this peer
if ok, wait := s.dsync.Lock(p); ok {
// ok, we have been charged to dial! let's do it.
// if it succeeds, dial will add the conn to the swarm itself.
log.Debugf("dial start")
defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done()
ctxT, _ := context.WithTimeout(ctx, s.dialT)
conn, err = s.dial(ctxT, p)
conn, err := s.dial(ctxT, p)
s.dsync.Unlock(p)
log.Debugf("dial end %s", conn)
if err != nil {
log.Event(ctx, "swarmDialBackoffAdd", logdial)
s.backf.AddBackoff(p) // let others know to backoff
continue // ok, we failed. try again. (if loop is done, our error is output)
return nil, ErrDialFailed // ok, we failed. try again. (if loop is done, our error is output)
}
log.Event(ctx, "swarmDialBackoffClear", logdial)
s.backf.Clear(p) // okay, no longer need to backoff
return conn, nil
} else {
// we did not dial. we must wait for someone else to dial.
// check whether we should backoff first...
if s.backf.Backoff(p) {
log.Event(ctx, "swarmDialBackoff", logdial)
return nil, ErrDialBackoff
}
defer log.EventBegin(ctx, "swarmDialWait", logdial).Done()
select {
case <-wait: // wait for that other dial to finish.
// see if it worked, OR we got an incoming dial in the meantime...
conn := s.bestConnectionToPeer(p)
if conn != nil {
return conn, nil
}
return nil, ErrDialFailed
case <-ctx.Done(): // or we may have to bail...
return nil, ctx.Err()
}
}
if err == nil {
err = fmt.Errorf("%s failed to dial %s after %d attempts", s.local, p, dialAttempts)
}
return nil, err
}
// dial is the actual swarm's dial logic, gated by Dial.
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
if p == s.local {
return nil, errors.New("Attempted connection to self!")
log.Event(ctx, "swarmDialDoDialSelf", logdial)
return nil, ErrDialToSelf
}
defer log.EventBegin(ctx, "swarmDialDo", logdial).Done()
logdial["dial"] = "failure" // start off with failure. set to "success" at the end.
sk := s.peers.PrivKey(s.local)
logdial["encrypted"] = (sk != nil) // log wether this will be an encrypted dial or not.
if sk == nil {
// may be fine for sk to be nil, just log a warning.
log.Warning("Dial not given PrivateKey, so WILL NOT SECURE conn.")
// fine for sk to be nil, just log.
log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.")
}
// get our own addrs. try dialing out from our listener addresses (reusing ports)
......@@ -269,7 +299,9 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addresses(s.local))
log.Debugf("%s swarm dialing %s -- remote:%s local:%s", s.local, p, remoteAddrs, s.ListenAddresses())
if len(remoteAddrs) == 0 {
return nil, errors.New("peer has no addresses")
err := errors.New("peer has no addresses")
logdial["error"] = err
return nil, err
}
// open connection to peer
......@@ -287,19 +319,21 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
// try to get a connection to any addr
connC, err := s.dialAddrs(ctx, d, p, remoteAddrs)
if err != nil {
logdial["error"] = err
return nil, err
}
logdial["netconn"] = lgbl.NetConn(connC)
// ok try to setup the new connection.
defer log.EventBegin(ctx, "swarmDialDoSetup", logdial, lgbl.NetConn(connC)).Done()
swarmC, err := dialConnSetup(ctx, s, connC)
if err != nil {
log.Debug("Dial newConnSetup failed. disconnecting.")
log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
logdial["error"] = err
connC.Close() // close the connection. didn't work out :(
return nil, err
}
log.Event(ctx, "dial", p)
logdial["dial"] = "success"
return swarmC, nil
}
......@@ -398,8 +432,6 @@ func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error
// ok try to setup the new connection. (newConnSetup will add to group)
swarmC, err := s.newConnSetup(ctx, psC)
if err != nil {
log.Debug("Dial newConnSetup failed. disconnecting.")
log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
psC.Close() // we need to make sure psC is Closed.
return nil, err
}
......
......@@ -9,7 +9,11 @@ package loggables
import (
"net"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
log "github.com/jbenet/go-ipfs/thirdparty/eventlog"
peer "github.com/jbenet/go-ipfs/p2p/peer"
)
// NetConn returns an eventlog.Metadata with the conn addresses
......@@ -26,3 +30,42 @@ func Error(e error) log.Loggable {
"error": e.Error(),
}
}
// Dial metadata is metadata for dial events
func Dial(sys string, lid, rid peer.ID, laddr, raddr ma.Multiaddr) DeferredMap {
m := DeferredMap{}
m["subsystem"] = sys
if lid != "" {
m["localPeer"] = func() interface{} { return lid.Pretty() }
}
if laddr != nil {
m["localAddr"] = func() interface{} { return laddr.String() }
}
if rid != "" {
m["remotePeer"] = func() interface{} { return rid.Pretty() }
}
if raddr != nil {
m["remoteAddr"] = func() interface{} { return raddr.String() }
}
return m
}
// DeferredMap is a Loggable which may contained deffered values.
type DeferredMap map[string]interface{}
// Loggable describes objects that can be marshalled into Metadata for logging
func (m DeferredMap) Loggable() map[string]interface{} {
m2 := map[string]interface{}{}
for k, v := range m {
if vf, ok := v.(func() interface{}); ok {
// if it's a DeferredVal, call it.
m2[k] = vf()
} else {
// else use the value as is.
m2[k] = v
}
}
return m2
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment