diff --git a/helpers/stream.go b/helpers/stream.go deleted file mode 100644 index 0b4c1f3e008ec79e3c9e6065f383c2c23857d39e..0000000000000000000000000000000000000000 --- a/helpers/stream.go +++ /dev/null @@ -1,56 +0,0 @@ -package helpers - -import ( - "errors" - "io" - "time" - - "github.com/libp2p/go-libp2p-core/network" -) - -// EOFTimeout is the maximum amount of time to wait to successfully observe an -// EOF on the stream. Defaults to 60 seconds. -var EOFTimeout = time.Second * 60 - -// ErrExpectedEOF is returned when we read data while expecting an EOF. -var ErrExpectedEOF = errors.New("read data when expecting EOF") - -// FullClose closes the stream and waits to read an EOF from the other side. -// -// * If it reads any data *before* the EOF, it resets the stream. -// * If it doesn't read an EOF within EOFTimeout, it resets the stream. -// -// You'll likely want to invoke this as `go FullClose(stream)` to close the -// stream in the background. -func FullClose(s network.Stream) error { - if err := s.Close(); err != nil { - s.Reset() - return err - } - return AwaitEOF(s) -} - -// AwaitEOF waits for an EOF on the given stream, returning an error if that -// fails. It waits at most EOFTimeout (defaults to 1 minute) after which it -// resets the stream. -func AwaitEOF(s network.Stream) error { - // So we don't wait forever - s.SetDeadline(time.Now().Add(EOFTimeout)) - - // We *have* to observe the EOF. Otherwise, we leak the stream. - // Now, technically, we should do this *before* - // returning from SendMessage as the message - // hasn't really been sent yet until we see the - // EOF but we don't actually *know* what - // protocol the other side is speaking. - n, err := s.Read([]byte{0}) - if n > 0 || err == nil { - s.Reset() - return ErrExpectedEOF - } - if err != io.EOF { - s.Reset() - return err - } - return nil -} diff --git a/helpers/stream_test.go b/helpers/stream_test.go deleted file mode 100644 index d267f1ba3d6cb4871d08bead032d34699edcba02..0000000000000000000000000000000000000000 --- a/helpers/stream_test.go +++ /dev/null @@ -1,151 +0,0 @@ -package helpers_test - -import ( - "errors" - "io" - "testing" - "time" - - "github.com/libp2p/go-libp2p-core/helpers" - network "github.com/libp2p/go-libp2p-core/network" -) - -var errCloseFailed = errors.New("close failed") -var errWriteFailed = errors.New("write failed") -var errReadFailed = errors.New("read failed") - -type stream struct { - network.Stream - - data []byte - - failRead, failWrite, failClose bool - - reset bool -} - -func (s *stream) Reset() error { - s.reset = true - return nil -} - -func (s *stream) Close() error { - if s.failClose { - return errCloseFailed - } - return nil -} - -func (s *stream) SetDeadline(t time.Time) error { - s.SetReadDeadline(t) - s.SetWriteDeadline(t) - return nil -} - -func (s *stream) SetReadDeadline(t time.Time) error { - return nil -} - -func (s *stream) SetWriteDeadline(t time.Time) error { - return nil -} - -func (s *stream) Write(b []byte) (int, error) { - if s.failWrite { - return 0, errWriteFailed - } - return len(b), nil -} - -func (s *stream) Read(b []byte) (int, error) { - var err error - if s.failRead { - err = errReadFailed - } - if len(s.data) == 0 { - if err == nil { - err = io.EOF - } - return 0, err - } - n := copy(b, s.data) - s.data = s.data[n:] - return n, err -} - -func TestNormal(t *testing.T) { - var s stream - if err := helpers.FullClose(&s); err != nil { - t.Fatal(err) - } - if s.reset { - t.Fatal("stream should not have been reset") - } -} - -func TestFailRead(t *testing.T) { - var s stream - s.failRead = true - if helpers.FullClose(&s) != errReadFailed { - t.Fatal("expected read to fail with:", errReadFailed) - } - if !s.reset { - t.Fatal("expected stream to be reset") - } -} - -func TestFailClose(t *testing.T) { - var s stream - s.failClose = true - if helpers.FullClose(&s) != errCloseFailed { - t.Fatal("expected close to fail with:", errCloseFailed) - } - if !s.reset { - t.Fatal("expected stream to be reset") - } -} - -func TestFailWrite(t *testing.T) { - var s stream - s.failWrite = true - if err := helpers.FullClose(&s); err != nil { - t.Fatal(err) - } - if s.reset { - t.Fatal("stream should not have been reset") - } -} - -func TestReadDataOne(t *testing.T) { - var s stream - s.data = []byte{0} - if err := helpers.FullClose(&s); err != helpers.ErrExpectedEOF { - t.Fatal("expected:", helpers.ErrExpectedEOF) - } - if !s.reset { - t.Fatal("stream have been reset") - } -} - -func TestReadDataMany(t *testing.T) { - var s stream - s.data = []byte{0, 1, 2, 3} - if err := helpers.FullClose(&s); err != helpers.ErrExpectedEOF { - t.Fatal("expected:", helpers.ErrExpectedEOF) - } - if !s.reset { - t.Fatal("stream have been reset") - } -} - -func TestReadDataError(t *testing.T) { - var s stream - s.data = []byte{0, 1, 2, 3} - s.failRead = true - if err := helpers.FullClose(&s); err != helpers.ErrExpectedEOF { - t.Fatal("expected:", helpers.ErrExpectedEOF) - } - if !s.reset { - t.Fatal("stream have been reset") - } -} diff --git a/mux/mux.go b/mux/mux.go index 39f2e51c1b899ba7c241836992f54a4877719f59..736da160e500ede2d0601f743f2e5d839a85e012 100644 --- a/mux/mux.go +++ b/mux/mux.go @@ -19,10 +19,30 @@ type MuxedStream interface { io.Reader io.Writer - // Close closes the stream for writing. Reading will still work (that - // is, the remote side can still write). + // Close closes the stream. + // + // * Any buffered data for writing will be flushed. + // * Future reads will fail. + // * Any in-progress reads/writes will be interrupted. + // + // Close may be asynchronous and _does not_ guarantee receipt of the + // data. io.Closer + // CloseWrite closes the stream for writing but leaves it open for + // reading. + // + // CloseWrite does not free the stream, users must still call Close or + // Reset. + CloseWrite() error + + // CloseRead closes the stream for writing but leaves it open for + // reading. + // + // CloseRead does not free the stream, users must still call Close or + // Reset. + CloseRead() error + // Reset closes both ends of the stream. Use this to tell the remote // side to hang up and go away. Reset() error