From 8065b61c305e7170c99e69e616e4267cf1e6b8a3 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet <juan@benet.ai> Date: Sat, 18 Oct 2014 02:47:27 -0700 Subject: [PATCH] Added ContextCloser abstraction --- net/conn/closer.go | 83 +++++++++++++++++++++++++++++++++++++++++++ net/conn/conn.go | 88 +++++++++++++--------------------------------- 2 files changed, 107 insertions(+), 64 deletions(-) create mode 100644 net/conn/closer.go diff --git a/net/conn/closer.go b/net/conn/closer.go new file mode 100644 index 000000000..503b035d3 --- /dev/null +++ b/net/conn/closer.go @@ -0,0 +1,83 @@ +package conn + +import ( + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +// Wait is a readable channel to block on until it receives a signal. +type Wait <-chan Signal + +// Signal is an empty channel +type Signal struct{} + +// CloseFunc is a function used to close a ContextCloser +type CloseFunc func() error + +// ContextCloser is an interface for services able to be opened and closed. +type ContextCloser interface { + Context() context.Context + + // Close is a method to call when you with to stop this ContextCloser + Close() error + + // Done is a method to wait upon, like context.Context.Done + Done() Wait +} + +// contextCloser is an OpenCloser with a cancellable context +type contextCloser struct { + ctx context.Context + cancel context.CancelFunc + + // called to close + closeFunc CloseFunc + + // closed is released once the close function is done. + closed chan Signal +} + +// NewContextCloser constructs and returns a ContextCloser. It will call +// cf CloseFunc before its Done() Wait signals fire. +func NewContextCloser(ctx context.Context, cf CloseFunc) ContextCloser { + ctx, cancel := context.WithCancel(ctx) + c := &contextCloser{ + ctx: ctx, + cancel: cancel, + closeFunc: cf, + closed: make(chan Signal), + } + + go c.closeOnContextDone() + return c +} + +func (c *contextCloser) Context() context.Context { + return c.ctx +} + +func (c *contextCloser) Done() Wait { + return c.closed +} + +func (c *contextCloser) Close() error { + select { + case <-c.Done(): + panic("closed twice") + default: + } + + c.cancel() // release anyone waiting on the context + err := c.closeFunc() // actually run the close logic + close(c.closed) // relase everyone waiting on Done + return err +} + +func (c *contextCloser) closeOnContextDone() { + <-c.ctx.Done() + select { + case <-c.Done(): + return // already closed + default: + } + c.Close() +} diff --git a/net/conn/conn.go b/net/conn/conn.go index 25fc676d2..d75ea1e15 100644 --- a/net/conn/conn.go +++ b/net/conn/conn.go @@ -41,35 +41,30 @@ type singleConn struct { remote *peer.Peer maconn manet.Conn - // context + cancel - ctx context.Context - cancel context.CancelFunc - secure *spipe.SecurePipe insecure *msgioPipe + + ContextCloser } // newConn constructs a new connection func newSingleConn(ctx context.Context, local, remote *peer.Peer, peers peer.Peerstore, maconn manet.Conn) (Conn, error) { - ctx, cancel := context.WithCancel(ctx) - conn := &singleConn{ local: local, remote: remote, maconn: maconn, - ctx: ctx, - cancel: cancel, insecure: newMsgioPipe(10), } + conn.ContextCloser = NewContextCloser(ctx, conn.close) + log.Info("newSingleConn: %v to %v", local, remote) // setup the various io goroutines go conn.insecure.outgoing.WriteTo(maconn) go conn.insecure.incoming.ReadFrom(maconn, MaxMessageSize) - go conn.waitToClose() // perform secure handshake before returning this connection. if err := conn.secureHandshake(peers); err != nil { @@ -93,7 +88,7 @@ func (c *singleConn) secureHandshake(peers peer.Peerstore) error { } // spipe performs the secure handshake, which takes multiple RTT - sp, err := spipe.NewSecurePipe(c.ctx, 10, c.local, peers, insecure) + sp, err := spipe.NewSecurePipe(c.Context(), 10, c.local, peers, insecure) if err != nil { return err } @@ -114,42 +109,20 @@ func (c *singleConn) secureHandshake(peers peer.Peerstore) error { return nil } -// waitToClose waits on the given context's Done before closing Conn. -func (c *singleConn) waitToClose() { - select { - case <-c.ctx.Done(): - } +// close is the internal close function, called by ContextCloser.Close +func (c *singleConn) close() error { + log.Debug("%s closing Conn with %s", c.local, c.remote) // close underlying connection - c.maconn.Close() + err := c.maconn.Close() // closing channels c.insecure.outgoing.Close() if c.secure != nil { // may never have gotten here. c.secure.Close() } -} -// isClosed returns whether this Conn is open or closed. -func (c *singleConn) isClosed() bool { - select { - case <-c.ctx.Done(): - return true - default: - return false - } -} - -// Close closes the connection, and associated channels. -func (c *singleConn) Close() error { - log.Debug("%s closing Conn with %s", c.local, c.remote) - if c.isClosed() { - return fmt.Errorf("connection already closed") - } - - // cancel context. - c.cancel() - return nil + return err } // LocalPeer is the Peer on this side @@ -235,23 +208,24 @@ type listener struct { // Peerstore is the set of peers we know about locally peers peer.Peerstore - // ctx + cancel func - ctx context.Context - cancel context.CancelFunc + // embedded ContextCloser + ContextCloser } -// waitToClose is needed to hand -func (l *listener) waitToClose() { - select { - case <-l.ctx.Done(): - } +// disambiguate +func (l *listener) Close() error { + return l.ContextCloser.Close() +} - l.Listener.Close() +// close called by ContextCloser.Close +func (l *listener) close() error { + log.Info("listener closing: %s %s", l.local, l.maddr) + return l.Listener.Close() } func (l *listener) isClosed() bool { select { - case <-l.ctx.Done(): + case <-l.Done(): return true default: return false @@ -266,7 +240,7 @@ func (l *listener) listen() { // handle is a goroutine work function that handles the handshake. // it's here only so that accepting new connections can happen quickly. handle := func(maconn manet.Conn) { - c, err := newSingleConn(l.ctx, l.local, nil, l.peers, maconn) + c, err := newSingleConn(l.Context(), l.local, nil, l.peers, maconn) if err != nil { log.Error("Error accepting connection: %v", err) } else { @@ -316,22 +290,9 @@ func (l *listener) Peerstore() peer.Peerstore { return l.peers } -// Close closes the listener. -// Any blocked Accept operations will be unblocked and return errors -func (l *listener) Close() error { - if l.isClosed() { - return errors.New("listener already closed") - } - - l.cancel() - return nil -} - // Listen listens on the particular multiaddr, with given peer and peerstore. func Listen(ctx context.Context, addr ma.Multiaddr, local *peer.Peer, peers peer.Peerstore) (Listener, error) { - ctx, cancel := context.WithCancel(ctx) - ml, err := manet.Listen(addr) if err != nil { return nil, err @@ -341,8 +302,6 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local *peer.Peer, peers peer chansize := 10 l := &listener{ - ctx: ctx, - cancel: cancel, Listener: ml, maddr: addr, peers: peers, @@ -351,8 +310,9 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local *peer.Peer, peers peer chansize: chansize, } + l.ContextCloser = NewContextCloser(ctx, l.close) + go l.listen() - go l.waitToClose() return l, nil } -- GitLab