Commit 58ad863d authored by Brian Tiger Chow's avatar Brian Tiger Chow Committed by Jeromy

fix(notifications) prevent deadlock when context cancelled early

+ test(notifications)

cc @whyrusleeping @jbenet

License: MIT
Signed-off-by: default avatarBrian Tiger Chow <brian@perfmode.com>
parent cba713cd
...@@ -39,15 +39,15 @@ func (ps *impl) Shutdown() { ...@@ -39,15 +39,15 @@ func (ps *impl) Shutdown() {
func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block { func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block {
blocksCh := make(chan *blocks.Block, len(keys)) blocksCh := make(chan *blocks.Block, len(keys))
valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
if len(keys) == 0 { if len(keys) == 0 {
close(blocksCh) close(blocksCh)
return blocksCh return blocksCh
} }
valuesCh := ps.wrapped.SubOnceEach(toStrings(keys)...) ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...)
go func() { go func() {
defer func() { defer close(blocksCh)
close(blocksCh) defer ps.wrapped.Unsub(valuesCh) // with a len(keys) buffer, this is an optimization
}()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
......
...@@ -7,6 +7,8 @@ import ( ...@@ -7,6 +7,8 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
"github.com/jbenet/go-ipfs/util"
) )
func TestDuplicates(t *testing.T) { func TestDuplicates(t *testing.T) {
...@@ -96,6 +98,34 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) { ...@@ -96,6 +98,34 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) {
assertBlockChannelNil(t, blockChannel) assertBlockChannelNil(t, blockChannel)
} }
func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) {
g := blocksutil.NewBlockGenerator()
ctx, cancel := context.WithCancel(context.Background())
n := New()
defer n.Shutdown()
t.Log("generate a large number of blocks. exceed default buffer")
bs := g.Blocks(1000)
ks := func() []util.Key {
var keys []util.Key
for _, b := range bs {
keys = append(keys, b.Key())
}
return keys
}()
_ = n.Subscribe(ctx, ks...) // ignore received channel
t.Log("cancel context before any blocks published")
cancel()
for _, b := range bs {
n.Publish(b)
}
t.Log("publishing the large number of blocks to the ignored channel must not deadlock")
}
func assertBlockChannelNil(t *testing.T, blockChannel <-chan *blocks.Block) { func assertBlockChannelNil(t *testing.T, blockChannel <-chan *blocks.Block) {
_, ok := <-blockChannel _, ok := <-blockChannel
if ok { if ok {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment