notifications.go 1.76 KB
Newer Older
1 2 3
package notifications

import (
4
	context "context"
5 6
	pubsub "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/pubsub"
	blocks "github.com/ipfs/go-ipfs/blocks"
7
	key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key"
8 9
)

10 11
const bufferSize = 16

12
type PubSub interface {
13 14
	Publish(block blocks.Block)
	Subscribe(ctx context.Context, keys ...key.Key) <-chan blocks.Block
15 16 17 18 19 20 21 22 23 24 25
	Shutdown()
}

func New() PubSub {
	return &impl{*pubsub.New(bufferSize)}
}

type impl struct {
	wrapped pubsub.PubSub
}

26
func (ps *impl) Publish(block blocks.Block) {
27 28 29 30 31 32 33
	topic := string(block.Key())
	ps.wrapped.Pub(block, topic)
}

func (ps *impl) Shutdown() {
	ps.wrapped.Shutdown()
}
34 35 36 37

// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
// blocks.
38
func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan blocks.Block {
39

40
	blocksCh := make(chan blocks.Block, len(keys))
41
	valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
42 43 44 45
	if len(keys) == 0 {
		close(blocksCh)
		return blocksCh
	}
46
	ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...)
47
	go func() {
48 49
		defer close(blocksCh)
		defer ps.wrapped.Unsub(valuesCh) // with a len(keys) buffer, this is an optimization
50
		for {
51 52 53 54 55 56 57
			select {
			case <-ctx.Done():
				return
			case val, ok := <-valuesCh:
				if !ok {
					return
				}
58
				block, ok := val.(blocks.Block)
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
				if !ok {
					return
				}
				select {
				case <-ctx.Done():
					return
				case blocksCh <- block: // continue
				}
			}
		}
	}()

	return blocksCh
}

74
func toStrings(keys []key.Key) []string {
75 76 77 78 79 80
	strs := make([]string, 0)
	for _, key := range keys {
		strs = append(strs, string(key))
	}
	return strs
}