diff --git a/net/conn/closer.go b/net/conn/closer.go index 519b9e513d788451e10ab710d36294d7c433910f..6eb3f8ad4ea3dacafef9d4f936ae48b2b12b103a 100644 --- a/net/conn/closer.go +++ b/net/conn/closer.go @@ -1,29 +1,71 @@ package conn import ( - "errors" + "sync" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ) -// Wait is a readable channel to block on until it receives a signal. -type Wait <-chan Signal - -// Signal is an empty channel -type Signal struct{} - // CloseFunc is a function used to close a ContextCloser type CloseFunc func() error // ContextCloser is an interface for services able to be opened and closed. +// It has a parent Context, and Children. But ContextCloser is not a proper +// "tree" like the Context tree. It is more like a Context-WaitGroup hybrid. +// It models a main object with a few children objects -- and, unlike the +// context -- concerns itself with the parent-child closing semantics: +// +// - Can define a CloseFunc (func() error) to be run at Close time. +// - Children call Children().Add(1) to be waited upon +// - Children can select on <-Closing() to know when they should shut down. +// - Close() will wait until all children call Children().Done() +// - <-Closed() signals when the service is completely closed. +// +// ContextCloser can be embedded into the main object itself. In that case, +// the closeFunc (if a member function) has to be set after the struct +// is intialized: +// +// type service struct { +// ContextCloser +// net.Conn +// } +// +// func (s *service) close() error { +// return s.Conn.Close() +// } +// +// func newService(ctx context.Context, c net.Conn) *service { +// s := &service{c} +// s.ContextCloser = NewContextCloser(ctx, s.close) +// return s +// } +// type ContextCloser interface { + + // Context is the context of this ContextCloser. It is "sort of" a parent. Context() context.Context - // Close is a method to call when you with to stop this ContextCloser + // Children is a sync.Waitgroup for all children goroutines that should + // shut down completely before this service is said to be "closed". + // Follows the semantics of WaitGroup: + // Children().Add(1) // add one more dependent child + // Children().Done() // child signals it is done + Children() *sync.WaitGroup + + // Close is a method to call when you wish to stop this ContextCloser Close() error - // Done is a method to wait upon, like context.Context.Done - Done() Wait + // Closing is a signal to wait upon, like Context.Done(). + // It fires when the object should be closing (but hasn't yet fully closed). + // The primary use case is for child goroutines who need to know when + // they should shut down. (equivalent to Context().Done()) + Closing() <-chan struct{} + + // Closed is a method to wait upon, like Context.Done(). + // It fires when the entire object is fully closed. + // The primary use case is for external listeners who need to know when + // this object is completly done, and all its children closed. + Closed() <-chan struct{} } // contextCloser is an OpenCloser with a cancellable context @@ -31,11 +73,20 @@ type contextCloser struct { ctx context.Context cancel context.CancelFunc - // called to close + // called to run the close logic. closeFunc CloseFunc // closed is released once the close function is done. - closed chan Signal + closed chan struct{} + + // wait group for child goroutines + children sync.WaitGroup + + // sync primitive to ensure the close logic is only called once. + closeOnce sync.Once + + // error to return to clients of Close(). + closeErr error } // NewContextCloser constructs and returns a ContextCloser. It will call @@ -46,7 +97,7 @@ func NewContextCloser(ctx context.Context, cf CloseFunc) ContextCloser { ctx: ctx, cancel: cancel, closeFunc: cf, - closed: make(chan Signal), + closed: make(chan struct{}), } go c.closeOnContextDone() @@ -57,30 +108,47 @@ func (c *contextCloser) Context() context.Context { return c.ctx } -func (c *contextCloser) Done() Wait { - return c.closed +func (c *contextCloser) Children() *sync.WaitGroup { + return &c.children } +// Close is the external close function. it's a wrapper around internalClose +// that waits on Closed() func (c *contextCloser) Close() error { - select { - case <-c.Done(): - // panic("closed twice") - return errors.New("closed twice") - default: - } + c.internalClose() + <-c.Closed() // wait until we're totally done. + return c.closeErr +} - err := c.closeFunc() // actually run the close logic - close(c.closed) // relase everyone waiting on Done - c.cancel() // release anyone waiting on the context - return err +func (c *contextCloser) Closing() <-chan struct{} { + return c.Context().Done() } +func (c *contextCloser) Closed() <-chan struct{} { + return c.closed +} + +func (c *contextCloser) internalClose() { + go c.closeOnce.Do(c.closeLogic) +} + +// the _actual_ close process. +func (c *contextCloser) closeLogic() { + // this function should only be called once (hence the sync.Once). + // and it will panic at the bottom (on close(c.closed)) otherwise. + + c.cancel() // signal that we're shutting down (Closing) + c.closeErr = c.closeFunc() // actually run the close logic + c.children.Wait() // wait till all children are done. + close(c.closed) // signal that we're shut down (Closed) +} + +// if parent context is shut down before we call Close explicitly, +// we need to go through the Close motions anyway. Hence all the sync +// stuff all over the place... func (c *contextCloser) closeOnContextDone() { - <-c.ctx.Done() - select { - case <-c.Done(): - return // already closed - default: - } - c.Close() + c.Children().Add(1) // we're a child goroutine, to be waited upon. + <-c.Context().Done() // wait until parent (context) is done. + c.internalClose() + c.Children().Done() } diff --git a/net/conn/conn.go b/net/conn/conn.go index 6c3dd0957ea6bd08de1d1ce1b7ddaec2dada34ed..5d67c1e221198884e6da8eb1b241f3b506102ded 100644 --- a/net/conn/conn.go +++ b/net/conn/conn.go @@ -218,7 +218,7 @@ func (l *listener) close() error { func (l *listener) isClosed() bool { select { - case <-l.Done(): + case <-l.Closed(): return true default: return false diff --git a/net/conn/conn_test.go b/net/conn/conn_test.go index 715c7a900b878aeace628c8fd40eb90fdd31d186..f86192a8d63df2df6abde330e48ef8bc298ba8b4 100644 --- a/net/conn/conn_test.go +++ b/net/conn/conn_test.go @@ -18,9 +18,9 @@ func TestClose(t *testing.T) { c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/1234", "/ip4/127.0.0.1/tcp/2345") select { - case <-c1.Done(): + case <-c1.Closed(): t.Fatal("done before close") - case <-c2.Done(): + case <-c2.Closed(): t.Fatal("done before close") default: } @@ -28,7 +28,7 @@ func TestClose(t *testing.T) { c1.Close() select { - case <-c1.Done(): + case <-c1.Closed(): default: t.Fatal("not done after cancel") } @@ -36,7 +36,7 @@ func TestClose(t *testing.T) { c2.Close() select { - case <-c2.Done(): + case <-c2.Closed(): default: t.Fatal("not done after cancel") } @@ -50,9 +50,9 @@ func TestCancel(t *testing.T) { c1, c2 := setupConn(t, ctx, "/ip4/127.0.0.1/tcp/1234", "/ip4/127.0.0.1/tcp/2345") select { - case <-c1.Done(): + case <-c1.Closed(): t.Fatal("done before close") - case <-c2.Done(): + case <-c2.Closed(): t.Fatal("done before close") default: } @@ -64,13 +64,13 @@ func TestCancel(t *testing.T) { // test that cancel called Close. select { - case <-c1.Done(): + case <-c1.Closed(): default: t.Fatal("not done after cancel") } select { - case <-c2.Done(): + case <-c2.Closed(): default: t.Fatal("not done after cancel") } diff --git a/net/conn/handshake.go b/net/conn/handshake.go index 7729214467602ebb1857e80afa9d8645b482476a..0932335224dce0442a707565d827d9b2704d28aa 100644 --- a/net/conn/handshake.go +++ b/net/conn/handshake.go @@ -31,7 +31,7 @@ func VersionHandshake(ctx context.Context, c Conn) error { case <-ctx.Done(): return ctx.Err() - case <-c.Done(): + case <-c.Closed(): return errors.New("remote closed connection during version exchange") case data, ok := <-c.In(): diff --git a/net/conn/secure_conn_test.go b/net/conn/secure_conn_test.go index 9a1dd429a0aaa87135da74d931d4b8c0b12e94ef..4e6db1ea471a679940f1641b0a7fa4c874282c2e 100644 --- a/net/conn/secure_conn_test.go +++ b/net/conn/secure_conn_test.go @@ -37,9 +37,9 @@ func TestSecureClose(t *testing.T) { c2 = setupSecureConn(t, c2) select { - case <-c1.Done(): + case <-c1.Closed(): t.Fatal("done before close") - case <-c2.Done(): + case <-c2.Closed(): t.Fatal("done before close") default: } @@ -47,7 +47,7 @@ func TestSecureClose(t *testing.T) { c1.Close() select { - case <-c1.Done(): + case <-c1.Closed(): default: t.Fatal("not done after cancel") } @@ -55,7 +55,7 @@ func TestSecureClose(t *testing.T) { c2.Close() select { - case <-c2.Done(): + case <-c2.Closed(): default: t.Fatal("not done after cancel") } @@ -72,9 +72,9 @@ func TestSecureCancel(t *testing.T) { c2 = setupSecureConn(t, c2) select { - case <-c1.Done(): + case <-c1.Closed(): t.Fatal("done before close") - case <-c2.Done(): + case <-c2.Closed(): t.Fatal("done before close") default: } @@ -86,13 +86,13 @@ func TestSecureCancel(t *testing.T) { // test that cancel called Close. select { - case <-c1.Done(): + case <-c1.Closed(): default: t.Fatal("not done after cancel") } select { - case <-c2.Done(): + case <-c2.Closed(): default: t.Fatal("not done after cancel") }