diff --git a/net/conn/conn.go b/net/conn/conn.go index 4acafff471b9149dc914ec40476e9e1998e3415d..92bf2259b76f5ed7f49a8eee6f39650f32c3a154 100644 --- a/net/conn/conn.go +++ b/net/conn/conn.go @@ -89,13 +89,13 @@ func newSingleConn(ctx context.Context, local, remote peer.Peer, log.Info("newSingleConn: %v to %v", local, remote) // setup the various io goroutines + conn.Children().Add(1) go func() { - conn.Children().Add(1) conn.msgio.outgoing.WriteTo(maconn) conn.Children().Done() }() + conn.Children().Add(1) go func() { - conn.Children().Add(1) conn.msgio.incoming.ReadFrom(maconn, MaxMessageSize) conn.Children().Done() }() diff --git a/net/conn/listen.go b/net/conn/listen.go index 41335ee88168f191f556847bf0b4ac81f4d55f67..4cbe9a766b2b2fcc5144e92cd86fa11ac19490e3 100644 --- a/net/conn/listen.go +++ b/net/conn/listen.go @@ -47,7 +47,6 @@ func (l *listener) close() error { } func (l *listener) listen() { - l.Children().Add(1) defer l.Children().Done() // handle at most chansize concurrent handshakes @@ -143,6 +142,7 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local peer.Peer, peers peer. ctx2, _ := context.WithCancel(ctx) l.ContextCloser = ctxc.NewContextCloser(ctx2, l.close) + l.Children().Add(1) go l.listen() return l, nil diff --git a/net/conn/multiconn.go b/net/conn/multiconn.go index 885ad60084a26b51aaafe4bfa242c291c4d40aa6..5834217a77af6792ed67e32159bf6066400054fb 100644 --- a/net/conn/multiconn.go +++ b/net/conn/multiconn.go @@ -57,6 +57,8 @@ func NewMultiConn(ctx context.Context, local, remote peer.Peer, conns []Conn) (* if conns != nil && len(conns) > 0 { c.Add(conns...) } + + c.Children().Add(1) go c.fanOut() return c, nil } @@ -81,6 +83,8 @@ func (c *MultiConn) Add(conns ...Conn) { } c.conns[c2.ID()] = c2 + c.Children().Add(1) + c2.Children().Add(1) // yep, on the child too. go c.fanInSingle(c2) log.Infof("MultiConn: added %s", c2) } @@ -134,7 +138,6 @@ func CloseConns(conns ...Conn) { // fanOut is the multiplexor out -- it sends outgoing messages over the // underlying single connections. func (c *MultiConn) fanOut() { - c.Children().Add(1) defer c.Children().Done() i := 0 @@ -165,9 +168,6 @@ func (c *MultiConn) fanOut() { // fanInSingle is a multiplexor in -- it receives incoming messages over the // underlying single connections. func (c *MultiConn) fanInSingle(child Conn) { - c.Children().Add(1) - child.Children().Add(1) // yep, on the child too. - // cleanup all data associated with this child Connection. defer func() { log.Infof("closing: %s", child) diff --git a/net/swarm/conn.go b/net/swarm/conn.go index 63f6910a48946fdd02f9b51904f5c142f6069587..aa160b924d184c2559ceb21ab1072f93e14d72d9 100644 --- a/net/swarm/conn.go +++ b/net/swarm/conn.go @@ -139,6 +139,8 @@ func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) { s.connsLock.Unlock() // kick off reader goroutine + s.Children().Add(1) + mc.Children().Add(1) // child of Conn as well. go s.fanInSingle(mc) log.Debugf("added new multiconn: %s", mc) } else { @@ -154,7 +156,6 @@ func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) { // Handles the unwrapping + sending of messages to the right connection. func (s *Swarm) fanOut() { - s.Children().Add(1) defer s.Children().Done() i := 0 @@ -194,9 +195,6 @@ func (s *Swarm) fanOut() { // Handles the receiving + wrapping of messages, per conn. // Consider using reflect.Select with one goroutine instead of n. func (s *Swarm) fanInSingle(c conn.Conn) { - s.Children().Add(1) - c.Children().Add(1) // child of Conn as well. - // cleanup all data associated with this child Connection. defer func() { // remove it from the map. diff --git a/net/swarm/swarm.go b/net/swarm/swarm.go index 59279ed320fe43120adf7a6f6de34298e3e7d10f..2f022db006c4f1208334b016823a6a539b26508c 100644 --- a/net/swarm/swarm.go +++ b/net/swarm/swarm.go @@ -83,6 +83,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, local peer.Peer, // ContextCloser for proper child management. s.ContextCloser = ctxc.NewContextCloser(ctx, s.close) + s.Children().Add(1) go s.fanOut() return s, s.listen(listenAddrs) } diff --git a/util/ctxcloser/closer.go b/util/ctxcloser/closer.go index ca80368eae26983d5608a504906b4c60b82b1be0..5baf6591d46fa2ecf0d05d8fc112853be87de896 100644 --- a/util/ctxcloser/closer.go +++ b/util/ctxcloser/closer.go @@ -120,6 +120,7 @@ func NewContextCloser(ctx context.Context, cf CloseFunc) ContextCloser { closed: make(chan struct{}), } + c.Children().Add(1) // we're a child goroutine, to be waited upon. go c.closeOnContextDone() return c } @@ -176,7 +177,6 @@ func (c *contextCloser) closeLogic() { // we need to go through the Close motions anyway. Hence all the sync // stuff all over the place... func (c *contextCloser) closeOnContextDone() { - c.Children().Add(1) // we're a child goroutine, to be waited upon. <-c.Context().Done() // wait until parent (context) is done. c.internalClose() c.Children().Done()