Commit e25f98c4 authored by gammazero's avatar gammazero Committed by vyzo

Fix close of closed channel

This happens when there are multiple subscriptions to a topic and a single subscription is canceled twice.
parent e6ad80cf
......@@ -34,6 +34,7 @@ github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46f
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE=
github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
......@@ -600,6 +601,7 @@ gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMy
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/src-d/go-cli.v0 v0.0.0-20181105080154-d492247bbc0d/go.mod h1:z+K8VcOYVYcSwSjGebuDL6176A1XskgbtNl64NSg+n8=
gopkg.in/src-d/go-log.v1 v1.0.1/go.mod h1:GN34hKP0g305ysm2/hctJ0Y8nWP3zxXXJ8GFabTyABE=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
......
......@@ -10,6 +10,7 @@ type Subscription struct {
topic string
ch chan *Message
cancelCh chan<- *Subscription
closed bool
ctx context.Context
err error
}
......@@ -43,5 +44,8 @@ func (sub *Subscription) Cancel() {
}
func (sub *Subscription) close() {
close(sub.ch)
if !sub.closed {
close(sub.ch)
sub.closed = true
}
}
......@@ -679,6 +679,40 @@ func TestTopicRelayOnClosedTopic(t *testing.T) {
}
}
func TestProducePanic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
const numHosts = 5
topicID := "foobar"
hosts := getNetHosts(t, ctx, numHosts)
ps := getPubsub(ctx, hosts[0])
// Create topic
topic, err := ps.Join(topicID)
if err != nil {
t.Fatal(err)
}
// Create subscription we're going to cancel
s, err := topic.Subscribe()
if err != nil {
t.Fatal(err)
}
// Create second subscription to keep us alive on the subscription map
// after the first one is canceled
s2, err := topic.Subscribe()
if err != nil {
t.Fatal(err)
}
_ = s2
s.Cancel()
time.Sleep(time.Second)
s.Cancel()
time.Sleep(time.Second)
}
func notifSubThenUnSub(ctx context.Context, t *testing.T, topics []*Topic) {
primaryTopic := topics[0]
msgs := make([]*Subscription, len(topics))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment