notifications.go 1.8 KB
Newer Older
1 2 3
package notifications

import (
4 5
	pubsub "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/pubsub"
	blocks "github.com/ipfs/go-ipfs/blocks"
6
	key "github.com/ipfs/go-ipfs/blocks/key"
Jeromy's avatar
Jeromy committed
7
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
8 9
)

10 11
const bufferSize = 16

12
type PubSub interface {
Jeromy's avatar
Jeromy committed
13
	Publish(block *blocks.Block)
14
	Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.Block
15 16 17 18 19 20 21 22 23 24 25
	Shutdown()
}

func New() PubSub {
	return &impl{*pubsub.New(bufferSize)}
}

type impl struct {
	wrapped pubsub.PubSub
}

Jeromy's avatar
Jeromy committed
26
func (ps *impl) Publish(block *blocks.Block) {
27 28 29 30 31 32 33
	topic := string(block.Key())
	ps.wrapped.Pub(block, topic)
}

func (ps *impl) Shutdown() {
	ps.wrapped.Shutdown()
}
34 35 36 37

// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
// blocks.
38
func (ps *impl) Subscribe(ctx context.Context, keys ...key.Key) <-chan *blocks.Block {
39

40
	blocksCh := make(chan *blocks.Block, len(keys))
41
	valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
42 43 44 45
	if len(keys) == 0 {
		close(blocksCh)
		return blocksCh
	}
46
	ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...)
47
	go func() {
48 49
		defer close(blocksCh)
		defer ps.wrapped.Unsub(valuesCh) // with a len(keys) buffer, this is an optimization
50
		for {
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
			select {
			case <-ctx.Done():
				return
			case val, ok := <-valuesCh:
				if !ok {
					return
				}
				block, ok := val.(*blocks.Block)
				if !ok {
					return
				}
				select {
				case <-ctx.Done():
					return
				case blocksCh <- block: // continue
				}
			}
		}
	}()

	return blocksCh
}

74
func toStrings(keys []key.Key) []string {
75 76 77 78 79 80
	strs := make([]string, 0)
	for _, key := range keys {
		strs = append(strs, string(key))
	}
	return strs
}