notifications.go 1.43 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
package notifications

import (
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
	pubsub "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/tuxychandru/pubsub"

	blocks "github.com/jbenet/go-ipfs/blocks"
	u "github.com/jbenet/go-ipfs/util"
)

11 12
const bufferSize = 16

13
type PubSub interface {
Jeromy's avatar
Jeromy committed
14 15
	Publish(block *blocks.Block)
	Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block
16 17 18 19 20 21 22 23 24 25 26
	Shutdown()
}

func New() PubSub {
	return &impl{*pubsub.New(bufferSize)}
}

type impl struct {
	wrapped pubsub.PubSub
}

Jeromy's avatar
Jeromy committed
27
func (ps *impl) Publish(block *blocks.Block) {
28 29 30 31
	topic := string(block.Key())
	ps.wrapped.Pub(block, topic)
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
32 33 34
// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
// blocks.
Jeromy's avatar
Jeromy committed
35
func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block {
36 37 38 39 40
	topics := make([]string, 0)
	for _, key := range keys {
		topics = append(topics, string(key))
	}
	subChan := ps.wrapped.SubOnce(topics...)
Jeromy's avatar
Jeromy committed
41
	blockChannel := make(chan *blocks.Block, 1) // buffered so the sender doesn't wait on receiver
42 43 44 45
	go func() {
		defer close(blockChannel)
		select {
		case val := <-subChan:
Jeromy's avatar
Jeromy committed
46
			block, ok := val.(*blocks.Block)
47 48 49 50
			if ok {
				blockChannel <- block
			}
		case <-ctx.Done():
51
			ps.wrapped.Unsub(subChan, topics...)
52 53 54 55 56 57 58 59
		}
	}()
	return blockChannel
}

func (ps *impl) Shutdown() {
	ps.wrapped.Shutdown()
}