Unverified Commit 367fc7d3 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #8 from libp2p/fix/7

fixes a stream deadlock multiple ways
parents 66397218 62648031
...@@ -77,22 +77,22 @@ func (s *Stream) Read(b []byte) (n int, err error) { ...@@ -77,22 +77,22 @@ func (s *Stream) Read(b []byte) (n int, err error) {
defer asyncNotify(s.recvNotifyCh) defer asyncNotify(s.recvNotifyCh)
START: START:
s.stateLock.Lock() s.stateLock.Lock()
switch s.state { state := s.state
s.stateLock.Unlock()
switch state {
case streamRemoteClose: case streamRemoteClose:
fallthrough fallthrough
case streamClosed: case streamClosed:
s.recvLock.Lock() s.recvLock.Lock()
if s.recvBuf.Len() == 0 { empty := s.recvBuf.Len() == 0
s.recvLock.Unlock() s.recvLock.Unlock()
s.stateLock.Unlock() if empty {
return 0, io.EOF return 0, io.EOF
} }
s.recvLock.Unlock()
case streamReset: case streamReset:
s.stateLock.Unlock()
return 0, ErrConnectionReset return 0, ErrConnectionReset
} }
s.stateLock.Unlock()
// If there is no data available, block // If there is no data available, block
s.recvLock.Lock() s.recvLock.Lock()
...@@ -143,17 +143,17 @@ func (s *Stream) write(b []byte) (n int, err error) { ...@@ -143,17 +143,17 @@ func (s *Stream) write(b []byte) (n int, err error) {
START: START:
s.stateLock.Lock() s.stateLock.Lock()
switch s.state { state := s.state
s.stateLock.Unlock()
switch state {
case streamLocalClose: case streamLocalClose:
fallthrough fallthrough
case streamClosed: case streamClosed:
s.stateLock.Unlock()
return 0, ErrStreamClosed return 0, ErrStreamClosed
case streamReset: case streamReset:
s.stateLock.Unlock()
return 0, ErrConnectionReset return 0, ErrConnectionReset
} }
s.stateLock.Unlock()
// If there is no data available, block // If there is no data available, block
window := atomic.LoadUint32(&s.sendWindow) window := atomic.LoadUint32(&s.sendWindow)
...@@ -208,14 +208,14 @@ func (s *Stream) sendFlags() uint16 { ...@@ -208,14 +208,14 @@ func (s *Stream) sendFlags() uint16 {
// sendWindowUpdate potentially sends a window update enabling // sendWindowUpdate potentially sends a window update enabling
// further writes to take place. Must be invoked with the lock. // further writes to take place. Must be invoked with the lock.
func (s *Stream) sendWindowUpdate() error { func (s *Stream) sendWindowUpdate() error {
// Determine the flags if any
flags := s.sendFlags()
// Determine the delta update // Determine the delta update
max := s.session.config.MaxStreamWindowSize max := s.session.config.MaxStreamWindowSize
s.recvLock.Lock() s.recvLock.Lock()
delta := (max - uint32(s.recvBuf.Len())) - s.recvWindow delta := (max - uint32(s.recvBuf.Len())) - s.recvWindow
// Determine the flags if any
flags := s.sendFlags()
// Check if we can omit the update // Check if we can omit the update
if delta < (max/2) && flags == 0 { if delta < (max/2) && flags == 0 {
s.recvLock.Unlock() s.recvLock.Unlock()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment