Commit 39947de7 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

updated goprocess

parent 35738ace
{
"ImportPath": "github.com/jbenet/go-datastore",
"GoVersion": "go1.3",
"GoVersion": "go1.4",
"Packages": [
"./..."
],
......@@ -20,7 +20,7 @@
},
{
"ImportPath": "github.com/jbenet/goprocess",
"Rev": "b4b4178efcf2404ce9db72438c9c49db2fb399d8"
"Rev": "5b02f8d275a2dd882fb06f8bbdf74347795ff3b1"
},
{
"ImportPath": "github.com/mattbaird/elastigo/api",
......
language: go
go:
- 1.2
- 1.3
- 1.4
- release
- tip
script:
- go test -race -cpu=5 -v ./...
The MIT License (MIT)
Copyright (c) 2014 Juan Batiz-Benet
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
# goprocess - lifecycles in go
[![travisbadge](https://travis-ci.org/jbenet/goprocess.svg)](https://travis-ci.org/jbenet/goprocess)
(Based on https://github.com/jbenet/go-ctxgroup)
- Godoc: https://godoc.org/github.com/jbenet/goprocess
......
......@@ -18,7 +18,7 @@ import (
// More specifically, it fits this:
//
// p := WithTeardown(tf) // new process is created, it is now running.
// p.AddChild(q) // can register children **before** Closing.
// p.AddChild(q) // can register children **before** Closed().
// go p.Close() // blocks until done running teardown func.
// <-p.Closing() // would now return true.
// <-p.childrenDone() // wait on all children to be done
......@@ -113,7 +113,7 @@ type Process interface {
// }
//
// It is useful to construct simple asynchronous workers, children of p.
Go(f ProcessFunc)
Go(f ProcessFunc) Process
// Close ends the process. Close blocks until the process has completely
// shut down, and any teardown has run _exactly once_. The returned error
......@@ -171,7 +171,19 @@ var nilProcessFunc = func(Process) {}
//
// This is because having the process you
func Go(f ProcessFunc) Process {
return GoChild(Background(), f)
// return GoChild(Background(), f)
// we use two processes, one for communication, and
// one for ensuring we wait on the function (unclosable from the outside).
p := newProcess(nil)
waitFor := newProcess(nil)
p.WaitFor(waitFor) // prevent p from closing
go func() {
f(p)
waitFor.Close() // allow p to close.
p.Close() // ensure p closes.
}()
return p
}
// GoChild is like Go, but it registers the returned Process as a child of parent,
......
......@@ -149,7 +149,7 @@ func TestTeardownCalledOnce(t *testing.T) {
a.c[1].Close()
}
func TestOnClosed(t *testing.T) {
func TestOnClosedAll(t *testing.T) {
Q := make(chan string, 10)
p := WithParent(Background())
......@@ -165,12 +165,40 @@ func TestOnClosed(t *testing.T) {
go p.Close()
testStrs(t, Q, "00", "01", "10", "11")
testStrs(t, Q, "00", "01", "10", "11")
testStrs(t, Q, "00", "01", "10", "11")
testStrs(t, Q, "00", "01", "10", "11")
testStrs(t, Q, "0", "1")
testStrs(t, Q, "0", "1")
testStrs(t, Q, "00", "01", "10", "11", "0", "1")
testStrs(t, Q, "00", "01", "10", "11", "0", "1")
testStrs(t, Q, "00", "01", "10", "11", "0", "1")
testStrs(t, Q, "00", "01", "10", "11", "0", "1")
testStrs(t, Q, "00", "01", "10", "11", "0", "1")
testStrs(t, Q, "00", "01", "10", "11", "0", "1")
testStrs(t, Q, "")
}
func TestOnClosedLeaves(t *testing.T) {
Q := make(chan string, 10)
p := WithParent(Background())
a := setupHierarchy(p)
go onClosedStr(Q, "0", a.c[0])
go onClosedStr(Q, "10", a.c[1].c[0])
go onClosedStr(Q, "", a)
go onClosedStr(Q, "00", a.c[0].c[0])
go onClosedStr(Q, "1", a.c[1])
go onClosedStr(Q, "01", a.c[0].c[1])
go onClosedStr(Q, "11", a.c[1].c[1])
go a.c[0].Close()
testStrs(t, Q, "00", "01", "0")
testStrs(t, Q, "00", "01", "0")
testStrs(t, Q, "00", "01", "0")
go a.c[1].Close()
testStrs(t, Q, "10", "11", "1")
testStrs(t, Q, "10", "11", "1")
testStrs(t, Q, "10", "11", "1")
go p.Close()
testStrs(t, Q, "")
}
......@@ -397,6 +425,7 @@ func TestCloseAfterChildren(t *testing.T) {
aDone := make(chan struct{})
bDone := make(chan struct{})
t.Log("test none when waiting on a")
testNone(t, Q)
go func() {
a.CloseAfterChildren()
......@@ -404,6 +433,7 @@ func TestCloseAfterChildren(t *testing.T) {
}()
testNone(t, Q)
t.Log("test none when waiting on b")
go func() {
b.CloseAfterChildren()
bDone <- struct{}{}
......@@ -482,8 +512,8 @@ func testNotClosed(t *testing.T, p Process) {
func testNone(t *testing.T, c <-chan string) {
select {
case <-c:
t.Fatal("none should be closed")
case out := <-c:
t.Fatal("none should be closed", out)
default:
}
}
......
// +build ignore
// WARNING: this implementation is not correct.
// here only for historical purposes.
package goprocess
import (
"sync"
)
// process implements Process
type process struct {
children sync.WaitGroup // wait group for child goroutines
teardown TeardownFunc // called to run the teardown logic.
closing chan struct{} // closed once close starts.
closed chan struct{} // closed once close is done.
closeOnce sync.Once // ensure close is only called once.
closeErr error // error to return to clients of Close()
}
// newProcess constructs and returns a Process.
// It will call tf TeardownFunc exactly once:
// **after** all children have fully Closed,
// **after** entering <-Closing(), and
// **before** <-Closed().
func newProcess(tf TeardownFunc) *process {
if tf == nil {
tf = nilTeardownFunc
}
return &process{
teardown: tf,
closed: make(chan struct{}),
closing: make(chan struct{}),
}
}
func (p *process) WaitFor(q Process) {
p.children.Add(1) // p waits on q to be done
go func(p *process, q Process) {
<-q.Closed() // wait until q is closed
p.children.Done() // p done waiting on q
}(p, q)
}
func (p *process) AddChildNoWait(child Process) {
go func(p, child Process) {
<-p.Closing() // wait until p is closing
child.Close() // close child
}(p, child)
}
func (p *process) AddChild(child Process) {
select {
case <-p.Closing():
panic("attempt to add child to closing or closed process")
default:
}
p.children.Add(1) // p waits on child to be done
go func(p *process, child Process) {
<-p.Closing() // wait until p is closing
child.Close() // close child and wait
p.children.Done() // p done waiting on child
}(p, child)
}
func (p *process) Go(f ProcessFunc) Process {
select {
case <-p.Closing():
panic("attempt to add child to closing or closed process")
default:
}
// this is very similar to AddChild, but also runs the func
// in the child. we replicate it here to save one goroutine.
child := newProcessGoroutines(nil)
child.children.Add(1) // child waits on func to be done
p.AddChild(child)
go func() {
f(child)
child.children.Done() // wait on child's children to be done.
child.Close() // close to tear down.
}()
return child
}
// Close is the external close function.
// it's a wrapper around internalClose that waits on Closed()
func (p *process) Close() error {
p.closeOnce.Do(p.doClose)
<-p.Closed() // sync.Once should block, but this checks chan is closed too
return p.closeErr
}
func (p *process) Closing() <-chan struct{} {
return p.closing
}
func (p *process) Closed() <-chan struct{} {
return p.closed
}
// the _actual_ close process.
func (p *process) doClose() {
// this function should only be called once (hence the sync.Once).
// and it will panic (on closing channels) otherwise.
close(p.closing) // signal that we're shutting down (Closing)
p.children.Wait() // wait till all children are done (before teardown)
p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown)
close(p.closed) // signal that we're shut down (Closed)
}
......@@ -74,29 +74,36 @@ func (p *process) AddChild(child Process) {
p.Unlock()
}
func (p *process) Go(f ProcessFunc) {
func (p *process) Go(f ProcessFunc) Process {
child := newProcess(nil)
p.AddChild(child)
waitFor := newProcess(nil)
child.WaitFor(waitFor) // prevent child from closing
go func() {
f(child)
child.Close() // close to tear down.
waitFor.Close() // allow child to close.
child.CloseAfterChildren() // close to tear down.
}()
return child
}
// Close is the external close function.
// it's a wrapper around internalClose that waits on Closed()
func (p *process) Close() error {
p.Lock()
defer p.Unlock()
// if already closed, get out.
// if already closing, or closed, get out. (but wait!)
select {
case <-p.Closed():
case <-p.Closing():
p.Unlock()
<-p.Closed()
return p.closeErr
default:
}
p.doClose()
p.Unlock()
return p.closeErr
}
......@@ -115,12 +122,23 @@ func (p *process) doClose() {
close(p.closing) // signal that we're shutting down (Closing)
for _, c := range p.children {
go c.Close() // force all children to shut down
}
for _, w := range p.waitfors {
<-w.Closed() // wait till all waitfors are fully closed (before teardown)
for len(p.children) > 0 || len(p.waitfors) > 0 {
for _, c := range p.children {
go c.Close() // force all children to shut down
}
p.children = nil // clear them
// we must be careful not to iterate over waitfors directly, as it may
// change under our feet.
wf := p.waitfors
p.waitfors = nil // clear them
for _, w := range wf {
// Here, we wait UNLOCKED, so that waitfors who are in the middle of
// adding a child to us can finish. we will immediately close the child.
p.Unlock()
<-w.Closed() // wait till all waitfors are fully closed (before teardown)
p.Lock()
}
}
p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown)
......@@ -162,16 +180,10 @@ func (p *process) CloseAfterChildren() error {
// wait for all processes we're waiting for are closed.
// the semantics here are simple: we will _only_ close
// if there are no processes currently waiting for.
for {
// remove it from waitingfors
if next := nextToWaitFor(); next != nil {
<-next.Closed()
continue
}
// YAY! we're done. close
break
for next := nextToWaitFor(); next != nil; next = nextToWaitFor() {
<-next.Closed()
}
// YAY! we're done. close
return p.Close()
}
# goprocess/periodic - periodic process creation
- goprocess: https://github.com/jbenet/goprocess
- Godoc: https://godoc.org/github.com/jbenet/goprocess/periodic
package periodicproc_test
import (
"fmt"
"time"
goprocess "github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/jbenet/goprocess"
periodicproc "github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic"
)
func ExampleEvery() {
tock := make(chan struct{})
i := 0
p := periodicproc.Every(time.Second, func(proc goprocess.Process) {
tock <- struct{}{}
fmt.Printf("hello %d\n", i)
i++
})
<-tock
<-tock
<-tock
p.Close()
// Output:
// hello 0
// hello 1
// hello 2
}
func ExampleTick() {
p := periodicproc.Tick(time.Second, func(proc goprocess.Process) {
fmt.Println("tick")
})
<-time.After(3*time.Second + 500*time.Millisecond)
p.Close()
// Output:
// tick
// tick
// tick
}
func ExampleTickGo() {
// with TickGo, execution is not rate limited,
// there can be many in-flight simultaneously
wait := make(chan struct{})
p := periodicproc.TickGo(time.Second, func(proc goprocess.Process) {
fmt.Println("tick")
<-wait
})
<-time.After(3*time.Second + 500*time.Millisecond)
wait <- struct{}{}
wait <- struct{}{}
wait <- struct{}{}
p.Close() // blocks us until all children are closed.
// Output:
// tick
// tick
// tick
}
func ExampleOnSignal() {
sig := make(chan struct{})
p := periodicproc.OnSignal(sig, func(proc goprocess.Process) {
fmt.Println("fire!")
})
sig <- struct{}{}
sig <- struct{}{}
sig <- struct{}{}
p.Close()
// Output:
// fire!
// fire!
// fire!
}
// Package periodic is part of github.com/jbenet/goprocess.
// It provides a simple periodic processor that calls a function
// periodically based on some options.
//
// For example:
//
// // use a time.Duration
// p := periodicproc.Every(time.Second, func(proc goprocess.Process) {
// fmt.Printf("the time is %s and all is well", time.Now())
// })
//
// <-time.After(5*time.Second)
// p.Close()
//
// // use a time.Time channel (like time.Ticker)
// p := periodicproc.Tick(time.Tick(time.Second), func(proc goprocess.Process) {
// fmt.Printf("the time is %s and all is well", time.Now())
// })
//
// <-time.After(5*time.Second)
// p.Close()
//
// // or arbitrary signals
// signal := make(chan struct{})
// p := periodicproc.OnSignal(signal, func(proc goprocess.Process) {
// fmt.Printf("the time is %s and all is well", time.Now())
// })
//
// signal<- struct{}{}
// signal<- struct{}{}
// <-time.After(5 * time.Second)
// signal<- struct{}{}
// p.Close()
//
package periodicproc
import (
"time"
gp "github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/jbenet/goprocess"
)
// Every calls the given ProcessFunc at periodic intervals. Internally, it uses
// <-time.After(interval), so it will have the behavior of waiting _at least_
// interval in between calls. If you'd prefer the time.Ticker behavior, use
// periodicproc.Tick instead.
// This is sequentially rate limited, only one call will be in-flight at a time.
func Every(interval time.Duration, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(func(proc gp.Process) {
for {
select {
case <-time.After(interval):
select {
case <-proc.Go(procfunc).Closed(): // spin it out as a child, and wait till it's done.
case <-proc.Closing(): // we're told to close
return
}
case <-proc.Closing(): // we're told to close
return
}
}
})
}
// EveryGo calls the given ProcessFunc at periodic intervals. Internally, it uses
// <-time.After(interval)
// This is not rate limited, multiple calls could be in-flight at the same time.
func EveryGo(interval time.Duration, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(func(proc gp.Process) {
for {
select {
case <-time.After(interval):
proc.Go(procfunc)
case <-proc.Closing(): // we're told to close
return
}
}
})
}
// Tick constructs a ticker with interval, and calls the given ProcessFunc every
// time the ticker fires.
// This is sequentially rate limited, only one call will be in-flight at a time.
//
// p := periodicproc.Tick(time.Second, func(proc goprocess.Process) {
// fmt.Println("fire!")
// })
//
// <-time.After(3 * time.Second)
// p.Close()
//
// // Output:
// // fire!
// // fire!
// // fire!
func Tick(interval time.Duration, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(func(proc gp.Process) {
ticker := time.NewTicker(interval)
callOnTicker(ticker.C, procfunc)(proc)
ticker.Stop()
})
}
// TickGo constructs a ticker with interval, and calls the given ProcessFunc every
// time the ticker fires.
// This is not rate limited, multiple calls could be in-flight at the same time.
//
// p := periodicproc.TickGo(time.Second, func(proc goprocess.Process) {
// fmt.Println("fire!")
// <-time.After(10 * time.Second) // will not block sequential execution
// })
//
// <-time.After(3 * time.Second)
// p.Close()
//
// // Output:
// // fire!
// // fire!
// // fire!
func TickGo(interval time.Duration, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(func(proc gp.Process) {
ticker := time.NewTicker(interval)
goCallOnTicker(ticker.C, procfunc)(proc)
ticker.Stop()
})
}
// Ticker calls the given ProcessFunc every time the ticker fires.
// This is sequentially rate limited, only one call will be in-flight at a time.
func Ticker(ticker <-chan time.Time, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(callOnTicker(ticker, procfunc))
}
// TickerGo calls the given ProcessFunc every time the ticker fires.
// This is not rate limited, multiple calls could be in-flight at the same time.
func TickerGo(ticker <-chan time.Time, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(goCallOnTicker(ticker, procfunc))
}
func callOnTicker(ticker <-chan time.Time, pf gp.ProcessFunc) gp.ProcessFunc {
return func(proc gp.Process) {
for {
select {
case <-ticker:
select {
case <-proc.Go(pf).Closed(): // spin it out as a child, and wait till it's done.
case <-proc.Closing(): // we're told to close
return
}
case <-proc.Closing(): // we're told to close
return
}
}
}
}
func goCallOnTicker(ticker <-chan time.Time, pf gp.ProcessFunc) gp.ProcessFunc {
return func(proc gp.Process) {
for {
select {
case <-ticker:
proc.Go(pf)
case <-proc.Closing(): // we're told to close
return
}
}
}
}
// OnSignal calls the given ProcessFunc every time the signal fires, and waits for it to exit.
// This is sequentially rate limited, only one call will be in-flight at a time.
//
// sig := make(chan struct{})
// p := periodicproc.OnSignal(sig, func(proc goprocess.Process) {
// fmt.Println("fire!")
// <-time.After(time.Second) // delays sequential execution by 1 second
// })
//
// sig<- struct{}
// sig<- struct{}
// sig<- struct{}
//
// // Output:
// // fire!
// // fire!
// // fire!
func OnSignal(sig <-chan struct{}, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(func(proc gp.Process) {
for {
select {
case <-sig:
select {
case <-proc.Go(procfunc).Closed(): // spin it out as a child, and wait till it's done.
case <-proc.Closing(): // we're told to close
return
}
case <-proc.Closing(): // we're told to close
return
}
}
})
}
// OnSignalGo calls the given ProcessFunc every time the signal fires.
// This is not rate limited, multiple calls could be in-flight at the same time.
//
// sig := make(chan struct{})
// p := periodicproc.OnSignalGo(sig, func(proc goprocess.Process) {
// fmt.Println("fire!")
// <-time.After(time.Second) // wont block execution
// })
//
// sig<- struct{}
// sig<- struct{}
// sig<- struct{}
//
// // Output:
// // fire!
// // fire!
// // fire!
func OnSignalGo(sig <-chan struct{}, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(func(proc gp.Process) {
for {
select {
case <-sig:
proc.Go(procfunc)
case <-proc.Closing(): // we're told to close
return
}
}
})
}
package periodicproc
import (
"testing"
"time"
ci "github.com/jbenet/go-cienv"
gp "github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/jbenet/goprocess"
)
var (
grace = time.Millisecond * 5
interval = time.Millisecond * 10
timeout = time.Second * 5
)
func init() {
if ci.IsRunning() {
grace = time.Millisecond * 500
interval = time.Millisecond * 1000
timeout = time.Second * 15
}
}
func between(min, diff, max time.Duration) bool {
return min <= diff && diff <= max
}
func testBetween(t *testing.T, min, diff, max time.Duration) {
if !between(min, diff, max) {
t.Error("time diff incorrect:", min, diff, max)
}
}
type intervalFunc func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process)
func testSeq(t *testing.T, toTest intervalFunc) {
t.Parallel()
last := time.Now()
times := make(chan time.Time, 10)
p := toTest(times, nil)
for i := 0; i < 5; i++ {
next := <-times
testBetween(t, interval-grace, next.Sub(last), interval+grace)
last = next
}
go p.Close()
select {
case <-p.Closed():
case <-time.After(timeout):
t.Error("proc failed to close")
}
}
func testSeqWait(t *testing.T, toTest intervalFunc) {
t.Parallel()
last := time.Now()
times := make(chan time.Time, 10)
wait := make(chan struct{})
p := toTest(times, wait)
for i := 0; i < 5; i++ {
next := <-times
testBetween(t, interval-grace, next.Sub(last), interval+grace)
<-time.After(interval * 2) // make it wait.
last = time.Now() // make it now (sequential)
wait <- struct{}{} // release it.
}
go p.Close()
select {
case <-p.Closed():
case <-time.After(timeout):
t.Error("proc failed to close")
}
}
func testSeqNoWait(t *testing.T, toTest intervalFunc) {
t.Parallel()
last := time.Now()
times := make(chan time.Time, 10)
wait := make(chan struct{})
p := toTest(times, wait)
for i := 0; i < 5; i++ {
next := <-times
testBetween(t, 0, next.Sub(last), interval+grace) // min of 0
<-time.After(interval * 2) // make it wait.
last = time.Now() // make it now (sequential)
wait <- struct{}{} // release it.
}
go p.Close()
end:
select {
case wait <- struct{}{}: // drain any extras.
goto end
case <-p.Closed():
case <-time.After(timeout):
t.Error("proc failed to close")
}
}
func testParallel(t *testing.T, toTest intervalFunc) {
t.Parallel()
last := time.Now()
times := make(chan time.Time, 10)
wait := make(chan struct{})
p := toTest(times, wait)
for i := 0; i < 5; i++ {
next := <-times
testBetween(t, interval-grace, next.Sub(last), interval+grace)
last = next
<-time.After(interval * 2) // make it wait.
wait <- struct{}{} // release it.
}
go p.Close()
end:
select {
case wait <- struct{}{}: // drain any extras.
goto end
case <-p.Closed():
case <-time.After(timeout):
t.Error("proc failed to close")
}
}
func TestEverySeq(t *testing.T) {
testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return Every(interval, func(proc gp.Process) {
times <- time.Now()
})
})
}
func TestEverySeqWait(t *testing.T) {
testSeqWait(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return Every(interval, func(proc gp.Process) {
times <- time.Now()
select {
case <-wait:
case <-proc.Closing():
}
})
})
}
func TestEveryGoSeq(t *testing.T) {
testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return EveryGo(interval, func(proc gp.Process) {
times <- time.Now()
})
})
}
func TestEveryGoSeqParallel(t *testing.T) {
testParallel(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return EveryGo(interval, func(proc gp.Process) {
times <- time.Now()
select {
case <-wait:
case <-proc.Closing():
}
})
})
}
func TestTickSeq(t *testing.T) {
testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return Tick(interval, func(proc gp.Process) {
times <- time.Now()
})
})
}
func TestTickSeqNoWait(t *testing.T) {
testSeqNoWait(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return Tick(interval, func(proc gp.Process) {
times <- time.Now()
select {
case <-wait:
case <-proc.Closing():
}
})
})
}
func TestTickGoSeq(t *testing.T) {
testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return TickGo(interval, func(proc gp.Process) {
times <- time.Now()
})
})
}
func TestTickGoSeqParallel(t *testing.T) {
testParallel(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return TickGo(interval, func(proc gp.Process) {
times <- time.Now()
select {
case <-wait:
case <-proc.Closing():
}
})
})
}
func TestTickerSeq(t *testing.T) {
testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return Ticker(time.Tick(interval), func(proc gp.Process) {
times <- time.Now()
})
})
}
func TestTickerSeqNoWait(t *testing.T) {
testSeqNoWait(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return Ticker(time.Tick(interval), func(proc gp.Process) {
times <- time.Now()
select {
case <-wait:
case <-proc.Closing():
}
})
})
}
func TestTickerGoSeq(t *testing.T) {
testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return TickerGo(time.Tick(interval), func(proc gp.Process) {
times <- time.Now()
})
})
}
func TestTickerGoParallel(t *testing.T) {
testParallel(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return TickerGo(time.Tick(interval), func(proc gp.Process) {
times <- time.Now()
select {
case <-wait:
case <-proc.Closing():
}
})
})
}
......@@ -45,29 +45,14 @@ func NewRateLimiter(parent process.Process, limit int) *RateLimiter {
func (rl *RateLimiter) LimitedGo(f process.ProcessFunc) {
<-rl.limiter
rl.Go(func(child process.Process) {
p := rl.Go(f)
// call the function as rl.Go would.
f(child)
// this close is here because the child may have spawned
// children of its own, and our rate limiter should capture that.
// we have two options:
// * this approach (which is what process.Go itself does), or
// * spawn another goroutine that waits on <-child.Closed()
//
// go func() {
// <-child.Closed()
// rl.limiter <- struct{}{}
// }()
//
// This approach saves a goroutine. It is fine to call child.Close()
// multiple times.
child.Close()
// after it's done.
// this <-closed() is here because the child may have spawned
// children of its own, and our rate limiter should capture that.
go func() {
<-p.Closed()
rl.limiter <- struct{}{}
})
}()
}
// LimitChan returns a rate-limiting channel. it is the usual, simple,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment