Commit 8a87b709 authored by Brian Tiger Chow's avatar Brian Tiger Chow Committed by Jeromy

feat(bitswap/notifications) Subscribe to multiple keys

License: MIT
Signed-off-by: default avatarBrian Tiger Chow <brian@perfmode.com>
parent 140a141c
......@@ -12,7 +12,7 @@ const bufferSize = 16
type PubSub interface {
Publish(block blocks.Block)
Subscribe(ctx context.Context, k u.Key) <-chan blocks.Block
Subscribe(ctx context.Context, keys ...u.Key) <-chan blocks.Block
Shutdown()
}
......@@ -31,10 +31,13 @@ func (ps *impl) Publish(block blocks.Block) {
// Subscribe returns a one-time use |blockChannel|. |blockChannel| returns nil
// if the |ctx| times out or is cancelled. Then channel is closed after the
// block given by |k| is sent.
func (ps *impl) Subscribe(ctx context.Context, k u.Key) <-chan blocks.Block {
topic := string(k)
subChan := ps.wrapped.SubOnce(topic)
// blocks given by |keys| are sent.
func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan blocks.Block {
topics := make([]string, 0)
for _, key := range keys {
topics = append(topics, string(key))
}
subChan := ps.wrapped.SubOnce(topics...)
blockChannel := make(chan blocks.Block, 1) // buffered so the sender doesn't wait on receiver
go func() {
defer close(blockChannel)
......@@ -45,7 +48,7 @@ func (ps *impl) Subscribe(ctx context.Context, k u.Key) <-chan blocks.Block {
blockChannel <- block
}
case <-ctx.Done():
ps.wrapped.Unsub(subChan, topic)
ps.wrapped.Unsub(subChan, topics...)
}
}()
return blockChannel
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment