Commit 0552f46f authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

'generic' notifier

parent 069cff3d
// Package notifier provides a simple notification dispatcher
// meant to be embedded in larger structres who wish to allow
// clients to sign up for event notifications.
package notifier
import (
"sync"
)
// Notifiee is a generic interface. Clients implement
// their own Notifiee interfaces to ensure type-safety
// of notifications:
//
// type RocketNotifiee interface{
// Countdown(r Rocket, countdown time.Duration)
// LiftedOff(Rocket)
// ReachedOrbit(Rocket)
// Detached(Rocket, Capsule)
// Landed(Rocket)
// }
//
type Notifiee interface{}
// Notifier is a notification dispatcher. It's meant
// to be composed, and its zero-value is ready to be used.
//
// type Rocket struct {
// notifier notifier.Notifier
// }
//
type Notifier struct {
mu sync.RWMutex // guards notifiees
nots map[Notifiee]struct{}
}
// Notify signs up Notifiee e for notifications. This function
// is meant to be called behind your own type-safe function(s):
//
// // generic function for pattern-following
// func (r *Rocket) Notify(n Notifiee) {
// r.notifier.Notify(n)
// }
//
// // or as part of other functions
// func (r *Rocket) Onboard(a Astronaut) {
// r.astronauts = append(r.austronauts, a)
// r.notifier.Notify(a)
// }
//
func (n *Notifier) Notify(e Notifiee) {
n.mu.Lock()
if n.nots == nil { // so that zero-value is ready to be used.
n.nots = make(map[Notifiee]struct{})
}
n.nots[e] = struct{}{}
n.mu.Unlock()
}
// StopNotifying stops notifying Notifiee e. This function
// is meant to be called behind your own type-safe function(s):
//
// // generic function for pattern-following
// func (r *Rocket) StopNotify(n Notifiee) {
// r.notifier.StopNotify(n)
// }
//
// // or as part of other functions
// func (r *Rocket) Detach(c Capsule) {
// r.notifier.StopNotify(c)
// r.capsule = nil
// }
//
func (n *Notifier) StopNotify(e Notifiee) {
n.mu.Lock()
if n.nots != nil { // so that zero-value is ready to be used.
delete(n.nots, e)
}
n.mu.Unlock()
}
// NotifyAll messages the notifier's notifiees with a given notification.
// This is done by calling the given function with each notifiee. It is
// meant to be called with your own type-safe notification functions:
//
// func (r *Rocket) Launch() {
// r.notifyAll(func(n Notifiee) {
// n.Launched(r)
// })
// }
//
// // make it private so only you can use it. This function is necessary
// // to make sure you only up-cast in one place. You control who you added
// // to be a notifiee. If Go adds generics, maybe we can get rid of this
// // method but for now it is like wrapping a type-less container with
// // a type safe interface.
// func (r *Rocket) notifyAll(notify func(Notifiee)) {
// r.notifier.NotifyAll(func(n notifier.Notifiee) {
// notify(n.(Notifiee))
// })
// }
//
func (n *Notifier) NotifyAll(notify func(Notifiee)) {
n.mu.Lock()
if n.nots != nil { // so that zero-value is ready to be used.
for notifiee := range n.nots {
// we spin out a goroutine so that whatever the notification does
// it _never_ blocks out the client. This is so that consumers
// _cannot_ add hooks into your object that block you accidentally.
go notify(notifiee)
}
}
n.mu.Unlock()
}
package notifier
import (
"fmt"
"sync"
"testing"
)
// test data structures
type Router struct {
queue chan Packet
notifier Notifier
}
type Packet struct{}
type RouterNotifiee interface {
Enqueued(*Router, Packet)
Forwarded(*Router, Packet)
Dropped(*Router, Packet)
}
func (r *Router) Notify(n RouterNotifiee) {
r.notifier.Notify(n)
}
func (r *Router) StopNotify(n RouterNotifiee) {
r.notifier.StopNotify(n)
}
func (r *Router) notifyAll(notify func(n RouterNotifiee)) {
r.notifier.NotifyAll(func(n Notifiee) {
notify(n.(RouterNotifiee))
})
}
func (r *Router) Receive(p Packet) {
select {
case r.queue <- p: // enqueued
r.notifyAll(func(n RouterNotifiee) {
n.Enqueued(r, p)
})
default: // drop
r.notifyAll(func(n RouterNotifiee) {
n.Dropped(r, p)
})
}
}
func (r *Router) Forward() {
p := <-r.queue
r.notifyAll(func(n RouterNotifiee) {
n.Forwarded(r, p)
})
}
type Metrics struct {
enqueued int
forwarded int
dropped int
received chan struct{}
sync.Mutex
}
func (m *Metrics) Enqueued(*Router, Packet) {
m.Lock()
m.enqueued++
m.Unlock()
if m.received != nil {
m.received <- struct{}{}
}
}
func (m *Metrics) Forwarded(*Router, Packet) {
m.Lock()
m.forwarded++
m.Unlock()
if m.received != nil {
m.received <- struct{}{}
}
}
func (m *Metrics) Dropped(*Router, Packet) {
m.Lock()
m.dropped++
m.Unlock()
if m.received != nil {
m.received <- struct{}{}
}
}
func (m *Metrics) String() string {
m.Lock()
defer m.Unlock()
return fmt.Sprintf("%d enqueued, %d forwarded, %d in queue, %d dropped",
m.enqueued, m.forwarded, m.enqueued-m.forwarded, m.dropped)
}
func TestNotifies(t *testing.T) {
m := Metrics{received: make(chan struct{})}
r := Router{queue: make(chan Packet, 10)}
r.Notify(&m)
for i := 0; i < 10; i++ {
r.Receive(Packet{})
<-m.received
if m.enqueued != (1 + i) {
t.Error("not notifying correctly", m.enqueued, 1+i)
}
}
for i := 0; i < 10; i++ {
r.Receive(Packet{})
<-m.received
if m.enqueued != 10 {
t.Error("not notifying correctly", m.enqueued, 10)
}
if m.dropped != (1 + i) {
t.Error("not notifying correctly", m.dropped, 1+i)
}
}
}
func TestStopsNotifying(t *testing.T) {
m := Metrics{received: make(chan struct{})}
r := Router{queue: make(chan Packet, 10)}
r.Notify(&m)
for i := 0; i < 5; i++ {
r.Receive(Packet{})
<-m.received
if m.enqueued != (1 + i) {
t.Error("not notifying correctly")
}
}
r.StopNotify(&m)
for i := 0; i < 5; i++ {
r.Receive(Packet{})
select {
case <-m.received:
t.Error("did not stop notifying")
default:
}
if m.enqueued != 5 {
t.Error("did not stop notifying")
}
}
}
func TestThreadsafe(t *testing.T) {
N := 1000
r := Router{queue: make(chan Packet, 10)}
m1 := Metrics{received: make(chan struct{})}
m2 := Metrics{received: make(chan struct{})}
m3 := Metrics{received: make(chan struct{})}
r.Notify(&m1)
r.Notify(&m2)
r.Notify(&m3)
var n int
var wg sync.WaitGroup
for i := 0; i < N; i++ {
n++
wg.Add(1)
go func() {
defer wg.Done()
r.Receive(Packet{})
}()
if i%3 == 0 {
n++
wg.Add(1)
go func() {
defer wg.Done()
r.Forward()
}()
}
}
// drain queues
for i := 0; i < (n * 3); i++ {
select {
case <-m1.received:
case <-m2.received:
case <-m3.received:
}
}
wg.Wait()
// counts should be correct and all agree. and this should
// run fine under `go test -race -cpu=5`
t.Log("m1", m1.String())
t.Log("m2", m2.String())
t.Log("m3", m3.String())
if m1.String() != m2.String() || m2.String() != m3.String() {
t.Error("counts disagree")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment