Commit 28d17a4c authored by Yusef Napora's avatar Yusef Napora Committed by vyzo

add tagTracer to apply connmgr tags

parent 44828773
......@@ -41,12 +41,16 @@ func checkMessageRouting(t *testing.T, topic string, pubs []*PubSub, subs []*Sub
}
}
func getNetHosts(t *testing.T, ctx context.Context, n int) []host.Host {
func getNetHosts(t *testing.T, ctx context.Context, n int, options ...func() bhost.Option) []host.Host {
var out []host.Host
for i := 0; i < n; i++ {
netw := swarmt.GenSwarm(t, ctx)
h := bhost.NewBlankHost(netw)
opts := make([]bhost.Option, len(options))
for i, optFn := range options {
opts[i] = optFn()
}
h := bhost.NewBlankHost(netw, opts...)
out = append(out, h)
}
......
......@@ -124,7 +124,12 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
opportunisticGraftTicks: GossipSubOpportunisticGraftTicks,
fanoutTTL: GossipSubFanoutTTL,
tagTracer: newTagTracer(h.ConnManager()),
}
// use the withInternalTracer option to hook up the tag tracer
opts = append(opts, withInternalTracer(rt.tagTracer))
return NewPubSub(ctx, h, rt, opts...)
}
......@@ -224,6 +229,10 @@ func WithDirectPeers(pis []peer.AddrInfo) Option {
gs.direct = direct
if gs.tagTracer != nil {
gs.tagTracer.direct = direct
}
return nil
}
}
......@@ -254,6 +263,7 @@ type GossipSubRouter struct {
tracer *pubsubTracer
score *peerScore
gossipTracer *gossipTracer
tagTracer *tagTracer
// whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted
// nodes.
......@@ -347,12 +357,6 @@ func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
gs.tracer.AddPeer(p, proto)
gs.peers[p] = proto
// tag peer if it is a direct peer
_, direct := gs.direct[p]
if direct {
gs.p.host.ConnManager().TagPeer(p, "pubsub:direct", 1000)
}
// track the connection direction
outbound := false
conns := gs.p.host.Network().ConnsToPeer(p)
......@@ -621,7 +625,6 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
log.Debugf("GRAFT: add mesh link from %s in %s", p, topic)
gs.tracer.Graft(p, topic)
peers[p] = struct{}{}
gs.tagPeer(p, topic)
}
if len(prune) == 0 {
......@@ -649,7 +652,6 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
delete(peers, p)
gs.untagPeer(p, topic)
// is there a backoff specified by the peer? if so obey it.
backoff := prune.GetBackoff()
if backoff > 0 {
......@@ -889,7 +891,6 @@ func (gs *GossipSubRouter) Join(topic string) {
log.Debugf("JOIN: Add mesh link to %s in %s", p, topic)
gs.tracer.Graft(p, topic)
gs.sendGraft(p, topic)
gs.tagPeer(p, topic)
}
}
......@@ -908,7 +909,6 @@ func (gs *GossipSubRouter) Leave(topic string) {
log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
gs.sendPrune(p, topic)
gs.untagPeer(p, topic)
}
}
......@@ -1168,7 +1168,6 @@ func (gs *GossipSubRouter) heartbeat() {
prunePeer := func(p peer.ID) {
gs.tracer.Prune(p, topic)
delete(peers, p)
gs.untagPeer(p, topic)
gs.addBackoff(p, topic)
topics := toprune[p]
toprune[p] = append(topics, topic)
......@@ -1178,7 +1177,6 @@ func (gs *GossipSubRouter) heartbeat() {
log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic)
gs.tracer.Graft(p, topic)
peers[p] = struct{}{}
gs.tagPeer(p, topic)
topics := tograft[p]
tograft[p] = append(topics, topic)
}
......@@ -1672,20 +1670,6 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID
return peers
}
func (gs *GossipSubRouter) tagPeer(p peer.ID, topic string) {
tag := topicTag(topic)
gs.p.host.ConnManager().TagPeer(p, tag, 20)
}
func (gs *GossipSubRouter) untagPeer(p peer.ID, topic string) {
tag := topicTag(topic)
gs.p.host.ConnManager().UntagPeer(p, tag)
}
func topicTag(topic string) string {
return fmt.Sprintf("pubsub:%s", topic)
}
func peerListToMap(peers []peer.ID) map[peer.ID]struct{} {
pmap := make(map[peer.ID]struct{})
for _, p := range peers {
......
package pubsub
import (
"context"
"testing"
"time"
bhost "github.com/libp2p/go-libp2p-blankhost"
connmgr "github.com/libp2p/go-libp2p-connmgr"
"github.com/libp2p/go-libp2p-core/peer"
)
// This file has tests for gossipsub's interaction with the libp2p connection manager.
// We tag connections for three reasons:
//
// - direct peers get a `pubsub:direct` tag with a value of GossipSubConnTagValueDirectPeer
// - mesh members get a `pubsub:$topic` tag with a value of GossipSubConnTagValueMeshPeer (default 20)
// - applies for each topic they're a mesh member of
// - anyone who delivers a message first gets a bump to a decaying `pubsub:deliveries:$topic` tag
func TestGossipsubConnTagDirectPeers(t *testing.T) {
// test that direct peers get tagged with GossipSubConnTagValueDirectPeer
t.Skip("coming soon")
}
func TestGossipsubConnTagMeshPeers(t *testing.T) {
// test that mesh peers get tagged with GossipSubConnTagValueMeshPeer
t.Skip("coming soon")
}
func TestGossipsubConnTagMessageDeliveries(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// set the gossipsub D parameters low, so that not all
// test peers will be in a mesh together
oldGossipSubD := GossipSubD
oldGossipSubDHi := GossipSubDhi
GossipSubD = 4
GossipSubDhi = 4
defer func() {
GossipSubD = oldGossipSubD
GossipSubDhi = oldGossipSubDHi
}()
decayCfg := connmgr.DecayerCfg{
Resolution: time.Second,
}
hosts := getNetHosts(t, ctx, 20, func() bhost.Option {
return bhost.WithConnectionManager(
connmgr.NewConnManager(1, 30, 10*time.Millisecond,
connmgr.DecayerConfig(&decayCfg)))
})
// use flood publishing, so non-mesh peers will still be delivering messages
// to everyone
psubs := getGossipsubs(ctx, hosts,
WithFloodPublish(true))
denseConnect(t, hosts)
// subscribe everyone to the topic
var msgs []*Subscription
for _, ps := range psubs {
subch, err := ps.Subscribe("foobar")
if err != nil {
t.Fatal(err)
}
msgs = append(msgs, subch)
}
// wait a few heartbeats for meshes to form
time.Sleep(2 * time.Second)
// have all the hosts publish messages
}
func getMeshPeers(ps PubSub, topic string) []peer.ID {
gs := ps.rt.(*GossipSubRouter)
peerCh := make(chan peer.ID)
ps.eval <- func() {
peers := gs.mesh[topic]
for pid, _ := range peers {
peerCh <- pid
}
close(peerCh)
}
var out []peer.ID
for pid := range peerCh {
out = append(out, pid)
}
return out
}
......@@ -383,6 +383,18 @@ func WithEventTracer(tracer EventTracer) Option {
}
}
// withInternalTracer adds an internal event tracer to the pubsub system
func withInternalTracer(tracer internalTracer) Option {
return func(p *PubSub) error {
if p.tracer != nil {
p.tracer.internal = append(p.tracer.internal, tracer)
} else {
p.tracer = &pubsubTracer{internal: []internalTracer{tracer}, pid: p.host.ID(), msgID: p.msgID}
}
return nil
}
}
// WithMaxMessageSize sets the global maximum message size for pubsub wire
// messages. The default value is 1MiB (DefaultMaxMessageSize).
//
......
package pubsub
import (
"fmt"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
var (
// GossipSubConnTagValueDirectPeer is the connection manager tag value to
// apply to direct peers. This should be high, as we want to prioritize these
// connections above all others.
GossipSubConnTagValueDirectPeer = 1000
// GossipSubConnTagValueMeshPeer is the connection manager tag value to apply to
// peers in a topic mesh. If a peer is in the mesh for multiple topics, their
// connection will be tagged separately for each.
GossipSubConnTagValueMeshPeer = 20
// GossipSubConnTagBumpMessageDelivery is the amount to add to the connection manager
// tag that tracks message deliveries. Each time a peer is the first to deliver a
// message within a topic, we "bump" a tag by this amount, up to a maximum
// of GossipSubConnTagMessageDeliveryCap.
// Note that the delivery tags decay over time, decreasing by GossipSubConnTagDecayAmount
// at every GossipSubConnTagDecayInterval.
GossipSubConnTagBumpMessageDelivery = 1
// GossipSubConnTagDecayInterval is the decay interval for decaying connection manager tags.
GossipSubConnTagDecayInterval = time.Minute
// GossipSubConnTagDecayAmount is subtracted from decaying tag values at each decay interval.
GossipSubConnTagDecayAmount = 1
// GossipSubConnTagMessageDeliveryCap is the maximum value for the connection manager tags that
// track message deliveries.
GossipSubConnTagMessageDeliveryCap = 15
)
// tagTracer is an internal tracer that applies connection manager tags to peer
// connections based on their behavior.
//
// We tag a peer's connections for the following reasons:
// - Directly connected peers are tagged with GossipSubConnTagValueDirectPeer (default 1000).
// - Mesh peers are tagged with a value of GossipSubConnTagValueMeshPeer (default 20).
// If a peer is in multiple topic meshes, they'll be tagged for each.
// - For each message that we receive, we bump a delivery tag for peer that delivered the message
// first.
// The delivery tags have a maximum value, GossipSubConnTagMessageDeliveryCap, and they decay at
// a rate of GossipSubConnTagDecayAmount / GossipSubConnTagDecayInterval (default 1/minute).
// With the defaults, a peer who stops delivering messages will have their delivery tag decay to zero
// in fifteen minutes.
type tagTracer struct {
sync.Mutex
cmgr connmgr.ConnManager
decayer connmgr.Decayer
decaying map[string]connmgr.DecayingTag
direct map[peer.ID]struct{}
}
func newTagTracer(cmgr connmgr.ConnManager) *tagTracer {
decayer, ok := connmgr.SupportsDecay(cmgr)
if !ok {
log.Warnf("connection manager does not support decaying tags, delivery tags will not be applied")
}
return &tagTracer{
cmgr: cmgr,
decayer: decayer,
decaying: make(map[string]connmgr.DecayingTag),
}
}
func (t *tagTracer) tagPeerIfDirect(p peer.ID) {
if t.direct == nil {
return
}
// tag peer if it is a direct peer
_, direct := t.direct[p]
if direct {
t.cmgr.TagPeer(p, "pubsub:direct", GossipSubConnTagValueDirectPeer)
}
}
func (t *tagTracer) tagMeshPeer(p peer.ID, topic string) {
tag := topicTag(topic)
t.cmgr.TagPeer(p, tag, GossipSubConnTagValueMeshPeer)
}
func (t *tagTracer) untagMeshPeer(p peer.ID, topic string) {
tag := topicTag(topic)
t.cmgr.UntagPeer(p, tag)
}
func topicTag(topic string) string {
return fmt.Sprintf("pubsub:%s", topic)
}
func (t *tagTracer) addDeliveryTag(topic string) {
if t.decayer == nil {
return
}
t.Lock()
defer t.Unlock()
tag, err := t.decayingDeliveryTag(topic)
if err != nil {
log.Warnf("unable to create decaying delivery tag: %s", err)
return
}
t.decaying[topic] = tag
}
func (t *tagTracer) removeDeliveryTag(topic string) {
t.Lock()
defer t.Unlock()
delete(t.decaying, topic)
}
func (t *tagTracer) decayingDeliveryTag(topic string) (connmgr.DecayingTag, error) {
name := fmt.Sprintf("pubsub-deliveries:%s", topic)
// decrement tag value by GossipSubConnTagDecayAmount at each decay interval
decayFn := func(value connmgr.DecayingValue) (after int, rm bool) {
v := value.Value - GossipSubConnTagDecayAmount
return v, v <= 0
}
// bump up to max of GossipSubConnTagMessageDeliveryCap
bumpFn := func(value connmgr.DecayingValue, delta int) (after int) {
val := value.Value + delta
if val > GossipSubConnTagMessageDeliveryCap {
return GossipSubConnTagMessageDeliveryCap
}
return val
}
return t.decayer.RegisterDecayingTag(name, GossipSubConnTagDecayInterval, decayFn, bumpFn)
}
func (t *tagTracer) bumpDeliveryTag(p peer.ID, topic string) error {
tag, ok := t.decaying[topic]
if !ok {
return fmt.Errorf("no decaying tag registered for topic %s", topic)
}
return tag.Bump(p, GossipSubConnTagBumpMessageDelivery)
}
func (t *tagTracer) bumpTagsForMessage(p peer.ID, msg *Message) {
for _, topic := range msg.TopicIDs {
err := t.bumpDeliveryTag(p, topic)
if err != nil {
log.Warnf("error bumping delivery tag: %s", err)
}
}
}
// -- internalTracer interface methods
var _ internalTracer = (*tagTracer)(nil)
func (t *tagTracer) AddPeer(p peer.ID, proto protocol.ID) {
t.tagPeerIfDirect(p)
}
func (t *tagTracer) Join(topic string) {
t.addDeliveryTag(topic)
}
func (t *tagTracer) DeliverMessage(msg *Message) {
// TODO: also give a bump to "near-first" message deliveries
t.bumpTagsForMessage(msg.ReceivedFrom, msg)
}
func (t *tagTracer) Leave(topic string) {
t.removeDeliveryTag(topic)
}
func (t *tagTracer) Graft(p peer.ID, topic string) {
t.tagMeshPeer(p, topic)
}
func (t *tagTracer) Prune(p peer.ID, topic string) {
t.untagMeshPeer(p, topic)
}
func (t *tagTracer) RemovePeer(peer.ID) {}
func (t *tagTracer) ValidateMessage(*Message) {}
func (t *tagTracer) RejectMessage(*Message, string) {}
func (t *tagTracer) DuplicateMessage(*Message) {}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment