Commit 6287b47f authored by Steven Allen's avatar Steven Allen

cleanup connection setup

We already setup the connection from within the new connection handler. No need
to do it *again* on dial.
parent f7c26f49
package swarm package swarm
import ( import (
"context"
"fmt" "fmt"
ic "github.com/libp2p/go-libp2p-crypto" ic "github.com/libp2p/go-libp2p-crypto"
...@@ -121,31 +120,3 @@ func wrapConns(conns1 []*ps.Conn) []*Conn { ...@@ -121,31 +120,3 @@ func wrapConns(conns1 []*ps.Conn) []*Conn {
} }
return conns2 return conns2
} }
// newConnSetup does the swarm's "setup" for a connection. returns the underlying
// conn.Conn this method is used by both swarm.Dial and ps.Swarm connHandler
func (s *Swarm) newConnSetup(ctx context.Context, psConn *ps.Conn) (*Conn, error) {
// wrap with a Conn
sc, err := wrapConn(psConn)
if err != nil {
return nil, err
}
// if we have a public key, make sure we add it to our peerstore!
// This is an important detail. Otherwise we must fetch the public
// key from the DHT or some other system.
if pk := sc.RemotePublicKey(); pk != nil {
s.peers.AddPubKey(sc.RemotePeer(), pk)
}
// ok great! we can use it. add it to our group.
// set the RemotePeer as a group on the conn. this lets us group
// connections in the StreamSwarm by peer, and get a streams from
// any available connection in the group (better multiconn):
// swarm.StreamSwarm().NewStreamWithGroup(remotePeer)
psConn.AddGroup(sc.RemotePeer())
return sc, nil
}
...@@ -189,14 +189,16 @@ func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error) ...@@ -189,14 +189,16 @@ func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error)
// doDial is an ugly shim method to retain all the logging and backoff logic // doDial is an ugly shim method to retain all the logging and backoff logic
// of the old dialsync code // of the old dialsync code
func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) { func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil) ctx, cancel := context.WithTimeout(ctx, s.dialT)
defer cancel()
logdial := lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
// ok, we have been charged to dial! let's do it. // ok, we have been charged to dial! let's do it.
// if it succeeds, dial will add the conn to the swarm itself. // if it succeeds, dial will add the conn to the swarm itself.
defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done() defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done()
ctxT, cancel := context.WithTimeout(ctx, s.dialT)
conn, err := s.dial(ctxT, p) conn, err := s.dial(ctx, p)
cancel()
log.Debugf("dial end %s", conn)
if err != nil { if err != nil {
if err != context.Canceled { if err != context.Canceled {
log.Event(ctx, "swarmDialBackoffAdd", logdial) log.Event(ctx, "swarmDialBackoffAdd", logdial)
...@@ -206,8 +208,6 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) { ...@@ -206,8 +208,6 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {
// ok, we failed. try again. (if loop is done, our error is output) // ok, we failed. try again. (if loop is done, our error is output)
return nil, fmt.Errorf("dial attempt failed: %s", err) return nil, fmt.Errorf("dial attempt failed: %s", err)
} }
log.Event(ctx, "swarmDialBackoffClear", logdial)
s.backf.Clear(p) // okay, no longer need to backoff
return conn, nil return conn, nil
} }
...@@ -373,10 +373,8 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (ico ...@@ -373,10 +373,8 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (ico
var ConnSetupTimeout = time.Minute * 5 var ConnSetupTimeout = time.Minute * 5
// dialConnSetup is the setup logic for a connection from the dial side. it // dialConnSetup is the setup logic for a connection from the dial side.
// needs to add the Conn to the StreamSwarm, then run newConnSetup
func dialConnSetup(ctx context.Context, s *Swarm, connC iconn.Conn) (*Conn, error) { func dialConnSetup(ctx context.Context, s *Swarm, connC iconn.Conn) (*Conn, error) {
deadline, ok := ctx.Deadline() deadline, ok := ctx.Deadline()
if !ok { if !ok {
deadline = time.Now().Add(ConnSetupTimeout) deadline = time.Now().Add(ConnSetupTimeout)
...@@ -386,14 +384,14 @@ func dialConnSetup(ctx context.Context, s *Swarm, connC iconn.Conn) (*Conn, erro ...@@ -386,14 +384,14 @@ func dialConnSetup(ctx context.Context, s *Swarm, connC iconn.Conn) (*Conn, erro
return nil, err return nil, err
} }
// Add conn to ps swarm. Setup will be done by the connection handler.
psC, err := s.swarm.AddConn(connC) psC, err := s.swarm.AddConn(connC)
if err != nil { if err != nil {
// connC is closed by caller if we fail. // connC is closed by caller if we fail.
return nil, fmt.Errorf("failed to add conn to ps.Swarm: %s", err) return nil, fmt.Errorf("failed to add conn to ps.Swarm: %s", err)
} }
// ok try to setup the new connection. (newConnSetup will add to group) swarmC, err := wrapConn(psC)
swarmC, err := s.newConnSetup(ctx, psC)
if err != nil { if err != nil {
psC.Close() // we need to make sure psC is Closed. psC.Close() // we need to make sure psC is Closed.
return nil, err return nil, err
......
...@@ -147,23 +147,24 @@ func (s *Swarm) addConnListener(list iconn.Listener) error { ...@@ -147,23 +147,24 @@ func (s *Swarm) addConnListener(list iconn.Listener) error {
// will take a while do it in a goroutine. // will take a while do it in a goroutine.
// See https://godoc.org/github.com/libp2p/go-peerstream for more information // See https://godoc.org/github.com/libp2p/go-peerstream for more information
func (s *Swarm) connHandler(c *ps.Conn) *Conn { func (s *Swarm) connHandler(c *ps.Conn) *Conn {
ctx := context.Background() sc, err := wrapConn(c)
// this context is for running the handshake, which -- when receiveing connections
// -- we have no bound on beyond what the transport protocol bounds it at.
// note that setup + the handshake are bounded by underlying io.
// (i.e. if TCP or UDP disconnects (or the swarm closes), we're done.
// Q: why not have a shorter handshake? think about an HTTP server on really slow conns.
// as long as the conn is live (TCP says its online), it tries its best. we follow suit.)
sc, err := s.newConnSetup(ctx, c)
if err != nil { if err != nil {
log.Debug(err) log.Event(s.Context(), "newConnHandlerDisconnect", lgbl.NetConn(c.NetConn()), lgbl.Error(err))
log.Event(ctx, "newConnHandlerDisconnect", lgbl.NetConn(c.NetConn()), lgbl.Error(err))
c.Close() // boom. close it. c.Close() // boom. close it.
return nil return nil
} }
// if a peer dials us, remove from dial backoff. // Add the public key.
if pk := sc.RemotePublicKey(); pk != nil {
s.peers.AddPubKey(sc.RemotePeer(), pk)
}
// Add the group
c.AddGroup(sc.RemotePeer())
// clear backoff on successful connection.
logdial := lgbl.Dial("swarm", sc.LocalPeer(), sc.RemotePeer(), nil, nil)
log.Event(s.Context(), "swarmDialBackoffClear", logdial)
s.backf.Clear(sc.RemotePeer()) s.backf.Clear(sc.RemotePeer())
return sc return sc
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment