notifications.go 2.62 KB
Newer Older
1 2 3
package notifications

import (
4
	"context"
5
	"sync"
6

Steven Allen's avatar
Steven Allen committed
7
	cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
8 9
	pubsub "gx/ipfs/QmdbxjQWogRCHRaxhhGnYdT1oQJzL9GdqSKzCdqWr85AP2/pubsub"
	blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
10 11
)

12 13
const bufferSize = 16

14
type PubSub interface {
15
	Publish(block blocks.Block)
16
	Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block
17 18 19 20
	Shutdown()
}

func New() PubSub {
21 22 23 24
	return &impl{
		wrapped: *pubsub.New(bufferSize),
		cancel:  make(chan struct{}),
	}
25 26 27 28
}

type impl struct {
	wrapped pubsub.PubSub
29 30 31 32 33 34

	// These two fields make up a shutdown "lock".
	// We need them as calling, e.g., `Unsubscribe` after calling `Shutdown`
	// blocks forever and fixing this in pubsub would be rather invasive.
	cancel chan struct{}
	wg     sync.WaitGroup
35 36
}

37
func (ps *impl) Publish(block blocks.Block) {
38 39 40 41 42 43 44 45 46 47
	ps.wg.Add(1)
	defer ps.wg.Done()

	select {
	case <-ps.cancel:
		// Already shutdown, bail.
		return
	default:
	}

48
	ps.wrapped.Pub(block, block.Cid().KeyString())
49 50
}

51
// Not safe to call more than once.
52
func (ps *impl) Shutdown() {
53 54 55 56 57
	// Interrupt in-progress subscriptions.
	close(ps.cancel)
	// Wait for them to finish.
	ps.wg.Wait()
	// shutdown the pubsub.
58 59
	ps.wrapped.Shutdown()
}
60 61 62 63

// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
// blocks.
64
func (ps *impl) Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block {
65

66
	blocksCh := make(chan blocks.Block, len(keys))
67
	valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
68 69 70 71
	if len(keys) == 0 {
		close(blocksCh)
		return blocksCh
	}
72 73 74 75 76 77 78 79 80 81 82 83 84 85

	// prevent shutdown
	ps.wg.Add(1)

	// check if shutdown *after* preventing shutdowns.
	select {
	case <-ps.cancel:
		// abort, allow shutdown to continue.
		ps.wg.Done()
		close(blocksCh)
		return blocksCh
	default:
	}

86
	ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...)
87
	go func() {
88 89 90 91 92 93 94 95
		defer func() {
			ps.wrapped.Unsub(valuesCh)
			close(blocksCh)

			// Unblock shutdown.
			ps.wg.Done()
		}()

96
		for {
97
			select {
98 99
			case <-ps.cancel:
				return
100 101 102 103 104 105
			case <-ctx.Done():
				return
			case val, ok := <-valuesCh:
				if !ok {
					return
				}
106
				block, ok := val.(blocks.Block)
107 108 109 110
				if !ok {
					return
				}
				select {
111 112
				case <-ps.cancel:
					return
113 114 115 116 117 118 119 120 121 122 123
				case <-ctx.Done():
					return
				case blocksCh <- block: // continue
				}
			}
		}
	}()

	return blocksCh
}

124
func toStrings(keys []*cid.Cid) []string {
125
	strs := make([]string, 0, len(keys))
126
	for _, key := range keys {
127
		strs = append(strs, key.KeyString())
128 129 130
	}
	return strs
}