From 5217668f642213bcc069116cdde6e0201f6b3efe Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 2 Sep 2020 14:52:56 -0700 Subject: [PATCH] fix: handle case where swarm closes before stream When we close a connection, we set the "stream" set to nil to avoid opening new stream. Unfortunately, this meant we wouldn't decrement the reference count on the swarm. --- swarm_conn.go | 4 +--- swarm_stream.go | 10 +++++----- swarm_test.go | 17 +++++++++++++++++ 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/swarm_conn.go b/swarm_conn.go index bd531f2..946936c 100644 --- a/swarm_conn.go +++ b/swarm_conn.go @@ -87,12 +87,10 @@ func (c *Conn) doClose() { }() } -func (c *Conn) removeStream(s *Stream) bool { +func (c *Conn) removeStream(s *Stream) { c.streams.Lock() - _, has := c.streams.m[s] delete(c.streams.m, s) c.streams.Unlock() - return has } // listens for new streams. diff --git a/swarm_stream.go b/swarm_stream.go index 74ac62c..c09b712 100644 --- a/swarm_stream.go +++ b/swarm_stream.go @@ -22,6 +22,8 @@ type Stream struct { stream mux.MuxedStream conn *Conn + closeOnce sync.Once + notifyLk sync.Mutex protocol atomic.Value @@ -76,7 +78,7 @@ func (s *Stream) Write(p []byte) (int, error) { // resources. func (s *Stream) Close() error { err := s.stream.Close() - s.remove() + s.closeOnce.Do(s.remove) return err } @@ -84,7 +86,7 @@ func (s *Stream) Close() error { // associated resources. func (s *Stream) Reset() error { err := s.stream.Reset() - s.remove() + s.closeOnce.Do(s.remove) return err } @@ -102,9 +104,7 @@ func (s *Stream) CloseRead() error { } func (s *Stream) remove() { - if !s.conn.removeStream(s) { - return - } + s.conn.removeStream(s) // We *must* do this in a goroutine. This can be called during a // an open notification and will block until that notification is done. diff --git a/swarm_test.go b/swarm_test.go index 81e1412..bd2c844 100644 --- a/swarm_test.go +++ b/swarm_test.go @@ -440,3 +440,20 @@ func TestNoDial(t *testing.T) { t.Fatal("should have failed with ErrNoConn") } } + +func TestCloseWithOpenStreams(t *testing.T) { + ctx := context.Background() + swarms := makeSwarms(ctx, t, 2) + connectSwarms(t, ctx, swarms) + + s, err := swarms[0].NewStream(ctx, swarms[1].LocalPeer()) + if err != nil { + t.Fatal(err) + } + defer s.Close() + // close swarm before stream. + err = swarms[0].Close() + if err != nil { + t.Fatal(err) + } +} -- GitLab