From e25f98c40268d11ec74a059b06099affbd0d4875 Mon Sep 17 00:00:00 2001 From: gammazero Date: Mon, 14 Jun 2021 12:11:15 -0700 Subject: [PATCH] Fix close of closed channel This happens when there are multiple subscriptions to a topic and a single subscription is canceled twice. --- go.sum | 2 ++ subscription.go | 6 +++++- topic_test.go | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/go.sum b/go.sum index 8d5da87..f566102 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,7 @@ github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46f github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE= github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= @@ -600,6 +601,7 @@ gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMy gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/src-d/go-cli.v0 v0.0.0-20181105080154-d492247bbc0d/go.mod h1:z+K8VcOYVYcSwSjGebuDL6176A1XskgbtNl64NSg+n8= gopkg.in/src-d/go-log.v1 v1.0.1/go.mod h1:GN34hKP0g305ysm2/hctJ0Y8nWP3zxXXJ8GFabTyABE= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/subscription.go b/subscription.go index 3d773e8..cf13622 100644 --- a/subscription.go +++ b/subscription.go @@ -10,6 +10,7 @@ type Subscription struct { topic string ch chan *Message cancelCh chan<- *Subscription + closed bool ctx context.Context err error } @@ -43,5 +44,8 @@ func (sub *Subscription) Cancel() { } func (sub *Subscription) close() { - close(sub.ch) + if !sub.closed { + close(sub.ch) + sub.closed = true + } } diff --git a/topic_test.go b/topic_test.go index 407998c..aa4990b 100644 --- a/topic_test.go +++ b/topic_test.go @@ -679,6 +679,40 @@ func TestTopicRelayOnClosedTopic(t *testing.T) { } } +func TestProducePanic(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const numHosts = 5 + topicID := "foobar" + hosts := getNetHosts(t, ctx, numHosts) + ps := getPubsub(ctx, hosts[0]) + + // Create topic + topic, err := ps.Join(topicID) + if err != nil { + t.Fatal(err) + } + + // Create subscription we're going to cancel + s, err := topic.Subscribe() + if err != nil { + t.Fatal(err) + } + // Create second subscription to keep us alive on the subscription map + // after the first one is canceled + s2, err := topic.Subscribe() + if err != nil { + t.Fatal(err) + } + _ = s2 + + s.Cancel() + time.Sleep(time.Second) + s.Cancel() + time.Sleep(time.Second) +} + func notifSubThenUnSub(ctx context.Context, t *testing.T, topics []*Topic) { primaryTopic := topics[0] msgs := make([]*Subscription, len(topics)) -- GitLab