notifications.go 2.64 KB
Newer Older
1 2 3
package notifications

import (
4
	"context"
5
	"sync"
6

Steven Allen's avatar
Steven Allen committed
7
	blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
8 9

	pubsub "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/pubsub"
Steven Allen's avatar
Steven Allen committed
10
	cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
11 12
)

13 14
const bufferSize = 16

15
type PubSub interface {
16
	Publish(block blocks.Block)
17
	Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block
18 19 20 21
	Shutdown()
}

func New() PubSub {
22 23 24 25
	return &impl{
		wrapped: *pubsub.New(bufferSize),
		cancel:  make(chan struct{}),
	}
26 27 28 29
}

type impl struct {
	wrapped pubsub.PubSub
30 31 32 33 34 35

	// These two fields make up a shutdown "lock".
	// We need them as calling, e.g., `Unsubscribe` after calling `Shutdown`
	// blocks forever and fixing this in pubsub would be rather invasive.
	cancel chan struct{}
	wg     sync.WaitGroup
36 37
}

38
func (ps *impl) Publish(block blocks.Block) {
39 40 41 42 43 44 45 46 47 48
	ps.wg.Add(1)
	defer ps.wg.Done()

	select {
	case <-ps.cancel:
		// Already shutdown, bail.
		return
	default:
	}

49
	ps.wrapped.Pub(block, block.Cid().KeyString())
50 51
}

52
// Not safe to call more than once.
53
func (ps *impl) Shutdown() {
54 55 56 57 58
	// Interrupt in-progress subscriptions.
	close(ps.cancel)
	// Wait for them to finish.
	ps.wg.Wait()
	// shutdown the pubsub.
59 60
	ps.wrapped.Shutdown()
}
61 62 63 64

// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
// blocks.
65
func (ps *impl) Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block {
66

67
	blocksCh := make(chan blocks.Block, len(keys))
68
	valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
69 70 71 72
	if len(keys) == 0 {
		close(blocksCh)
		return blocksCh
	}
73 74 75 76 77 78 79 80 81 82 83 84 85 86

	// prevent shutdown
	ps.wg.Add(1)

	// check if shutdown *after* preventing shutdowns.
	select {
	case <-ps.cancel:
		// abort, allow shutdown to continue.
		ps.wg.Done()
		close(blocksCh)
		return blocksCh
	default:
	}

87
	ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...)
88
	go func() {
89 90 91 92 93 94 95 96
		defer func() {
			ps.wrapped.Unsub(valuesCh)
			close(blocksCh)

			// Unblock shutdown.
			ps.wg.Done()
		}()

97
		for {
98
			select {
99 100
			case <-ps.cancel:
				return
101 102 103 104 105 106
			case <-ctx.Done():
				return
			case val, ok := <-valuesCh:
				if !ok {
					return
				}
107
				block, ok := val.(blocks.Block)
108 109 110 111
				if !ok {
					return
				}
				select {
112 113
				case <-ps.cancel:
					return
114 115 116 117 118 119 120 121 122 123 124
				case <-ctx.Done():
					return
				case blocksCh <- block: // continue
				}
			}
		}
	}()

	return blocksCh
}

125
func toStrings(keys []*cid.Cid) []string {
126
	strs := make([]string, 0, len(keys))
127
	for _, key := range keys {
128
		strs = append(strs, key.KeyString())
129 130 131
	}
	return strs
}