Commit 4bf39431 authored by Steven Allen's avatar Steven Allen

Merge branch 'feat/stat'

parents 69702873 a13e3ec8
3.0.8: QmPWNZRUybw3nwJH3mpkrwB97YEQmXRkzvyh34rpJiih6Q 3.0.9: QmYSM6PKnCe9YVPNMisfpoBmczzHkA7h5Wrnc36DtdJhGo
...@@ -9,9 +9,9 @@ ...@@ -9,9 +9,9 @@
"gxDependencies": [ "gxDependencies": [
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmX5J1q63BrrDTbpcHifrFbxH3cMZsvaNajy6u3zCpzBXs", "hash": "QmQSbtGXCyNrj34LWL8EgXyNNYDZ8r3SwQcpW5pPxVhLnM",
"name": "go-libp2p-net", "name": "go-libp2p-net",
"version": "3.0.7" "version": "3.0.8"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
...@@ -33,9 +33,9 @@ ...@@ -33,9 +33,9 @@
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmYr9RHifaqHTFZdAsUPLmiMAi2oNeEqA48AFKxXJAsLpJ", "hash": "QmcDUyb52N62J8ZamGgUWUyWc1MtuCBce7WFA4D9xA6cwF",
"name": "go-libp2p-transport", "name": "go-libp2p-transport",
"version": "3.0.7" "version": "3.0.8"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
...@@ -63,9 +63,9 @@ ...@@ -63,9 +63,9 @@
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmcK89iqkFV8TqpRUgx1481YZbhjPFnBjqkpBQJfJqmSfm", "hash": "Qmcw9fndogcYwyGs4a5TPDbnZPBLxvtrBZzpvyyVDzxDWT",
"name": "go-tcp-transport", "name": "go-tcp-transport",
"version": "2.0.7" "version": "2.0.8"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
...@@ -134,21 +134,21 @@ ...@@ -134,21 +134,21 @@
}, },
{ {
"author": "stebalien", "author": "stebalien",
"hash": "Qma6UXLMHjdVFExQZLYqdb5KAesbnoXuthQzovrwRZ64fG", "hash": "QmSbkqfiFmJCdczVQ7mkFZf5FUUNpuP5Ne2LxY2htXGtrZ",
"name": "go-conn-security-multistream", "name": "go-conn-security-multistream",
"version": "0.1.6" "version": "0.1.7"
}, },
{ {
"author": "steb", "author": "steb",
"hash": "QmfNvpHX396fhMeauERV6eFnSJg78rUjhjpFf1JvbjxaYM", "hash": "QmefQrpDSYX6jQRtUyhcASFVBDkoAsDTPXemyxGMzA3phK",
"name": "go-libp2p-transport-upgrader", "name": "go-libp2p-transport-upgrader",
"version": "0.1.7" "version": "0.1.8"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmWKKkNLFRcznF5vDqt2eeRsnQqQhwbjVf8zJ9KC2RXrzN", "hash": "QmWri2HWdxHjWBUermhWy7QWJqN1cV8Gd1QbDiB5m86f1H",
"name": "go-libp2p-secio", "name": "go-libp2p-secio",
"version": "2.0.7" "version": "2.0.8"
} }
], ],
"gxVersion": "0.9.1", "gxVersion": "0.9.1",
...@@ -156,6 +156,6 @@ ...@@ -156,6 +156,6 @@
"license": "MIT", "license": "MIT",
"name": "go-libp2p-swarm", "name": "go-libp2p-swarm",
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"", "releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "3.0.8" "version": "3.0.9"
} }
...@@ -165,7 +165,7 @@ func (s *Swarm) Process() goprocess.Process { ...@@ -165,7 +165,7 @@ func (s *Swarm) Process() goprocess.Process {
return s.proc return s.proc
} }
func (s *Swarm) addConn(tc transport.Conn) (*Conn, error) { func (s *Swarm) addConn(tc transport.Conn, dir inet.Direction) (*Conn, error) {
// The underlying transport (or the dialer) *should* filter it's own // The underlying transport (or the dialer) *should* filter it's own
// connections but we should double check anyways. // connections but we should double check anyways.
raddr := tc.RemoteMultiaddr() raddr := tc.RemoteMultiaddr()
...@@ -194,9 +194,11 @@ func (s *Swarm) addConn(tc transport.Conn) (*Conn, error) { ...@@ -194,9 +194,11 @@ func (s *Swarm) addConn(tc transport.Conn) (*Conn, error) {
} }
// Wrap and register the connection. // Wrap and register the connection.
stat := inet.Stat{Direction: dir}
c := &Conn{ c := &Conn{
conn: tc, conn: tc,
swarm: s, swarm: s,
stat: stat,
} }
c.streams.m = make(map[*Stream]struct{}) c.streams.m = make(map[*Stream]struct{})
s.conns.m[p] = append(s.conns.m[p], c) s.conns.m[p] = append(s.conns.m[p], c)
......
...@@ -33,6 +33,8 @@ type Conn struct { ...@@ -33,6 +33,8 @@ type Conn struct {
sync.Mutex sync.Mutex
m map[*Stream]struct{} m map[*Stream]struct{}
} }
stat inet.Stat
} }
// Close closes this connection. // Close closes this connection.
...@@ -98,7 +100,7 @@ func (c *Conn) start() { ...@@ -98,7 +100,7 @@ func (c *Conn) start() {
} }
c.swarm.refs.Add(1) c.swarm.refs.Add(1)
go func() { go func() {
s, err := c.addStream(ts) s, err := c.addStream(ts, inet.DirInbound)
// Don't defer this. We don't want to block // Don't defer this. We don't want to block
// swarm shutdown on the connection handler. // swarm shutdown on the connection handler.
...@@ -158,16 +160,21 @@ func (c *Conn) RemotePublicKey() ic.PubKey { ...@@ -158,16 +160,21 @@ func (c *Conn) RemotePublicKey() ic.PubKey {
return c.conn.RemotePublicKey() return c.conn.RemotePublicKey()
} }
// Stat returns metadata pertaining to this connection
func (c *Conn) Stat() inet.Stat {
return c.stat
}
// NewStream returns a new Stream from this connection // NewStream returns a new Stream from this connection
func (c *Conn) NewStream() (inet.Stream, error) { func (c *Conn) NewStream() (inet.Stream, error) {
ts, err := c.conn.OpenStream() ts, err := c.conn.OpenStream()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return c.addStream(ts) return c.addStream(ts, inet.DirOutbound)
} }
func (c *Conn) addStream(ts smux.Stream) (*Stream, error) { func (c *Conn) addStream(ts smux.Stream, dir inet.Direction) (*Stream, error) {
c.streams.Lock() c.streams.Lock()
// Are we still online? // Are we still online?
if c.streams.m == nil { if c.streams.m == nil {
...@@ -177,9 +184,11 @@ func (c *Conn) addStream(ts smux.Stream) (*Stream, error) { ...@@ -177,9 +184,11 @@ func (c *Conn) addStream(ts smux.Stream) (*Stream, error) {
} }
// Wrap and register the stream. // Wrap and register the stream.
stat := inet.Stat{Direction: dir}
s := &Stream{ s := &Stream{
stream: ts, stream: ts,
conn: c, conn: c,
stat: stat,
} }
c.streams.m[s] = struct{}{} c.streams.m[s] = struct{}{}
......
...@@ -325,7 +325,7 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { ...@@ -325,7 +325,7 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
"localAddr": connC.LocalMultiaddr(), "localAddr": connC.LocalMultiaddr(),
"remoteAddr": connC.RemoteMultiaddr(), "remoteAddr": connC.RemoteMultiaddr(),
} }
swarmC, err := s.addConn(connC) swarmC, err := s.addConn(connC, inet.DirOutbound)
if err != nil { if err != nil {
logdial["error"] = err.Error() logdial["error"] = err.Error()
connC.Close() // close the connection. didn't work out :( connC.Close() // close the connection. didn't work out :(
......
...@@ -81,7 +81,7 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error { ...@@ -81,7 +81,7 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
s.refs.Add(1) s.refs.Add(1)
go func() { go func() {
defer s.refs.Done() defer s.refs.Done()
_, err := s.addConn(c) _, err := s.addConn(c, inet.DirInbound)
if err != nil { if err != nil {
// Probably just means that the swarm has been closed. // Probably just means that the swarm has been closed.
log.Warningf("add conn failed: ", err) log.Warningf("add conn failed: ", err)
......
...@@ -22,6 +22,9 @@ const ( ...@@ -22,6 +22,9 @@ const (
streamReset streamReset
) )
// Validate Stream conforms to the go-libp2p-net Stream interface
var _ inet.Stream = &Stream{}
// Stream is the stream type used by swarm. In general, you won't use this type // Stream is the stream type used by swarm. In general, you won't use this type
// directly. // directly.
type Stream struct { type Stream struct {
...@@ -36,6 +39,8 @@ type Stream struct { ...@@ -36,6 +39,8 @@ type Stream struct {
notifyLk sync.Mutex notifyLk sync.Mutex
protocol atomic.Value protocol atomic.Value
stat inet.Stat
} }
func (s *Stream) String() string { func (s *Stream) String() string {
...@@ -165,3 +170,8 @@ func (s *Stream) SetReadDeadline(t time.Time) error { ...@@ -165,3 +170,8 @@ func (s *Stream) SetReadDeadline(t time.Time) error {
func (s *Stream) SetWriteDeadline(t time.Time) error { func (s *Stream) SetWriteDeadline(t time.Time) error {
return s.stream.SetWriteDeadline(t) return s.stream.SetWriteDeadline(t)
} }
// Stat returns metadata information for this stream.
func (s *Stream) Stat() inet.Stat {
return s.stat
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment