notifications.go 1.45 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
package notifications

import (
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
	pubsub "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/tuxychandru/pubsub"

	blocks "github.com/jbenet/go-ipfs/blocks"
	u "github.com/jbenet/go-ipfs/util"
)

11 12
const bufferSize = 16

13 14
type PubSub interface {
	Publish(block blocks.Block)
15
	Subscribe(ctx context.Context, keys ...u.Key) <-chan blocks.Block
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
	Shutdown()
}

func New() PubSub {
	return &impl{*pubsub.New(bufferSize)}
}

type impl struct {
	wrapped pubsub.PubSub
}

func (ps *impl) Publish(block blocks.Block) {
	topic := string(block.Key())
	ps.wrapped.Pub(block, topic)
}

// Subscribe returns a one-time use |blockChannel|. |blockChannel| returns nil
// if the |ctx| times out or is cancelled. Then channel is closed after the
34 35 36 37 38 39 40
// blocks given by |keys| are sent.
func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan blocks.Block {
	topics := make([]string, 0)
	for _, key := range keys {
		topics = append(topics, string(key))
	}
	subChan := ps.wrapped.SubOnce(topics...)
41
	blockChannel := make(chan blocks.Block, 1) // buffered so the sender doesn't wait on receiver
42 43 44 45 46 47 48 49 50
	go func() {
		defer close(blockChannel)
		select {
		case val := <-subChan:
			block, ok := val.(blocks.Block)
			if ok {
				blockChannel <- block
			}
		case <-ctx.Done():
51
			ps.wrapped.Unsub(subChan, topics...)
52 53 54 55 56 57 58 59
		}
	}()
	return blockChannel
}

func (ps *impl) Shutdown() {
	ps.wrapped.Shutdown()
}