Commit 8d6fa244 authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub

Merge pull request #5 from libp2p/feat/stream-unity

move protocol methods down into peerstream
parents 368ca206 9fbfd0eb
...@@ -9,9 +9,9 @@ ...@@ -9,9 +9,9 @@
"gxDependencies": [ "gxDependencies": [
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmdXimY9QHaasZmw6hWojWnCJvfgxETjZQfg9g6ZrA9wMX", "hash": "QmdysBu77i3YaagNtMAjiCJdeWWvds18ho5XEB784guQ41",
"name": "go-libp2p-net", "name": "go-libp2p-net",
"version": "1.2.1" "version": "1.5.0"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
...@@ -69,21 +69,21 @@ ...@@ -69,21 +69,21 @@
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmRmFKJgjjQhrT1uDyhpS87kE5M9YbMT8RBWam5uk8o4uH", "hash": "QmS9en3mcwW2HRSeRabceJEGVxTZF4vEeFm7JHWQwWsb1U",
"name": "go-peerstream", "name": "go-peerstream",
"version": "1.1.0" "version": "1.4.1"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmWpTXhTkpoCDEm9twJd5Rc9jFwy61emzxneeJzrVMfjGF", "hash": "QmVcNzHewFvmVah1CGqg8NV7nHHsPu19U43YE5b2oqWyBp",
"name": "go-libp2p-metrics", "name": "go-libp2p-metrics",
"version": "1.2.0" "version": "1.5.0"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmdiBXu57LDzaoovc89jypRSNEhxVZ4zRig2AsXm9VwrsS", "hash": "QmeFCCpGjB663gcLVS7Kz6ZSXyFgx5aJwQu27xkwyWdWiG",
"name": "go-libp2p-conn", "name": "go-libp2p-conn",
"version": "1.2.1" "version": "1.4.0"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
......
...@@ -12,7 +12,6 @@ import ( ...@@ -12,7 +12,6 @@ import (
"time" "time"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
ps "github.com/jbenet/go-peerstream"
pst "github.com/jbenet/go-stream-muxer" pst "github.com/jbenet/go-stream-muxer"
"github.com/jbenet/goprocess" "github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context" goprocessctx "github.com/jbenet/goprocess/context"
...@@ -26,6 +25,7 @@ import ( ...@@ -26,6 +25,7 @@ import (
pstore "github.com/libp2p/go-libp2p-peerstore" pstore "github.com/libp2p/go-libp2p-peerstore"
transport "github.com/libp2p/go-libp2p-transport" transport "github.com/libp2p/go-libp2p-transport"
filter "github.com/libp2p/go-maddr-filter" filter "github.com/libp2p/go-maddr-filter"
ps "github.com/libp2p/go-peerstream"
tcpt "github.com/libp2p/go-tcp-transport" tcpt "github.com/libp2p/go-tcp-transport"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
psmss "github.com/whyrusleeping/go-smux-multistream" psmss "github.com/whyrusleeping/go-smux-multistream"
...@@ -256,7 +256,7 @@ func (s *Swarm) SetConnHandler(handler ConnHandler) { ...@@ -256,7 +256,7 @@ func (s *Swarm) SetConnHandler(handler ConnHandler) {
// See peerstream. // See peerstream.
func (s *Swarm) SetStreamHandler(handler inet.StreamHandler) { func (s *Swarm) SetStreamHandler(handler inet.StreamHandler) {
s.swarm.SetStreamHandler(func(s *ps.Stream) { s.swarm.SetStreamHandler(func(s *ps.Stream) {
handler(wrapStream(s)) handler((*Stream)(s))
}) })
} }
...@@ -273,12 +273,7 @@ func (s *Swarm) NewStreamWithPeer(ctx context.Context, p peer.ID) (*Stream, erro ...@@ -273,12 +273,7 @@ func (s *Swarm) NewStreamWithPeer(ctx context.Context, p peer.ID) (*Stream, erro
// TODO: think about passing a context down to NewStreamWithGroup // TODO: think about passing a context down to NewStreamWithGroup
st, err := s.swarm.NewStreamWithGroup(p) st, err := s.swarm.NewStreamWithGroup(p)
return wrapStream(st), err return (*Stream)(st), err
}
// StreamsWithPeer returns all the live Streams to p
func (s *Swarm) StreamsWithPeer(p peer.ID) []*Stream {
return wrapStreams(ps.StreamsWithGroup(p, s.swarm.Streams()))
} }
// ConnectionsToPeer returns all the live connections to p // ConnectionsToPeer returns all the live connections to p
...@@ -387,9 +382,9 @@ func (n *ps2netNotifee) Disconnected(c *ps.Conn) { ...@@ -387,9 +382,9 @@ func (n *ps2netNotifee) Disconnected(c *ps.Conn) {
} }
func (n *ps2netNotifee) OpenedStream(s *ps.Stream) { func (n *ps2netNotifee) OpenedStream(s *ps.Stream) {
n.not.OpenedStream(n.net, &Stream{stream: s}) n.not.OpenedStream(n.net, (*Stream)(s))
} }
func (n *ps2netNotifee) ClosedStream(s *ps.Stream) { func (n *ps2netNotifee) ClosedStream(s *ps.Stream) {
n.not.ClosedStream(n.net, &Stream{stream: s}) n.not.ClosedStream(n.net, (*Stream)(s))
} }
...@@ -4,11 +4,11 @@ import ( ...@@ -4,11 +4,11 @@ import (
"context" "context"
"fmt" "fmt"
ps "github.com/jbenet/go-peerstream"
ic "github.com/libp2p/go-libp2p-crypto" ic "github.com/libp2p/go-libp2p-crypto"
iconn "github.com/libp2p/go-libp2p-interface-conn" iconn "github.com/libp2p/go-libp2p-interface-conn"
inet "github.com/libp2p/go-libp2p-net" inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
ps "github.com/libp2p/go-peerstream"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
) )
...@@ -77,7 +77,7 @@ func (c *Conn) RemotePublicKey() ic.PubKey { ...@@ -77,7 +77,7 @@ func (c *Conn) RemotePublicKey() ic.PubKey {
// NewSwarmStream returns a new Stream from this connection // NewSwarmStream returns a new Stream from this connection
func (c *Conn) NewSwarmStream() (*Stream, error) { func (c *Conn) NewSwarmStream() (*Stream, error) {
s, err := c.StreamConn().NewStream() s, err := c.StreamConn().NewStream()
return wrapStream(s), err return (*Stream)(s), err
} }
// NewStream returns a new Stream from this connection // NewStream returns a new Stream from this connection
...@@ -91,6 +91,16 @@ func (c *Conn) Close() error { ...@@ -91,6 +91,16 @@ func (c *Conn) Close() error {
return c.StreamConn().Close() return c.StreamConn().Close()
} }
func (c *Conn) GetStreams() ([]inet.Stream, error) {
ss := c.StreamConn().Streams()
out := make([]inet.Stream, len(ss))
for i, s := range ss {
out[i] = (*Stream)(s)
}
return out, nil
}
func wrapConn(psc *ps.Conn) (*Conn, error) { func wrapConn(psc *ps.Conn) (*Conn, error) {
// grab the underlying connection. // grab the underlying connection.
if _, ok := psc.NetConn().(iconn.Conn); !ok { if _, ok := psc.NetConn().(iconn.Conn); !ok {
......
...@@ -4,13 +4,13 @@ import ( ...@@ -4,13 +4,13 @@ import (
"context" "context"
"fmt" "fmt"
ps "github.com/jbenet/go-peerstream"
conn "github.com/libp2p/go-libp2p-conn" conn "github.com/libp2p/go-libp2p-conn"
iconn "github.com/libp2p/go-libp2p-interface-conn" iconn "github.com/libp2p/go-libp2p-interface-conn"
lgbl "github.com/libp2p/go-libp2p-loggables" lgbl "github.com/libp2p/go-libp2p-loggables"
mconn "github.com/libp2p/go-libp2p-metrics/conn" mconn "github.com/libp2p/go-libp2p-metrics/conn"
inet "github.com/libp2p/go-libp2p-net" inet "github.com/libp2p/go-libp2p-net"
transport "github.com/libp2p/go-libp2p-transport" transport "github.com/libp2p/go-libp2p-transport"
ps "github.com/libp2p/go-peerstream"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
) )
......
...@@ -165,6 +165,15 @@ func TestNetworkOpenStream(t *testing.T) { ...@@ -165,6 +165,15 @@ func TestNetworkOpenStream(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
streams, err := nets[0].ConnsToPeer(nets[1].LocalPeer())[0].GetStreams()
if err != nil {
t.Fatal(err)
}
if len(streams) != 1 {
t.Fatal("should only have one stream there")
}
_, err = s.Write([]byte("hello ipfs")) _, err = s.Write([]byte("hello ipfs"))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
......
...@@ -4,19 +4,16 @@ import ( ...@@ -4,19 +4,16 @@ import (
inet "github.com/libp2p/go-libp2p-net" inet "github.com/libp2p/go-libp2p-net"
protocol "github.com/libp2p/go-libp2p-protocol" protocol "github.com/libp2p/go-libp2p-protocol"
ps "github.com/jbenet/go-peerstream" ps "github.com/libp2p/go-peerstream"
) )
// Stream is a wrapper around a ps.Stream that exposes a way to get // Stream is a wrapper around a ps.Stream that exposes a way to get
// our Conn and Swarm (instead of just the ps.Conn and ps.Swarm) // our Conn and Swarm (instead of just the ps.Conn and ps.Swarm)
type Stream struct { type Stream ps.Stream
stream *ps.Stream
protocol protocol.ID
}
// Stream returns the underlying peerstream.Stream // Stream returns the underlying peerstream.Stream
func (s *Stream) Stream() *ps.Stream { func (s *Stream) Stream() *ps.Stream {
return s.stream return (*ps.Stream)(s)
} }
// Conn returns the Conn associated with this Stream, as an inet.Conn // Conn returns the Conn associated with this Stream, as an inet.Conn
...@@ -26,43 +23,29 @@ func (s *Stream) Conn() inet.Conn { ...@@ -26,43 +23,29 @@ func (s *Stream) Conn() inet.Conn {
// SwarmConn returns the Conn associated with this Stream, as a *Conn // SwarmConn returns the Conn associated with this Stream, as a *Conn
func (s *Stream) SwarmConn() *Conn { func (s *Stream) SwarmConn() *Conn {
return (*Conn)(s.stream.Conn()) return (*Conn)(s.Stream().Conn())
} }
// Read reads bytes from a stream. // Read reads bytes from a stream.
func (s *Stream) Read(p []byte) (n int, err error) { func (s *Stream) Read(p []byte) (n int, err error) {
return s.stream.Read(p) return s.Stream().Read(p)
} }
// Write writes bytes to a stream, flushing for each call. // Write writes bytes to a stream, flushing for each call.
func (s *Stream) Write(p []byte) (n int, err error) { func (s *Stream) Write(p []byte) (n int, err error) {
return s.stream.Write(p) return s.Stream().Write(p)
} }
// Close closes the stream, indicating this side is finished // Close closes the stream, indicating this side is finished
// with the stream. // with the stream.
func (s *Stream) Close() error { func (s *Stream) Close() error {
return s.stream.Close() return s.Stream().Close()
} }
func (s *Stream) Protocol() protocol.ID { func (s *Stream) Protocol() protocol.ID {
return s.protocol return (*ps.Stream)(s).Protocol()
} }
func (s *Stream) SetProtocol(p protocol.ID) { func (s *Stream) SetProtocol(p protocol.ID) {
s.protocol = p (*ps.Stream)(s).SetProtocol(p)
}
func wrapStream(pss *ps.Stream) *Stream {
return &Stream{
stream: pss,
}
}
func wrapStreams(st []*ps.Stream) []*Stream {
out := make([]*Stream, len(st))
for i, s := range st {
out[i] = wrapStream(s)
}
return out
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment