Commit 8aed79cd authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

fixed data races

parent 08af98d4
...@@ -34,34 +34,29 @@ type params struct { ...@@ -34,34 +34,29 @@ type params struct {
// NewSecurePipe constructs a pipe with channels of a given buffer size. // NewSecurePipe constructs a pipe with channels of a given buffer size.
func NewSecurePipe(ctx context.Context, bufsize int, local *peer.Peer, func NewSecurePipe(ctx context.Context, bufsize int, local *peer.Peer,
peers peer.Peerstore) (*SecurePipe, error) { peers peer.Peerstore, insecure Duplex) (*SecurePipe, error) {
ctx, cancel := context.WithCancel(ctx)
sp := &SecurePipe{ sp := &SecurePipe{
Duplex: Duplex{ Duplex: Duplex{
In: make(chan []byte, bufsize), In: make(chan []byte, bufsize),
Out: make(chan []byte, bufsize), Out: make(chan []byte, bufsize),
}, },
local: local, local: local,
peers: peers, peers: peers,
} insecure: insecure,
return sp, nil
}
// Wrap creates a secure connection on top of an insecure duplex channel. ctx: ctx,
func (s *SecurePipe) Wrap(ctx context.Context, insecure Duplex) error { cancel: cancel,
if s.ctx != nil {
return errors.New("Pipe in use")
} }
s.insecure = insecure if err := sp.handshake(); err != nil {
s.ctx, s.cancel = context.WithCancel(ctx) sp.Close()
return nil, err
if err := s.handshake(); err != nil {
s.cancel()
return err
} }
return nil return sp, nil
} }
// LocalPeer retrieves the local peer. // LocalPeer retrieves the local peer.
...@@ -76,11 +71,12 @@ func (s *SecurePipe) RemotePeer() *peer.Peer { ...@@ -76,11 +71,12 @@ func (s *SecurePipe) RemotePeer() *peer.Peer {
// Close closes the secure pipe // Close closes the secure pipe
func (s *SecurePipe) Close() error { func (s *SecurePipe) Close() error {
if s.cancel == nil { select {
return errors.New("pipe already closed") case <-s.ctx.Done():
return errors.New("already closed")
default:
} }
s.cancel() s.cancel()
s.cancel = nil
return nil return nil
} }
...@@ -45,6 +45,7 @@ type singleConn struct { ...@@ -45,6 +45,7 @@ type singleConn struct {
// context + cancel // context + cancel
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
closed chan struct{}
secure *spipe.SecurePipe secure *spipe.SecurePipe
insecure *msgioPipe insecure *msgioPipe
...@@ -66,6 +67,7 @@ func newSingleConn(ctx context.Context, local, remote *peer.Peer, ...@@ -66,6 +67,7 @@ func newSingleConn(ctx context.Context, local, remote *peer.Peer,
maconn: maconn, maconn: maconn,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
closed: make(chan struct{}),
insecure: newMsgioPipe(10), insecure: newMsgioPipe(10),
msgpipe: msg.NewPipe(10), msgpipe: msg.NewPipe(10),
} }
...@@ -92,20 +94,16 @@ func (c *singleConn) secureHandshake(peers peer.Peerstore) error { ...@@ -92,20 +94,16 @@ func (c *singleConn) secureHandshake(peers peer.Peerstore) error {
return errors.New("Conn is already secured or being secured.") return errors.New("Conn is already secured or being secured.")
} }
var err error
c.secure, err = spipe.NewSecurePipe(c.ctx, 10, c.local, peers)
if err != nil {
return err
}
// setup a Duplex pipe for spipe // setup a Duplex pipe for spipe
insecure := spipe.Duplex{ insecure := spipe.Duplex{
In: c.insecure.incoming.MsgChan, In: c.insecure.incoming.MsgChan,
Out: c.insecure.outgoing.MsgChan, Out: c.insecure.outgoing.MsgChan,
} }
// Wrap actually performs the secure handshake, which takes multiple RTT // spipe performs the secure handshake, which takes multiple RTT
if err := c.secure.Wrap(c.ctx, insecure); err != nil { var err error
c.secure, err = spipe.NewSecurePipe(c.ctx, 10, c.local, peers, insecure)
if err != nil {
return err return err
} }
...@@ -166,29 +164,33 @@ func (c *singleConn) waitToClose(ctx context.Context) { ...@@ -166,29 +164,33 @@ func (c *singleConn) waitToClose(ctx context.Context) {
// close underlying connection // close underlying connection
c.maconn.Close() c.maconn.Close()
c.maconn = nil
// closing channels // closing channels
c.insecure.outgoing.Close() c.insecure.outgoing.Close()
c.secure.Close() c.secure.Close()
close(c.msgpipe.Incoming) close(c.msgpipe.Incoming)
close(c.closed)
} }
// IsOpen returns whether this Conn is open or closed. // isClosed returns whether this Conn is open or closed.
func (c *singleConn) isOpen() bool { func (c *singleConn) isClosed() bool {
return c.maconn != nil select {
case <-c.closed:
return true
default:
return false
}
} }
// Close closes the connection, and associated channels. // Close closes the connection, and associated channels.
func (c *singleConn) Close() error { func (c *singleConn) Close() error {
log.Debug("%s closing Conn with %s", c.local, c.remote) log.Debug("%s closing Conn with %s", c.local, c.remote)
if !c.isOpen() { if c.isClosed() {
return fmt.Errorf("Already closed") // already closed return fmt.Errorf("connection already closed")
} }
// cancel context. // cancel context.
c.cancel() c.cancel()
c.cancel = nil
return nil return nil
} }
...@@ -278,6 +280,7 @@ type listener struct { ...@@ -278,6 +280,7 @@ type listener struct {
// ctx + cancel func // ctx + cancel func
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
closed chan struct{}
} }
// waitToClose is needed to hand // waitToClose is needed to hand
...@@ -286,8 +289,17 @@ func (l *listener) waitToClose() { ...@@ -286,8 +289,17 @@ func (l *listener) waitToClose() {
case <-l.ctx.Done(): case <-l.ctx.Done():
} }
l.cancel = nil
l.Listener.Close() l.Listener.Close()
close(l.closed)
}
func (l *listener) isClosed() bool {
select {
case <-l.closed:
return true
default:
return false
}
} }
func (l *listener) listen() { func (l *listener) listen() {
...@@ -312,7 +324,7 @@ func (l *listener) listen() { ...@@ -312,7 +324,7 @@ func (l *listener) listen() {
if err != nil { if err != nil {
// if cancel is nil we're closed. // if cancel is nil we're closed.
if l.cancel == nil { if l.isClosed() {
return // done. return // done.
} }
...@@ -351,7 +363,12 @@ func (l *listener) Peerstore() peer.Peerstore { ...@@ -351,7 +363,12 @@ func (l *listener) Peerstore() peer.Peerstore {
// Close closes the listener. // Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors // Any blocked Accept operations will be unblocked and return errors
func (l *listener) Close() error { func (l *listener) Close() error {
if l.isClosed() {
return errors.New("listener already closed")
}
l.cancel() l.cancel()
<-l.closed
return nil return nil
} }
...@@ -371,6 +388,7 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local *peer.Peer, peers peer ...@@ -371,6 +388,7 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local *peer.Peer, peers peer
l := &listener{ l := &listener{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
closed: make(chan struct{}),
Listener: ml, Listener: ml,
maddr: addr, maddr: addr,
peers: peers, peers: peers,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment