notifications.go 2.24 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
package notifications

import (
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
	pubsub "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/tuxychandru/pubsub"

	blocks "github.com/jbenet/go-ipfs/blocks"
	u "github.com/jbenet/go-ipfs/util"
)

11 12
const bufferSize = 16

13
type PubSub interface {
Jeromy's avatar
Jeromy committed
14 15
	Publish(block *blocks.Block)
	Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block
16 17 18 19 20 21 22 23 24 25 26
	Shutdown()
}

func New() PubSub {
	return &impl{*pubsub.New(bufferSize)}
}

type impl struct {
	wrapped pubsub.PubSub
}

Jeromy's avatar
Jeromy committed
27
func (ps *impl) Publish(block *blocks.Block) {
28 29 30 31 32 33 34
	topic := string(block.Key())
	ps.wrapped.Pub(block, topic)
}

func (ps *impl) Shutdown() {
	ps.wrapped.Shutdown()
}
35 36 37 38 39

// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
// blocks.
func (ps *impl) Subscribe(ctx context.Context, keys ...u.Key) <-chan *blocks.Block {
40

41 42
	blocksCh := make(chan *blocks.Block, len(keys))
	valuesCh := make(chan interface{}, len(keys))
43
	ps.wrapped.AddSub(valuesCh, toStrings(keys)...)
44 45 46

	go func() {
		defer func() {
47
			ps.wrapped.Unsub(valuesCh, toStrings(keys)...)
48 49
			close(blocksCh)
		}()
50 51 52 53 54 55
		seen := make(map[u.Key]struct{})
		i := 0 // req'd because it only counts unique block sends
		for {
			if i >= len(keys) {
				return
			}
56 57 58 59 60 61 62 63 64 65 66
			select {
			case <-ctx.Done():
				return
			case val, ok := <-valuesCh:
				if !ok {
					return
				}
				block, ok := val.(*blocks.Block)
				if !ok {
					return
				}
67 68 69
				if _, ok := seen[block.Key()]; ok {
					continue
				}
70 71 72 73
				select {
				case <-ctx.Done():
					return
				case blocksCh <- block: // continue
74 75 76 77 78 79 80 81 82
					// Unsub alone is insufficient for keeping out duplicates.
					// It's a race to unsubscribe before pubsub handles the
					// next Publish call. Therefore, must also check for
					// duplicates manually. Unsub is a performance
					// consideration to avoid lots of unnecessary channel
					// chatter.
					ps.wrapped.Unsub(valuesCh, string(block.Key()))
					i++
					seen[block.Key()] = struct{}{}
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
				}
			}
		}
	}()

	return blocksCh
}

func toStrings(keys []u.Key) []string {
	strs := make([]string, 0)
	for _, key := range keys {
		strs = append(strs, string(key))
	}
	return strs
}