notifications.go 1.79 KB
Newer Older
1 2 3
package notifications

import (
4 5
	"context"

6
	blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
7 8

	pubsub "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/pubsub"
9
	cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
10 11
)

12 13
const bufferSize = 16

14
type PubSub interface {
15
	Publish(block blocks.Block)
16
	Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block
17 18 19 20 21 22 23 24 25 26 27
	Shutdown()
}

func New() PubSub {
	return &impl{*pubsub.New(bufferSize)}
}

type impl struct {
	wrapped pubsub.PubSub
}

28
func (ps *impl) Publish(block blocks.Block) {
29
	ps.wrapped.Pub(block, block.Cid().KeyString())
30 31 32 33 34
}

func (ps *impl) Shutdown() {
	ps.wrapped.Shutdown()
}
35 36 37 38

// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
// blocks.
39
func (ps *impl) Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block {
40

41
	blocksCh := make(chan blocks.Block, len(keys))
42
	valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
43 44 45 46
	if len(keys) == 0 {
		close(blocksCh)
		return blocksCh
	}
47
	ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...)
48
	go func() {
49 50
		defer close(blocksCh)
		defer ps.wrapped.Unsub(valuesCh) // with a len(keys) buffer, this is an optimization
51
		for {
52 53 54 55 56 57 58
			select {
			case <-ctx.Done():
				return
			case val, ok := <-valuesCh:
				if !ok {
					return
				}
59
				block, ok := val.(blocks.Block)
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
				if !ok {
					return
				}
				select {
				case <-ctx.Done():
					return
				case blocksCh <- block: // continue
				}
			}
		}
	}()

	return blocksCh
}

75
func toStrings(keys []*cid.Cid) []string {
76 77
	strs := make([]string, 0)
	for _, key := range keys {
78
		strs = append(strs, key.KeyString())
79 80 81
	}
	return strs
}