Commit 3f537471 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

notifs: rate limit notifications

parent a2e3ea62
......@@ -5,6 +5,9 @@ package notifier
import (
"sync"
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
ratelimit "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
)
// Notifiee is a generic interface. Clients implement
......@@ -31,6 +34,18 @@ type Notifiee interface{}
type Notifier struct {
mu sync.RWMutex // guards notifiees
nots map[Notifiee]struct{}
lim *ratelimit.RateLimiter
}
// RateLimited returns a rate limited Notifier. only limit goroutines
// will be spawned. If limit is zero, no rate limiting happens. This
// is the same as `Notifier{}`.
func RateLimited(limit int) Notifier {
n := Notifier{}
if limit > 0 {
n.lim = ratelimit.NewRateLimiter(process.Background(), limit)
}
return n
}
// Notify signs up Notifiee e for notifications. This function
......@@ -107,8 +122,15 @@ func (n *Notifier) NotifyAll(notify func(Notifiee)) {
n.mu.Lock()
if n.nots != nil { // so that zero-value is ready to be used.
for notifiee := range n.nots {
go notify(notifiee)
// TODO find a good way to rate limit this without blocking notifier.
if n.lim == nil { // no rate limit
go notify(notifiee)
} else {
notifiee := notifiee // rebind for data races
n.lim.LimitedGo(func(worker process.Process) {
notify(notifiee)
})
}
}
}
n.mu.Unlock()
......
......@@ -4,6 +4,7 @@ import (
"fmt"
"sync"
"testing"
"time"
)
// test data structures
......@@ -205,3 +206,86 @@ func TestThreadsafe(t *testing.T) {
t.Error("counts disagree")
}
}
type highwatermark struct {
mu sync.Mutex
mark int
limit int
errs chan error
}
func (m *highwatermark) incr() {
m.mu.Lock()
m.mark++
// fmt.Println("incr", m.mark)
if m.mark > m.limit {
m.errs <- fmt.Errorf("went over limit: %d/%d", m.mark, m.limit)
}
m.mu.Unlock()
}
func (m *highwatermark) decr() {
m.mu.Lock()
m.mark--
// fmt.Println("decr", m.mark)
if m.mark < 0 {
m.errs <- fmt.Errorf("went under zero: %d/%d", m.mark, m.limit)
}
m.mu.Unlock()
}
func TestLimited(t *testing.T) {
timeout := 10 * time.Second // huge timeout.
limit := 9
hwm := highwatermark{limit: limit, errs: make(chan error, 100)}
n := RateLimited(limit) // will stop after 3 rounds
n.Notify(1)
n.Notify(2)
n.Notify(3)
entr := make(chan struct{})
exit := make(chan struct{})
done := make(chan struct{})
go func() {
for i := 0; i < 10; i++ {
// fmt.Printf("round: %d\n", i)
n.NotifyAll(func(e Notifiee) {
hwm.incr()
entr <- struct{}{}
<-exit // wait
hwm.decr()
})
}
done <- struct{}{}
}()
for i := 0; i < 30; {
select {
case <-entr:
continue // let as many enter as possible
case <-time.After(1 * time.Millisecond):
}
// let one exit
select {
case <-entr:
continue // in case of timing issues.
case exit <- struct{}{}:
case <-time.After(timeout):
t.Error("got stuck")
}
i++
}
select {
case <-done: // two parts done
case <-time.After(timeout):
t.Error("did not finish")
}
close(hwm.errs)
for err := range hwm.errs {
t.Error(err)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment