notifications.go 2.65 KB
Newer Older
1 2 3
package notifications

import (
4
	"context"
5
	"sync"
6

7
	pubsub "github.com/cskr/pubsub"
8 9
	blocks "gitlab.dms3.io/dms3/go-block-format"
	cid "gitlab.dms3.io/dms3/go-cid"
10 11
)

12 13
const bufferSize = 16

14 15 16
// PubSub is a simple interface for publishing blocks and being able to subscribe
// for cids. It's used internally by bitswap to decouple receiving blocks
// and actually providing them back to the GetBlocks caller.
17
type PubSub interface {
18
	Publish(block blocks.Block)
19
	Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block
20 21 22
	Shutdown()
}

23
// New generates a new PubSub interface.
24
func New() PubSub {
25 26
	return &impl{
		wrapped: *pubsub.New(bufferSize),
Steven Allen's avatar
Steven Allen committed
27
		closed:  make(chan struct{}),
28
	}
29 30 31
}

type impl struct {
Steven Allen's avatar
Steven Allen committed
32
	lk      sync.RWMutex
33
	wrapped pubsub.PubSub
34

Steven Allen's avatar
Steven Allen committed
35
	closed chan struct{}
36 37
}

38
func (ps *impl) Publish(block blocks.Block) {
Steven Allen's avatar
Steven Allen committed
39 40
	ps.lk.RLock()
	defer ps.lk.RUnlock()
Steven Allen's avatar
Steven Allen committed
41 42
	select {
	case <-ps.closed:
43
		return
Steven Allen's avatar
Steven Allen committed
44
	default:
45 46
	}

47
	ps.wrapped.Pub(block, block.Cid().KeyString())
48 49 50
}

func (ps *impl) Shutdown() {
Steven Allen's avatar
Steven Allen committed
51 52
	ps.lk.Lock()
	defer ps.lk.Unlock()
Steven Allen's avatar
Steven Allen committed
53 54
	select {
	case <-ps.closed:
Steven Allen's avatar
Steven Allen committed
55
		return
Steven Allen's avatar
Steven Allen committed
56
	default:
Steven Allen's avatar
Steven Allen committed
57
	}
Steven Allen's avatar
Steven Allen committed
58
	close(ps.closed)
59 60
	ps.wrapped.Shutdown()
}
61 62

// Subscribe returns a channel of blocks for the given |keys|. |blockChannel|
63 64
// is closed if the |ctx| times out or is cancelled, or after receiving the blocks
// corresponding to |keys|.
65
func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block {
66

67
	blocksCh := make(chan blocks.Block, len(keys))
68
	valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
69 70 71 72
	if len(keys) == 0 {
		close(blocksCh)
		return blocksCh
	}
73 74

	// prevent shutdown
Steven Allen's avatar
Steven Allen committed
75 76
	ps.lk.RLock()
	defer ps.lk.RUnlock()
77

Steven Allen's avatar
Steven Allen committed
78 79
	select {
	case <-ps.closed:
80 81
		close(blocksCh)
		return blocksCh
Steven Allen's avatar
Steven Allen committed
82
	default:
83 84
	}

85 86
	// AddSubOnceEach listens for each key in the list, and closes the channel
	// once all keys have been received
87
	ps.wrapped.AddSubOnceEach(valuesCh, toStrings(keys)...)
88
	go func() {
89 90 91
		defer func() {
			close(blocksCh)

Steven Allen's avatar
Steven Allen committed
92 93
			ps.lk.RLock()
			defer ps.lk.RUnlock()
Steven Allen's avatar
Steven Allen committed
94 95 96 97
			// Don't touch the pubsub instance if we're
			// already closed.
			select {
			case <-ps.closed:
Steven Allen's avatar
Steven Allen committed
98
				return
Steven Allen's avatar
Steven Allen committed
99
			default:
Steven Allen's avatar
Steven Allen committed
100 101 102
			}

			ps.wrapped.Unsub(valuesCh)
103 104
		}()

105
		for {
106 107 108
			select {
			case <-ctx.Done():
				return
Steven Allen's avatar
Steven Allen committed
109
			case <-ps.closed:
110 111 112 113
			case val, ok := <-valuesCh:
				if !ok {
					return
				}
114
				block, ok := val.(blocks.Block)
115 116 117 118 119 120 121
				if !ok {
					return
				}
				select {
				case <-ctx.Done():
					return
				case blocksCh <- block: // continue
Steven Allen's avatar
Steven Allen committed
122
				case <-ps.closed:
123 124 125 126 127 128 129 130
				}
			}
		}
	}()

	return blocksCh
}

131
func toStrings(keys []cid.Cid) []string {
132
	strs := make([]string, 0, len(keys))
133
	for _, key := range keys {
134
		strs = append(strs, key.KeyString())
135 136 137
	}
	return strs
}