notifications.go 1.67 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
package notifications

import (
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
	pubsub "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/tuxychandru/pubsub"

	blocks "github.com/jbenet/go-ipfs/blocks"
	u "github.com/jbenet/go-ipfs/util"
)

11 12
const bufferSize = 16

13
type PubSub interface {
Jeromy's avatar
Jeromy committed
14 15
	Publish(block *blocks.Block)
	Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block
16 17 18 19 20 21 22 23 24 25 26
	Shutdown()
}

func New() PubSub {
	return &impl{*pubsub.New(bufferSize)}
}

type impl struct {
	wrapped pubsub.PubSub
}

Jeromy's avatar
Jeromy committed
27
func (ps *impl) Publish(block *blocks.Block) {
28 29 30 31 32 33 34
	topic := string(block.Key())
	ps.wrapped.Pub(block, topic)
}

func (ps *impl) Shutdown() {
	ps.wrapped.Shutdown()
}
35 36 37 38 39

// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
// blocks.
func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block {
40

41 42
	blocksCh := make(chan *blocks.Block, len(keys))
	valuesCh := make(chan interface{}, len(keys))
43
	ps.wrapped.AddSub(valuesCh, toStrings(keys)...)
44 45 46

	go func() {
		defer func() {
47
			ps.wrapped.Unsub(valuesCh, toStrings(keys)...)
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
			close(blocksCh)
		}()
		for _, _ = range keys {
			select {
			case <-ctx.Done():
				return
			case val, ok := <-valuesCh:
				if !ok {
					return
				}
				block, ok := val.(*blocks.Block)
				if !ok {
					return
				}
				select {
				case <-ctx.Done():
					return
				case blocksCh <- block: // continue
				}
			}
		}
	}()

	return blocksCh
}

func toStrings(keys []u.Key) []string {
	strs := make([]string, 0)
	for _, key := range keys {
		strs = append(strs, string(key))
	}
	return strs
}