Commit 8a7f9c86 authored by Jeromy's avatar Jeromy

update multistream deps and fix code to work with new changes

parent 1b9cb346
...@@ -340,9 +340,9 @@ func (n *ps2netNotifee) Disconnected(c *ps.Conn) { ...@@ -340,9 +340,9 @@ func (n *ps2netNotifee) Disconnected(c *ps.Conn) {
} }
func (n *ps2netNotifee) OpenedStream(s *ps.Stream) { func (n *ps2netNotifee) OpenedStream(s *ps.Stream) {
n.not.OpenedStream(n.net, inet.Stream((*Stream)(s))) n.not.OpenedStream(n.net, &Stream{stream: s})
} }
func (n *ps2netNotifee) ClosedStream(s *ps.Stream) { func (n *ps2netNotifee) ClosedStream(s *ps.Stream) {
n.not.ClosedStream(n.net, inet.Stream((*Stream)(s))) n.not.ClosedStream(n.net, &Stream{stream: s})
} }
...@@ -10,6 +10,12 @@ import ( ...@@ -10,6 +10,12 @@ import (
context "golang.org/x/net/context" context "golang.org/x/net/context"
) )
func streamsSame(a, b inet.Stream) bool {
sa := a.(*Stream)
sb := b.(*Stream)
return sa.Stream() == sb.Stream()
}
func TestNotifications(t *testing.T) { func TestNotifications(t *testing.T) {
ctx := context.Background() ctx := context.Background()
swarms := makeSwarms(ctx, t, 5) swarms := makeSwarms(ctx, t, 5)
...@@ -98,7 +104,7 @@ func TestNotifications(t *testing.T) { ...@@ -98,7 +104,7 @@ func TestNotifications(t *testing.T) {
case <-time.After(timeout): case <-time.After(timeout):
t.Fatal("timeout") t.Fatal("timeout")
} }
if s != s2 { if !streamsSame(s, s2) {
t.Fatal("got incorrect stream", s.Conn(), s2.Conn()) t.Fatal("got incorrect stream", s.Conn(), s2.Conn())
} }
...@@ -108,7 +114,7 @@ func TestNotifications(t *testing.T) { ...@@ -108,7 +114,7 @@ func TestNotifications(t *testing.T) {
case <-time.After(timeout): case <-time.After(timeout):
t.Fatal("timeout") t.Fatal("timeout")
} }
if s != s2 { if !streamsSame(s, s2) {
t.Fatal("got incorrect stream", s.Conn(), s2.Conn()) t.Fatal("got incorrect stream", s.Conn(), s2.Conn())
} }
} }
......
...@@ -8,11 +8,14 @@ import ( ...@@ -8,11 +8,14 @@ import (
// a Stream is a wrapper around a ps.Stream that exposes a way to get // a Stream is a wrapper around a ps.Stream that exposes a way to get
// our Conn and Swarm (instead of just the ps.Conn and ps.Swarm) // our Conn and Swarm (instead of just the ps.Conn and ps.Swarm)
type Stream ps.Stream type Stream struct {
stream *ps.Stream
protocol string
}
// Stream returns the underlying peerstream.Stream // Stream returns the underlying peerstream.Stream
func (s *Stream) Stream() *ps.Stream { func (s *Stream) Stream() *ps.Stream {
return (*ps.Stream)(s) return s.stream
} }
// Conn returns the Conn associated with this Stream, as an inet.Conn // Conn returns the Conn associated with this Stream, as an inet.Conn
...@@ -22,27 +25,37 @@ func (s *Stream) Conn() inet.Conn { ...@@ -22,27 +25,37 @@ func (s *Stream) Conn() inet.Conn {
// SwarmConn returns the Conn associated with this Stream, as a *Conn // SwarmConn returns the Conn associated with this Stream, as a *Conn
func (s *Stream) SwarmConn() *Conn { func (s *Stream) SwarmConn() *Conn {
return (*Conn)(s.Stream().Conn()) return (*Conn)(s.stream.Conn())
} }
// Read reads bytes from a stream. // Read reads bytes from a stream.
func (s *Stream) Read(p []byte) (n int, err error) { func (s *Stream) Read(p []byte) (n int, err error) {
return s.Stream().Read(p) return s.stream.Read(p)
} }
// Write writes bytes to a stream, flushing for each call. // Write writes bytes to a stream, flushing for each call.
func (s *Stream) Write(p []byte) (n int, err error) { func (s *Stream) Write(p []byte) (n int, err error) {
return s.Stream().Write(p) return s.stream.Write(p)
} }
// Close closes the stream, indicating this side is finished // Close closes the stream, indicating this side is finished
// with the stream. // with the stream.
func (s *Stream) Close() error { func (s *Stream) Close() error {
return s.Stream().Close() return s.stream.Close()
}
func (s *Stream) Protocol() string {
return s.protocol
}
func (s *Stream) SetProtocol(p string) {
s.protocol = p
} }
func wrapStream(pss *ps.Stream) *Stream { func wrapStream(pss *ps.Stream) *Stream {
return (*Stream)(pss) return &Stream{
stream: pss,
}
} }
func wrapStreams(st []*ps.Stream) []*Stream { func wrapStreams(st []*ps.Stream) []*Stream {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment