Commit dbef4efd authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

updated goprocess to deal with memory leak

parent 11a85c7d
......@@ -199,7 +199,7 @@
},
{
"ImportPath": "github.com/jbenet/goprocess",
"Rev": "c877297c00ffe09f8213ceec3bbb0ab40871f8d4"
"Rev": "ea63e9540cd19cb39e0e4c4442b9c27664287bb8"
},
{
"ImportPath": "github.com/kardianos/osext",
......
......@@ -71,10 +71,6 @@ func TestChildFunc(t *testing.T) {
wait2 := make(chan struct{})
wait3 := make(chan struct{})
wait4 := make(chan struct{})
go func() {
a.Close()
wait4 <- struct{}{}
}()
a.Go(func(process Process) {
wait1 <- struct{}{}
......@@ -82,6 +78,11 @@ func TestChildFunc(t *testing.T) {
wait3 <- struct{}{}
})
go func() {
a.Close()
wait4 <- struct{}{}
}()
<-wait1
select {
case <-wait3:
......
......@@ -6,8 +6,10 @@ import (
// process implements Process
type process struct {
children []Process // process to close with us
waitfors []Process // process to only wait for
children []*processLink // process to close with us
waitfors []*processLink // process to only wait for
waiters []*processLink // processes that wait for us. for gc.
teardown TeardownFunc // called to run the teardown logic.
waiting chan struct{} // closed when CloseAfterChildrenClosed is called.
closing chan struct{} // closed once close starts.
......@@ -47,8 +49,10 @@ func (p *process) WaitFor(q Process) {
default:
}
p.waitfors = append(p.waitfors, q)
pl := newProcessLink(p, q)
p.waitfors = append(p.waitfors, pl)
p.Unlock()
go pl.AddToChild()
}
func (p *process) AddChildNoWait(child Process) {
......@@ -66,8 +70,10 @@ func (p *process) AddChildNoWait(child Process) {
default:
}
p.children = append(p.children, child)
pl := newProcessLink(p, child)
p.children = append(p.children, pl)
p.Unlock()
go pl.AddToChild()
}
func (p *process) AddChild(child Process) {
......@@ -85,9 +91,11 @@ func (p *process) AddChild(child Process) {
default:
}
p.waitfors = append(p.waitfors, child)
p.children = append(p.children, child)
pl := newProcessLink(p, child)
p.waitfors = append(p.waitfors, pl)
p.children = append(p.children, pl)
p.Unlock()
go pl.AddToChild()
}
func (p *process) Go(f ProcessFunc) Process {
......@@ -141,26 +149,38 @@ func (p *process) doClose() {
close(p.closing) // signal that we're shutting down (Closing)
for len(p.children) > 0 || len(p.waitfors) > 0 {
for _, c := range p.children {
go c.Close() // force all children to shut down
for _, plc := range p.children {
child := plc.Child()
if child != nil { // check because child may already have been removed.
go child.Close() // force all children to shut down
}
plc.ParentClear()
}
p.children = nil // clear them
p.children = nil // clear them. release memory.
// we must be careful not to iterate over waitfors directly, as it may
// change under our feet.
wf := p.waitfors
p.waitfors = nil // clear them
p.waitfors = nil // clear them. release memory.
for _, w := range wf {
// Here, we wait UNLOCKED, so that waitfors who are in the middle of
// adding a child to us can finish. we will immediately close the child.
p.Unlock()
<-w.Closed() // wait till all waitfors are fully closed (before teardown)
<-w.ChildClosed() // wait till all waitfors are fully closed (before teardown)
p.Lock()
w.ParentClear()
}
}
p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown)
close(p.closed) // signal that we're shut down (Closed)
// go remove all the parents from the process links. optimization.
go func(waiters []*processLink) {
for _, pl := range waiters {
pl.ClearChild()
}
}(p.waiters) // pass in so
}
// We will only wait on the children we have now.
......@@ -186,10 +206,15 @@ func (p *process) CloseAfterChildren() error {
p.Lock()
defer p.Unlock()
for _, e := range p.waitfors {
c := e.Child()
if c == nil {
continue
}
select {
case <-e.Closed():
case <-c.Closed():
default:
return e
return c
}
}
return nil
......
package goprocess
import (
"sync"
)
// closedCh is an alread-closed channel. used to return
// in cases where we already know a channel is closed.
var closedCh chan struct{}
func init() {
closedCh = make(chan struct{})
close(closedCh)
}
// a processLink is an internal bookkeeping datastructure.
// it's used to form a relationship between two processes.
// It is mostly for keeping memory usage down (letting
// children close and be garbage-collected).
type processLink struct {
// guards all fields.
// DO NOT HOLD while holding process locks.
// it may be slow, and could deadlock if not careful.
sync.Mutex
parent Process
child Process
}
func newProcessLink(p, c Process) *processLink {
return &processLink{
parent: p,
child: c,
}
}
// Closing returns whether the child is closing
func (pl *processLink) ChildClosing() <-chan struct{} {
// grab a hold of it, and unlock, as .Closing may block.
pl.Lock()
child := pl.child
pl.Unlock()
if child == nil { // already closed? memory optimization.
return closedCh
}
return child.Closing()
}
func (pl *processLink) ChildClosed() <-chan struct{} {
// grab a hold of it, and unlock, as .Closed may block.
pl.Lock()
child := pl.child
pl.Unlock()
if child == nil { // already closed? memory optimization.
return closedCh
}
return child.Closed()
}
func (pl *processLink) ChildClose() {
// grab a hold of it, and unlock, as .Closed may block.
pl.Lock()
child := pl.child
pl.Unlock()
if child != nil { // already closed? memory optimization.
child.Close()
}
}
func (pl *processLink) ClearChild() {
pl.Lock()
pl.child = nil
pl.Unlock()
}
func (pl *processLink) ParentClear() {
pl.Lock()
pl.parent = nil
pl.Unlock()
}
func (pl *processLink) Child() Process {
pl.Lock()
defer pl.Unlock()
return pl.child
}
func (pl *processLink) Parent() Process {
pl.Lock()
defer pl.Unlock()
return pl.parent
}
func (pl *processLink) AddToChild() {
cp := pl.Child()
// is it a *process ? if not... panic.
c, ok := cp.(*process)
if !ok {
panic("goprocess does not yet support other process impls.")
}
// first, is it Closed?
c.Lock()
select {
case <-c.Closed():
c.Unlock()
// already closed. must not add.
// we must clear it, though. do so without the lock.
pl.ClearChild()
return
default:
// put the process link into q's waiters
c.waiters = append(c.waiters, pl)
c.Unlock()
}
}
......@@ -4,8 +4,8 @@ import (
"testing"
"time"
ci "github.com/jbenet/go-cienv"
gp "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
ci "github.com/jbenet/go-cienv"
)
var (
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment