Commit bb0b5f7c authored by Brian Tiger Chow's avatar Brian Tiger Chow Committed by Jeromy

fix(notifications) prevent deadlock when context cancelled early

+ test(notifications)

cc @whyrusleeping @jbenet

License: MIT
Signed-off-by: default avatarBrian Tiger Chow <brian@perfmode.com>
parent 7a3819a5
......@@ -39,15 +39,15 @@ func (ps *impl) Shutdown() {
func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block {
blocksCh := make(chan *blocks.Block, len(keys))
valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
if len(keys) == 0 {
close(blocksCh)
return blocksCh
}
valuesCh := ps.wrapped.SubOnceEach(toStrings(keys)...)
ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...)
go func() {
defer func() {
close(blocksCh)
}()
defer close(blocksCh)
defer ps.wrapped.Unsub(valuesCh) // with a len(keys) buffer, this is an optimization
for {
select {
case <-ctx.Done():
......
......@@ -7,6 +7,8 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
blocks "github.com/jbenet/go-ipfs/blocks"
blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
"github.com/jbenet/go-ipfs/util"
)
func TestDuplicates(t *testing.T) {
......@@ -96,6 +98,34 @@ func TestCarryOnWhenDeadlineExpires(t *testing.T) {
assertBlockChannelNil(t, blockChannel)
}
func TestDoesNotDeadLockIfContextCancelledBeforePublish(t *testing.T) {
g := blocksutil.NewBlockGenerator()
ctx, cancel := context.WithCancel(context.Background())
n := New()
defer n.Shutdown()
t.Log("generate a large number of blocks. exceed default buffer")
bs := g.Blocks(1000)
ks := func() []util.Key {
var keys []util.Key
for _, b := range bs {
keys = append(keys, b.Key())
}
return keys
}()
_ = n.Subscribe(ctx, ks...) // ignore received channel
t.Log("cancel context before any blocks published")
cancel()
for _, b := range bs {
n.Publish(b)
}
t.Log("publishing the large number of blocks to the ignored channel must not deadlock")
}
func assertBlockChannelNil(t *testing.T, blockChannel <-chan *blocks.Block) {
_, ok := <-blockChannel
if ok {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment