From d6afc690e1c63a66c91193d5acc60b8731311bbd Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 1 Sep 2020 17:29:54 -0700 Subject: [PATCH] add CloseRead/CloseWrite on streams (#166) * add CloseRead/CloseWrite on streams This changes the behavior of `Close` to behave as one would expect: it closes the stream. The new methods, CloseWrite/CloseRead allow for closing the stream in a single direction. Note: This _does not_ implement CancelWrite/CancelRead as our stream muxer _protocols_ don't support that. fixes #9 * remove stream util helpers FullClose and AwaitEOF were introduced to work around the fact that calling Close on a stream only closed the write half. All users must adapt their code to the new interfaces, so this change is intentionally breaking. --- helpers/stream.go | 56 --------------- helpers/stream_test.go | 151 ----------------------------------------- mux/mux.go | 24 ++++++- 3 files changed, 22 insertions(+), 209 deletions(-) delete mode 100644 helpers/stream.go delete mode 100644 helpers/stream_test.go diff --git a/helpers/stream.go b/helpers/stream.go deleted file mode 100644 index 0b4c1f3..0000000 --- a/helpers/stream.go +++ /dev/null @@ -1,56 +0,0 @@ -package helpers - -import ( - "errors" - "io" - "time" - - "github.com/libp2p/go-libp2p-core/network" -) - -// EOFTimeout is the maximum amount of time to wait to successfully observe an -// EOF on the stream. Defaults to 60 seconds. -var EOFTimeout = time.Second * 60 - -// ErrExpectedEOF is returned when we read data while expecting an EOF. -var ErrExpectedEOF = errors.New("read data when expecting EOF") - -// FullClose closes the stream and waits to read an EOF from the other side. -// -// * If it reads any data *before* the EOF, it resets the stream. -// * If it doesn't read an EOF within EOFTimeout, it resets the stream. -// -// You'll likely want to invoke this as `go FullClose(stream)` to close the -// stream in the background. -func FullClose(s network.Stream) error { - if err := s.Close(); err != nil { - s.Reset() - return err - } - return AwaitEOF(s) -} - -// AwaitEOF waits for an EOF on the given stream, returning an error if that -// fails. It waits at most EOFTimeout (defaults to 1 minute) after which it -// resets the stream. -func AwaitEOF(s network.Stream) error { - // So we don't wait forever - s.SetDeadline(time.Now().Add(EOFTimeout)) - - // We *have* to observe the EOF. Otherwise, we leak the stream. - // Now, technically, we should do this *before* - // returning from SendMessage as the message - // hasn't really been sent yet until we see the - // EOF but we don't actually *know* what - // protocol the other side is speaking. - n, err := s.Read([]byte{0}) - if n > 0 || err == nil { - s.Reset() - return ErrExpectedEOF - } - if err != io.EOF { - s.Reset() - return err - } - return nil -} diff --git a/helpers/stream_test.go b/helpers/stream_test.go deleted file mode 100644 index d267f1b..0000000 --- a/helpers/stream_test.go +++ /dev/null @@ -1,151 +0,0 @@ -package helpers_test - -import ( - "errors" - "io" - "testing" - "time" - - "github.com/libp2p/go-libp2p-core/helpers" - network "github.com/libp2p/go-libp2p-core/network" -) - -var errCloseFailed = errors.New("close failed") -var errWriteFailed = errors.New("write failed") -var errReadFailed = errors.New("read failed") - -type stream struct { - network.Stream - - data []byte - - failRead, failWrite, failClose bool - - reset bool -} - -func (s *stream) Reset() error { - s.reset = true - return nil -} - -func (s *stream) Close() error { - if s.failClose { - return errCloseFailed - } - return nil -} - -func (s *stream) SetDeadline(t time.Time) error { - s.SetReadDeadline(t) - s.SetWriteDeadline(t) - return nil -} - -func (s *stream) SetReadDeadline(t time.Time) error { - return nil -} - -func (s *stream) SetWriteDeadline(t time.Time) error { - return nil -} - -func (s *stream) Write(b []byte) (int, error) { - if s.failWrite { - return 0, errWriteFailed - } - return len(b), nil -} - -func (s *stream) Read(b []byte) (int, error) { - var err error - if s.failRead { - err = errReadFailed - } - if len(s.data) == 0 { - if err == nil { - err = io.EOF - } - return 0, err - } - n := copy(b, s.data) - s.data = s.data[n:] - return n, err -} - -func TestNormal(t *testing.T) { - var s stream - if err := helpers.FullClose(&s); err != nil { - t.Fatal(err) - } - if s.reset { - t.Fatal("stream should not have been reset") - } -} - -func TestFailRead(t *testing.T) { - var s stream - s.failRead = true - if helpers.FullClose(&s) != errReadFailed { - t.Fatal("expected read to fail with:", errReadFailed) - } - if !s.reset { - t.Fatal("expected stream to be reset") - } -} - -func TestFailClose(t *testing.T) { - var s stream - s.failClose = true - if helpers.FullClose(&s) != errCloseFailed { - t.Fatal("expected close to fail with:", errCloseFailed) - } - if !s.reset { - t.Fatal("expected stream to be reset") - } -} - -func TestFailWrite(t *testing.T) { - var s stream - s.failWrite = true - if err := helpers.FullClose(&s); err != nil { - t.Fatal(err) - } - if s.reset { - t.Fatal("stream should not have been reset") - } -} - -func TestReadDataOne(t *testing.T) { - var s stream - s.data = []byte{0} - if err := helpers.FullClose(&s); err != helpers.ErrExpectedEOF { - t.Fatal("expected:", helpers.ErrExpectedEOF) - } - if !s.reset { - t.Fatal("stream have been reset") - } -} - -func TestReadDataMany(t *testing.T) { - var s stream - s.data = []byte{0, 1, 2, 3} - if err := helpers.FullClose(&s); err != helpers.ErrExpectedEOF { - t.Fatal("expected:", helpers.ErrExpectedEOF) - } - if !s.reset { - t.Fatal("stream have been reset") - } -} - -func TestReadDataError(t *testing.T) { - var s stream - s.data = []byte{0, 1, 2, 3} - s.failRead = true - if err := helpers.FullClose(&s); err != helpers.ErrExpectedEOF { - t.Fatal("expected:", helpers.ErrExpectedEOF) - } - if !s.reset { - t.Fatal("stream have been reset") - } -} diff --git a/mux/mux.go b/mux/mux.go index 39f2e51..736da16 100644 --- a/mux/mux.go +++ b/mux/mux.go @@ -19,10 +19,30 @@ type MuxedStream interface { io.Reader io.Writer - // Close closes the stream for writing. Reading will still work (that - // is, the remote side can still write). + // Close closes the stream. + // + // * Any buffered data for writing will be flushed. + // * Future reads will fail. + // * Any in-progress reads/writes will be interrupted. + // + // Close may be asynchronous and _does not_ guarantee receipt of the + // data. io.Closer + // CloseWrite closes the stream for writing but leaves it open for + // reading. + // + // CloseWrite does not free the stream, users must still call Close or + // Reset. + CloseWrite() error + + // CloseRead closes the stream for writing but leaves it open for + // reading. + // + // CloseRead does not free the stream, users must still call Close or + // Reset. + CloseRead() error + // Reset closes both ends of the stream. Use this to tell the remote // side to hang up and go away. Reset() error -- GitLab