Unverified Commit d6afc690 authored by Steven Allen's avatar Steven Allen Committed by GitHub

add CloseRead/CloseWrite on streams (#166)

* add CloseRead/CloseWrite on streams

This changes the behavior of `Close` to behave as one would expect: it closes
the stream. The new methods, CloseWrite/CloseRead allow for closing the stream in
a single direction.

Note: This _does not_ implement CancelWrite/CancelRead as our stream muxer
_protocols_ don't support that.

fixes #9

* remove stream util helpers

FullClose and AwaitEOF were introduced to work around the fact that calling
Close on a stream only closed the write half. All users must adapt their code
to the new interfaces, so this change is intentionally breaking.
parent b77fd280
package helpers
import (
"errors"
"io"
"time"
"github.com/libp2p/go-libp2p-core/network"
)
// EOFTimeout is the maximum amount of time to wait to successfully observe an
// EOF on the stream. Defaults to 60 seconds.
var EOFTimeout = time.Second * 60
// ErrExpectedEOF is returned when we read data while expecting an EOF.
var ErrExpectedEOF = errors.New("read data when expecting EOF")
// FullClose closes the stream and waits to read an EOF from the other side.
//
// * If it reads any data *before* the EOF, it resets the stream.
// * If it doesn't read an EOF within EOFTimeout, it resets the stream.
//
// You'll likely want to invoke this as `go FullClose(stream)` to close the
// stream in the background.
func FullClose(s network.Stream) error {
if err := s.Close(); err != nil {
s.Reset()
return err
}
return AwaitEOF(s)
}
// AwaitEOF waits for an EOF on the given stream, returning an error if that
// fails. It waits at most EOFTimeout (defaults to 1 minute) after which it
// resets the stream.
func AwaitEOF(s network.Stream) error {
// So we don't wait forever
s.SetDeadline(time.Now().Add(EOFTimeout))
// We *have* to observe the EOF. Otherwise, we leak the stream.
// Now, technically, we should do this *before*
// returning from SendMessage as the message
// hasn't really been sent yet until we see the
// EOF but we don't actually *know* what
// protocol the other side is speaking.
n, err := s.Read([]byte{0})
if n > 0 || err == nil {
s.Reset()
return ErrExpectedEOF
}
if err != io.EOF {
s.Reset()
return err
}
return nil
}
package helpers_test
import (
"errors"
"io"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/helpers"
network "github.com/libp2p/go-libp2p-core/network"
)
var errCloseFailed = errors.New("close failed")
var errWriteFailed = errors.New("write failed")
var errReadFailed = errors.New("read failed")
type stream struct {
network.Stream
data []byte
failRead, failWrite, failClose bool
reset bool
}
func (s *stream) Reset() error {
s.reset = true
return nil
}
func (s *stream) Close() error {
if s.failClose {
return errCloseFailed
}
return nil
}
func (s *stream) SetDeadline(t time.Time) error {
s.SetReadDeadline(t)
s.SetWriteDeadline(t)
return nil
}
func (s *stream) SetReadDeadline(t time.Time) error {
return nil
}
func (s *stream) SetWriteDeadline(t time.Time) error {
return nil
}
func (s *stream) Write(b []byte) (int, error) {
if s.failWrite {
return 0, errWriteFailed
}
return len(b), nil
}
func (s *stream) Read(b []byte) (int, error) {
var err error
if s.failRead {
err = errReadFailed
}
if len(s.data) == 0 {
if err == nil {
err = io.EOF
}
return 0, err
}
n := copy(b, s.data)
s.data = s.data[n:]
return n, err
}
func TestNormal(t *testing.T) {
var s stream
if err := helpers.FullClose(&s); err != nil {
t.Fatal(err)
}
if s.reset {
t.Fatal("stream should not have been reset")
}
}
func TestFailRead(t *testing.T) {
var s stream
s.failRead = true
if helpers.FullClose(&s) != errReadFailed {
t.Fatal("expected read to fail with:", errReadFailed)
}
if !s.reset {
t.Fatal("expected stream to be reset")
}
}
func TestFailClose(t *testing.T) {
var s stream
s.failClose = true
if helpers.FullClose(&s) != errCloseFailed {
t.Fatal("expected close to fail with:", errCloseFailed)
}
if !s.reset {
t.Fatal("expected stream to be reset")
}
}
func TestFailWrite(t *testing.T) {
var s stream
s.failWrite = true
if err := helpers.FullClose(&s); err != nil {
t.Fatal(err)
}
if s.reset {
t.Fatal("stream should not have been reset")
}
}
func TestReadDataOne(t *testing.T) {
var s stream
s.data = []byte{0}
if err := helpers.FullClose(&s); err != helpers.ErrExpectedEOF {
t.Fatal("expected:", helpers.ErrExpectedEOF)
}
if !s.reset {
t.Fatal("stream have been reset")
}
}
func TestReadDataMany(t *testing.T) {
var s stream
s.data = []byte{0, 1, 2, 3}
if err := helpers.FullClose(&s); err != helpers.ErrExpectedEOF {
t.Fatal("expected:", helpers.ErrExpectedEOF)
}
if !s.reset {
t.Fatal("stream have been reset")
}
}
func TestReadDataError(t *testing.T) {
var s stream
s.data = []byte{0, 1, 2, 3}
s.failRead = true
if err := helpers.FullClose(&s); err != helpers.ErrExpectedEOF {
t.Fatal("expected:", helpers.ErrExpectedEOF)
}
if !s.reset {
t.Fatal("stream have been reset")
}
}
......@@ -19,10 +19,30 @@ type MuxedStream interface {
io.Reader
io.Writer
// Close closes the stream for writing. Reading will still work (that
// is, the remote side can still write).
// Close closes the stream.
//
// * Any buffered data for writing will be flushed.
// * Future reads will fail.
// * Any in-progress reads/writes will be interrupted.
//
// Close may be asynchronous and _does not_ guarantee receipt of the
// data.
io.Closer
// CloseWrite closes the stream for writing but leaves it open for
// reading.
//
// CloseWrite does not free the stream, users must still call Close or
// Reset.
CloseWrite() error
// CloseRead closes the stream for writing but leaves it open for
// reading.
//
// CloseRead does not free the stream, users must still call Close or
// Reset.
CloseRead() error
// Reset closes both ends of the stream. Use this to tell the remote
// side to hang up and go away.
Reset() error
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment