Commit cce1f8a1 authored by vyzo's avatar vyzo

extended peer score inspection

parent ae55bf96
......@@ -78,6 +78,7 @@ type peerScore struct {
// debugging inspection
inspect PeerScoreInspectFn
inspectEx ExtendedPeerScoreInspectFn
inspectPeriod time.Duration
}
......@@ -114,12 +115,33 @@ const (
)
type PeerScoreInspectFn func(map[peer.ID]float64)
type ExtendedPeerScoreInspectFn func(map[peer.ID]*PeerScoreSnapshot)
type PeerScoreSnapshot struct {
Score float64
Topics map[string]*TopicScoreSnapshot
AppSpecificScore float64
IPColocationFactor int
BehaviourPenalty float64
}
type TopicScoreSnapshot struct {
TimeInMesh time.Duration
FirstMessageDeliveries float64
MeshMessageDeliveries float64
InvalidMessageDeliveries float64
}
// WithPeerScoreInspect is a gossipsub router option that enables peer score debugging.
// When this option is enabled, the supplied function will be invoked periodically to allow
// the application to inspec or dump the scores for connected peers.
// the application to inspect or dump the scores for connected peers.
// The supplied function can have one of two signatures:
// - PeerScoreInspectFn, which takes a map of peer IDs to score.
// - ExtendedPeerScoreInspectFn, which takes a map of peer IDs to
// PeerScoreSnapshots and allows inspection of individual score
// components for debugging peer scoring.
// This option must be passed _after_ the WithPeerScore option.
func WithPeerScoreInspect(inspect PeerScoreInspectFn, period time.Duration) Option {
func WithPeerScoreInspect(inspect interface{}, period time.Duration) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
if !ok {
......@@ -130,7 +152,19 @@ func WithPeerScoreInspect(inspect PeerScoreInspectFn, period time.Duration) Opti
return fmt.Errorf("peer scoring is not enabled")
}
gs.score.inspect = inspect
switch i := inspect.(type) {
case PeerScoreInspectFn:
gs.score.inspect = i
case func(map[peer.ID]float64):
gs.score.inspect = PeerScoreInspectFn(i)
case ExtendedPeerScoreInspectFn:
gs.score.inspectEx = i
case func(map[peer.ID]*PeerScoreSnapshot):
gs.score.inspectEx = ExtendedPeerScoreInspectFn(i)
default:
return fmt.Errorf("unknown peer score insector type: %v", inspect)
}
gs.score.inspectPeriod = period
return nil
......@@ -290,7 +324,7 @@ func (ps *peerScore) background(ctx context.Context) {
defer gcDeliveryRecords.Stop()
var inspectScores <-chan time.Time
if ps.inspect != nil {
if ps.inspect != nil || ps.inspectEx != nil {
ticker := time.NewTicker(ps.inspectPeriod)
defer ticker.Stop()
// also dump at exit for one final sample
......@@ -320,6 +354,14 @@ func (ps *peerScore) background(ctx context.Context) {
// inspectScores dumps all tracked scores into the inspect function.
func (ps *peerScore) inspectScores() {
if ps.inspect != nil {
ps.inspectScoresSimple()
} else if ps.inspectEx != nil {
ps.inspectScoresExtended()
}
}
func (ps *peerScore) inspectScoresSimple() {
ps.Lock()
scores := make(map[peer.ID]float64, len(ps.peerStats))
for p := range ps.peerStats {
......@@ -334,6 +376,45 @@ func (ps *peerScore) inspectScores() {
go ps.inspect(scores)
}
func (ps *peerScore) inspectScoresExtended() {
ps.Lock()
scores := make(map[peer.ID]*PeerScoreSnapshot, len(ps.peerStats))
for p, pstats := range ps.peerStats {
pss := new(PeerScoreSnapshot)
pss.Score = ps.score(p)
if len(pstats.topics) > 0 {
pss.Topics = make(map[string]*TopicScoreSnapshot, len(pstats.topics))
for t, ts := range pstats.topics {
pss.Topics[t] = &TopicScoreSnapshot{
FirstMessageDeliveries: ts.firstMessageDeliveries,
MeshMessageDeliveries: ts.meshMessageDeliveries,
InvalidMessageDeliveries: ts.invalidMessageDeliveries,
}
if ts.inMesh {
pss.Topics[t].TimeInMesh = ts.meshTime
}
}
}
pss.AppSpecificScore = ps.params.AppSpecificScore(p)
for _, ip := range pstats.ips {
_, whitelisted := ps.params.IPColocationFactorWhitelist[ip]
if whitelisted {
continue
}
peersInIP := len(ps.peerIPs[ip])
if peersInIP > ps.params.IPColocationFactorThreshold {
surpluss := peersInIP - ps.params.IPColocationFactorThreshold
pss.IPColocationFactor += surpluss
}
}
pss.BehaviourPenalty = pstats.behaviourPenalty
scores[p] = pss
}
ps.Unlock()
go ps.inspectEx(scores)
}
// refreshScores decays scores, and purges score records for disconnected peers,
// once their expiry has elapsed.
func (ps *peerScore) refreshScores() {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment