Commit d26fd581 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

ctx closer races #270

parent 7cfe2f1a
...@@ -89,13 +89,13 @@ func newSingleConn(ctx context.Context, local, remote peer.Peer, ...@@ -89,13 +89,13 @@ func newSingleConn(ctx context.Context, local, remote peer.Peer,
log.Info("newSingleConn: %v to %v", local, remote) log.Info("newSingleConn: %v to %v", local, remote)
// setup the various io goroutines // setup the various io goroutines
conn.Children().Add(1)
go func() { go func() {
conn.Children().Add(1)
conn.msgio.outgoing.WriteTo(maconn) conn.msgio.outgoing.WriteTo(maconn)
conn.Children().Done() conn.Children().Done()
}() }()
conn.Children().Add(1)
go func() { go func() {
conn.Children().Add(1)
conn.msgio.incoming.ReadFrom(maconn, MaxMessageSize) conn.msgio.incoming.ReadFrom(maconn, MaxMessageSize)
conn.Children().Done() conn.Children().Done()
}() }()
......
...@@ -47,7 +47,6 @@ func (l *listener) close() error { ...@@ -47,7 +47,6 @@ func (l *listener) close() error {
} }
func (l *listener) listen() { func (l *listener) listen() {
l.Children().Add(1)
defer l.Children().Done() defer l.Children().Done()
// handle at most chansize concurrent handshakes // handle at most chansize concurrent handshakes
...@@ -143,6 +142,7 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local peer.Peer, peers peer. ...@@ -143,6 +142,7 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local peer.Peer, peers peer.
ctx2, _ := context.WithCancel(ctx) ctx2, _ := context.WithCancel(ctx)
l.ContextCloser = ctxc.NewContextCloser(ctx2, l.close) l.ContextCloser = ctxc.NewContextCloser(ctx2, l.close)
l.Children().Add(1)
go l.listen() go l.listen()
return l, nil return l, nil
......
...@@ -57,6 +57,8 @@ func NewMultiConn(ctx context.Context, local, remote peer.Peer, conns []Conn) (* ...@@ -57,6 +57,8 @@ func NewMultiConn(ctx context.Context, local, remote peer.Peer, conns []Conn) (*
if conns != nil && len(conns) > 0 { if conns != nil && len(conns) > 0 {
c.Add(conns...) c.Add(conns...)
} }
c.Children().Add(1)
go c.fanOut() go c.fanOut()
return c, nil return c, nil
} }
...@@ -81,6 +83,8 @@ func (c *MultiConn) Add(conns ...Conn) { ...@@ -81,6 +83,8 @@ func (c *MultiConn) Add(conns ...Conn) {
} }
c.conns[c2.ID()] = c2 c.conns[c2.ID()] = c2
c.Children().Add(1)
c2.Children().Add(1) // yep, on the child too.
go c.fanInSingle(c2) go c.fanInSingle(c2)
log.Infof("MultiConn: added %s", c2) log.Infof("MultiConn: added %s", c2)
} }
...@@ -134,7 +138,6 @@ func CloseConns(conns ...Conn) { ...@@ -134,7 +138,6 @@ func CloseConns(conns ...Conn) {
// fanOut is the multiplexor out -- it sends outgoing messages over the // fanOut is the multiplexor out -- it sends outgoing messages over the
// underlying single connections. // underlying single connections.
func (c *MultiConn) fanOut() { func (c *MultiConn) fanOut() {
c.Children().Add(1)
defer c.Children().Done() defer c.Children().Done()
i := 0 i := 0
...@@ -165,9 +168,6 @@ func (c *MultiConn) fanOut() { ...@@ -165,9 +168,6 @@ func (c *MultiConn) fanOut() {
// fanInSingle is a multiplexor in -- it receives incoming messages over the // fanInSingle is a multiplexor in -- it receives incoming messages over the
// underlying single connections. // underlying single connections.
func (c *MultiConn) fanInSingle(child Conn) { func (c *MultiConn) fanInSingle(child Conn) {
c.Children().Add(1)
child.Children().Add(1) // yep, on the child too.
// cleanup all data associated with this child Connection. // cleanup all data associated with this child Connection.
defer func() { defer func() {
log.Infof("closing: %s", child) log.Infof("closing: %s", child)
......
...@@ -139,6 +139,8 @@ func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) { ...@@ -139,6 +139,8 @@ func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) {
s.connsLock.Unlock() s.connsLock.Unlock()
// kick off reader goroutine // kick off reader goroutine
s.Children().Add(1)
mc.Children().Add(1) // child of Conn as well.
go s.fanInSingle(mc) go s.fanInSingle(mc)
log.Debugf("added new multiconn: %s", mc) log.Debugf("added new multiconn: %s", mc)
} else { } else {
...@@ -154,7 +156,6 @@ func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) { ...@@ -154,7 +156,6 @@ func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) {
// Handles the unwrapping + sending of messages to the right connection. // Handles the unwrapping + sending of messages to the right connection.
func (s *Swarm) fanOut() { func (s *Swarm) fanOut() {
s.Children().Add(1)
defer s.Children().Done() defer s.Children().Done()
i := 0 i := 0
...@@ -194,9 +195,6 @@ func (s *Swarm) fanOut() { ...@@ -194,9 +195,6 @@ func (s *Swarm) fanOut() {
// Handles the receiving + wrapping of messages, per conn. // Handles the receiving + wrapping of messages, per conn.
// Consider using reflect.Select with one goroutine instead of n. // Consider using reflect.Select with one goroutine instead of n.
func (s *Swarm) fanInSingle(c conn.Conn) { func (s *Swarm) fanInSingle(c conn.Conn) {
s.Children().Add(1)
c.Children().Add(1) // child of Conn as well.
// cleanup all data associated with this child Connection. // cleanup all data associated with this child Connection.
defer func() { defer func() {
// remove it from the map. // remove it from the map.
......
...@@ -83,6 +83,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, local peer.Peer, ...@@ -83,6 +83,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, local peer.Peer,
// ContextCloser for proper child management. // ContextCloser for proper child management.
s.ContextCloser = ctxc.NewContextCloser(ctx, s.close) s.ContextCloser = ctxc.NewContextCloser(ctx, s.close)
s.Children().Add(1)
go s.fanOut() go s.fanOut()
return s, s.listen(listenAddrs) return s, s.listen(listenAddrs)
} }
......
...@@ -120,6 +120,7 @@ func NewContextCloser(ctx context.Context, cf CloseFunc) ContextCloser { ...@@ -120,6 +120,7 @@ func NewContextCloser(ctx context.Context, cf CloseFunc) ContextCloser {
closed: make(chan struct{}), closed: make(chan struct{}),
} }
c.Children().Add(1) // we're a child goroutine, to be waited upon.
go c.closeOnContextDone() go c.closeOnContextDone()
return c return c
} }
...@@ -176,7 +177,6 @@ func (c *contextCloser) closeLogic() { ...@@ -176,7 +177,6 @@ func (c *contextCloser) closeLogic() {
// we need to go through the Close motions anyway. Hence all the sync // we need to go through the Close motions anyway. Hence all the sync
// stuff all over the place... // stuff all over the place...
func (c *contextCloser) closeOnContextDone() { func (c *contextCloser) closeOnContextDone() {
c.Children().Add(1) // we're a child goroutine, to be waited upon.
<-c.Context().Done() // wait until parent (context) is done. <-c.Context().Done() // wait until parent (context) is done.
c.internalClose() c.internalClose()
c.Children().Done() c.Children().Done()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment