From cdade26faf91fc210386bf8b99dda069944d36fc Mon Sep 17 00:00:00 2001 From: Cole Brown Date: Fri, 15 Jun 2018 19:07:37 -0400 Subject: [PATCH] Add support for Stat() to Swarm Stream and Conn --- swarm.go | 3 ++- swarm_conn.go | 16 +++++++++++++--- swarm_dial.go | 3 ++- swarm_listen.go | 3 ++- swarm_stream.go | 10 ++++++++++ 5 files changed, 29 insertions(+), 6 deletions(-) diff --git a/swarm.go b/swarm.go index 7d795ab..78d8e12 100644 --- a/swarm.go +++ b/swarm.go @@ -165,7 +165,7 @@ func (s *Swarm) Process() goprocess.Process { return s.proc } -func (s *Swarm) addConn(tc transport.Conn) (*Conn, error) { +func (s *Swarm) addConn(tc transport.Conn, stat inet.Stat) (*Conn, error) { // The underlying transport (or the dialer) *should* filter it's own // connections but we should double check anyways. raddr := tc.RemoteMultiaddr() @@ -197,6 +197,7 @@ func (s *Swarm) addConn(tc transport.Conn) (*Conn, error) { c := &Conn{ conn: tc, swarm: s, + stat: stat, } c.streams.m = make(map[*Stream]struct{}) s.conns.m[p] = append(s.conns.m[p], c) diff --git a/swarm_conn.go b/swarm_conn.go index 5b2420c..05e6bfc 100644 --- a/swarm_conn.go +++ b/swarm_conn.go @@ -33,6 +33,8 @@ type Conn struct { sync.Mutex m map[*Stream]struct{} } + + stat inet.Stat } // Close closes this connection. @@ -98,7 +100,8 @@ func (c *Conn) start() { } c.swarm.refs.Add(1) go func() { - s, err := c.addStream(ts) + stat := inet.Stat{Direction: inet.DirInbound} + s, err := c.addStream(ts, stat) // Don't defer this. We don't want to block // swarm shutdown on the connection handler. @@ -158,16 +161,22 @@ func (c *Conn) RemotePublicKey() ic.PubKey { return c.conn.RemotePublicKey() } +// Stat returns metadata pertaining to this connection +func (c *Conn) Stat() inet.Stat { + return c.stat +} + // NewStream returns a new Stream from this connection func (c *Conn) NewStream() (inet.Stream, error) { ts, err := c.conn.OpenStream() if err != nil { return nil, err } - return c.addStream(ts) + stat := inet.Stat{Direction: inet.DirOutbound} + return c.addStream(ts, stat) } -func (c *Conn) addStream(ts smux.Stream) (*Stream, error) { +func (c *Conn) addStream(ts smux.Stream, stat inet.Stat) (*Stream, error) { c.streams.Lock() // Are we still online? if c.streams.m == nil { @@ -180,6 +189,7 @@ func (c *Conn) addStream(ts smux.Stream) (*Stream, error) { s := &Stream{ stream: ts, conn: c, + stat: stat, } c.streams.m[s] = struct{}{} diff --git a/swarm_dial.go b/swarm_dial.go index 2af26ed..246935a 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -325,7 +325,8 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { "localAddr": connC.LocalMultiaddr(), "remoteAddr": connC.RemoteMultiaddr(), } - swarmC, err := s.addConn(connC) + stat := inet.Stat{Direction: inet.DirOutbound} + swarmC, err := s.addConn(connC, stat) if err != nil { logdial["error"] = err.Error() connC.Close() // close the connection. didn't work out :( diff --git a/swarm_listen.go b/swarm_listen.go index df0a97f..cb56b42 100644 --- a/swarm_listen.go +++ b/swarm_listen.go @@ -81,7 +81,8 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error { s.refs.Add(1) go func() { defer s.refs.Done() - _, err := s.addConn(c) + stat := inet.Stat{Direction: inet.DirInbound} + _, err := s.addConn(c, stat) if err != nil { // Probably just means that the swarm has been closed. log.Warningf("add conn failed: ", err) diff --git a/swarm_stream.go b/swarm_stream.go index 30b44bd..754dbd5 100644 --- a/swarm_stream.go +++ b/swarm_stream.go @@ -22,6 +22,9 @@ const ( streamReset ) +// Validate Stream conforms to the go-libp2p-net Stream interface +var _ inet.Stream = &Stream{} + // Stream is the stream type used by swarm. In general, you won't use this type // directly. type Stream struct { @@ -36,6 +39,8 @@ type Stream struct { notifyLk sync.Mutex protocol atomic.Value + + stat inet.Stat } func (s *Stream) String() string { @@ -165,3 +170,8 @@ func (s *Stream) SetReadDeadline(t time.Time) error { func (s *Stream) SetWriteDeadline(t time.Time) error { return s.stream.SetWriteDeadline(t) } + +// Stat returns metadata information for this stream. +func (s *Stream) Stat() inet.Stat { + return s.stat +} -- GitLab