notifications.go 2.53 KB
Newer Older
1 2 3
package notifications

import (
4
	"context"
5
	"sync"
6

Steven Allen's avatar
Steven Allen committed
7
	blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
8 9

	pubsub "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/pubsub"
Steven Allen's avatar
Steven Allen committed
10
	cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
11 12
)

13 14
const bufferSize = 16

15
type PubSub interface {
16
	Publish(block blocks.Block)
17
	Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block
18 19 20 21
	Shutdown()
}

func New() PubSub {
22 23 24 25
	return &impl{
		wrapped: *pubsub.New(bufferSize),
		cancel:  make(chan struct{}),
	}
26 27 28 29
}

type impl struct {
	wrapped pubsub.PubSub
30 31 32 33 34 35

	// These two fields make up a shutdown "lock".
	// We need them as calling, e.g., `Unsubscribe` after calling `Shutdown`
	// blocks forever and fixing this in pubsub would be rather invasive.
	cancel chan struct{}
	wg     sync.WaitGroup
36 37
}

38
func (ps *impl) Publish(block blocks.Block) {
39
	ps.wrapped.Pub(block, block.Cid().KeyString())
40 41
}

42
// Not safe to call more than once.
43
func (ps *impl) Shutdown() {
44 45 46 47 48
	// Interrupt in-progress subscriptions.
	close(ps.cancel)
	// Wait for them to finish.
	ps.wg.Wait()
	// shutdown the pubsub.
49 50
	ps.wrapped.Shutdown()
}
51 52 53 54

// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
// is closed if the |ctx| times out or is cancelled, or after sending len(keys)
// blocks.
55
func (ps *impl) Subscribe(ctx context.Context, keys ...*cid.Cid) <-chan blocks.Block {
56

57
	blocksCh := make(chan blocks.Block, len(keys))
58
	valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
59 60 61 62
	if len(keys) == 0 {
		close(blocksCh)
		return blocksCh
	}
63 64 65 66 67 68 69 70 71 72 73 74 75 76

	// prevent shutdown
	ps.wg.Add(1)

	// check if shutdown *after* preventing shutdowns.
	select {
	case <-ps.cancel:
		// abort, allow shutdown to continue.
		ps.wg.Done()
		close(blocksCh)
		return blocksCh
	default:
	}

77
	ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...)
78
	go func() {
79 80 81 82 83 84 85 86
		defer func() {
			ps.wrapped.Unsub(valuesCh)
			close(blocksCh)

			// Unblock shutdown.
			ps.wg.Done()
		}()

87
		for {
88
			select {
89 90
			case <-ps.cancel:
				return
91 92 93 94 95 96
			case <-ctx.Done():
				return
			case val, ok := <-valuesCh:
				if !ok {
					return
				}
97
				block, ok := val.(blocks.Block)
98 99 100 101
				if !ok {
					return
				}
				select {
102 103
				case <-ps.cancel:
					return
104 105 106 107 108 109 110 111 112 113 114
				case <-ctx.Done():
					return
				case blocksCh <- block: // continue
				}
			}
		}
	}()

	return blocksCh
}

115
func toStrings(keys []*cid.Cid) []string {
116
	strs := make([]string, 0, len(keys))
117
	for _, key := range keys {
118
		strs = append(strs, key.KeyString())
119 120 121
	}
	return strs
}