Commit 8065b61c authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

Added ContextCloser abstraction

parent 7a7bf8d8
package conn
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
// Wait is a readable channel to block on until it receives a signal.
type Wait <-chan Signal
// Signal is an empty channel
type Signal struct{}
// CloseFunc is a function used to close a ContextCloser
type CloseFunc func() error
// ContextCloser is an interface for services able to be opened and closed.
type ContextCloser interface {
Context() context.Context
// Close is a method to call when you with to stop this ContextCloser
Close() error
// Done is a method to wait upon, like context.Context.Done
Done() Wait
}
// contextCloser is an OpenCloser with a cancellable context
type contextCloser struct {
ctx context.Context
cancel context.CancelFunc
// called to close
closeFunc CloseFunc
// closed is released once the close function is done.
closed chan Signal
}
// NewContextCloser constructs and returns a ContextCloser. It will call
// cf CloseFunc before its Done() Wait signals fire.
func NewContextCloser(ctx context.Context, cf CloseFunc) ContextCloser {
ctx, cancel := context.WithCancel(ctx)
c := &contextCloser{
ctx: ctx,
cancel: cancel,
closeFunc: cf,
closed: make(chan Signal),
}
go c.closeOnContextDone()
return c
}
func (c *contextCloser) Context() context.Context {
return c.ctx
}
func (c *contextCloser) Done() Wait {
return c.closed
}
func (c *contextCloser) Close() error {
select {
case <-c.Done():
panic("closed twice")
default:
}
c.cancel() // release anyone waiting on the context
err := c.closeFunc() // actually run the close logic
close(c.closed) // relase everyone waiting on Done
return err
}
func (c *contextCloser) closeOnContextDone() {
<-c.ctx.Done()
select {
case <-c.Done():
return // already closed
default:
}
c.Close()
}
......@@ -41,35 +41,30 @@ type singleConn struct {
remote *peer.Peer
maconn manet.Conn
// context + cancel
ctx context.Context
cancel context.CancelFunc
secure *spipe.SecurePipe
insecure *msgioPipe
ContextCloser
}
// newConn constructs a new connection
func newSingleConn(ctx context.Context, local, remote *peer.Peer,
peers peer.Peerstore, maconn manet.Conn) (Conn, error) {
ctx, cancel := context.WithCancel(ctx)
conn := &singleConn{
local: local,
remote: remote,
maconn: maconn,
ctx: ctx,
cancel: cancel,
insecure: newMsgioPipe(10),
}
conn.ContextCloser = NewContextCloser(ctx, conn.close)
log.Info("newSingleConn: %v to %v", local, remote)
// setup the various io goroutines
go conn.insecure.outgoing.WriteTo(maconn)
go conn.insecure.incoming.ReadFrom(maconn, MaxMessageSize)
go conn.waitToClose()
// perform secure handshake before returning this connection.
if err := conn.secureHandshake(peers); err != nil {
......@@ -93,7 +88,7 @@ func (c *singleConn) secureHandshake(peers peer.Peerstore) error {
}
// spipe performs the secure handshake, which takes multiple RTT
sp, err := spipe.NewSecurePipe(c.ctx, 10, c.local, peers, insecure)
sp, err := spipe.NewSecurePipe(c.Context(), 10, c.local, peers, insecure)
if err != nil {
return err
}
......@@ -114,42 +109,20 @@ func (c *singleConn) secureHandshake(peers peer.Peerstore) error {
return nil
}
// waitToClose waits on the given context's Done before closing Conn.
func (c *singleConn) waitToClose() {
select {
case <-c.ctx.Done():
}
// close is the internal close function, called by ContextCloser.Close
func (c *singleConn) close() error {
log.Debug("%s closing Conn with %s", c.local, c.remote)
// close underlying connection
c.maconn.Close()
err := c.maconn.Close()
// closing channels
c.insecure.outgoing.Close()
if c.secure != nil { // may never have gotten here.
c.secure.Close()
}
}
// isClosed returns whether this Conn is open or closed.
func (c *singleConn) isClosed() bool {
select {
case <-c.ctx.Done():
return true
default:
return false
}
}
// Close closes the connection, and associated channels.
func (c *singleConn) Close() error {
log.Debug("%s closing Conn with %s", c.local, c.remote)
if c.isClosed() {
return fmt.Errorf("connection already closed")
}
// cancel context.
c.cancel()
return nil
return err
}
// LocalPeer is the Peer on this side
......@@ -235,23 +208,24 @@ type listener struct {
// Peerstore is the set of peers we know about locally
peers peer.Peerstore
// ctx + cancel func
ctx context.Context
cancel context.CancelFunc
// embedded ContextCloser
ContextCloser
}
// waitToClose is needed to hand
func (l *listener) waitToClose() {
select {
case <-l.ctx.Done():
}
// disambiguate
func (l *listener) Close() error {
return l.ContextCloser.Close()
}
l.Listener.Close()
// close called by ContextCloser.Close
func (l *listener) close() error {
log.Info("listener closing: %s %s", l.local, l.maddr)
return l.Listener.Close()
}
func (l *listener) isClosed() bool {
select {
case <-l.ctx.Done():
case <-l.Done():
return true
default:
return false
......@@ -266,7 +240,7 @@ func (l *listener) listen() {
// handle is a goroutine work function that handles the handshake.
// it's here only so that accepting new connections can happen quickly.
handle := func(maconn manet.Conn) {
c, err := newSingleConn(l.ctx, l.local, nil, l.peers, maconn)
c, err := newSingleConn(l.Context(), l.local, nil, l.peers, maconn)
if err != nil {
log.Error("Error accepting connection: %v", err)
} else {
......@@ -316,22 +290,9 @@ func (l *listener) Peerstore() peer.Peerstore {
return l.peers
}
// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors
func (l *listener) Close() error {
if l.isClosed() {
return errors.New("listener already closed")
}
l.cancel()
return nil
}
// Listen listens on the particular multiaddr, with given peer and peerstore.
func Listen(ctx context.Context, addr ma.Multiaddr, local *peer.Peer, peers peer.Peerstore) (Listener, error) {
ctx, cancel := context.WithCancel(ctx)
ml, err := manet.Listen(addr)
if err != nil {
return nil, err
......@@ -341,8 +302,6 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local *peer.Peer, peers peer
chansize := 10
l := &listener{
ctx: ctx,
cancel: cancel,
Listener: ml,
maddr: addr,
peers: peers,
......@@ -351,8 +310,9 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local *peer.Peer, peers peer
chansize: chansize,
}
l.ContextCloser = NewContextCloser(ctx, l.close)
go l.listen()
go l.waitToClose()
return l, nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment