Unverified Commit 01b9825f authored by vyzo's avatar vyzo Committed by GitHub

Merge pull request #227 from libp2p/feat/tracing

tracing support
parents 28a87b31 7065297a
......@@ -31,6 +31,7 @@ func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, err
type FloodSubRouter struct {
p *PubSub
protocols []protocol.ID
tracer *pubsubTracer
}
func (fs *FloodSubRouter) Protocols() []protocol.ID {
......@@ -39,11 +40,16 @@ func (fs *FloodSubRouter) Protocols() []protocol.ID {
func (fs *FloodSubRouter) Attach(p *PubSub) {
fs.p = p
fs.tracer = p.tracer
}
func (fs *FloodSubRouter) AddPeer(peer.ID, protocol.ID) {}
func (fs *FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
fs.tracer.AddPeer(p, proto)
}
func (fs *FloodSubRouter) RemovePeer(peer.ID) {}
func (fs *FloodSubRouter) RemovePeer(p peer.ID) {
fs.tracer.RemovePeer(p)
}
func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool {
// check all peers in the topic
......@@ -91,13 +97,19 @@ func (fs *FloodSubRouter) Publish(from peer.ID, msg *pb.Message) {
select {
case mch <- out:
fs.tracer.SendRPC(out, pid)
default:
log.Infof("dropping message to peer %s: queue full", pid)
fs.tracer.DropRPC(out, pid)
// Drop it. The peer is too slow.
}
}
}
func (fs *FloodSubRouter) Join(topic string) {}
func (fs *FloodSubRouter) Join(topic string) {
fs.tracer.Join(topic)
}
func (fs *FloodSubRouter) Leave(topic string) {}
func (fs *FloodSubRouter) Leave(topic string) {
fs.tracer.Join(topic)
}
......@@ -65,6 +65,7 @@ type GossipSubRouter struct {
gossip map[peer.ID][]*pb.ControlIHave // pending gossip
control map[peer.ID]*pb.ControlMessage // pending control messages
mcache *MessageCache
tracer *pubsubTracer
}
func (gs *GossipSubRouter) Protocols() []protocol.ID {
......@@ -73,16 +74,19 @@ func (gs *GossipSubRouter) Protocols() []protocol.ID {
func (gs *GossipSubRouter) Attach(p *PubSub) {
gs.p = p
gs.tracer = p.tracer
go gs.heartbeatTimer()
}
func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
log.Debugf("PEERUP: Add new peer %s using %s", p, proto)
gs.tracer.AddPeer(p, proto)
gs.peers[p] = proto
}
func (gs *GossipSubRouter) RemovePeer(p peer.ID) {
log.Debugf("PEERDOWN: Remove disconnected peer %s", p)
gs.tracer.RemovePeer(p)
delete(gs.peers, p)
for _, peers := range gs.mesh {
delete(peers, p)
......@@ -208,6 +212,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
prune = append(prune, topic)
} else {
log.Debugf("GRAFT: Add mesh link from %s in %s", p, topic)
gs.tracer.Graft(p, topic)
peers[p] = struct{}{}
gs.tagPeer(p, topic)
}
......@@ -231,6 +236,7 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
peers, ok := gs.mesh[topic]
if ok {
log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
delete(peers, p)
gs.untagPeer(p, topic)
}
......@@ -294,6 +300,7 @@ func (gs *GossipSubRouter) Join(topic string) {
}
log.Debugf("JOIN %s", topic)
gs.tracer.Join(topic)
gmap, ok = gs.fanout[topic]
if ok {
......@@ -319,6 +326,7 @@ func (gs *GossipSubRouter) Join(topic string) {
for p := range gmap {
log.Debugf("JOIN: Add mesh link to %s in %s", p, topic)
gs.tracer.Graft(p, topic)
gs.sendGraft(p, topic)
gs.tagPeer(p, topic)
}
......@@ -331,11 +339,13 @@ func (gs *GossipSubRouter) Leave(topic string) {
}
log.Debugf("LEAVE %s", topic)
gs.tracer.Leave(topic)
delete(gs.mesh, topic)
for p := range gmap {
log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
gs.sendPrune(p, topic)
gs.untagPeer(p, topic)
}
......@@ -384,8 +394,10 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) {
select {
case mch <- out:
gs.tracer.SendRPC(out, p)
default:
log.Infof("dropping message to peer %s: queue full", p)
gs.tracer.DropRPC(out, p)
// push control messages that need to be retried
ctl := out.GetControl()
if ctl != nil {
......@@ -443,6 +455,7 @@ func (gs *GossipSubRouter) heartbeat() {
for _, p := range plst {
log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic)
gs.tracer.Graft(p, topic)
peers[p] = struct{}{}
gs.tagPeer(p, topic)
topics := tograft[p]
......@@ -458,6 +471,7 @@ func (gs *GossipSubRouter) heartbeat() {
for _, p := range plst[:idontneed] {
log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
delete(peers, p)
gs.untagPeer(p, topic)
topics := toprune[p]
......
This diff is collapsed.
syntax = "proto2";
package pubsub.pb;
message TraceEvent {
optional Type type = 1;
optional bytes peerID = 2;
optional int64 timestamp = 3;
optional PublishMessage publishMessage = 4;
optional RejectMessage rejectMessage = 5;
optional DuplicateMessage duplicateMessage = 6;
optional DeliverMessage deliverMessage = 7;
optional AddPeer addPeer = 8;
optional RemovePeer removePeer = 9;
optional RecvRPC recvRPC = 10;
optional SendRPC sendRPC = 11;
optional DropRPC dropRPC = 12;
optional Join join = 13;
optional Leave leave = 14;
optional Graft graft = 15;
optional Prune prune = 16;
enum Type {
PUBLISH_MESSAGE = 0;
REJECT_MESSAGE = 1;
DUPLICATE_MESSAGE = 2;
DELIVER_MESSAGE = 3;
ADD_PEER = 4;
REMOVE_PEER = 5;
RECV_RPC = 6;
SEND_RPC = 7;
DROP_RPC = 8;
JOIN = 9;
LEAVE = 10;
GRAFT = 11;
PRUNE = 12;
}
message PublishMessage {
optional bytes messageID = 1;
repeated string topics = 2;
}
message RejectMessage {
optional bytes messageID = 1;
optional bytes receivedFrom = 2;
optional string reason = 3;
}
message DuplicateMessage {
optional bytes messageID = 1;
optional bytes receivedFrom = 2;
}
message DeliverMessage {
optional bytes messageID = 1;
}
message AddPeer {
optional bytes peerID = 1;
optional string proto = 2;
}
message RemovePeer {
optional bytes peerID = 1;
}
message RecvRPC {
optional bytes receivedFrom = 1;
optional RPCMeta meta = 2;
}
message SendRPC {
optional bytes sendTo = 1;
optional RPCMeta meta = 2;
}
message DropRPC {
optional bytes sendTo = 1;
optional RPCMeta meta = 2;
}
message Join {
optional string topic = 1;
}
message Leave {
optional string topic = 2;
}
message Graft {
optional bytes peerID = 1;
optional string topic = 2;
}
message Prune {
optional bytes peerID = 1;
optional string topic = 2;
}
message RPCMeta {
repeated MessageMeta messages = 1;
repeated SubMeta subscription = 2;
optional ControlMeta control = 3;
}
message MessageMeta {
optional bytes messageID = 1;
repeated string topics = 2;
}
message SubMeta {
optional bool subscribe = 1;
optional string topic = 2;
}
message ControlMeta {
repeated ControlIHaveMeta ihave = 1;
repeated ControlIWantMeta iwant = 2;
repeated ControlGraftMeta graft = 3;
repeated ControlPruneMeta prune = 4;
}
message ControlIHaveMeta {
optional string topic = 1;
repeated bytes messageIDs = 2;
}
message ControlIWantMeta {
repeated bytes messageIDs = 1;
}
message ControlGraftMeta {
optional string topic = 1;
}
message ControlPruneMeta {
optional string topic = 1;
}
}
message TraceEventBatch {
repeated TraceEvent batch = 1;
}
......@@ -46,6 +46,8 @@ type PubSub struct {
disc *discover
tracer *pubsubTracer
// size of the outbound message channel that we maintain for each peer
peerOutboundQueueSize int
......@@ -321,6 +323,14 @@ func WithDiscovery(d discovery.Discovery, opts ...DiscoverOpt) Option {
}
}
// WithEventTracer provides a tracer for the pubsub system
func WithEventTracer(tracer EventTracer) Option {
return func(p *PubSub) error {
p.tracer = &pubsubTracer{tracer: tracer, pid: p.host.ID()}
return nil
}
}
// processLoop handles all inputs arriving on the channels
func (p *PubSub) processLoop(ctx context.Context) {
defer func() {
......@@ -436,6 +446,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
p.handleIncomingRPC(rpc)
case msg := <-p.publish:
p.tracer.PublishMessage(msg)
p.pushMsg(msg)
case msg := <-p.sendMsg:
......@@ -571,8 +582,10 @@ func (p *PubSub) announce(topic string, sub bool) {
for pid, peer := range p.peers {
select {
case peer <- out:
p.tracer.SendRPC(out, pid)
default:
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
p.tracer.DropRPC(out, pid)
go p.announceRetry(pid, topic, sub)
}
}
......@@ -608,8 +621,10 @@ func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) {
out := rpcWithSubs(subopt)
select {
case peer <- out:
p.tracer.SendRPC(out, pid)
default:
log.Infof("Can't send announce message to peer %s: queue full; scheduling retry", pid)
p.tracer.DropRPC(out, pid)
go p.announceRetry(pid, topic, sub)
}
}
......@@ -671,6 +686,8 @@ func (p *PubSub) notifyLeave(topic string, pid peer.ID) {
}
func (p *PubSub) handleIncomingRPC(rpc *RPC) {
p.tracer.RecvRPC(rpc)
for _, subopt := range rpc.GetSubscriptions() {
t := subopt.GetTopicid()
if subopt.GetSubscribe() {
......@@ -724,24 +741,28 @@ func (p *PubSub) pushMsg(msg *Message) {
// reject messages from blacklisted peers
if p.blacklist.Contains(src) {
log.Warningf("dropping message from blacklisted peer %s", src)
p.tracer.RejectMessage(msg, "blacklisted peer")
return
}
// even if they are forwarded by good peers
if p.blacklist.Contains(msg.GetFrom()) {
log.Warningf("dropping message from blacklisted source %s", src)
p.tracer.RejectMessage(msg, "blacklisted source")
return
}
// reject unsigned messages when strict before we even process the id
if p.signStrict && msg.Signature == nil {
log.Debugf("dropping unsigned message from %s", src)
p.tracer.RejectMessage(msg, "missing signature")
return
}
// have we already seen and validated this message?
id := msgID(msg.Message)
if p.seenMessage(id) {
p.tracer.DuplicateMessage(msg)
return
}
......@@ -755,6 +776,7 @@ func (p *PubSub) pushMsg(msg *Message) {
}
func (p *PubSub) publishMessage(msg *Message) {
p.tracer.DeliverMessage(msg)
p.notifySubs(msg)
p.rt.Publish(msg.ReceivedFrom, msg.Message)
}
......
......@@ -29,8 +29,9 @@ func NewRandomSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
// RandomSubRouter is a router that implements a random propagation strategy.
// For each message, it selects RandomSubD peers and forwards the message to them.
type RandomSubRouter struct {
p *PubSub
peers map[peer.ID]protocol.ID
p *PubSub
peers map[peer.ID]protocol.ID
tracer *pubsubTracer
}
func (rs *RandomSubRouter) Protocols() []protocol.ID {
......@@ -39,13 +40,16 @@ func (rs *RandomSubRouter) Protocols() []protocol.ID {
func (rs *RandomSubRouter) Attach(p *PubSub) {
rs.p = p
rs.tracer = p.tracer
}
func (rs *RandomSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
rs.tracer.AddPeer(p, proto)
rs.peers[p] = proto
}
func (rs *RandomSubRouter) RemovePeer(p peer.ID) {
rs.tracer.RemovePeer(p)
delete(rs.peers, p)
}
......@@ -132,12 +136,18 @@ func (rs *RandomSubRouter) Publish(from peer.ID, msg *pb.Message) {
select {
case mch <- out:
rs.tracer.SendRPC(out, p)
default:
log.Infof("dropping message to peer %s: queue full", p)
rs.tracer.DropRPC(out, p)
}
}
}
func (rs *RandomSubRouter) Join(topic string) {}
func (rs *RandomSubRouter) Join(topic string) {
rs.tracer.Join(topic)
}
func (rs *RandomSubRouter) Leave(topic string) {}
func (rs *RandomSubRouter) Leave(topic string) {
rs.tracer.Join(topic)
}
package pubsub
import (
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
)
// Generic event tracer interface
type EventTracer interface {
Trace(evt *pb.TraceEvent)
}
// pubsub tracer details
type pubsubTracer struct {
tracer EventTracer
pid peer.ID
}
func (t *pubsubTracer) PublishMessage(msg *Message) {
if t == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_PUBLISH_MESSAGE.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
PublishMessage: &pb.TraceEvent_PublishMessage{
MessageID: []byte(msgID(msg.Message)),
Topics: msg.Message.TopicIDs,
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) RejectMessage(msg *Message, reason string) {
if t == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_REJECT_MESSAGE.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
RejectMessage: &pb.TraceEvent_RejectMessage{
MessageID: []byte(msgID(msg.Message)),
ReceivedFrom: []byte(msg.ReceivedFrom),
Reason: &reason,
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) DuplicateMessage(msg *Message) {
if t == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_DUPLICATE_MESSAGE.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
DuplicateMessage: &pb.TraceEvent_DuplicateMessage{
MessageID: []byte(msgID(msg.Message)),
ReceivedFrom: []byte(msg.ReceivedFrom),
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) DeliverMessage(msg *Message) {
if t == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_DELIVER_MESSAGE.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
DeliverMessage: &pb.TraceEvent_DeliverMessage{
MessageID: []byte(msgID(msg.Message)),
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) {
if t == nil {
return
}
protoStr := string(proto)
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_ADD_PEER.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
AddPeer: &pb.TraceEvent_AddPeer{
PeerID: []byte(p),
Proto: &protoStr,
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) RemovePeer(p peer.ID) {
if t == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_REMOVE_PEER.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
RemovePeer: &pb.TraceEvent_RemovePeer{
PeerID: []byte(p),
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) RecvRPC(rpc *RPC) {
if t == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_RECV_RPC.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
RecvRPC: &pb.TraceEvent_RecvRPC{
ReceivedFrom: []byte(rpc.from),
Meta: traceRPCMeta(rpc),
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) SendRPC(rpc *RPC, p peer.ID) {
if t == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_SEND_RPC.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
SendRPC: &pb.TraceEvent_SendRPC{
SendTo: []byte(rpc.from),
Meta: traceRPCMeta(rpc),
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) DropRPC(rpc *RPC, p peer.ID) {
if t == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_DROP_RPC.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
DropRPC: &pb.TraceEvent_DropRPC{
SendTo: []byte(rpc.from),
Meta: traceRPCMeta(rpc),
},
}
t.tracer.Trace(evt)
}
func traceRPCMeta(rpc *RPC) *pb.TraceEvent_RPCMeta {
rpcMeta := new(pb.TraceEvent_RPCMeta)
var msgs []*pb.TraceEvent_MessageMeta
for _, m := range rpc.Publish {
msgs = append(msgs, &pb.TraceEvent_MessageMeta{
MessageID: []byte(msgID(m)),
Topics: m.TopicIDs,
})
}
rpcMeta.Messages = msgs
var subs []*pb.TraceEvent_SubMeta
for _, sub := range rpc.Subscriptions {
subs = append(subs, &pb.TraceEvent_SubMeta{
Subscribe: sub.Subscribe,
Topic: sub.Topicid,
})
}
rpcMeta.Subscription = subs
if rpc.Control != nil {
var ihave []*pb.TraceEvent_ControlIHaveMeta
for _, ctl := range rpc.Control.Ihave {
var mids [][]byte
for _, mid := range ctl.MessageIDs {
mids = append(mids, []byte(mid))
}
ihave = append(ihave, &pb.TraceEvent_ControlIHaveMeta{
Topic: ctl.TopicID,
MessageIDs: mids,
})
}
var iwant []*pb.TraceEvent_ControlIWantMeta
for _, ctl := range rpc.Control.Iwant {
var mids [][]byte
for _, mid := range ctl.MessageIDs {
mids = append(mids, []byte(mid))
}
iwant = append(iwant, &pb.TraceEvent_ControlIWantMeta{
MessageIDs: mids,
})
}
var graft []*pb.TraceEvent_ControlGraftMeta
for _, ctl := range rpc.Control.Graft {
graft = append(graft, &pb.TraceEvent_ControlGraftMeta{
Topic: ctl.TopicID,
})
}
var prune []*pb.TraceEvent_ControlPruneMeta
for _, ctl := range rpc.Control.Prune {
prune = append(prune, &pb.TraceEvent_ControlPruneMeta{
Topic: ctl.TopicID,
})
}
rpcMeta.Control = &pb.TraceEvent_ControlMeta{
Ihave: ihave,
Iwant: iwant,
Graft: graft,
Prune: prune,
}
}
return rpcMeta
}
func (t *pubsubTracer) Join(topic string) {
if t == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_JOIN.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
Join: &pb.TraceEvent_Join{
Topic: &topic,
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) Leave(topic string) {
if t == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_LEAVE.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
Leave: &pb.TraceEvent_Leave{
Topic: &topic,
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) Graft(p peer.ID, topic string) {
if t == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_GRAFT.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
Graft: &pb.TraceEvent_Graft{
PeerID: []byte(p),
Topic: &topic,
},
}
t.tracer.Trace(evt)
}
func (t *pubsubTracer) Prune(p peer.ID, topic string) {
if t == nil {
return
}
now := time.Now().UnixNano()
evt := &pb.TraceEvent{
Type: pb.TraceEvent_PRUNE.Enum(),
PeerID: []byte(t.pid),
Timestamp: &now,
Prune: &pb.TraceEvent_Prune{
PeerID: []byte(p),
Topic: &topic,
},
}
t.tracer.Trace(evt)
}
package pubsub
import (
"compress/gzip"
"context"
"encoding/json"
"io"
"os"
"sync"
"time"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
ggio "github.com/gogo/protobuf/io"
)
var TraceBufferSize = 1 << 16 // 64K ought to be enough for everyone; famous last words.
var MinTraceBatchSize = 16
type basicTracer struct {
ch chan struct{}
mx sync.Mutex
buf []*pb.TraceEvent
lossy bool
closed bool
}
func (t *basicTracer) Trace(evt *pb.TraceEvent) {
t.mx.Lock()
if t.closed {
t.mx.Unlock()
return
}
if t.lossy && len(t.buf) > TraceBufferSize {
log.Warningf("trace buffer overflow; dropping trace event")
} else {
t.buf = append(t.buf, evt)
}
t.mx.Unlock()
select {
case t.ch <- struct{}{}:
default:
}
}
func (t *basicTracer) Close() {
t.mx.Lock()
defer t.mx.Unlock()
if !t.closed {
t.closed = true
close(t.ch)
}
}
// JSONTracer is a tracer that writes events to a file, encoded in ndjson.
type JSONTracer struct {
basicTracer
w io.WriteCloser
}
// NewJsonTracer creates a new JSONTracer writing traces to file.
func NewJSONTracer(file string) (*JSONTracer, error) {
return OpenJSONTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
}
// OpenJSONTracer creates a new JSONTracer, with explicit control of OpenFile flags and permissions.
func OpenJSONTracer(file string, flags int, perm os.FileMode) (*JSONTracer, error) {
f, err := os.OpenFile(file, flags, perm)
if err != nil {
return nil, err
}
tr := &JSONTracer{w: f, basicTracer: basicTracer{ch: make(chan struct{}, 1)}}
go tr.doWrite()
return tr, nil
}
func (t *JSONTracer) doWrite() {
var buf []*pb.TraceEvent
enc := json.NewEncoder(t.w)
for {
_, ok := <-t.ch
t.mx.Lock()
tmp := t.buf
t.buf = buf[:0]
buf = tmp
t.mx.Unlock()
for i, evt := range buf {
err := enc.Encode(evt)
if err != nil {
log.Errorf("error writing event trace: %s", err.Error())
}
buf[i] = nil
}
if !ok {
t.w.Close()
return
}
}
}
var _ EventTracer = (*JSONTracer)(nil)
// PBTracer is a tracer that writes events to a file, as delimited protobufs.
type PBTracer struct {
basicTracer
w io.WriteCloser
}
func NewPBTracer(file string) (*PBTracer, error) {
return OpenPBTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
}
// OpenPBTracer creates a new PBTracer, with explicit control of OpenFile flags and permissions.
func OpenPBTracer(file string, flags int, perm os.FileMode) (*PBTracer, error) {
f, err := os.OpenFile(file, flags, perm)
if err != nil {
return nil, err
}
tr := &PBTracer{w: f, basicTracer: basicTracer{ch: make(chan struct{}, 1)}}
go tr.doWrite()
return tr, nil
}
func (t *PBTracer) doWrite() {
var buf []*pb.TraceEvent
w := ggio.NewDelimitedWriter(t.w)
for {
_, ok := <-t.ch
t.mx.Lock()
tmp := t.buf
t.buf = buf[:0]
buf = tmp
t.mx.Unlock()
for i, evt := range buf {
err := w.WriteMsg(evt)
if err != nil {
log.Errorf("error writing event trace: %s", err.Error())
}
buf[i] = nil
}
if !ok {
t.w.Close()
return
}
}
}
var _ EventTracer = (*PBTracer)(nil)
const RemoteTracerProtoID = protocol.ID("/libp2p/pubsub/tracer/1.0.0")
// RemoteTracer is a tracer that sends trace events to a remote peer
type RemoteTracer struct {
basicTracer
ctx context.Context
host host.Host
peer peer.ID
}
// NewRemoteTracer constructs a RemoteTracer, tracing to the peer identified by pi
func NewRemoteTracer(ctx context.Context, host host.Host, pi peer.AddrInfo) (*RemoteTracer, error) {
tr := &RemoteTracer{ctx: ctx, host: host, peer: pi.ID, basicTracer: basicTracer{ch: make(chan struct{}, 1), lossy: true}}
host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.PermanentAddrTTL)
go tr.doWrite()
return tr, nil
}
func (t *RemoteTracer) doWrite() {
var buf []*pb.TraceEvent
s, err := t.openStream()
if err != nil {
log.Errorf("error opening remote tracer stream: %s", err.Error())
return
}
var batch pb.TraceEventBatch
gzipW := gzip.NewWriter(s)
w := ggio.NewDelimitedWriter(gzipW)
for {
_, ok := <-t.ch
// deadline for batch accumulation
deadline := time.Now().Add(time.Second)
t.mx.Lock()
for len(t.buf) < MinTraceBatchSize && time.Now().Before(deadline) {
t.mx.Unlock()
time.Sleep(100 * time.Millisecond)
t.mx.Lock()
}
tmp := t.buf
t.buf = buf[:0]
buf = tmp
t.mx.Unlock()
if len(buf) == 0 {
goto end
}
batch.Batch = buf
err = w.WriteMsg(&batch)
if err != nil {
log.Errorf("error writing trace event batch: %s", err)
goto end
}
err = gzipW.Flush()
if err != nil {
log.Errorf("error flushin gzip stream: %s", err)
goto end
}
end:
// nil out the buffer to gc consumed events
for i := range buf {
buf[i] = nil
}
if !ok {
if err != nil {
s.Reset()
} else {
gzipW.Close()
helpers.FullClose(s)
}
return
}
if err != nil {
s.Reset()
s, err = t.openStream()
if err != nil {
log.Errorf("error opening remote tracer stream: %s", err.Error())
return
}
gzipW.Reset(s)
}
}
}
func (t *RemoteTracer) openStream() (network.Stream, error) {
for {
ctx, cancel := context.WithTimeout(t.ctx, time.Minute)
s, err := t.host.NewStream(ctx, t.peer, RemoteTracerProtoID)
cancel()
if err != nil {
if t.ctx.Err() != nil {
return nil, err
}
// wait a minute and try again, to account for transient server downtime
select {
case <-time.After(time.Minute):
continue
case <-t.ctx.Done():
return nil, t.ctx.Err()
}
}
return s, nil
}
}
var _ EventTracer = (*RemoteTracer)(nil)
......@@ -31,6 +31,8 @@ type ValidatorOpt func(addVal *addValReq) error
type validation struct {
p *PubSub
tracer *pubsubTracer
// topicVals tracks per topic validators
topicVals map[string]*topicVal
......@@ -90,6 +92,7 @@ func newValidation() *validation {
// workers
func (v *validation) Start(p *PubSub) {
v.p = p
v.tracer = p.tracer
for i := 0; i < v.validateWorkers; i++ {
go v.validateWorker()
}
......@@ -148,6 +151,7 @@ func (v *validation) Push(src peer.ID, msg *Message) bool {
case v.validateQ <- &validateReq{vals, src, msg}:
default:
log.Warningf("message validation throttled; dropping message from %s", src)
v.tracer.RejectMessage(msg, "validation throttled")
}
return false
}
......@@ -190,6 +194,7 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
if msg.Signature != nil {
if !v.validateSignature(msg) {
log.Warningf("message signature validation failed; dropping message from %s", src)
v.tracer.RejectMessage(msg, "invalid signature")
return
}
}
......@@ -198,6 +203,7 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
// and avoid invoking user validators more than once
id := msgID(msg.Message)
if !v.p.markSeen(id) {
v.tracer.DuplicateMessage(msg)
return
}
......@@ -214,6 +220,7 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
for _, val := range inline {
if !val.validateMsg(v.p.ctx, src, msg) {
log.Debugf("message validation failed; dropping message from %s", src)
v.tracer.RejectMessage(msg, "validation failed")
return
}
}
......@@ -228,6 +235,7 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message) {
}()
default:
log.Warningf("message validation throttled; dropping message from %s", src)
v.tracer.RejectMessage(msg, "validation throttled")
}
return
}
......@@ -249,6 +257,7 @@ func (v *validation) validateSignature(msg *Message) bool {
func (v *validation) doValidateTopic(vals []*topicVal, src peer.ID, msg *Message) {
if !v.validateTopic(vals, src, msg) {
log.Warningf("message validation failed; dropping message from %s", src)
v.tracer.RejectMessage(msg, "validation failed")
return
}
......@@ -286,6 +295,7 @@ loop:
}
if throttle {
v.tracer.RejectMessage(msg, "validation throttled")
return false
}
......@@ -310,6 +320,7 @@ func (v *validation) validateSingleTopic(val *topicVal, src peer.ID, msg *Messag
default:
log.Debugf("validation throttled for topic %s", val.topic)
v.tracer.RejectMessage(msg, "validation throttled")
return false
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment