Commit cdade26f authored by Cole Brown's avatar Cole Brown Committed by Steven Allen

Add support for Stat() to Swarm Stream and Conn

parent c868818b
......@@ -165,7 +165,7 @@ func (s *Swarm) Process() goprocess.Process {
return s.proc
}
func (s *Swarm) addConn(tc transport.Conn) (*Conn, error) {
func (s *Swarm) addConn(tc transport.Conn, stat inet.Stat) (*Conn, error) {
// The underlying transport (or the dialer) *should* filter it's own
// connections but we should double check anyways.
raddr := tc.RemoteMultiaddr()
......@@ -197,6 +197,7 @@ func (s *Swarm) addConn(tc transport.Conn) (*Conn, error) {
c := &Conn{
conn: tc,
swarm: s,
stat: stat,
}
c.streams.m = make(map[*Stream]struct{})
s.conns.m[p] = append(s.conns.m[p], c)
......
......@@ -33,6 +33,8 @@ type Conn struct {
sync.Mutex
m map[*Stream]struct{}
}
stat inet.Stat
}
// Close closes this connection.
......@@ -98,7 +100,8 @@ func (c *Conn) start() {
}
c.swarm.refs.Add(1)
go func() {
s, err := c.addStream(ts)
stat := inet.Stat{Direction: inet.DirInbound}
s, err := c.addStream(ts, stat)
// Don't defer this. We don't want to block
// swarm shutdown on the connection handler.
......@@ -158,16 +161,22 @@ func (c *Conn) RemotePublicKey() ic.PubKey {
return c.conn.RemotePublicKey()
}
// Stat returns metadata pertaining to this connection
func (c *Conn) Stat() inet.Stat {
return c.stat
}
// NewStream returns a new Stream from this connection
func (c *Conn) NewStream() (inet.Stream, error) {
ts, err := c.conn.OpenStream()
if err != nil {
return nil, err
}
return c.addStream(ts)
stat := inet.Stat{Direction: inet.DirOutbound}
return c.addStream(ts, stat)
}
func (c *Conn) addStream(ts smux.Stream) (*Stream, error) {
func (c *Conn) addStream(ts smux.Stream, stat inet.Stat) (*Stream, error) {
c.streams.Lock()
// Are we still online?
if c.streams.m == nil {
......@@ -180,6 +189,7 @@ func (c *Conn) addStream(ts smux.Stream) (*Stream, error) {
s := &Stream{
stream: ts,
conn: c,
stat: stat,
}
c.streams.m[s] = struct{}{}
......
......@@ -325,7 +325,8 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
"localAddr": connC.LocalMultiaddr(),
"remoteAddr": connC.RemoteMultiaddr(),
}
swarmC, err := s.addConn(connC)
stat := inet.Stat{Direction: inet.DirOutbound}
swarmC, err := s.addConn(connC, stat)
if err != nil {
logdial["error"] = err.Error()
connC.Close() // close the connection. didn't work out :(
......
......@@ -81,7 +81,8 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
s.refs.Add(1)
go func() {
defer s.refs.Done()
_, err := s.addConn(c)
stat := inet.Stat{Direction: inet.DirInbound}
_, err := s.addConn(c, stat)
if err != nil {
// Probably just means that the swarm has been closed.
log.Warningf("add conn failed: ", err)
......
......@@ -22,6 +22,9 @@ const (
streamReset
)
// Validate Stream conforms to the go-libp2p-net Stream interface
var _ inet.Stream = &Stream{}
// Stream is the stream type used by swarm. In general, you won't use this type
// directly.
type Stream struct {
......@@ -36,6 +39,8 @@ type Stream struct {
notifyLk sync.Mutex
protocol atomic.Value
stat inet.Stat
}
func (s *Stream) String() string {
......@@ -165,3 +170,8 @@ func (s *Stream) SetReadDeadline(t time.Time) error {
func (s *Stream) SetWriteDeadline(t time.Time) error {
return s.stream.SetWriteDeadline(t)
}
// Stat returns metadata information for this stream.
func (s *Stream) Stat() inet.Stat {
return s.stat
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment