From 88373cd4d30a9e66256ce0fd9d5a7309703f3273 Mon Sep 17 00:00:00 2001 From: dirkmc Date: Tue, 2 Jun 2020 11:07:42 -0400 Subject: [PATCH] Total wants gauge (#402) * feat: total wants gauge * fix: in gauges count wants regardless of which peers they're sent to * fix: want block gauge calculation * refactor: simplify peermanagerwants --- internal/peermanager/peermanager.go | 3 +- internal/peermanager/peerwantmanager.go | 99 +++++++++++++++----- internal/peermanager/peerwantmanager_test.go | 82 +++++++++++++--- 3 files changed, 149 insertions(+), 35 deletions(-) diff --git a/internal/peermanager/peermanager.go b/internal/peermanager/peermanager.go index 0ce7358..4c489dd 100644 --- a/internal/peermanager/peermanager.go +++ b/internal/peermanager/peermanager.go @@ -52,9 +52,10 @@ type PeerManager struct { // New creates a new PeerManager, given a context and a peerQueueFactory. func New(ctx context.Context, createPeerQueue PeerQueueFactory, self peer.ID) *PeerManager { wantGauge := metrics.NewCtx(ctx, "wantlist_total", "Number of items in wantlist.").Gauge() + wantBlockGauge := metrics.NewCtx(ctx, "want_blocks_total", "Number of want-blocks in wantlist.").Gauge() return &PeerManager{ peerQueues: make(map[peer.ID]PeerQueue), - pwm: newPeerWantManager(wantGauge), + pwm: newPeerWantManager(wantGauge, wantBlockGauge), createPeerQueue: createPeerQueue, ctx: ctx, self: self, diff --git a/internal/peermanager/peerwantmanager.go b/internal/peermanager/peerwantmanager.go index 16d1913..ee81649 100644 --- a/internal/peermanager/peerwantmanager.go +++ b/internal/peermanager/peerwantmanager.go @@ -30,6 +30,8 @@ type peerWantManager struct { // broadcastWants tracks all the current broadcast wants. broadcastWants *cid.Set + // Keeps track of the number of active want-haves & want-blocks + wantGauge Gauge // Keeps track of the number of active want-blocks wantBlockGauge Gauge } @@ -42,11 +44,12 @@ type peerWant struct { // New creates a new peerWantManager with a Gauge that keeps track of the // number of active want-blocks (ie sent but no response received) -func newPeerWantManager(wantBlockGauge Gauge) *peerWantManager { +func newPeerWantManager(wantGauge Gauge, wantBlockGauge Gauge) *peerWantManager { return &peerWantManager{ broadcastWants: cid.NewSet(), peerWants: make(map[peer.ID]*peerWant), wantPeers: make(map[cid.Cid]map[peer.ID]struct{}), + wantGauge: wantGauge, wantBlockGauge: wantBlockGauge, } } @@ -78,17 +81,30 @@ func (pwm *peerWantManager) removePeer(p peer.ID) { return } + // Clean up want-blocks _ = pws.wantBlocks.ForEach(func(c cid.Cid) error { - // Decrement the gauge by the number of pending want-blocks to the peer - pwm.wantBlockGauge.Dec() // Clean up want-blocks from the reverse index - pwm.reverseIndexRemove(c, p) + removedLastPeer := pwm.reverseIndexRemove(c, p) + + // Decrement the gauges by the number of pending want-blocks to the peer + if removedLastPeer { + pwm.wantBlockGauge.Dec() + if !pwm.broadcastWants.Has(c) { + pwm.wantGauge.Dec() + } + } return nil }) - // Clean up want-haves from the reverse index + // Clean up want-haves _ = pws.wantHaves.ForEach(func(c cid.Cid) error { - pwm.reverseIndexRemove(c, p) + // Clean up want-haves from the reverse index + removedLastPeer := pwm.reverseIndexRemove(c, p) + + // Decrement the gauge by the number of pending want-haves to the peer + if removedLastPeer && !pwm.broadcastWants.Has(c) { + pwm.wantGauge.Dec() + } return nil }) @@ -105,6 +121,11 @@ func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) { } pwm.broadcastWants.Add(c) unsent = append(unsent, c) + + // Increment the total wants gauge + if _, ok := pwm.wantPeers[c]; !ok { + pwm.wantGauge.Inc() + } } if len(unsent) == 0 { @@ -151,17 +172,22 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves // Record that the CID was sent as a want-block pws.wantBlocks.Add(c) - // Update the reverse index - pwm.reverseIndexAdd(c, p) - // Add the CID to the results fltWantBlks = append(fltWantBlks, c) // Make sure the CID is no longer recorded as a want-have pws.wantHaves.Remove(c) - // Increment the count of want-blocks - pwm.wantBlockGauge.Inc() + // Update the reverse index + isNew := pwm.reverseIndexAdd(c, p) + + // Increment the want gauges + if isNew { + pwm.wantBlockGauge.Inc() + if !pwm.broadcastWants.Has(c) { + pwm.wantGauge.Inc() + } + } } } @@ -178,11 +204,16 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves // Record that the CID was sent as a want-have pws.wantHaves.Add(c) - // Update the reverse index - pwm.reverseIndexAdd(c, p) - // Add the CID to the results fltWantHvs = append(fltWantHvs, c) + + // Update the reverse index + isNew := pwm.reverseIndexAdd(c, p) + + // Increment the total wants gauge + if isNew && !pwm.broadcastWants.Has(c) { + pwm.wantGauge.Inc() + } } } @@ -207,6 +238,9 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { } } + cancelledWantBlocks := cid.NewSet() + cancelledWantHaves := cid.NewSet() + // Send cancels to a particular peer send := func(p peer.ID, pws *peerWant) { // Start from the broadcast cancels @@ -216,13 +250,15 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { for _, c := range cancelKs { // Check if a want was sent for the key wantBlock := pws.wantBlocks.Has(c) - if !wantBlock && !pws.wantHaves.Has(c) { - continue - } + wantHave := pws.wantHaves.Has(c) - // Update the want gauge. + // Update the want gauges if wantBlock { - pwm.wantBlockGauge.Dec() + cancelledWantBlocks.Add(c) + } else if wantHave { + cancelledWantHaves.Add(c) + } else { + continue } // Unconditionally remove from the want lists. @@ -271,33 +307,54 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { // Remove cancelled broadcast wants for _, c := range broadcastCancels { pwm.broadcastWants.Remove(c) + + // Decrement the total wants gauge for broadcast wants + if !cancelledWantHaves.Has(c) && !cancelledWantBlocks.Has(c) { + pwm.wantGauge.Dec() + } } + // Decrement the total wants gauge for peer wants + _ = cancelledWantHaves.ForEach(func(c cid.Cid) error { + pwm.wantGauge.Dec() + return nil + }) + _ = cancelledWantBlocks.ForEach(func(c cid.Cid) error { + pwm.wantGauge.Dec() + pwm.wantBlockGauge.Dec() + return nil + }) + // Finally, batch-remove the reverse-index. There's no need to // clear this index peer-by-peer. for _, c := range cancelKs { delete(pwm.wantPeers, c) } + } // Add the peer to the list of peers that have sent a want with the cid -func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) { +func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) bool { peers, ok := pwm.wantPeers[c] if !ok { peers = make(map[peer.ID]struct{}, 10) pwm.wantPeers[c] = peers } peers[p] = struct{}{} + return !ok } // Remove the peer from the list of peers that have sent a want with the cid -func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) { +func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) bool { if peers, ok := pwm.wantPeers[c]; ok { delete(peers, p) if len(peers) == 0 { delete(pwm.wantPeers, c) + return true } } + + return false } // GetWantBlocks returns the set of all want-blocks sent to all peers diff --git a/internal/peermanager/peerwantmanager_test.go b/internal/peermanager/peerwantmanager_test.go index 396ea0d..60b7c8e 100644 --- a/internal/peermanager/peerwantmanager_test.go +++ b/internal/peermanager/peerwantmanager_test.go @@ -56,7 +56,7 @@ func clearSent(pqs map[peer.ID]PeerQueue) { } func TestEmpty(t *testing.T) { - pwm := newPeerWantManager(&gauge{}) + pwm := newPeerWantManager(&gauge{}, &gauge{}) if len(pwm.getWantBlocks()) > 0 { t.Fatal("Expected GetWantBlocks() to have length 0") @@ -67,7 +67,7 @@ func TestEmpty(t *testing.T) { } func TestPWMBroadcastWantHaves(t *testing.T) { - pwm := newPeerWantManager(&gauge{}) + pwm := newPeerWantManager(&gauge{}, &gauge{}) peers := testutil.GeneratePeers(3) cids := testutil.GenerateCids(2) @@ -179,7 +179,7 @@ func TestPWMBroadcastWantHaves(t *testing.T) { } func TestPWMSendWants(t *testing.T) { - pwm := newPeerWantManager(&gauge{}) + pwm := newPeerWantManager(&gauge{}, &gauge{}) peers := testutil.GeneratePeers(2) p0 := peers[0] @@ -259,7 +259,7 @@ func TestPWMSendWants(t *testing.T) { } func TestPWMSendCancels(t *testing.T) { - pwm := newPeerWantManager(&gauge{}) + pwm := newPeerWantManager(&gauge{}, &gauge{}) peers := testutil.GeneratePeers(2) p0 := peers[0] @@ -338,10 +338,12 @@ func TestPWMSendCancels(t *testing.T) { func TestStats(t *testing.T) { g := &gauge{} - pwm := newPeerWantManager(g) + wbg := &gauge{} + pwm := newPeerWantManager(g, wbg) peers := testutil.GeneratePeers(2) p0 := peers[0] + p1 := peers[1] cids := testutil.GenerateCids(2) cids2 := testutil.GenerateCids(2) @@ -353,7 +355,10 @@ func TestStats(t *testing.T) { // Send 2 want-blocks and 2 want-haves to p0 pwm.sendWants(p0, cids, cids2) - if g.count != 2 { + if g.count != 4 { + t.Fatal("Expected 4 wants") + } + if wbg.count != 2 { t.Fatal("Expected 2 want-blocks") } @@ -361,22 +366,73 @@ func TestStats(t *testing.T) { cids3 := testutil.GenerateCids(2) pwm.sendWants(p0, append(cids3, cids[0]), []cid.Cid{}) - if g.count != 4 { + if g.count != 6 { + t.Fatal("Expected 6 wants") + } + if wbg.count != 4 { + t.Fatal("Expected 4 want-blocks") + } + + // Broadcast 1 old want-have and 2 new want-haves + cids4 := testutil.GenerateCids(2) + pwm.broadcastWantHaves(append(cids4, cids2[0])) + if g.count != 8 { + t.Fatal("Expected 8 wants") + } + if wbg.count != 4 { + t.Fatal("Expected 4 want-blocks") + } + + // Add a second peer + pwm.addPeer(pq, p1) + + if g.count != 8 { + t.Fatal("Expected 8 wants") + } + if wbg.count != 4 { t.Fatal("Expected 4 want-blocks") } // Cancel 1 want-block that was sent to p0 // and 1 want-block that was not sent - cids4 := testutil.GenerateCids(1) - pwm.sendCancels(append(cids4, cids[0])) + cids5 := testutil.GenerateCids(1) + pwm.sendCancels(append(cids5, cids[0])) - if g.count != 3 { - t.Fatal("Expected 3 want-blocks", g.count) + if g.count != 7 { + t.Fatal("Expected 7 wants") + } + if wbg.count != 3 { + t.Fatal("Expected 3 want-blocks") } + // Remove first peer pwm.removePeer(p0) - if g.count != 0 { - t.Fatal("Expected all want-blocks to be removed with peer", g.count) + // Should still have 3 broadcast wants + if g.count != 3 { + t.Fatal("Expected 3 wants") + } + if wbg.count != 0 { + t.Fatal("Expected all want-blocks to be removed") + } + + // Remove second peer + pwm.removePeer(p1) + + // Should still have 3 broadcast wants + if g.count != 3 { + t.Fatal("Expected 3 wants") + } + if wbg.count != 0 { + t.Fatal("Expected 0 want-blocks") + } + + // Cancel one remaining broadcast want-have + pwm.sendCancels(cids2[:1]) + if g.count != 2 { + t.Fatal("Expected 2 wants") + } + if wbg.count != 0 { + t.Fatal("Expected 0 want-blocks") } } -- GitLab