Unverified Commit 7ada4e50 authored by Raúl Kripalani's avatar Raúl Kripalani Committed by GitHub

`ID()` method on connections and streams + record opening time (#224)

Co-authored-by: default avatarAarsh Shah <aarshkshah1992@gmail.com>
Co-authored-by: default avatarRaúl Kripalani <raul@protocol.ai>
parent 4e44cfc0
module github.com/libp2p/go-libp2p-swarm module github.com/libp2p/go-libp2p-swarm
go 1.12
require ( require (
github.com/ipfs/go-log v1.0.4 github.com/ipfs/go-log v1.0.4
github.com/jbenet/goprocess v0.1.4 github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-addr-util v0.0.2 github.com/libp2p/go-addr-util v0.0.2
github.com/libp2p/go-conn-security-multistream v0.2.0 github.com/libp2p/go-conn-security-multistream v0.2.0
github.com/libp2p/go-libp2p-circuit v0.2.2 github.com/libp2p/go-libp2p-circuit v0.2.3
github.com/libp2p/go-libp2p-core v0.5.6 github.com/libp2p/go-libp2p-core v0.6.0
github.com/libp2p/go-libp2p-loggables v0.1.0 github.com/libp2p/go-libp2p-loggables v0.1.0
github.com/libp2p/go-libp2p-peerstore v0.2.4 github.com/libp2p/go-libp2p-peerstore v0.2.4
github.com/libp2p/go-libp2p-pubsub v0.3.1 github.com/libp2p/go-libp2p-quic-transport v0.5.0
github.com/libp2p/go-libp2p-quic-transport v0.3.7
github.com/libp2p/go-libp2p-secio v0.2.2 github.com/libp2p/go-libp2p-secio v0.2.2
github.com/libp2p/go-libp2p-testing v0.1.1 github.com/libp2p/go-libp2p-testing v0.1.1
github.com/libp2p/go-libp2p-transport-upgrader v0.3.0 github.com/libp2p/go-libp2p-transport-upgrader v0.3.0
github.com/libp2p/go-libp2p-yamux v0.2.7 github.com/libp2p/go-libp2p-yamux v0.2.8
github.com/libp2p/go-stream-muxer-multistream v0.3.0 github.com/libp2p/go-stream-muxer-multistream v0.3.0
github.com/libp2p/go-tcp-transport v0.2.0 github.com/libp2p/go-tcp-transport v0.2.0
github.com/multiformats/go-multiaddr v0.2.2 github.com/multiformats/go-multiaddr v0.2.2
github.com/multiformats/go-multiaddr-fmt v0.1.0 github.com/multiformats/go-multiaddr-fmt v0.1.0
github.com/multiformats/go-multiaddr-net v0.1.5 github.com/multiformats/go-multiaddr-net v0.1.5
github.com/stretchr/testify v1.4.0 github.com/stretchr/testify v1.6.0
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
) )
go 1.12
This diff is collapsed.
...@@ -54,6 +54,9 @@ type Swarm struct { ...@@ -54,6 +54,9 @@ type Swarm struct {
local peer.ID local peer.ID
peers peerstore.Peerstore peers peerstore.Peerstore
nextConnID uint32 // guarded by atomic
nextStreamID uint32 // guarded by atomic
conns struct { conns struct {
sync.RWMutex sync.RWMutex
m map[peer.ID][]*Conn m map[peer.ID][]*Conn
...@@ -197,11 +200,13 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, ...@@ -197,11 +200,13 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
} }
} }
stat := network.Stat{Direction: dir} // Wrap and register the connection.
stat := network.Stat{Direction: dir, Opened: time.Now()}
c := &Conn{ c := &Conn{
conn: tc, conn: tc,
swarm: s, swarm: s,
stat: stat, stat: stat,
id: atomic.AddUint32(&s.nextConnID, 1),
} }
// we ONLY check upgraded connections here so we can send them a Disconnect message. // we ONLY check upgraded connections here so we can send them a Disconnect message.
...@@ -234,7 +239,6 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, ...@@ -234,7 +239,6 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
return nil, ErrSwarmClosed return nil, ErrSwarmClosed
} }
// Wrap and register the connection.
c.streams.m = make(map[*Stream]struct{}) c.streams.m = make(map[*Stream]struct{})
s.conns.m[p] = append(s.conns.m[p], c) s.conns.m[p] = append(s.conns.m[p], c)
......
...@@ -4,6 +4,8 @@ import ( ...@@ -4,6 +4,8 @@ import (
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"time"
ic "github.com/libp2p/go-libp2p-core/crypto" ic "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/mux" "github.com/libp2p/go-libp2p-core/mux"
...@@ -22,6 +24,7 @@ var ErrConnClosed = errors.New("connection closed") ...@@ -22,6 +24,7 @@ var ErrConnClosed = errors.New("connection closed")
// Conn is the connection type used by swarm. In general, you won't use this // Conn is the connection type used by swarm. In general, you won't use this
// type directly. // type directly.
type Conn struct { type Conn struct {
id uint32
conn transport.CapableConn conn transport.CapableConn
swarm *Swarm swarm *Swarm
...@@ -38,6 +41,11 @@ type Conn struct { ...@@ -38,6 +41,11 @@ type Conn struct {
stat network.Stat stat network.Stat
} }
func (c *Conn) ID() string {
// format: <first 10 chars of peer id>-<global conn ordinal>
return fmt.Sprintf("%s-%d", c.RemotePeer().Pretty()[0:10], c.id)
}
// Close closes this connection. // Close closes this connection.
// //
// Note: This method won't wait for the close notifications to finish as that // Note: This method won't wait for the close notifications to finish as that
...@@ -169,6 +177,7 @@ func (c *Conn) Stat() network.Stat { ...@@ -169,6 +177,7 @@ func (c *Conn) Stat() network.Stat {
// NewStream returns a new Stream from this connection // NewStream returns a new Stream from this connection
func (c *Conn) NewStream() (network.Stream, error) { func (c *Conn) NewStream() (network.Stream, error) {
ts, err := c.conn.OpenStream() ts, err := c.conn.OpenStream()
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -185,11 +194,15 @@ func (c *Conn) addStream(ts mux.MuxedStream, dir network.Direction) (*Stream, er ...@@ -185,11 +194,15 @@ func (c *Conn) addStream(ts mux.MuxedStream, dir network.Direction) (*Stream, er
} }
// Wrap and register the stream. // Wrap and register the stream.
stat := network.Stat{Direction: dir} stat := network.Stat{
Direction: dir,
Opened: time.Now(),
}
s := &Stream{ s := &Stream{
stream: ts, stream: ts,
conn: c, conn: c,
stat: stat, stat: stat,
id: atomic.AddUint32(&c.swarm.nextStreamID, 1),
} }
c.streams.m[s] = struct{}{} c.streams.m[s] = struct{}{}
......
...@@ -28,6 +28,8 @@ var _ network.Stream = &Stream{} ...@@ -28,6 +28,8 @@ var _ network.Stream = &Stream{}
// Stream is the stream type used by swarm. In general, you won't use this type // Stream is the stream type used by swarm. In general, you won't use this type
// directly. // directly.
type Stream struct { type Stream struct {
id uint32
stream mux.MuxedStream stream mux.MuxedStream
conn *Conn conn *Conn
...@@ -43,6 +45,11 @@ type Stream struct { ...@@ -43,6 +45,11 @@ type Stream struct {
stat network.Stat stat network.Stat
} }
func (s *Stream) ID() string {
// format: <first 10 chars of peer id>-<global conn ordinal>-<global stream ordinal>
return fmt.Sprintf("%s-%d", s.conn.ID(), s.id)
}
func (s *Stream) String() string { func (s *Stream) String() string {
return fmt.Sprintf( return fmt.Sprintf(
"<swarm.Stream[%s] %s (%s) <-> %s (%s)>", "<swarm.Stream[%s] %s (%s) <-> %s (%s)>",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment