Commit 934b813b authored by Adin Schmahmann's avatar Adin Schmahmann

stronger notification deliveries backed by unbounded buffer

parent e4a65bcf
......@@ -1210,3 +1210,166 @@ func TestSubscriptionLeaveNotification(t *testing.T) {
t.Fatal(fmt.Errorf("blacklisting peer did not cause a leave event"))
}
}
func TestSubscriptionNotificationOverflow(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
const topic = "foobar"
const numHosts = 35
hosts := getNetHosts(t, ctx, numHosts)
psubs := getPubsubs(ctx, hosts)
msgs := make([]*Subscription, numHosts)
subPeersFound := make([]map[peer.ID]struct{}, numHosts)
// Subscribe all peers except one and wait until they've all been found
for i := 1; i < numHosts; i++ {
subch, err := psubs[i].Subscribe(topic)
if err != nil {
t.Fatal(err)
}
msgs[i] = subch
}
connectAll(t, hosts)
time.Sleep(time.Millisecond * 100)
wg := sync.WaitGroup{}
for i := 1; i < numHosts; i++ {
peersFound := make(map[peer.ID]struct{})
subPeersFound[i] = peersFound
sub := msgs[i]
wg.Add(1)
go func(peersFound map[peer.ID]struct{}) {
defer wg.Done()
for len(peersFound) < numHosts-2 {
event, err := sub.NextPeerEvent(ctx)
if err != nil {
t.Fatal(err)
}
if event.Type == PEER_JOIN {
peersFound[event.Peer] = struct{}{}
}
}
}(peersFound)
}
wg.Wait()
for _, peersFound := range subPeersFound[1:] {
if len(peersFound) != numHosts-2 {
t.Fatalf("found %d peers, expected %d", len(peersFound), numHosts-2)
}
}
// Wait for remaining peer to find other peers
for len(psubs[0].ListPeers(topic)) < numHosts-1 {
time.Sleep(time.Millisecond * 100)
}
// Subscribe the remaining peer and check that all the events came through
sub, err := psubs[0].Subscribe(topic)
if err != nil {
t.Fatal(err)
}
msgs[0] = sub
peerState := readAllQueuedEvents(ctx, t, sub)
if len(peerState) != numHosts-1 {
t.Fatal("incorrect number of peers found")
}
for _, e := range peerState {
if e != PEER_JOIN {
t.Fatal("non JOIN event occurred")
}
}
// Unsubscribe all peers except one and check that all the events came through
for i := 1; i < numHosts; i++ {
msgs[i].Cancel()
}
// Wait for remaining peer to find other peers
for len(psubs[0].ListPeers(topic)) != 0 {
time.Sleep(time.Millisecond * 100)
}
peerState = readAllQueuedEvents(ctx, t, sub)
if len(peerState) != numHosts-1 {
t.Fatal("incorrect number of peers found")
}
for _, e := range peerState {
if e != PEER_LEAVE {
t.Fatal("non LEAVE event occurred")
}
}
// Resubscribe and Unsubscribe a peers and check the state for consistency
notifSubThenUnSub(ctx, t, topic, psubs, msgs, 10)
notifSubThenUnSub(ctx, t, topic, psubs, msgs, numHosts-1)
}
func notifSubThenUnSub(ctx context.Context, t *testing.T, topic string,
psubs []*PubSub, msgs []*Subscription, checkSize int) {
ps := psubs[0]
sub := msgs[0]
var err error
for i := 1; i < checkSize+1; i++ {
msgs[i], err = psubs[i].Subscribe(topic)
if err != nil {
t.Fatal(err)
}
}
for len(ps.ListPeers(topic)) < checkSize {
time.Sleep(time.Millisecond * 100)
}
for i := 1; i < checkSize+1; i++ {
msgs[i].Cancel()
}
// Wait for subscriptions to register
for len(ps.ListPeers(topic)) < 0 {
time.Sleep(time.Millisecond * 100)
}
peerState := readAllQueuedEvents(ctx, t, sub)
if len(peerState) != 0 {
t.Fatal("Received incorrect events")
}
}
func readAllQueuedEvents(ctx context.Context, t *testing.T, sub *Subscription) map[peer.ID]EventType {
peerState := make(map[peer.ID]EventType)
for {
ctx, _ := context.WithTimeout(ctx, time.Millisecond*100)
event, err := sub.NextPeerEvent(ctx)
if err == context.DeadlineExceeded {
break
} else if err != nil {
t.Fatal(err)
}
e, ok := peerState[event.Peer]
if !ok {
peerState[event.Peer] = event.Type
} else if e != event.Type {
delete(peerState, event.Peer)
}
}
return peerState
}
......@@ -454,19 +454,11 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) {
}
tmap := p.topics[sub.topic]
inboundBufSize := len(tmap)
if inboundBufSize < 32 {
inboundBufSize = 32
}
sub.ch = make(chan *Message, 32)
sub.joinCh = make(chan peer.ID, inboundBufSize)
sub.leaveCh = make(chan peer.ID, 32)
sub.cancelCh = p.cancelCh
for pid := range tmap {
sub.joinCh <- pid
for p := range tmap {
sub.evtBacklog[p] = PEER_JOIN
}
sub.cancelCh = p.cancelCh
p.myTopics[sub.topic][sub] = struct{}{}
......@@ -581,11 +573,7 @@ func (p *PubSub) subscribedToMsg(msg *pb.Message) bool {
func (p *PubSub) notifyLeave(topic string, pid peer.ID) {
if subs, ok := p.myTopics[topic]; ok {
for s := range subs {
select {
case s.leaveCh <- pid:
default:
log.Infof("Can't deliver leave event to subscription for topic %s; subscriber too slow", topic)
}
s.sendNotification(PeerEvent{PEER_LEAVE, pid})
}
}
}
......@@ -605,11 +593,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
if subs, ok := p.myTopics[t]; ok {
peer := rpc.from
for s := range subs {
select {
case s.joinCh <- peer:
default:
log.Infof("Can't deliver join event to subscription for topic %s; subscriber too slow", t)
}
s.sendNotification(PeerEvent{PEER_JOIN, peer})
}
}
}
......@@ -712,6 +696,11 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubO
sub := &Subscription{
topic: td.GetName(),
ch: make(chan *Message, 32),
peerEvtCh: make(chan PeerEvent, 32),
evtBacklog: make(map[peer.ID]EventType),
backlogCh: make(chan PeerEvent, 1),
}
for _, opt := range opts {
......
......@@ -3,6 +3,7 @@ package pubsub
import (
"context"
"github.com/libp2p/go-libp2p-core/peer"
"sync"
)
type EventType int
......@@ -16,9 +17,12 @@ type Subscription struct {
topic string
ch chan *Message
cancelCh chan<- *Subscription
joinCh chan peer.ID
leaveCh chan peer.ID
err error
peerEvtCh chan PeerEvent
eventMx sync.Mutex
evtBacklog map[peer.ID]EventType
backlogCh chan PeerEvent
}
type PeerEvent struct {
......@@ -50,29 +54,77 @@ func (sub *Subscription) Cancel() {
func (sub *Subscription) close() {
close(sub.ch)
close(sub.joinCh)
close(sub.leaveCh)
}
// NextPeerEvent returns the next event regarding subscribed peers
// Note: There is no guarantee that the Peer Join event will fire before
// the related Peer Leave event for a given peer
func (sub *Subscription) NextPeerEvent(ctx context.Context) (PeerEvent, error) {
func (sub *Subscription) sendNotification(evt PeerEvent) {
sub.eventMx.Lock()
defer sub.eventMx.Unlock()
e, ok := sub.evtBacklog[evt.Peer]
if ok && e != evt.Type {
delete(sub.evtBacklog, evt.Peer)
}
select {
case newPeer, ok := <-sub.joinCh:
event := PeerEvent{Type: PEER_JOIN, Peer: newPeer}
if !ok {
return event, sub.err
case sub.peerEvtCh <- evt:
default:
// Empty event queue into backlog
emptyqueue:
for {
select {
case e := <-sub.peerEvtCh:
sub.addToBacklog(e)
default:
break emptyqueue
}
}
sub.addToBacklog(evt)
if e, ok := sub.pullFromBacklog(); ok {
sub.peerEvtCh <- e
}
}
}
// addToBacklog assumes a lock has been taken to protect the backlog
func (sub *Subscription) addToBacklog(evt PeerEvent) {
e, ok := sub.evtBacklog[evt.Peer]
if !ok {
sub.evtBacklog[evt.Peer] = evt.Type
} else if e != evt.Type {
delete(sub.evtBacklog, evt.Peer)
}
}
// pullFromBacklog assumes a lock has been taken to protect the backlog
func (sub *Subscription) pullFromBacklog() (PeerEvent, bool) {
for k, v := range sub.evtBacklog {
evt := PeerEvent{Peer: k, Type: v}
delete(sub.evtBacklog, k)
return evt, true
}
return PeerEvent{}, false
}
return event, nil
case leavingPeer, ok := <-sub.leaveCh:
event := PeerEvent{Type: PEER_LEAVE, Peer: leavingPeer}
// NextPeerEvent returns the next event regarding subscribed peers
// Guarantees: Peer Join and Peer Leave events for a given peer will fire in order.
// Unless a peer both Joins and Leaves before NextPeerEvent emits either event
// all events will eventually be received from NextPeerEvent.
func (sub *Subscription) NextPeerEvent(ctx context.Context) (PeerEvent, error) {
sub.eventMx.Lock()
evt, ok := sub.pullFromBacklog()
sub.eventMx.Unlock()
if ok {
return evt, nil
}
select {
case evt, ok := <-sub.peerEvtCh:
if !ok {
return event, sub.err
return PeerEvent{}, sub.err
}
return event, nil
return evt, nil
case <-ctx.Done():
return PeerEvent{}, ctx.Err()
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment