Commit 47129f71 authored by Dirk McCormick's avatar Dirk McCormick

fix: want gauge calculation

parent 25318da3
......@@ -84,25 +84,28 @@ func (pwm *peerWantManager) removePeer(p peer.ID) {
// Clean up want-blocks
_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
// Clean up want-blocks from the reverse index
removedLastPeer := pwm.reverseIndexRemove(c, p)
pwm.reverseIndexRemove(c, p)
// Decrement the gauges by the number of pending want-blocks to the peer
if removedLastPeer {
peersWantingBlock, peersWantingHave := pwm.peersWanting(c)
if peersWantingBlock == 0 {
pwm.wantBlockGauge.Dec()
if !pwm.broadcastWants.Has(c) {
if peersWantingHave == 0 && !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Dec()
}
}
return nil
})
// Clean up want-haves
_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
// Clean up want-haves from the reverse index
removedLastPeer := pwm.reverseIndexRemove(c, p)
pwm.reverseIndexRemove(c, p)
// Decrement the gauge by the number of pending want-haves to the peer
if removedLastPeer && !pwm.broadcastWants.Has(c) {
peersWantingBlock, peersWantingHave := pwm.peersWanting(c)
if peersWantingBlock == 0 && peersWantingHave == 0 && !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Dec()
}
return nil
......@@ -122,8 +125,9 @@ func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) {
pwm.broadcastWants.Add(c)
unsent = append(unsent, c)
// Increment the total wants gauge
// If no peer has a pending want for the key
if _, ok := pwm.wantPeers[c]; !ok {
// Increment the total wants gauge
pwm.wantGauge.Inc()
}
}
......@@ -168,27 +172,30 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
// Iterate over the requested want-blocks
for _, c := range wantBlocks {
// If the want-block hasn't been sent to the peer
if !pws.wantBlocks.Has(c) {
// Record that the CID was sent as a want-block
pws.wantBlocks.Add(c)
// Add the CID to the results
fltWantBlks = append(fltWantBlks, c)
// Make sure the CID is no longer recorded as a want-have
pws.wantHaves.Remove(c)
if pws.wantBlocks.Has(c) {
continue
}
// Update the reverse index
isNew := pwm.reverseIndexAdd(c, p)
// Increment the want gauges
if isNew {
pwm.wantBlockGauge.Inc()
if !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Inc()
}
// Increment the want gauges
peersWantingBlock, peersWantingHave := pwm.peersWanting(c)
if peersWantingBlock == 0 {
pwm.wantBlockGauge.Inc()
if peersWantingHave == 0 && !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Inc()
}
}
// Make sure the CID is no longer recorded as a want-have
pws.wantHaves.Remove(c)
// Record that the CID was sent as a want-block
pws.wantBlocks.Add(c)
// Add the CID to the results
fltWantBlks = append(fltWantBlks, c)
// Update the reverse index
pwm.reverseIndexAdd(c, p)
}
// Iterate over the requested want-haves
......@@ -201,6 +208,12 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
// If the CID has not been sent as a want-block or want-have
if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
// Increment the total wants gauge
peersWantingBlock, peersWantingHave := pwm.peersWanting(c)
if peersWantingHave == 0 && !pwm.broadcastWants.Has(c) && peersWantingBlock == 0 {
pwm.wantGauge.Inc()
}
// Record that the CID was sent as a want-have
pws.wantHaves.Add(c)
......@@ -208,12 +221,7 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
fltWantHvs = append(fltWantHvs, c)
// Update the reverse index
isNew := pwm.reverseIndexAdd(c, p)
// Increment the total wants gauge
if isNew && !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Inc()
}
pwm.reverseIndexAdd(c, p)
}
}
......@@ -228,6 +236,14 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
return
}
// Record how many peers have a pending want-block and want-have for each
// key to be cancelled
peersWantingBefore := make(map[cid.Cid][]int, len(cancelKs))
for _, c := range cancelKs {
blks, haves := pwm.peersWanting(c)
peersWantingBefore[c] = []int{blks, haves}
}
// Create a buffer to use for filtering cancels per peer, with the
// broadcast wants at the front of the buffer (broadcast wants are sent to
// all peers)
......@@ -238,9 +254,6 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
}
}
cancelledWantBlocks := cid.NewSet()
cancelledWantHaves := cid.NewSet()
// Send cancels to a particular peer
send := func(p peer.ID, pws *peerWant) {
// Start from the broadcast cancels
......@@ -249,15 +262,7 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
// For each key to be cancelled
for _, c := range cancelKs {
// Check if a want was sent for the key
wantBlock := pws.wantBlocks.Has(c)
wantHave := pws.wantHaves.Has(c)
// Update the want gauges
if wantBlock {
cancelledWantBlocks.Add(c)
} else if wantHave {
cancelledWantHaves.Add(c)
} else {
if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
continue
}
......@@ -304,33 +309,56 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
}
}
// Remove cancelled broadcast wants
for _, c := range broadcastCancels {
pwm.broadcastWants.Remove(c)
// Decrement the wants gauges
for _, c := range cancelKs {
before := peersWantingBefore[c]
peersWantingBlockBefore := before[0]
peersWantingHaveBefore := before[1]
// Decrement the total wants gauge for broadcast wants
if !cancelledWantHaves.Has(c) && !cancelledWantBlocks.Has(c) {
// If there were any peers that had a pending want-block for the key
if peersWantingBlockBefore > 0 {
// Decrement the want-block gauge
pwm.wantBlockGauge.Dec()
}
// If there was a peer that had a pending want or it was a broadcast want
if peersWantingBlockBefore > 0 || peersWantingHaveBefore > 0 || pwm.broadcastWants.Has(c) {
// Decrement the total wants gauge
pwm.wantGauge.Dec()
}
}
// Decrement the total wants gauge for peer wants
_ = cancelledWantHaves.ForEach(func(c cid.Cid) error {
pwm.wantGauge.Dec()
return nil
})
_ = cancelledWantBlocks.ForEach(func(c cid.Cid) error {
pwm.wantGauge.Dec()
pwm.wantBlockGauge.Dec()
return nil
})
// Remove cancelled broadcast wants
for _, c := range broadcastCancels {
pwm.broadcastWants.Remove(c)
}
// Finally, batch-remove the reverse-index. There's no need to
// clear this index peer-by-peer.
// Batch-remove the reverse-index. There's no need to clear this index
// peer-by-peer.
for _, c := range cancelKs {
delete(pwm.wantPeers, c)
}
}
// peersWanting counts how many peers have a pending want-block and want-have
// for the given CID
func (pwm *peerWantManager) peersWanting(c cid.Cid) (int, int) {
blockCount := 0
haveCount := 0
for p := range pwm.wantPeers[c] {
pws, ok := pwm.peerWants[p]
if !ok {
continue
}
if pws.wantBlocks.Has(c) {
blockCount++
} else if pws.wantHaves.Has(c) {
haveCount++
}
}
return blockCount, haveCount
}
// Add the peer to the list of peers that have sent a want with the cid
......@@ -345,16 +373,13 @@ func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) bool {
}
// Remove the peer from the list of peers that have sent a want with the cid
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) bool {
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) {
if peers, ok := pwm.wantPeers[c]; ok {
delete(peers, p)
if len(peers) == 0 {
delete(pwm.wantPeers, c)
return true
}
}
return false
}
// GetWantBlocks returns the set of all want-blocks sent to all peers
......
......@@ -436,3 +436,81 @@ func TestStats(t *testing.T) {
t.Fatal("Expected 0 want-blocks")
}
}
func TestStatsOverlappingWantBlockWantHave(t *testing.T) {
g := &gauge{}
wbg := &gauge{}
pwm := newPeerWantManager(g, wbg)
peers := testutil.GeneratePeers(2)
p0 := peers[0]
p1 := peers[1]
cids := testutil.GenerateCids(2)
cids2 := testutil.GenerateCids(2)
pwm.addPeer(&mockPQ{}, p0)
pwm.addPeer(&mockPQ{}, p1)
// Send 2 want-blocks and 2 want-haves to p0
pwm.sendWants(p0, cids, cids2)
// Send opposite:
// 2 want-haves and 2 want-blocks to p1
pwm.sendWants(p1, cids2, cids)
if g.count != 4 {
t.Fatal("Expected 4 wants")
}
if wbg.count != 4 {
t.Fatal("Expected 4 want-blocks")
}
// Cancel 1 of each group of cids
pwm.sendCancels([]cid.Cid{cids[0], cids2[0]})
if g.count != 2 {
t.Fatal("Expected 2 wants")
}
if wbg.count != 2 {
t.Fatal("Expected 2 want-blocks")
}
}
func TestStatsRemovePeerOverlappingWantBlockWantHave(t *testing.T) {
g := &gauge{}
wbg := &gauge{}
pwm := newPeerWantManager(g, wbg)
peers := testutil.GeneratePeers(2)
p0 := peers[0]
p1 := peers[1]
cids := testutil.GenerateCids(2)
cids2 := testutil.GenerateCids(2)
pwm.addPeer(&mockPQ{}, p0)
pwm.addPeer(&mockPQ{}, p1)
// Send 2 want-blocks and 2 want-haves to p0
pwm.sendWants(p0, cids, cids2)
// Send opposite:
// 2 want-haves and 2 want-blocks to p1
pwm.sendWants(p1, cids2, cids)
if g.count != 4 {
t.Fatal("Expected 4 wants")
}
if wbg.count != 4 {
t.Fatal("Expected 4 want-blocks")
}
// Remove p0
pwm.removePeer(p0)
if g.count != 4 {
t.Fatal("Expected 4 wants")
}
if wbg.count != 2 {
t.Fatal("Expected 2 want-blocks")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment