Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
p2p
go-tcp-transport
Commits
256a6c3d
Commit
256a6c3d
authored
Mar 27, 2021
by
Marten Seemann
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
expose some Prometheus metrics
parent
2b0b384b
Changes
5
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
587 additions
and
89 deletions
+587
-89
go.mod
go.mod
+6
-0
go.sum
go.sum
+360
-87
metrics.go
metrics.go
+176
-0
tcp.go
tcp.go
+6
-2
tcp_conn.go
tcp_conn.go
+39
-0
No files found.
go.mod
View file @
256a6c3d
...
@@ -6,10 +6,16 @@ require (
...
@@ -6,10 +6,16 @@ require (
github.com/libp2p/go-libp2p-mplex
v0.2.3
github.com/libp2p/go-libp2p-mplex
v0.2.3
github.com/libp2p/go-libp2p-testing
v0.1.1
github.com/libp2p/go-libp2p-testing
v0.1.1
github.com/libp2p/go-libp2p-transport-upgrader
v0.2.0
github.com/libp2p/go-libp2p-transport-upgrader
v0.2.0
github.com/libp2p/go-netroute
v0.1.5 // indirect
github.com/libp2p/go-reuseport
v0.0.2
github.com/libp2p/go-reuseport
v0.0.2
github.com/libp2p/go-reuseport-transport
v0.0.4
github.com/libp2p/go-reuseport-transport
v0.0.4
github.com/mikioh/tcp
v0.0.0-20190314235350-803a9b46060c
github.com/mikioh/tcpinfo
v0.0.0-20190314235526-30a79bb1804b
github.com/mikioh/tcpopt
v0.0.0-20190314235656-172688c1accc // indirect
github.com/multiformats/go-multiaddr
v0.3.1
github.com/multiformats/go-multiaddr
v0.3.1
github.com/multiformats/go-multiaddr-fmt
v0.1.0
github.com/multiformats/go-multiaddr-fmt
v0.1.0
github.com/multiformats/go-multihash
v0.0.15 // indirect
github.com/prometheus/client_golang
v1.10.0
)
)
go 1.12
go 1.12
go.sum
View file @
256a6c3d
This diff is collapsed.
Click to expand it.
metrics.go
0 → 100644
View file @
256a6c3d
package
tcp
import
(
"strings"
"sync"
"time"
"github.com/mikioh/tcp"
"github.com/mikioh/tcpinfo"
manet
"github.com/multiformats/go-multiaddr/net"
"github.com/prometheus/client_golang/prometheus"
)
var
(
newConns
*
prometheus
.
CounterVec
closedConns
*
prometheus
.
CounterVec
)
var
collector
*
aggregatingCollector
func
init
()
{
collector
=
newAggregatingCollector
()
prometheus
.
MustRegister
(
collector
)
const
direction
=
"direction"
newConns
=
prometheus
.
NewCounterVec
(
prometheus
.
CounterOpts
{
Name
:
"tcp_connections_new_total"
,
Help
:
"TCP new connections"
,
},
[]
string
{
direction
},
)
prometheus
.
MustRegister
(
newConns
)
closedConns
=
prometheus
.
NewCounterVec
(
prometheus
.
CounterOpts
{
Name
:
"tcp_connections_closed_total"
,
Help
:
"TCP connections closed"
,
},
[]
string
{
direction
},
)
prometheus
.
MustRegister
(
closedConns
)
}
type
aggregatingCollector
struct
{
mutex
sync
.
Mutex
highestID
uint64
conns
map
[
uint64
]
/* id */
*
tracingConn
rtts
prometheus
.
Histogram
connDurations
prometheus
.
Histogram
}
var
_
prometheus
.
Collector
=
&
aggregatingCollector
{}
func
newAggregatingCollector
()
*
aggregatingCollector
{
return
&
aggregatingCollector
{
conns
:
make
(
map
[
uint64
]
*
tracingConn
),
rtts
:
prometheus
.
NewHistogram
(
prometheus
.
HistogramOpts
{
Name
:
"tcp_rtt"
,
Help
:
"TCP round trip time"
,
Buckets
:
prometheus
.
ExponentialBuckets
(
0.001
,
1.25
,
40
),
// 1ms to ~6000ms
}),
connDurations
:
prometheus
.
NewHistogram
(
prometheus
.
HistogramOpts
{
Name
:
"tcp_connection_duration"
,
Help
:
"TCP Connection Duration"
,
Buckets
:
prometheus
.
ExponentialBuckets
(
1
,
1.5
,
40
),
// 1s to ~12 weeks
}),
}
}
func
(
c
*
aggregatingCollector
)
AddConn
(
t
*
tracingConn
)
uint64
{
c
.
mutex
.
Lock
()
defer
c
.
mutex
.
Unlock
()
c
.
highestID
++
c
.
conns
[
c
.
highestID
]
=
t
return
c
.
highestID
}
func
(
c
*
aggregatingCollector
)
removeConn
(
id
uint64
)
{
delete
(
c
.
conns
,
id
)
}
func
(
c
*
aggregatingCollector
)
Describe
(
descs
chan
<-
*
prometheus
.
Desc
)
{
descs
<-
c
.
rtts
.
Desc
()
descs
<-
c
.
connDurations
.
Desc
()
}
func
(
c
*
aggregatingCollector
)
Collect
(
metrics
chan
<-
prometheus
.
Metric
)
{
now
:=
time
.
Now
()
c
.
mutex
.
Lock
()
for
_
,
conn
:=
range
c
.
conns
{
info
,
err
:=
conn
.
getTCPInfo
()
if
err
!=
nil
{
if
strings
.
Contains
(
err
.
Error
(),
"use of closed network connection"
)
{
c
.
closedConn
(
conn
)
continue
}
log
.
Errorf
(
"Failed to get TCP info: %s"
,
err
)
continue
}
c
.
rtts
.
Observe
(
info
.
RTT
.
Seconds
())
c
.
connDurations
.
Observe
(
now
.
Sub
(
conn
.
startTime
)
.
Seconds
())
if
info
.
State
==
tcpinfo
.
Closed
{
c
.
closedConn
(
conn
)
}
}
c
.
mutex
.
Unlock
()
metrics
<-
c
.
rtts
metrics
<-
c
.
connDurations
}
func
(
c
*
aggregatingCollector
)
closedConn
(
conn
*
tracingConn
)
{
collector
.
removeConn
(
conn
.
id
)
closedConns
.
WithLabelValues
(
conn
.
getDirection
())
.
Inc
()
}
type
tracingConn
struct
{
id
uint64
startTime
time
.
Time
isClient
bool
manet
.
Conn
tcpConn
*
tcp
.
Conn
}
func
newTracingConn
(
c
manet
.
Conn
,
isClient
bool
)
(
*
tracingConn
,
error
)
{
conn
,
err
:=
newTCPConn
(
c
)
if
err
!=
nil
{
return
nil
,
err
}
tc
:=
&
tracingConn
{
startTime
:
time
.
Now
(),
isClient
:
isClient
,
Conn
:
c
,
tcpConn
:
conn
,
}
tc
.
id
=
collector
.
AddConn
(
tc
)
newConns
.
WithLabelValues
(
tc
.
getDirection
())
.
Inc
()
return
tc
,
nil
}
func
(
c
*
tracingConn
)
getDirection
()
string
{
if
c
.
isClient
{
return
"outgoing"
}
return
"incoming"
}
func
(
c
*
tracingConn
)
Close
()
error
{
return
c
.
Conn
.
Close
()
}
func
(
c
*
tracingConn
)
getTCPInfo
()
(
*
tcpinfo
.
Info
,
error
)
{
var
o
tcpinfo
.
Info
var
b
[
256
]
byte
i
,
err
:=
c
.
tcpConn
.
Option
(
o
.
Level
(),
o
.
Name
(),
b
[
:
])
if
err
!=
nil
{
return
nil
,
err
}
info
:=
i
.
(
*
tcpinfo
.
Info
)
return
info
,
nil
}
type
tracingListener
struct
{
manet
.
Listener
}
func
(
l
*
tracingListener
)
Accept
()
(
manet
.
Conn
,
error
)
{
conn
,
err
:=
l
.
Listener
.
Accept
()
if
err
!=
nil
{
return
nil
,
err
}
return
newTracingConn
(
conn
,
false
)
}
tcp.go
View file @
256a6c3d
...
@@ -132,7 +132,11 @@ func (t *TcpTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID)
...
@@ -132,7 +132,11 @@ func (t *TcpTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID)
// This means we can immediately reuse the 5-tuple and reconnect.
// This means we can immediately reuse the 5-tuple and reconnect.
tryLinger
(
conn
,
0
)
tryLinger
(
conn
,
0
)
tryKeepAlive
(
conn
,
true
)
tryKeepAlive
(
conn
,
true
)
return
t
.
Upgrader
.
UpgradeOutbound
(
ctx
,
t
,
conn
,
p
)
c
,
err
:=
newTracingConn
(
conn
,
true
)
if
err
!=
nil
{
return
nil
,
err
}
return
t
.
Upgrader
.
UpgradeOutbound
(
ctx
,
t
,
c
,
p
)
}
}
// UseReuseport returns true if reuseport is enabled and available.
// UseReuseport returns true if reuseport is enabled and available.
...
@@ -153,7 +157,7 @@ func (t *TcpTransport) Listen(laddr ma.Multiaddr) (transport.Listener, error) {
...
@@ -153,7 +157,7 @@ func (t *TcpTransport) Listen(laddr ma.Multiaddr) (transport.Listener, error) {
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
err
return
nil
,
err
}
}
list
=
&
tcpListener
{
list
,
0
}
list
=
&
tracingListener
{
&
tcpListener
{
list
,
0
}
}
return
t
.
Upgrader
.
UpgradeListener
(
t
,
list
),
nil
return
t
.
Upgrader
.
UpgradeListener
(
t
,
list
),
nil
}
}
...
...
tcp_conn.go
0 → 100644
View file @
256a6c3d
package
tcp
import
(
"errors"
"net"
"syscall"
"unsafe"
"github.com/mikioh/tcp"
)
// This is only needed because mikioh/tcp doesn't accept wrapped connections.
// See https://github.com/mikioh/tcp/pull/2.
type
tcpConn
struct
{
net
.
Conn
c
syscall
.
RawConn
}
// newTCPConn returns a new end point.
func
newTCPConn
(
c
net
.
Conn
)
(
*
tcp
.
Conn
,
error
)
{
type
tcpConnI
interface
{
SyscallConn
()
(
syscall
.
RawConn
,
error
)
SetLinger
(
int
)
error
}
var
_
tcpConnI
=
&
net
.
TCPConn
{}
cc
:=
&
tcpConn
{
Conn
:
c
}
switch
c
:=
c
.
(
type
)
{
case
tcpConnI
:
var
err
error
cc
.
c
,
err
=
c
.
SyscallConn
()
if
err
!=
nil
{
return
nil
,
err
}
return
(
*
tcp
.
Conn
)(
unsafe
.
Pointer
(
cc
)),
nil
default
:
return
nil
,
errors
.
New
(
"unknown connection type"
)
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment