Commit cf9bd9b6 authored by Adrian Lanzafame's avatar Adrian Lanzafame Committed by Matt Joiner

Add opencensus basic metrics (#317)

Add initial set of metrics exposed via OpenCensus.
parent 12c95104
......@@ -4,11 +4,14 @@ import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"time"
"go.opencensus.io/tag"
"golang.org/x/xerrors"
"github.com/libp2p/go-libp2p-kad-dht/metrics"
opts "github.com/libp2p/go-libp2p-kad-dht/opts"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
providers "github.com/libp2p/go-libp2p-kad-dht/providers"
......@@ -136,7 +139,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
cmgr.UntagPeer(p, "kbucket")
}
return &IpfsDHT{
dht := &IpfsDHT{
datastore: dstore,
self: h.ID(),
peerstore: h.Peerstore(),
......@@ -148,6 +151,10 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
routingTable: rt,
protocols: protocols,
}
dht.ctx = dht.newContextWithLocalTags(ctx)
return dht
}
// putValueToPeer stores the given key/value pair at the peer 'p'
......@@ -413,3 +420,19 @@ func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
}
return nil
}
// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
extraTags = append(
extraTags,
tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
tag.Upsert(metrics.KeyInstanceID, fmt.Sprintf("%p", dht)),
)
ctx, _ = tag.New(
ctx,
extraTags...,
) // ignoring error as it is unrelated to the actual function of this code.
return ctx
}
......@@ -10,9 +10,12 @@ import (
ggio "github.com/gogo/protobuf/io"
ctxio "github.com/jbenet/go-context/io"
"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
)
var dhtReadMessageTimeout = time.Minute
......@@ -54,7 +57,8 @@ func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
// Returns true on orderly completion of writes (so we can Close the stream).
func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
ctx := dht.Context()
ctx := dht.ctx
cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
......@@ -72,18 +76,37 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
if err.Error() != "stream reset" {
logger.Debugf("error reading message: %#v", err)
}
stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")},
metrics.ReceivedMessageErrors.M(1),
)
return false
case nil:
}
startTime := time.Now()
ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.KeyMessageType, req.GetType().String()),
)
stats.Record(
ctx,
metrics.ReceivedMessages.M(1),
metrics.ReceivedBytes.M(int64(req.Size())),
)
handler := dht.handlerForMsgType(req.GetType())
if handler == nil {
stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
logger.Warningf("can't handle received message of type %v", req.GetType())
return false
}
resp, err := handler(ctx, mPeer, &req)
if err != nil {
stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
logger.Debugf("error handling message: %v", err)
return false
}
......@@ -100,19 +123,25 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
err = w.Flush()
}
if err != nil {
stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
logger.Debugf("error writing response: %v", err)
return false
}
elapsedTime := time.Since(startTime)
latencyMillis := float64(elapsedTime) / float64(time.Millisecond)
stats.Record(ctx, metrics.InboundRequestLatency.M(latencyMillis))
}
}
// sendRequest sends out a request, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
ctx, _ = tag.New(ctx, metrics.UpsertMessageType(pmes))
ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
stats.Record(ctx, metrics.SentRequestErrors.M(1))
return nil, err
}
......@@ -120,12 +149,21 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
rpmes, err := ms.SendRequest(ctx, pmes)
if err != nil {
stats.Record(ctx, metrics.SentRequestErrors.M(1))
return nil, err
}
// update the peer (on valid msgs only)
dht.updateFromMessage(ctx, p, rpmes)
stats.Record(
ctx,
metrics.SentRequests.M(1),
metrics.SentBytes.M(int64(pmes.Size())),
metrics.OutboundRequestLatency.M(
float64(time.Since(start))/float64(time.Millisecond),
),
)
dht.peerstore.RecordLatency(p, time.Since(start))
logger.Event(ctx, "dhtReceivedMessage", dht.self, p, rpmes)
return rpmes, nil
......@@ -133,14 +171,24 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
// sendMessage sends out a message
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
ctx, _ = tag.New(ctx, metrics.UpsertMessageType(pmes))
ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
stats.Record(ctx, metrics.SentMessageErrors.M(1))
return err
}
if err := ms.SendMessage(ctx, pmes); err != nil {
stats.Record(ctx, metrics.SentMessageErrors.M(1))
return err
}
stats.Record(
ctx,
metrics.SentMessages.M(1),
metrics.SentBytes.M(int64(pmes.Size())),
)
logger.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
return nil
}
......@@ -260,11 +308,10 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro
if retry {
logger.Info("error writing message, bailing: ", err)
return err
} else {
logger.Info("error writing message, trying again: ", err)
retry = true
continue
}
logger.Info("error writing message, trying again: ", err)
retry = true
continue
}
logger.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)
......@@ -296,11 +343,10 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb
if retry {
logger.Info("error writing message, bailing: ", err)
return nil, err
} else {
logger.Info("error writing message, trying again: ", err)
retry = true
continue
}
logger.Info("error writing message, trying again: ", err)
retry = true
continue
}
mes := new(pb.Message)
......@@ -311,11 +357,10 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb
if retry {
logger.Info("error reading message, bailing: ", err)
return nil, err
} else {
logger.Info("error reading message, trying again: ", err)
retry = true
continue
}
logger.Info("error reading message, trying again: ", err)
retry = true
continue
}
logger.Event(ctx, "dhtSentMessage", ms.dht.self, ms.p, pmes)
......
This diff is collapsed.
package metrics
import (
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)
var (
defaultBytesDistribution = view.Distribution(1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296)
defaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
)
// Keys
var (
KeyMessageType, _ = tag.NewKey("message_type")
KeyPeerID, _ = tag.NewKey("peer_id")
// KeyInstanceID identifies a dht instance by the pointer address.
// Useful for differentiating between different dhts that have the same peer id.
KeyInstanceID, _ = tag.NewKey("instance_id")
)
// UpsertMessageType is a convenience upserts the message type
// of a pb.Message into the KeyMessageType.
func UpsertMessageType(m *pb.Message) tag.Mutator {
return tag.Upsert(KeyMessageType, m.Type.String())
}
// Measures
var (
ReceivedMessages = stats.Int64("libp2p.io/dht/kad/received_messages", "Total number of messages received per RPC", stats.UnitDimensionless)
ReceivedMessageErrors = stats.Int64("libp2p.io/dht/kad/received_message_errors", "Total number of errors for messages received per RPC", stats.UnitDimensionless)
ReceivedBytes = stats.Int64("libp2p.io/dht/kad/received_bytes", "Total received bytes per RPC", stats.UnitBytes)
InboundRequestLatency = stats.Float64("libp2p.io/dht/kad/inbound_request_latency", "Latency per RPC", stats.UnitMilliseconds)
OutboundRequestLatency = stats.Float64("libp2p.io/dht/kad/outbound_request_latency", "Latency per RPC", stats.UnitMilliseconds)
SentMessages = stats.Int64("libp2p.io/dht/kad/sent_messages", "Total number of messages sent per RPC", stats.UnitDimensionless)
SentMessageErrors = stats.Int64("libp2p.io/dht/kad/sent_message_errors", "Total number of errors for messages sent per RPC", stats.UnitDimensionless)
SentRequests = stats.Int64("libp2p.io/dht/kad/sent_requests", "Total number of requests sent per RPC", stats.UnitDimensionless)
SentRequestErrors = stats.Int64("libp2p.io/dht/kad/sent_request_errors", "Total number of errors for requests sent per RPC", stats.UnitDimensionless)
SentBytes = stats.Int64("libp2p.io/dht/kad/sent_bytes", "Total sent bytes per RPC", stats.UnitBytes)
)
// Views
var (
ReceivedMessagesView = &view.View{
Measure: ReceivedMessages,
TagKeys: []tag.Key{KeyMessageType, KeyPeerID, KeyInstanceID},
Aggregation: view.Count(),
}
ReceivedMessageErrorsView = &view.View{
Measure: ReceivedMessageErrors,
TagKeys: []tag.Key{KeyMessageType, KeyPeerID, KeyInstanceID},
Aggregation: view.Count(),
}
ReceivedBytesView = &view.View{
Measure: ReceivedBytes,
TagKeys: []tag.Key{KeyMessageType, KeyPeerID, KeyInstanceID},
Aggregation: defaultBytesDistribution,
}
InboundRequestLatencyView = &view.View{
Measure: InboundRequestLatency,
TagKeys: []tag.Key{KeyMessageType, KeyPeerID, KeyInstanceID},
Aggregation: defaultMillisecondsDistribution,
}
OutboundRequestLatencyView = &view.View{
Measure: OutboundRequestLatency,
TagKeys: []tag.Key{KeyMessageType, KeyPeerID, KeyInstanceID},
Aggregation: defaultMillisecondsDistribution,
}
SentMessagesView = &view.View{
Measure: SentMessages,
TagKeys: []tag.Key{KeyMessageType, KeyPeerID, KeyInstanceID},
Aggregation: view.Count(),
}
SentMessageErrorsView = &view.View{
Measure: SentMessageErrors,
TagKeys: []tag.Key{KeyMessageType, KeyPeerID, KeyInstanceID},
Aggregation: view.Count(),
}
SentRequestsView = &view.View{
Measure: SentRequests,
TagKeys: []tag.Key{KeyMessageType, KeyPeerID, KeyInstanceID},
Aggregation: view.Count(),
}
SentRequestErrorsView = &view.View{
Measure: SentRequestErrors,
TagKeys: []tag.Key{KeyMessageType, KeyPeerID, KeyInstanceID},
Aggregation: view.Count(),
}
SentBytesView = &view.View{
Measure: SentBytes,
TagKeys: []tag.Key{KeyMessageType, KeyPeerID, KeyInstanceID},
Aggregation: defaultBytesDistribution,
}
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment