Commit 611d0533 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

added goprocess

parent c0cf8e7f
...@@ -18,6 +18,10 @@ ...@@ -18,6 +18,10 @@
"ImportPath": "github.com/hashicorp/golang-lru", "ImportPath": "github.com/hashicorp/golang-lru",
"Rev": "4dfff096c4973178c8f35cf6dd1a732a0a139370" "Rev": "4dfff096c4973178c8f35cf6dd1a732a0a139370"
}, },
{
"ImportPath": "github.com/jbenet/goprocess",
"Rev": "b4b4178efcf2404ce9db72438c9c49db2fb399d8"
},
{ {
"ImportPath": "github.com/mattbaird/elastigo/api", "ImportPath": "github.com/mattbaird/elastigo/api",
"Rev": "041b88c1fcf6489a5721ede24378ce1253b9159d" "Rev": "041b88c1fcf6489a5721ede24378ce1253b9159d"
......
# goprocess - lifecycles in go
(Based on https://github.com/jbenet/go-ctxgroup)
- Godoc: https://godoc.org/github.com/jbenet/goprocess
`goprocess` introduces a way to manage process lifecycles in go. It is
much like [go.net/context](https://godoc.org/code.google.com/p/go.net/context)
(it actually uses a Context), but it is more like a Context-WaitGroup hybrid.
`goprocess` is about being able to start and stop units of work, which may
receive `Close` signals from many clients. Think of it like a UNIX process
tree, but inside go.
`goprocess` seeks to minimally affect your objects, so you can use it
with both embedding or composition. At the heart of `goprocess` is the
`Process` interface:
```Go
// Process is the basic unit of work in goprocess. It defines a computation
// with a lifecycle:
// - running (before calling Close),
// - closing (after calling Close at least once),
// - closed (after Close returns, and all teardown has _completed_).
//
// More specifically, it fits this:
//
// p := WithTeardown(tf) // new process is created, it is now running.
// p.AddChild(q) // can register children **before** Closing.
// go p.Close() // blocks until done running teardown func.
// <-p.Closing() // would now return true.
// <-p.childrenDone() // wait on all children to be done
// p.teardown() // runs the user's teardown function tf.
// p.Close() // now returns, with error teardown returned.
// <-p.Closed() // would now return true.
//
// Processes can be arranged in a process "tree", where children are
// automatically Closed if their parents are closed. (Note, it is actually
// a Process DAG, children may have multiple parents). A process may also
// optionally wait for another to fully Close before beginning to Close.
// This makes it easy to ensure order of operations and proper sequential
// teardown of resurces. For example:
//
// p1 := goprocess.WithTeardown(func() error {
// fmt.Println("closing 1")
// })
// p2 := goprocess.WithTeardown(func() error {
// fmt.Println("closing 2")
// })
// p3 := goprocess.WithTeardown(func() error {
// fmt.Println("closing 3")
// })
//
// p1.AddChild(p2)
// p2.AddChild(p3)
//
//
// go p1.Close()
// go p2.Close()
// go p3.Close()
//
// // Output:
// // closing 3
// // closing 2
// // closing 1
//
// Process is modelled after the UNIX processes group idea, and heavily
// informed by sync.WaitGroup and go.net/context.Context.
//
// In the function documentation of this interface, `p` always refers to
// the self Process.
type Process interface {
// WaitFor makes p wait for q before exiting. Thus, p will _always_ close
// _after_ q. Note well: a waiting cycle is deadlock.
//
// If q is already Closed, WaitFor calls p.Close()
// If p is already Closing or Closed, WaitFor panics. This is the same thing
// as calling Add(1) _after_ calling Done() on a wait group. Calling WaitFor
// on an already-closed process is a programming error likely due to bad
// synchronization
WaitFor(q Process)
// AddChildNoWait registers child as a "child" of Process. As in UNIX,
// when parent is Closed, child is Closed -- child may Close beforehand.
// This is the equivalent of calling:
//
// go func(parent, child Process) {
// <-parent.Closing()
// child.Close()
// }(p, q)
//
// Note: the naming of functions is `AddChildNoWait` and `AddChild` (instead
// of `AddChild` and `AddChildWaitFor`) because:
// - it is the more common operation,
// - explicitness is helpful in the less common case (no waiting), and
// - usual "child" semantics imply parent Processes should wait for children.
AddChildNoWait(q Process)
// AddChild is the equivalent of calling:
// parent.AddChildNoWait(q)
// parent.WaitFor(q)
AddChild(q Process)
// Go creates a new process, adds it as a child, and spawns the ProcessFunc f
// in its own goroutine. It is equivalent to:
//
// GoChild(p, f)
//
// It is useful to construct simple asynchronous workers, children of p.
Go(f ProcessFunc) Process
// Close ends the process. Close blocks until the process has completely
// shut down, and any teardown has run _exactly once_. The returned error
// is available indefinitely: calling Close twice returns the same error.
// If the process has already been closed, Close returns immediately.
Close() error
// Closing is a signal to wait upon. The returned channel is closed
// _after_ Close has been called at least once, but teardown may or may
// not be done yet. The primary use case of Closing is for children who
// need to know when a parent is shutting down, and therefore also shut
// down.
Closing() <-chan struct{}
// Closed is a signal to wait upon. The returned channel is closed
// _after_ Close has completed; teardown has finished. The primary use case
// of Closed is waiting for a Process to Close without _causing_ the Close.
Closed() <-chan struct{}
}
```
package goprocessctx
import (
context "code.google.com/p/go.net/context"
goprocess "github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/jbenet/goprocess"
)
// WithContext constructs and returns a Process that respects
// given context. It is the equivalent of:
//
// func ProcessWithContext(ctx context.Context) goprocess.Process {
// p := goprocess.WithParent(goprocess.Background())
// go func() {
// <-ctx.Done()
// p.Close()
// }()
// return p
// }
//
func WithContext(ctx context.Context) goprocess.Process {
if ctx == nil {
panic("nil Context")
}
p := goprocess.WithParent(goprocess.Background())
go func() {
<-ctx.Done()
p.Close()
}()
return p
}
// WaitForContext makes p WaitFor ctx. When Closing, p waits for
// ctx.Done(), before being Closed(). It is simply:
//
// p.WaitFor(goprocess.WithContext(ctx))
//
func WaitForContext(ctx context.Context, p goprocess.Process) {
p.WaitFor(WithContext(ctx))
}
// WithProcessClosing returns a context.Context derived from ctx that
// is cancelled as p is Closing (after: <-p.Closing()). It is simply:
//
// func WithProcessClosing(ctx context.Context, p goprocess.Process) context.Context {
// ctx, cancel := context.WithCancel(ctx)
// go func() {
// <-p.Closing()
// cancel()
// }()
// return ctx
// }
//
func WithProcessClosing(ctx context.Context, p goprocess.Process) context.Context {
ctx, cancel := context.WithCancel(ctx)
go func() {
<-p.Closing()
cancel()
}()
return ctx
}
// WithProcessClosed returns a context.Context that is cancelled
// after Process p is Closed. It is the equivalent of:
//
// func WithProcessClosed(ctx context.Context, p goprocess.Process) context.Context {
// ctx, cancel := context.WithCancel(ctx)
// go func() {
// <-p.Closed()
// cancel()
// }()
// return ctx
// }
//
func WithProcessClosed(ctx context.Context, p goprocess.Process) context.Context {
ctx, cancel := context.WithCancel(ctx)
go func() {
<-p.Closed()
cancel()
}()
return ctx
}
package goprocess_test
import (
"fmt"
"time"
"github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/jbenet/goprocess"
)
func ExampleGo() {
p := goprocess.Go(func(p goprocess.Process) {
ticker := time.Tick(200 * time.Millisecond)
for {
select {
case <-ticker:
fmt.Println("tick")
case <-p.Closing():
fmt.Println("closing")
return
}
}
})
<-time.After(1100 * time.Millisecond)
p.Close()
fmt.Println("closed")
<-time.After(100 * time.Millisecond)
// Output:
// tick
// tick
// tick
// tick
// tick
// closing
// closed
}
// Package goprocess introduces a Process abstraction that allows simple
// organization, and orchestration of work. It is much like a WaitGroup,
// and much like a context.Context, but also ensures safe **exactly-once**,
// and well-ordered teardown semantics.
package goprocess
import (
"os"
"os/signal"
)
// Process is the basic unit of work in goprocess. It defines a computation
// with a lifecycle:
// - running (before calling Close),
// - closing (after calling Close at least once),
// - closed (after Close returns, and all teardown has _completed_).
//
// More specifically, it fits this:
//
// p := WithTeardown(tf) // new process is created, it is now running.
// p.AddChild(q) // can register children **before** Closing.
// go p.Close() // blocks until done running teardown func.
// <-p.Closing() // would now return true.
// <-p.childrenDone() // wait on all children to be done
// p.teardown() // runs the user's teardown function tf.
// p.Close() // now returns, with error teardown returned.
// <-p.Closed() // would now return true.
//
// Processes can be arranged in a process "tree", where children are
// automatically Closed if their parents are closed. (Note, it is actually
// a Process DAG, children may have multiple parents). A process may also
// optionally wait for another to fully Close before beginning to Close.
// This makes it easy to ensure order of operations and proper sequential
// teardown of resurces. For example:
//
// p1 := goprocess.WithTeardown(func() error {
// fmt.Println("closing 1")
// })
// p2 := goprocess.WithTeardown(func() error {
// fmt.Println("closing 2")
// })
// p3 := goprocess.WithTeardown(func() error {
// fmt.Println("closing 3")
// })
//
// p1.AddChild(p2)
// p2.AddChild(p3)
//
//
// go p1.Close()
// go p2.Close()
// go p3.Close()
//
// // Output:
// // closing 3
// // closing 2
// // closing 1
//
// Process is modelled after the UNIX processes group idea, and heavily
// informed by sync.WaitGroup and go.net/context.Context.
//
// In the function documentation of this interface, `p` always refers to
// the self Process.
type Process interface {
// WaitFor makes p wait for q before exiting. Thus, p will _always_ close
// _after_ q. Note well: a waiting cycle is deadlock.
//
// If q is already Closed, WaitFor calls p.Close()
// If p is already Closing or Closed, WaitFor panics. This is the same thing
// as calling Add(1) _after_ calling Done() on a wait group. Calling WaitFor
// on an already-closed process is a programming error likely due to bad
// synchronization
WaitFor(q Process)
// AddChildNoWait registers child as a "child" of Process. As in UNIX,
// when parent is Closed, child is Closed -- child may Close beforehand.
// This is the equivalent of calling:
//
// go func(parent, child Process) {
// <-parent.Closing()
// child.Close()
// }(p, q)
//
// Note: the naming of functions is `AddChildNoWait` and `AddChild` (instead
// of `AddChild` and `AddChildWaitFor`) because:
// - it is the more common operation,
// - explicitness is helpful in the less common case (no waiting), and
// - usual "child" semantics imply parent Processes should wait for children.
AddChildNoWait(q Process)
// AddChild is the equivalent of calling:
// parent.AddChildNoWait(q)
// parent.WaitFor(q)
AddChild(q Process)
// Go is much like `go`, as it runs a function in a newly spawned goroutine.
// The neat part of Process.Go is that the Process object you call it on will:
// * construct a child Process, and call AddChild(child) on it
// * spawn a goroutine, and call the given function
// * Close the child when the function exits.
// This way, you can rest assured each goroutine you spawn has its very own
// Process context, and that it will be closed when the function exits.
// It is the function's responsibility to respect the Closing of its Process,
// namely it should exit (return) when <-Closing() is ready. It is basically:
//
// func (p Process) Go(f ProcessFunc) Process {
// child := WithParent(p)
// go func () {
// f(child)
// child.Close()
// }()
// }
//
// It is useful to construct simple asynchronous workers, children of p.
Go(f ProcessFunc)
// Close ends the process. Close blocks until the process has completely
// shut down, and any teardown has run _exactly once_. The returned error
// is available indefinitely: calling Close twice returns the same error.
// If the process has already been closed, Close returns immediately.
Close() error
// CloseAfterChildren calls Close _after_ its children have Closed
// normally (i.e. it _does not_ attempt to close them).
CloseAfterChildren() error
// Closing is a signal to wait upon. The returned channel is closed
// _after_ Close has been called at least once, but teardown may or may
// not be done yet. The primary use case of Closing is for children who
// need to know when a parent is shutting down, and therefore also shut
// down.
Closing() <-chan struct{}
// Closed is a signal to wait upon. The returned channel is closed
// _after_ Close has completed; teardown has finished. The primary use case
// of Closed is waiting for a Process to Close without _causing_ the Close.
Closed() <-chan struct{}
}
// TeardownFunc is a function used to cleanup state at the end of the
// lifecycle of a Process.
type TeardownFunc func() error
var nilTeardownFunc = func() error { return nil }
// ProcessFunc is a function that takes a process. Its main use case is goprocess.Go,
// which spawns a ProcessFunc in its own goroutine, and returns a corresponding
// Process object.
type ProcessFunc func(proc Process)
var nilProcessFunc = func(Process) {}
// Go is much like `go`: it runs a function in a newly spawned goroutine. The neat
// part of Go is that it provides Process object to communicate between the
// function and the outside world. Thus, callers can easily WaitFor, or Close the
// function. It is the function's responsibility to respect the Closing of its Process,
// namely it should exit (return) when <-Closing() is ready. It is simply:
//
// func Go(f ProcessFunc) Process {
// p := WithParent(Background())
// p.Go(f)
// return p
// }
//
// Note that a naive implementation of Go like the following would not work:
//
// func Go(f ProcessFunc) Process {
// return Background().Go(f)
// }
//
// This is because having the process you
func Go(f ProcessFunc) Process {
return GoChild(Background(), f)
}
// GoChild is like Go, but it registers the returned Process as a child of parent,
// **before** spawning the goroutine, which ensures proper synchronization with parent.
// It is somewhat like
//
// func GoChild(parent Process, f ProcessFunc) Process {
// p := WithParent(parent)
// p.Go(f)
// return p
// }
//
// And it is similar to the classic WaitGroup use case:
//
// func WaitGroupGo(wg sync.WaitGroup, child func()) {
// wg.Add(1)
// go func() {
// child()
// wg.Done()
// }()
// }
//
func GoChild(parent Process, f ProcessFunc) Process {
p := WithParent(parent)
p.Go(f)
return p
}
// Spawn is an alias of `Go`. In many contexts, Spawn is a
// well-known Process launching word, which fits our use case.
var Spawn = Go
// SpawnChild is an alias of `GoChild`. In many contexts, Spawn is a
// well-known Process launching word, which fits our use case.
var SpawnChild = GoChild
// WithTeardown constructs and returns a Process with a TeardownFunc.
// TeardownFunc tf will be called **exactly-once** when Process is
// Closing, after all Children have fully closed, and before p is Closed.
// In fact, Process p will not be Closed until tf runs and exits.
// See lifecycle in Process doc.
func WithTeardown(tf TeardownFunc) Process {
if tf == nil {
panic("nil tf TeardownFunc")
}
return newProcess(tf)
}
// WithParent constructs and returns a Process with a given parent.
func WithParent(parent Process) Process {
if parent == nil {
panic("nil parent Process")
}
q := newProcess(nil)
parent.AddChild(q)
return q
}
// WithSignals returns a Process that will Close() when any given signal fires.
// This is useful to bind Process trees to syscall.SIGTERM, SIGKILL, etc.
func WithSignals(sig ...os.Signal) Process {
p := WithParent(Background())
c := make(chan os.Signal)
signal.Notify(c, sig...)
go func() {
<-c
signal.Stop(c)
p.Close()
}()
return p
}
// Background returns the "background" Process: a statically allocated
// process that can _never_ close. It also never enters Closing() state.
// Calling Background().Close() will hang indefinitely.
func Background() Process {
return background
}
// background is the background process
var background = &unclosable{Process: newProcess(nil)}
// unclosable is a process that _cannot_ be closed. calling Close simply hangs.
type unclosable struct {
Process
}
func (p *unclosable) Close() error {
var hang chan struct{}
<-hang // hang forever
return nil
}
package goprocess
import (
"syscall"
"testing"
"time"
)
type tree struct {
Process
c []tree
}
func setupHierarchy(p Process) tree {
t := func(n Process, ts ...tree) tree {
return tree{n, ts}
}
a := WithParent(p)
b1 := WithParent(a)
b2 := WithParent(a)
c1 := WithParent(b1)
c2 := WithParent(b1)
c3 := WithParent(b2)
c4 := WithParent(b2)
return t(a, t(b1, t(c1), t(c2)), t(b2, t(c3), t(c4)))
}
func TestClosingClosed(t *testing.T) {
a := WithParent(Background())
b := WithParent(a)
Q := make(chan string, 3)
go func() {
<-a.Closing()
Q <- "closing"
b.Close()
}()
go func() {
<-a.Closed()
Q <- "closed"
}()
go func() {
a.Close()
Q <- "closed"
}()
if q := <-Q; q != "closing" {
t.Error("order incorrect. closing not first")
}
if q := <-Q; q != "closed" {
t.Error("order incorrect. closing not first")
}
if q := <-Q; q != "closed" {
t.Error("order incorrect. closing not first")
}
}
func TestChildFunc(t *testing.T) {
a := WithParent(Background())
wait1 := make(chan struct{})
wait2 := make(chan struct{})
wait3 := make(chan struct{})
wait4 := make(chan struct{})
go func() {
a.Close()
wait4 <- struct{}{}
}()
a.Go(func(process Process) {
wait1 <- struct{}{}
<-wait2
wait3 <- struct{}{}
})
<-wait1
select {
case <-wait3:
t.Error("should not be closed yet")
case <-wait4:
t.Error("should not be closed yet")
case <-a.Closed():
t.Error("should not be closed yet")
default:
}
wait2 <- struct{}{}
select {
case <-wait3:
case <-time.After(time.Second):
t.Error("should be closed now")
}
select {
case <-wait4:
case <-time.After(time.Second):
t.Error("should be closed now")
}
}
func TestTeardownCalledOnce(t *testing.T) {
a := setupHierarchy(Background())
onlyOnce := func() func() error {
count := 0
return func() error {
count++
if count > 1 {
t.Error("called", count, "times")
}
return nil
}
}
setTeardown := func(t tree, tf TeardownFunc) {
t.Process.(*process).teardown = tf
}
setTeardown(a, onlyOnce())
setTeardown(a.c[0], onlyOnce())
setTeardown(a.c[0].c[0], onlyOnce())
setTeardown(a.c[0].c[1], onlyOnce())
setTeardown(a.c[1], onlyOnce())
setTeardown(a.c[1].c[0], onlyOnce())
setTeardown(a.c[1].c[1], onlyOnce())
a.c[0].c[0].Close()
a.c[0].c[0].Close()
a.c[0].c[0].Close()
a.c[0].c[0].Close()
a.c[0].Close()
a.c[0].Close()
a.c[0].Close()
a.c[0].Close()
a.Close()
a.Close()
a.Close()
a.Close()
a.c[1].Close()
a.c[1].Close()
a.c[1].Close()
a.c[1].Close()
}
func TestOnClosed(t *testing.T) {
Q := make(chan string, 10)
p := WithParent(Background())
a := setupHierarchy(p)
go onClosedStr(Q, "0", a.c[0])
go onClosedStr(Q, "10", a.c[1].c[0])
go onClosedStr(Q, "", a)
go onClosedStr(Q, "00", a.c[0].c[0])
go onClosedStr(Q, "1", a.c[1])
go onClosedStr(Q, "01", a.c[0].c[1])
go onClosedStr(Q, "11", a.c[1].c[1])
go p.Close()
testStrs(t, Q, "00", "01", "10", "11")
testStrs(t, Q, "00", "01", "10", "11")
testStrs(t, Q, "00", "01", "10", "11")
testStrs(t, Q, "00", "01", "10", "11")
testStrs(t, Q, "0", "1")
testStrs(t, Q, "0", "1")
testStrs(t, Q, "")
}
func TestWaitFor(t *testing.T) {
Q := make(chan string, 5)
a := WithParent(Background())
b := WithParent(Background())
c := WithParent(Background())
d := WithParent(Background())
e := WithParent(Background())
go onClosedStr(Q, "a", a)
go onClosedStr(Q, "b", b)
go onClosedStr(Q, "c", c)
go onClosedStr(Q, "d", d)
go onClosedStr(Q, "e", e)
testNone(t, Q)
a.WaitFor(b)
a.WaitFor(c)
b.WaitFor(d)
e.WaitFor(d)
testNone(t, Q)
go a.Close() // should do nothing.
testNone(t, Q)
go e.Close()
testNone(t, Q)
d.Close()
testStrs(t, Q, "d", "e")
testStrs(t, Q, "d", "e")
c.Close()
testStrs(t, Q, "c")
b.Close()
testStrs(t, Q, "a", "b")
testStrs(t, Q, "a", "b")
}
func TestAddChildNoWait(t *testing.T) {
Q := make(chan string, 5)
a := WithParent(Background())
b := WithParent(Background())
c := WithParent(Background())
d := WithParent(Background())
e := WithParent(Background())
go onClosedStr(Q, "a", a)
go onClosedStr(Q, "b", b)
go onClosedStr(Q, "c", c)
go onClosedStr(Q, "d", d)
go onClosedStr(Q, "e", e)
testNone(t, Q)
a.AddChildNoWait(b)
a.AddChildNoWait(c)
b.AddChildNoWait(d)
e.AddChildNoWait(d)
testNone(t, Q)
b.Close()
testStrs(t, Q, "b", "d")
testStrs(t, Q, "b", "d")
a.Close()
testStrs(t, Q, "a", "c")
testStrs(t, Q, "a", "c")
e.Close()
testStrs(t, Q, "e")
}
func TestAddChild(t *testing.T) {
a := WithParent(Background())
b := WithParent(Background())
c := WithParent(Background())
d := WithParent(Background())
e := WithParent(Background())
Q := make(chan string, 5)
go onClosedStr(Q, "a", a)
go onClosedStr(Q, "b", b)
go onClosedStr(Q, "c", c)
go onClosedStr(Q, "d", d)
go onClosedStr(Q, "e", e)
testNone(t, Q)
a.AddChild(b)
a.AddChild(c)
b.AddChild(d)
e.AddChild(d)
testNone(t, Q)
go b.Close()
testNone(t, Q)
d.Close()
testStrs(t, Q, "b", "d")
testStrs(t, Q, "b", "d")
go a.Close()
testNone(t, Q)
c.Close()
testStrs(t, Q, "a", "c")
testStrs(t, Q, "a", "c")
e.Close()
testStrs(t, Q, "e")
}
func TestGoChildrenClose(t *testing.T) {
var a, b, c, d, e Process
var ready = make(chan struct{})
var bWait = make(chan struct{})
var cWait = make(chan struct{})
var dWait = make(chan struct{})
var eWait = make(chan struct{})
a = WithParent(Background())
a.Go(func(p Process) {
b = p
b.Go(func(p Process) {
c = p
ready <- struct{}{}
<-cWait
})
ready <- struct{}{}
<-bWait
})
a.Go(func(p Process) {
d = p
d.Go(func(p Process) {
e = p
ready <- struct{}{}
<-eWait
})
ready <- struct{}{}
<-dWait
})
<-ready
<-ready
<-ready
<-ready
Q := make(chan string, 5)
go onClosedStr(Q, "a", a)
go onClosedStr(Q, "b", b)
go onClosedStr(Q, "c", c)
go onClosedStr(Q, "d", d)
go onClosedStr(Q, "e", e)
testNone(t, Q)
go a.Close()
testNone(t, Q)
bWait <- struct{}{} // relase b
go b.Close()
testNone(t, Q)
cWait <- struct{}{} // relase c
<-c.Closed()
<-b.Closed()
testStrs(t, Q, "b", "c")
testStrs(t, Q, "b", "c")
eWait <- struct{}{} // release e
<-e.Closed()
testStrs(t, Q, "e")
dWait <- struct{}{} // releasse d
<-d.Closed()
<-a.Closed()
testStrs(t, Q, "a", "d")
testStrs(t, Q, "a", "d")
}
func TestCloseAfterChildren(t *testing.T) {
var a, b, c, d, e Process
var ready = make(chan struct{})
a = WithParent(Background())
a.Go(func(p Process) {
b = p
b.Go(func(p Process) {
c = p
ready <- struct{}{}
<-p.Closing() // wait till we're told to close (parents mustnt)
})
ready <- struct{}{}
})
a.Go(func(p Process) {
d = p
d.Go(func(p Process) {
e = p
ready <- struct{}{}
<-p.Closing() // wait till we're told to close (parents mustnt)
})
ready <- struct{}{}
})
<-ready
<-ready
<-ready
<-ready
Q := make(chan string, 5)
go onClosedStr(Q, "a", a)
go onClosedStr(Q, "b", b)
go onClosedStr(Q, "c", c)
go onClosedStr(Q, "d", d)
go onClosedStr(Q, "e", e)
aDone := make(chan struct{})
bDone := make(chan struct{})
testNone(t, Q)
go func() {
a.CloseAfterChildren()
aDone <- struct{}{}
}()
testNone(t, Q)
go func() {
b.CloseAfterChildren()
bDone <- struct{}{}
}()
testNone(t, Q)
c.Close()
<-bDone
<-b.Closed()
testStrs(t, Q, "b", "c")
testStrs(t, Q, "b", "c")
e.Close()
testStrs(t, Q, "e")
d.Close()
<-aDone
<-a.Closed()
testStrs(t, Q, "a", "d")
testStrs(t, Q, "a", "d")
}
func TestBackground(t *testing.T) {
// test it hangs indefinitely:
b := Background()
go b.Close()
select {
case <-b.Closing():
t.Error("b.Closing() closed :(")
default:
}
}
func TestWithSignals(t *testing.T) {
p := WithSignals(syscall.SIGABRT)
testNotClosed(t, p)
syscall.Kill(syscall.Getpid(), syscall.SIGABRT)
testClosed(t, p)
}
func testClosing(t *testing.T, p Process) {
select {
case <-p.Closing():
case <-time.After(50 * time.Millisecond):
t.Fatal("should be closing")
}
}
func testNotClosing(t *testing.T, p Process) {
select {
case <-p.Closing():
t.Fatal("should not be closing")
case <-p.Closed():
t.Fatal("should not be closed")
default:
}
}
func testClosed(t *testing.T, p Process) {
select {
case <-p.Closed():
case <-time.After(50 * time.Millisecond):
t.Fatal("should be closed")
}
}
func testNotClosed(t *testing.T, p Process) {
select {
case <-p.Closed():
t.Fatal("should not be closed")
case <-time.After(50 * time.Millisecond):
}
}
func testNone(t *testing.T, c <-chan string) {
select {
case <-c:
t.Fatal("none should be closed")
default:
}
}
func testStrs(t *testing.T, Q <-chan string, ss ...string) {
s1 := <-Q
for _, s2 := range ss {
if s1 == s2 {
return
}
}
t.Error("context not in group:", s1, ss)
}
func onClosedStr(Q chan<- string, s string, p Process) {
<-p.Closed()
Q <- s
}
// +build ignore
// WARNING: this implementation is not correct.
// here only for historical purposes.
package goprocess
import (
"sync"
)
// process implements Process
type process struct {
children sync.WaitGroup // wait group for child goroutines
teardown TeardownFunc // called to run the teardown logic.
closing chan struct{} // closed once close starts.
closed chan struct{} // closed once close is done.
closeOnce sync.Once // ensure close is only called once.
closeErr error // error to return to clients of Close()
}
// newProcess constructs and returns a Process.
// It will call tf TeardownFunc exactly once:
// **after** all children have fully Closed,
// **after** entering <-Closing(), and
// **before** <-Closed().
func newProcess(tf TeardownFunc) *process {
if tf == nil {
tf = nilTeardownFunc
}
return &process{
teardown: tf,
closed: make(chan struct{}),
closing: make(chan struct{}),
}
}
func (p *process) WaitFor(q Process) {
p.children.Add(1) // p waits on q to be done
go func(p *process, q Process) {
<-q.Closed() // wait until q is closed
p.children.Done() // p done waiting on q
}(p, q)
}
func (p *process) AddChildNoWait(child Process) {
go func(p, child Process) {
<-p.Closing() // wait until p is closing
child.Close() // close child
}(p, child)
}
func (p *process) AddChild(child Process) {
select {
case <-p.Closing():
panic("attempt to add child to closing or closed process")
default:
}
p.children.Add(1) // p waits on child to be done
go func(p *process, child Process) {
<-p.Closing() // wait until p is closing
child.Close() // close child and wait
p.children.Done() // p done waiting on child
}(p, child)
}
func (p *process) Go(f ProcessFunc) Process {
select {
case <-p.Closing():
panic("attempt to add child to closing or closed process")
default:
}
// this is very similar to AddChild, but also runs the func
// in the child. we replicate it here to save one goroutine.
child := newProcessGoroutines(nil)
child.children.Add(1) // child waits on func to be done
p.AddChild(child)
go func() {
f(child)
child.children.Done() // wait on child's children to be done.
child.Close() // close to tear down.
}()
return child
}
// Close is the external close function.
// it's a wrapper around internalClose that waits on Closed()
func (p *process) Close() error {
p.closeOnce.Do(p.doClose)
<-p.Closed() // sync.Once should block, but this checks chan is closed too
return p.closeErr
}
func (p *process) Closing() <-chan struct{} {
return p.closing
}
func (p *process) Closed() <-chan struct{} {
return p.closed
}
// the _actual_ close process.
func (p *process) doClose() {
// this function should only be called once (hence the sync.Once).
// and it will panic (on closing channels) otherwise.
close(p.closing) // signal that we're shutting down (Closing)
p.children.Wait() // wait till all children are done (before teardown)
p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown)
close(p.closed) // signal that we're shut down (Closed)
}
package goprocess
import (
"sync"
)
// process implements Process
type process struct {
children []Process // process to close with us
waitfors []Process // process to only wait for
teardown TeardownFunc // called to run the teardown logic.
waiting chan struct{} // closed when CloseAfterChildrenClosed is called.
closing chan struct{} // closed once close starts.
closed chan struct{} // closed once close is done.
closeErr error // error to return to clients of Close()
sync.Mutex
}
// newProcess constructs and returns a Process.
// It will call tf TeardownFunc exactly once:
// **after** all children have fully Closed,
// **after** entering <-Closing(), and
// **before** <-Closed().
func newProcess(tf TeardownFunc) *process {
if tf == nil {
tf = nilTeardownFunc
}
return &process{
teardown: tf,
closed: make(chan struct{}),
closing: make(chan struct{}),
}
}
func (p *process) WaitFor(q Process) {
p.Lock()
select {
case <-p.Closed():
panic("Process cannot wait after being closed")
default:
}
p.waitfors = append(p.waitfors, q)
p.Unlock()
}
func (p *process) AddChildNoWait(child Process) {
p.Lock()
select {
case <-p.Closed():
panic("Process cannot add children after being closed")
default:
}
p.children = append(p.children, child)
p.Unlock()
}
func (p *process) AddChild(child Process) {
p.Lock()
select {
case <-p.Closed():
panic("Process cannot add children after being closed")
default:
}
p.waitfors = append(p.waitfors, child)
p.children = append(p.children, child)
p.Unlock()
}
func (p *process) Go(f ProcessFunc) {
child := newProcess(nil)
p.AddChild(child)
go func() {
f(child)
child.Close() // close to tear down.
}()
}
// Close is the external close function.
// it's a wrapper around internalClose that waits on Closed()
func (p *process) Close() error {
p.Lock()
defer p.Unlock()
// if already closed, get out.
select {
case <-p.Closed():
return p.closeErr
default:
}
p.doClose()
return p.closeErr
}
func (p *process) Closing() <-chan struct{} {
return p.closing
}
func (p *process) Closed() <-chan struct{} {
return p.closed
}
// the _actual_ close process.
func (p *process) doClose() {
// this function is only be called once (protected by p.Lock()).
// and it will panic (on closing channels) otherwise.
close(p.closing) // signal that we're shutting down (Closing)
for _, c := range p.children {
go c.Close() // force all children to shut down
}
for _, w := range p.waitfors {
<-w.Closed() // wait till all waitfors are fully closed (before teardown)
}
p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown)
close(p.closed) // signal that we're shut down (Closed)
}
// We will only wait on the children we have now.
// We will not wait on children added subsequently.
// this may change in the future.
func (p *process) CloseAfterChildren() error {
p.Lock()
select {
case <-p.Closed():
p.Unlock()
return p.Close() // get error. safe, after p.Closed()
case <-p.waiting: // already called it.
p.Unlock()
<-p.Closed()
return p.Close() // get error. safe, after p.Closed()
default:
}
p.Unlock()
// here only from one goroutine.
nextToWaitFor := func() Process {
p.Lock()
defer p.Unlock()
for _, e := range p.waitfors {
select {
case <-e.Closed():
default:
return e
}
}
return nil
}
// wait for all processes we're waiting for are closed.
// the semantics here are simple: we will _only_ close
// if there are no processes currently waiting for.
for {
// remove it from waitingfors
if next := nextToWaitFor(); next != nil {
<-next.Closed()
continue
}
// YAY! we're done. close
break
}
return p.Close()
}
# goprocess/ratelimit - ratelimit children creation
- goprocess: https://github.com/jbenet/goprocess
- Godoc: https://godoc.org/github.com/jbenet/goprocess/ratelimit
// Package ratelimit is part of github.com/jbenet/goprocess.
// It provides a simple process that ratelimits child creation.
// This is done internally with a channel/semaphore.
// So the call `RateLimiter.LimitedGo` may block until another
// child is Closed().
package ratelimit
import (
process "github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/jbenet/goprocess"
)
// RateLimiter limits the spawning of children. It does so
// with an internal semaphore. Note that Go will continue
// to be the unlimited process.Process.Go, and ONLY the
// added function `RateLimiter.LimitedGo` will honor the
// limit. This is to improve readability and avoid confusion
// for the reader, particularly if code changes over time.
type RateLimiter struct {
process.Process
limiter chan struct{}
}
func NewRateLimiter(parent process.Process, limit int) *RateLimiter {
proc := process.WithParent(parent)
return &RateLimiter{Process: proc, limiter: LimitChan(limit)}
}
// LimitedGo creates a new process, adds it as a child, and spawns the
// ProcessFunc f in its own goroutine, but may block according to the
// internal rate limit. It is equivalent to:
//
// func(f process.ProcessFunc) {
// <-limitch
// p.Go(func (child process.Process) {
// f(child)
// f.Close() // make sure its children close too!
// limitch<- struct{}{}
// })
/// }
//
// It is useful to construct simple asynchronous workers, children of p,
// and rate limit their creation, to avoid spinning up too many, too fast.
// This is great for providing backpressure to producers.
func (rl *RateLimiter) LimitedGo(f process.ProcessFunc) {
<-rl.limiter
rl.Go(func(child process.Process) {
// call the function as rl.Go would.
f(child)
// this close is here because the child may have spawned
// children of its own, and our rate limiter should capture that.
// we have two options:
// * this approach (which is what process.Go itself does), or
// * spawn another goroutine that waits on <-child.Closed()
//
// go func() {
// <-child.Closed()
// rl.limiter <- struct{}{}
// }()
//
// This approach saves a goroutine. It is fine to call child.Close()
// multiple times.
child.Close()
// after it's done.
rl.limiter <- struct{}{}
})
}
// LimitChan returns a rate-limiting channel. it is the usual, simple,
// golang-idiomatic rate-limiting semaphore. This function merely
// initializes it with certain buffer size, and sends that many values,
// so it is ready to be used.
func LimitChan(limit int) chan struct{} {
limitch := make(chan struct{}, limit)
for i := 0; i < limit; i++ {
limitch <- struct{}{}
}
return limitch
}
package ratelimit
import (
"testing"
"time"
process "github.com/jbenet/go-datastore/Godeps/_workspace/src/github.com/jbenet/goprocess"
)
func TestRateLimitLimitedGoBlocks(t *testing.T) {
numChildren := 6
t.Logf("create a rate limiter with limit of %d", numChildren/2)
rl := NewRateLimiter(process.Background(), numChildren/2)
doneSpawning := make(chan struct{})
childClosing := make(chan struct{})
t.Log("spawn 6 children with LimitedGo.")
go func() {
for i := 0; i < numChildren; i++ {
rl.LimitedGo(func(child process.Process) {
// hang until we drain childClosing
childClosing <- struct{}{}
})
t.Logf("spawned %d", i)
}
close(doneSpawning)
}()
t.Log("should have blocked.")
select {
case <-doneSpawning:
t.Error("did not block")
case <-time.After(time.Millisecond): // for scheduler
t.Log("blocked")
}
t.Logf("drain %d children so they close", numChildren/2)
for i := 0; i < numChildren/2; i++ {
t.Logf("closing %d", i)
<-childClosing // consume child cloing
t.Logf("closed %d", i)
}
t.Log("should be done spawning.")
select {
case <-doneSpawning:
case <-time.After(time.Millisecond): // for scheduler
t.Error("still blocked...")
}
t.Logf("drain %d children so they close", numChildren/2)
for i := 0; i < numChildren/2; i++ {
<-childClosing
t.Logf("closed %d", i)
}
rl.Close() // ensure everyone's closed.
}
func TestRateLimitGoDoesntBlock(t *testing.T) {
numChildren := 6
t.Logf("create a rate limiter with limit of %d", numChildren/2)
rl := NewRateLimiter(process.Background(), numChildren/2)
doneSpawning := make(chan struct{})
childClosing := make(chan struct{})
t.Log("spawn 6 children with usual Process.Go.")
go func() {
for i := 0; i < numChildren; i++ {
rl.Go(func(child process.Process) {
// hang until we drain childClosing
childClosing <- struct{}{}
})
t.Logf("spawned %d", i)
}
close(doneSpawning)
}()
t.Log("should not have blocked.")
select {
case <-doneSpawning:
t.Log("did not block")
case <-time.After(time.Millisecond): // for scheduler
t.Error("process.Go blocked. it should not.")
}
t.Log("drain children so they close")
for i := 0; i < numChildren; i++ {
<-childClosing
t.Logf("closed %d", i)
}
rl.Close() // ensure everyone's closed.
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment