Commit 6419f7ce authored by Jeromy's avatar Jeromy Committed by hannahhoward

add statistics for network messages sent/recvd

parent 55a5c2b6
......@@ -38,6 +38,8 @@ type BitSwapNetwork interface {
ConnectionManager() ifconnmgr.ConnManager
Stats() NetworkStats
Routing
}
......@@ -68,3 +70,11 @@ type Routing interface {
// Provide provides the key to the network
Provide(context.Context, cid.Cid) error
}
// NetworkStats is a container for statistics about the bitswap network
// the numbers inside are specific to bitswap, and not any other protocols
// using the same underlying network.
type NetworkStats struct {
MessagesSent uint64
MessagesRecvd uint64
}
......@@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"sync/atomic"
"time"
bsmsg "github.com/ipfs/go-bitswap/message"
......@@ -48,6 +49,8 @@ type impl struct {
// inbound messages from the network are forwarded to the receiver
receiver Receiver
stats NetworkStats
}
type streamMessageSender struct {
......@@ -130,6 +133,8 @@ func (bsnet *impl) SendMessage(
s.Reset()
return err
}
atomic.AddUint64(&bsnet.stats.MessagesSent, 1)
// TODO(https://github.com/libp2p/go-libp2p-net/issues/28): Avoid this goroutine.
go inet.AwaitEOF(s)
return s.Close()
......@@ -210,6 +215,7 @@ func (bsnet *impl) handleNewStream(s inet.Stream) {
ctx := context.Background()
log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer())
bsnet.receiver.ReceiveMessage(ctx, p, received)
atomic.AddUint64(&bsnet.stats.MessagesRecvd, 1)
}
}
......@@ -217,6 +223,13 @@ func (bsnet *impl) ConnectionManager() ifconnmgr.ConnManager {
return bsnet.host.ConnManager()
}
func (bsnet *impl) Stats() NetworkStats {
return NetworkStats{
MessagesRecvd: atomic.LoadUint64(&bsnet.stats.MessagesRecvd),
MessagesSent: atomic.LoadUint64(&bsnet.stats.MessagesSent),
}
}
type netNotifiee impl
func (nn *netNotifiee) impl() *impl {
......
......@@ -7,15 +7,16 @@ import (
)
type Stat struct {
ProvideBufLen int
Wantlist []cid.Cid
Peers []string
BlocksReceived uint64
DataReceived uint64
BlocksSent uint64
DataSent uint64
DupBlksReceived uint64
DupDataReceived uint64
ProvideBufLen int
Wantlist []cid.Cid
Peers []string
BlocksReceived uint64
DataReceived uint64
BlocksSent uint64
DataSent uint64
DupBlksReceived uint64
DupDataReceived uint64
MessagesReceived uint64
}
func (bs *Bitswap) Stat() (*Stat, error) {
......@@ -30,6 +31,7 @@ func (bs *Bitswap) Stat() (*Stat, error) {
st.BlocksSent = c.blocksSent
st.DataSent = c.dataSent
st.DataReceived = c.dataRecvd
st.MessagesReceived = c.messagesRecvd
bs.counterLk.Unlock()
peers := bs.engine.Peers()
......
......@@ -4,6 +4,7 @@ import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
bsmsg "github.com/ipfs/go-bitswap/message"
......@@ -48,7 +49,7 @@ type message struct {
// order* with their delays respected as much as sending them in order allows
// for
type receiverQueue struct {
receiver bsnet.Receiver
receiver *networkClient
queue []*message
active bool
lk sync.Mutex
......@@ -104,30 +105,30 @@ func (n *network) SendMessage(
return nil
}
func (n *network) deliver(
r bsnet.Receiver, from peer.ID, message bsmsg.BitSwapMessage) error {
if message == nil || from == "" {
return errors.New("invalid input")
}
n.delay.Wait()
r.ReceiveMessage(context.TODO(), from, message)
return nil
}
type networkClient struct {
local peer.ID
bsnet.Receiver
network *network
routing routing.IpfsRouting
stats bsnet.NetworkStats
}
func (nc *networkClient) SendMessage(
ctx context.Context,
to peer.ID,
message bsmsg.BitSwapMessage) error {
return nc.network.SendMessage(ctx, nc.local, to, message)
if err := nc.network.SendMessage(ctx, nc.local, to, message); err != nil {
return err
}
atomic.AddUint64(&nc.stats.MessagesSent, 1)
return nil
}
func (nc *networkClient) Stats() bsnet.NetworkStats {
return bsnet.NetworkStats{
MessagesRecvd: atomic.LoadUint64(&nc.stats.MessagesRecvd),
MessagesSent: atomic.LoadUint64(&nc.stats.MessagesSent),
}
}
// FindProvidersAsync returns a channel of providers for the given key
......@@ -157,14 +158,14 @@ func (nc *networkClient) ConnectionManager() ifconnmgr.ConnManager {
}
type messagePasser struct {
net *network
net *networkClient
target peer.ID
local peer.ID
ctx context.Context
}
func (mp *messagePasser) SendMsg(ctx context.Context, m bsmsg.BitSwapMessage) error {
return mp.net.SendMessage(ctx, mp.local, mp.target, m)
return mp.net.SendMessage(ctx, mp.target, m)
}
func (mp *messagePasser) Close() error {
......@@ -177,7 +178,7 @@ func (mp *messagePasser) Reset() error {
func (n *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) {
return &messagePasser{
net: n.network,
net: n,
target: p,
local: n.local,
ctx: ctx,
......@@ -241,6 +242,7 @@ func (rq *receiverQueue) process() {
rq.lk.Unlock()
time.Sleep(time.Until(m.shouldSend))
atomic.AddUint64(&rq.receiver.stats.MessagesRecvd, 1)
rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg)
}
}
......
......@@ -81,7 +81,7 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
return i.blockstoreDelay.Set(t)
}
// session creates a test bitswap session.
// session creates a test bitswap instance.
//
// NB: It's easy make mistakes by providing the same peer ID to two different
// sessions. To safeguard, use the SessionGenerator to generate sessions. It's
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment