Unverified Commit 88373cd4 authored by dirkmc's avatar dirkmc Committed by GitHub

Total wants gauge (#402)

* feat: total wants gauge

* fix: in gauges count wants regardless of which peers they're sent to

* fix: want block gauge calculation

* refactor: simplify peermanagerwants
parent 07300422
......@@ -52,9 +52,10 @@ type PeerManager struct {
// New creates a new PeerManager, given a context and a peerQueueFactory.
func New(ctx context.Context, createPeerQueue PeerQueueFactory, self peer.ID) *PeerManager {
wantGauge := metrics.NewCtx(ctx, "wantlist_total", "Number of items in wantlist.").Gauge()
wantBlockGauge := metrics.NewCtx(ctx, "want_blocks_total", "Number of want-blocks in wantlist.").Gauge()
return &PeerManager{
peerQueues: make(map[peer.ID]PeerQueue),
pwm: newPeerWantManager(wantGauge),
pwm: newPeerWantManager(wantGauge, wantBlockGauge),
createPeerQueue: createPeerQueue,
ctx: ctx,
self: self,
......
......@@ -30,6 +30,8 @@ type peerWantManager struct {
// broadcastWants tracks all the current broadcast wants.
broadcastWants *cid.Set
// Keeps track of the number of active want-haves & want-blocks
wantGauge Gauge
// Keeps track of the number of active want-blocks
wantBlockGauge Gauge
}
......@@ -42,11 +44,12 @@ type peerWant struct {
// New creates a new peerWantManager with a Gauge that keeps track of the
// number of active want-blocks (ie sent but no response received)
func newPeerWantManager(wantBlockGauge Gauge) *peerWantManager {
func newPeerWantManager(wantGauge Gauge, wantBlockGauge Gauge) *peerWantManager {
return &peerWantManager{
broadcastWants: cid.NewSet(),
peerWants: make(map[peer.ID]*peerWant),
wantPeers: make(map[cid.Cid]map[peer.ID]struct{}),
wantGauge: wantGauge,
wantBlockGauge: wantBlockGauge,
}
}
......@@ -78,17 +81,30 @@ func (pwm *peerWantManager) removePeer(p peer.ID) {
return
}
// Clean up want-blocks
_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
// Decrement the gauge by the number of pending want-blocks to the peer
pwm.wantBlockGauge.Dec()
// Clean up want-blocks from the reverse index
pwm.reverseIndexRemove(c, p)
removedLastPeer := pwm.reverseIndexRemove(c, p)
// Decrement the gauges by the number of pending want-blocks to the peer
if removedLastPeer {
pwm.wantBlockGauge.Dec()
if !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Dec()
}
}
return nil
})
// Clean up want-haves from the reverse index
// Clean up want-haves
_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
pwm.reverseIndexRemove(c, p)
// Clean up want-haves from the reverse index
removedLastPeer := pwm.reverseIndexRemove(c, p)
// Decrement the gauge by the number of pending want-haves to the peer
if removedLastPeer && !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Dec()
}
return nil
})
......@@ -105,6 +121,11 @@ func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) {
}
pwm.broadcastWants.Add(c)
unsent = append(unsent, c)
// Increment the total wants gauge
if _, ok := pwm.wantPeers[c]; !ok {
pwm.wantGauge.Inc()
}
}
if len(unsent) == 0 {
......@@ -151,17 +172,22 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
// Record that the CID was sent as a want-block
pws.wantBlocks.Add(c)
// Update the reverse index
pwm.reverseIndexAdd(c, p)
// Add the CID to the results
fltWantBlks = append(fltWantBlks, c)
// Make sure the CID is no longer recorded as a want-have
pws.wantHaves.Remove(c)
// Increment the count of want-blocks
pwm.wantBlockGauge.Inc()
// Update the reverse index
isNew := pwm.reverseIndexAdd(c, p)
// Increment the want gauges
if isNew {
pwm.wantBlockGauge.Inc()
if !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Inc()
}
}
}
}
......@@ -178,11 +204,16 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
// Record that the CID was sent as a want-have
pws.wantHaves.Add(c)
// Update the reverse index
pwm.reverseIndexAdd(c, p)
// Add the CID to the results
fltWantHvs = append(fltWantHvs, c)
// Update the reverse index
isNew := pwm.reverseIndexAdd(c, p)
// Increment the total wants gauge
if isNew && !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Inc()
}
}
}
......@@ -207,6 +238,9 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
}
}
cancelledWantBlocks := cid.NewSet()
cancelledWantHaves := cid.NewSet()
// Send cancels to a particular peer
send := func(p peer.ID, pws *peerWant) {
// Start from the broadcast cancels
......@@ -216,13 +250,15 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
for _, c := range cancelKs {
// Check if a want was sent for the key
wantBlock := pws.wantBlocks.Has(c)
if !wantBlock && !pws.wantHaves.Has(c) {
continue
}
wantHave := pws.wantHaves.Has(c)
// Update the want gauge.
// Update the want gauges
if wantBlock {
pwm.wantBlockGauge.Dec()
cancelledWantBlocks.Add(c)
} else if wantHave {
cancelledWantHaves.Add(c)
} else {
continue
}
// Unconditionally remove from the want lists.
......@@ -271,33 +307,54 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
// Remove cancelled broadcast wants
for _, c := range broadcastCancels {
pwm.broadcastWants.Remove(c)
// Decrement the total wants gauge for broadcast wants
if !cancelledWantHaves.Has(c) && !cancelledWantBlocks.Has(c) {
pwm.wantGauge.Dec()
}
}
// Decrement the total wants gauge for peer wants
_ = cancelledWantHaves.ForEach(func(c cid.Cid) error {
pwm.wantGauge.Dec()
return nil
})
_ = cancelledWantBlocks.ForEach(func(c cid.Cid) error {
pwm.wantGauge.Dec()
pwm.wantBlockGauge.Dec()
return nil
})
// Finally, batch-remove the reverse-index. There's no need to
// clear this index peer-by-peer.
for _, c := range cancelKs {
delete(pwm.wantPeers, c)
}
}
// Add the peer to the list of peers that have sent a want with the cid
func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) {
func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) bool {
peers, ok := pwm.wantPeers[c]
if !ok {
peers = make(map[peer.ID]struct{}, 10)
pwm.wantPeers[c] = peers
}
peers[p] = struct{}{}
return !ok
}
// Remove the peer from the list of peers that have sent a want with the cid
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) {
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) bool {
if peers, ok := pwm.wantPeers[c]; ok {
delete(peers, p)
if len(peers) == 0 {
delete(pwm.wantPeers, c)
return true
}
}
return false
}
// GetWantBlocks returns the set of all want-blocks sent to all peers
......
......@@ -56,7 +56,7 @@ func clearSent(pqs map[peer.ID]PeerQueue) {
}
func TestEmpty(t *testing.T) {
pwm := newPeerWantManager(&gauge{})
pwm := newPeerWantManager(&gauge{}, &gauge{})
if len(pwm.getWantBlocks()) > 0 {
t.Fatal("Expected GetWantBlocks() to have length 0")
......@@ -67,7 +67,7 @@ func TestEmpty(t *testing.T) {
}
func TestPWMBroadcastWantHaves(t *testing.T) {
pwm := newPeerWantManager(&gauge{})
pwm := newPeerWantManager(&gauge{}, &gauge{})
peers := testutil.GeneratePeers(3)
cids := testutil.GenerateCids(2)
......@@ -179,7 +179,7 @@ func TestPWMBroadcastWantHaves(t *testing.T) {
}
func TestPWMSendWants(t *testing.T) {
pwm := newPeerWantManager(&gauge{})
pwm := newPeerWantManager(&gauge{}, &gauge{})
peers := testutil.GeneratePeers(2)
p0 := peers[0]
......@@ -259,7 +259,7 @@ func TestPWMSendWants(t *testing.T) {
}
func TestPWMSendCancels(t *testing.T) {
pwm := newPeerWantManager(&gauge{})
pwm := newPeerWantManager(&gauge{}, &gauge{})
peers := testutil.GeneratePeers(2)
p0 := peers[0]
......@@ -338,10 +338,12 @@ func TestPWMSendCancels(t *testing.T) {
func TestStats(t *testing.T) {
g := &gauge{}
pwm := newPeerWantManager(g)
wbg := &gauge{}
pwm := newPeerWantManager(g, wbg)
peers := testutil.GeneratePeers(2)
p0 := peers[0]
p1 := peers[1]
cids := testutil.GenerateCids(2)
cids2 := testutil.GenerateCids(2)
......@@ -353,7 +355,10 @@ func TestStats(t *testing.T) {
// Send 2 want-blocks and 2 want-haves to p0
pwm.sendWants(p0, cids, cids2)
if g.count != 2 {
if g.count != 4 {
t.Fatal("Expected 4 wants")
}
if wbg.count != 2 {
t.Fatal("Expected 2 want-blocks")
}
......@@ -361,22 +366,73 @@ func TestStats(t *testing.T) {
cids3 := testutil.GenerateCids(2)
pwm.sendWants(p0, append(cids3, cids[0]), []cid.Cid{})
if g.count != 4 {
if g.count != 6 {
t.Fatal("Expected 6 wants")
}
if wbg.count != 4 {
t.Fatal("Expected 4 want-blocks")
}
// Broadcast 1 old want-have and 2 new want-haves
cids4 := testutil.GenerateCids(2)
pwm.broadcastWantHaves(append(cids4, cids2[0]))
if g.count != 8 {
t.Fatal("Expected 8 wants")
}
if wbg.count != 4 {
t.Fatal("Expected 4 want-blocks")
}
// Add a second peer
pwm.addPeer(pq, p1)
if g.count != 8 {
t.Fatal("Expected 8 wants")
}
if wbg.count != 4 {
t.Fatal("Expected 4 want-blocks")
}
// Cancel 1 want-block that was sent to p0
// and 1 want-block that was not sent
cids4 := testutil.GenerateCids(1)
pwm.sendCancels(append(cids4, cids[0]))
cids5 := testutil.GenerateCids(1)
pwm.sendCancels(append(cids5, cids[0]))
if g.count != 3 {
t.Fatal("Expected 3 want-blocks", g.count)
if g.count != 7 {
t.Fatal("Expected 7 wants")
}
if wbg.count != 3 {
t.Fatal("Expected 3 want-blocks")
}
// Remove first peer
pwm.removePeer(p0)
if g.count != 0 {
t.Fatal("Expected all want-blocks to be removed with peer", g.count)
// Should still have 3 broadcast wants
if g.count != 3 {
t.Fatal("Expected 3 wants")
}
if wbg.count != 0 {
t.Fatal("Expected all want-blocks to be removed")
}
// Remove second peer
pwm.removePeer(p1)
// Should still have 3 broadcast wants
if g.count != 3 {
t.Fatal("Expected 3 wants")
}
if wbg.count != 0 {
t.Fatal("Expected 0 want-blocks")
}
// Cancel one remaining broadcast want-have
pwm.sendCancels(cids2[:1])
if g.count != 2 {
t.Fatal("Expected 2 wants")
}
if wbg.count != 0 {
t.Fatal("Expected 0 want-blocks")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment