diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 617096eab38e3f25698ca41e6597cf94ed77a28f..44ba61885a97644a6a643979ac0d89a9186069d9 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -199,7 +199,7 @@ }, { "ImportPath": "github.com/jbenet/goprocess", - "Rev": "c877297c00ffe09f8213ceec3bbb0ab40871f8d4" + "Rev": "ea63e9540cd19cb39e0e4c4442b9c27664287bb8" }, { "ImportPath": "github.com/kardianos/osext", diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess_test.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess_test.go index ea04a1808170670478d4969523d35e1ddf89e680..26bb5b42b5384cc8c2c5d679a3aa907181632a99 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess_test.go @@ -71,10 +71,6 @@ func TestChildFunc(t *testing.T) { wait2 := make(chan struct{}) wait3 := make(chan struct{}) wait4 := make(chan struct{}) - go func() { - a.Close() - wait4 <- struct{}{} - }() a.Go(func(process Process) { wait1 <- struct{}{} @@ -82,6 +78,11 @@ func TestChildFunc(t *testing.T) { wait3 <- struct{}{} }) + go func() { + a.Close() + wait4 <- struct{}{} + }() + <-wait1 select { case <-wait3: diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go index d278b72b5a5e2e2eb9ae774ca81545c179a8c5a4..47a73feddb86eda6cafa276ac118c28575f836ee 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go @@ -6,8 +6,10 @@ import ( // process implements Process type process struct { - children []Process // process to close with us - waitfors []Process // process to only wait for + children []*processLink // process to close with us + waitfors []*processLink // process to only wait for + waiters []*processLink // processes that wait for us. for gc. + teardown TeardownFunc // called to run the teardown logic. waiting chan struct{} // closed when CloseAfterChildrenClosed is called. closing chan struct{} // closed once close starts. @@ -47,8 +49,10 @@ func (p *process) WaitFor(q Process) { default: } - p.waitfors = append(p.waitfors, q) + pl := newProcessLink(p, q) + p.waitfors = append(p.waitfors, pl) p.Unlock() + go pl.AddToChild() } func (p *process) AddChildNoWait(child Process) { @@ -66,8 +70,10 @@ func (p *process) AddChildNoWait(child Process) { default: } - p.children = append(p.children, child) + pl := newProcessLink(p, child) + p.children = append(p.children, pl) p.Unlock() + go pl.AddToChild() } func (p *process) AddChild(child Process) { @@ -85,9 +91,11 @@ func (p *process) AddChild(child Process) { default: } - p.waitfors = append(p.waitfors, child) - p.children = append(p.children, child) + pl := newProcessLink(p, child) + p.waitfors = append(p.waitfors, pl) + p.children = append(p.children, pl) p.Unlock() + go pl.AddToChild() } func (p *process) Go(f ProcessFunc) Process { @@ -141,26 +149,38 @@ func (p *process) doClose() { close(p.closing) // signal that we're shutting down (Closing) for len(p.children) > 0 || len(p.waitfors) > 0 { - for _, c := range p.children { - go c.Close() // force all children to shut down + for _, plc := range p.children { + child := plc.Child() + if child != nil { // check because child may already have been removed. + go child.Close() // force all children to shut down + } + plc.ParentClear() } - p.children = nil // clear them + p.children = nil // clear them. release memory. // we must be careful not to iterate over waitfors directly, as it may // change under our feet. wf := p.waitfors - p.waitfors = nil // clear them + p.waitfors = nil // clear them. release memory. for _, w := range wf { // Here, we wait UNLOCKED, so that waitfors who are in the middle of // adding a child to us can finish. we will immediately close the child. p.Unlock() - <-w.Closed() // wait till all waitfors are fully closed (before teardown) + <-w.ChildClosed() // wait till all waitfors are fully closed (before teardown) p.Lock() + w.ParentClear() } } p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown) close(p.closed) // signal that we're shut down (Closed) + + // go remove all the parents from the process links. optimization. + go func(waiters []*processLink) { + for _, pl := range waiters { + pl.ClearChild() + } + }(p.waiters) // pass in so } // We will only wait on the children we have now. @@ -186,10 +206,15 @@ func (p *process) CloseAfterChildren() error { p.Lock() defer p.Unlock() for _, e := range p.waitfors { + c := e.Child() + if c == nil { + continue + } + select { - case <-e.Closed(): + case <-c.Closed(): default: - return e + return c } } return nil diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/link.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/link.go new file mode 100644 index 0000000000000000000000000000000000000000..c344c1e6136d669cc13eba29ef2b9e73c5752923 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/link.go @@ -0,0 +1,121 @@ +package goprocess + +import ( + "sync" +) + +// closedCh is an alread-closed channel. used to return +// in cases where we already know a channel is closed. +var closedCh chan struct{} + +func init() { + closedCh = make(chan struct{}) + close(closedCh) +} + +// a processLink is an internal bookkeeping datastructure. +// it's used to form a relationship between two processes. +// It is mostly for keeping memory usage down (letting +// children close and be garbage-collected). +type processLink struct { + // guards all fields. + // DO NOT HOLD while holding process locks. + // it may be slow, and could deadlock if not careful. + sync.Mutex + parent Process + child Process +} + +func newProcessLink(p, c Process) *processLink { + return &processLink{ + parent: p, + child: c, + } +} + +// Closing returns whether the child is closing +func (pl *processLink) ChildClosing() <-chan struct{} { + // grab a hold of it, and unlock, as .Closing may block. + pl.Lock() + child := pl.child + pl.Unlock() + + if child == nil { // already closed? memory optimization. + return closedCh + } + return child.Closing() +} + +func (pl *processLink) ChildClosed() <-chan struct{} { + // grab a hold of it, and unlock, as .Closed may block. + pl.Lock() + child := pl.child + pl.Unlock() + + if child == nil { // already closed? memory optimization. + return closedCh + } + return child.Closed() +} + +func (pl *processLink) ChildClose() { + // grab a hold of it, and unlock, as .Closed may block. + pl.Lock() + child := pl.child + pl.Unlock() + + if child != nil { // already closed? memory optimization. + child.Close() + } +} + +func (pl *processLink) ClearChild() { + pl.Lock() + pl.child = nil + pl.Unlock() +} + +func (pl *processLink) ParentClear() { + pl.Lock() + pl.parent = nil + pl.Unlock() +} + +func (pl *processLink) Child() Process { + pl.Lock() + defer pl.Unlock() + return pl.child +} + +func (pl *processLink) Parent() Process { + pl.Lock() + defer pl.Unlock() + return pl.parent +} + +func (pl *processLink) AddToChild() { + cp := pl.Child() + + // is it a *process ? if not... panic. + c, ok := cp.(*process) + if !ok { + panic("goprocess does not yet support other process impls.") + } + + // first, is it Closed? + c.Lock() + select { + case <-c.Closed(): + c.Unlock() + + // already closed. must not add. + // we must clear it, though. do so without the lock. + pl.ClearChild() + return + + default: + // put the process link into q's waiters + c.waiters = append(c.waiters, pl) + c.Unlock() + } +} diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic_test.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic_test.go index 3e48a110e0c9d4e1c7bfc8bf9c34dcbd08d6ae8d..088d31fe8fa384046cf176abaf532e50a63fc4cd 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic/periodic_test.go @@ -4,8 +4,8 @@ import ( "testing" "time" - ci "github.com/jbenet/go-cienv" gp "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + ci "github.com/jbenet/go-cienv" ) var (