Unverified Commit f29c774e authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #416 from ipfs/fix/want-gauge

fix want gauge calculation
parents 06129d6f 654e5b4d
......@@ -84,25 +84,28 @@ func (pwm *peerWantManager) removePeer(p peer.ID) {
// Clean up want-blocks
_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
// Clean up want-blocks from the reverse index
removedLastPeer := pwm.reverseIndexRemove(c, p)
pwm.reverseIndexRemove(c, p)
// Decrement the gauges by the number of pending want-blocks to the peer
if removedLastPeer {
peerCounts := pwm.wantPeerCounts(c)
if peerCounts.wantBlock == 0 {
pwm.wantBlockGauge.Dec()
if !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Dec()
}
}
if !peerCounts.wanted() {
pwm.wantGauge.Dec()
}
return nil
})
// Clean up want-haves
_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
// Clean up want-haves from the reverse index
removedLastPeer := pwm.reverseIndexRemove(c, p)
pwm.reverseIndexRemove(c, p)
// Decrement the gauge by the number of pending want-haves to the peer
if removedLastPeer && !pwm.broadcastWants.Has(c) {
peerCounts := pwm.wantPeerCounts(c)
if !peerCounts.wanted() {
pwm.wantGauge.Dec()
}
return nil
......@@ -122,8 +125,9 @@ func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) {
pwm.broadcastWants.Add(c)
unsent = append(unsent, c)
// Increment the total wants gauge
// If no peer has a pending want for the key
if _, ok := pwm.wantPeers[c]; !ok {
// Increment the total wants gauge
pwm.wantGauge.Inc()
}
}
......@@ -168,27 +172,30 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
// Iterate over the requested want-blocks
for _, c := range wantBlocks {
// If the want-block hasn't been sent to the peer
if !pws.wantBlocks.Has(c) {
// Record that the CID was sent as a want-block
pws.wantBlocks.Add(c)
if pws.wantBlocks.Has(c) {
continue
}
// Add the CID to the results
fltWantBlks = append(fltWantBlks, c)
// Increment the want gauges
peerCounts := pwm.wantPeerCounts(c)
if peerCounts.wantBlock == 0 {
pwm.wantBlockGauge.Inc()
}
if !peerCounts.wanted() {
pwm.wantGauge.Inc()
}
// Make sure the CID is no longer recorded as a want-have
pws.wantHaves.Remove(c)
// Make sure the CID is no longer recorded as a want-have
pws.wantHaves.Remove(c)
// Update the reverse index
isNew := pwm.reverseIndexAdd(c, p)
// Increment the want gauges
if isNew {
pwm.wantBlockGauge.Inc()
if !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Inc()
}
}
}
// Record that the CID was sent as a want-block
pws.wantBlocks.Add(c)
// Add the CID to the results
fltWantBlks = append(fltWantBlks, c)
// Update the reverse index
pwm.reverseIndexAdd(c, p)
}
// Iterate over the requested want-haves
......@@ -201,6 +208,12 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
// If the CID has not been sent as a want-block or want-have
if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
// Increment the total wants gauge
peerCounts := pwm.wantPeerCounts(c)
if !peerCounts.wanted() {
pwm.wantGauge.Inc()
}
// Record that the CID was sent as a want-have
pws.wantHaves.Add(c)
......@@ -208,12 +221,7 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
fltWantHvs = append(fltWantHvs, c)
// Update the reverse index
isNew := pwm.reverseIndexAdd(c, p)
// Increment the total wants gauge
if isNew && !pwm.broadcastWants.Has(c) {
pwm.wantGauge.Inc()
}
pwm.reverseIndexAdd(c, p)
}
}
......@@ -228,6 +236,13 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
return
}
// Record how many peers have a pending want-block and want-have for each
// key to be cancelled
peerCounts := make(map[cid.Cid]wantPeerCnts, len(cancelKs))
for _, c := range cancelKs {
peerCounts[c] = pwm.wantPeerCounts(c)
}
// Create a buffer to use for filtering cancels per peer, with the
// broadcast wants at the front of the buffer (broadcast wants are sent to
// all peers)
......@@ -238,9 +253,6 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
}
}
cancelledWantBlocks := cid.NewSet()
cancelledWantHaves := cid.NewSet()
// Send cancels to a particular peer
send := func(p peer.ID, pws *peerWant) {
// Start from the broadcast cancels
......@@ -249,15 +261,7 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
// For each key to be cancelled
for _, c := range cancelKs {
// Check if a want was sent for the key
wantBlock := pws.wantBlocks.Has(c)
wantHave := pws.wantHaves.Has(c)
// Update the want gauges
if wantBlock {
cancelledWantBlocks.Add(c)
} else if wantHave {
cancelledWantHaves.Add(c)
} else {
if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
continue
}
......@@ -304,33 +308,70 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
}
}
// Remove cancelled broadcast wants
for _, c := range broadcastCancels {
pwm.broadcastWants.Remove(c)
// Decrement the wants gauges
for _, c := range cancelKs {
peerCnts := peerCounts[c]
// Decrement the total wants gauge for broadcast wants
if !cancelledWantHaves.Has(c) && !cancelledWantBlocks.Has(c) {
// If there were any peers that had a pending want-block for the key
if peerCnts.wantBlock > 0 {
// Decrement the want-block gauge
pwm.wantBlockGauge.Dec()
}
// If there was a peer that had a pending want or it was a broadcast want
if peerCnts.wanted() {
// Decrement the total wants gauge
pwm.wantGauge.Dec()
}
}
// Decrement the total wants gauge for peer wants
_ = cancelledWantHaves.ForEach(func(c cid.Cid) error {
pwm.wantGauge.Dec()
return nil
})
_ = cancelledWantBlocks.ForEach(func(c cid.Cid) error {
pwm.wantGauge.Dec()
pwm.wantBlockGauge.Dec()
return nil
})
// Remove cancelled broadcast wants
for _, c := range broadcastCancels {
pwm.broadcastWants.Remove(c)
}
// Finally, batch-remove the reverse-index. There's no need to
// clear this index peer-by-peer.
// Batch-remove the reverse-index. There's no need to clear this index
// peer-by-peer.
for _, c := range cancelKs {
delete(pwm.wantPeers, c)
}
}
// wantPeerCnts stores the number of peers that have pending wants for a CID
type wantPeerCnts struct {
// number of peers that have a pending want-block for the CID
wantBlock int
// number of peers that have a pending want-have for the CID
wantHave int
// whether the CID is a broadcast want
isBroadcast bool
}
// wanted returns true if any peer wants the CID or it's a broadcast want
func (pwm *wantPeerCnts) wanted() bool {
return pwm.wantBlock > 0 || pwm.wantHave > 0 || pwm.isBroadcast
}
// wantPeerCounts counts how many peers have a pending want-block and want-have
// for the given CID
func (pwm *peerWantManager) wantPeerCounts(c cid.Cid) wantPeerCnts {
blockCount := 0
haveCount := 0
for p := range pwm.wantPeers[c] {
pws, ok := pwm.peerWants[p]
if !ok {
log.Errorf("reverse index has extra peer %s for key %s in peerWantManager", string(p), c)
continue
}
if pws.wantBlocks.Has(c) {
blockCount++
} else if pws.wantHaves.Has(c) {
haveCount++
}
}
return wantPeerCnts{blockCount, haveCount, pwm.broadcastWants.Has(c)}
}
// Add the peer to the list of peers that have sent a want with the cid
......@@ -345,16 +386,13 @@ func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) bool {
}
// Remove the peer from the list of peers that have sent a want with the cid
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) bool {
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) {
if peers, ok := pwm.wantPeers[c]; ok {
delete(peers, p)
if len(peers) == 0 {
delete(pwm.wantPeers, c)
return true
}
}
return false
}
// GetWantBlocks returns the set of all want-blocks sent to all peers
......
......@@ -436,3 +436,81 @@ func TestStats(t *testing.T) {
t.Fatal("Expected 0 want-blocks")
}
}
func TestStatsOverlappingWantBlockWantHave(t *testing.T) {
g := &gauge{}
wbg := &gauge{}
pwm := newPeerWantManager(g, wbg)
peers := testutil.GeneratePeers(2)
p0 := peers[0]
p1 := peers[1]
cids := testutil.GenerateCids(2)
cids2 := testutil.GenerateCids(2)
pwm.addPeer(&mockPQ{}, p0)
pwm.addPeer(&mockPQ{}, p1)
// Send 2 want-blocks and 2 want-haves to p0
pwm.sendWants(p0, cids, cids2)
// Send opposite:
// 2 want-haves and 2 want-blocks to p1
pwm.sendWants(p1, cids2, cids)
if g.count != 4 {
t.Fatal("Expected 4 wants")
}
if wbg.count != 4 {
t.Fatal("Expected 4 want-blocks")
}
// Cancel 1 of each group of cids
pwm.sendCancels([]cid.Cid{cids[0], cids2[0]})
if g.count != 2 {
t.Fatal("Expected 2 wants")
}
if wbg.count != 2 {
t.Fatal("Expected 2 want-blocks")
}
}
func TestStatsRemovePeerOverlappingWantBlockWantHave(t *testing.T) {
g := &gauge{}
wbg := &gauge{}
pwm := newPeerWantManager(g, wbg)
peers := testutil.GeneratePeers(2)
p0 := peers[0]
p1 := peers[1]
cids := testutil.GenerateCids(2)
cids2 := testutil.GenerateCids(2)
pwm.addPeer(&mockPQ{}, p0)
pwm.addPeer(&mockPQ{}, p1)
// Send 2 want-blocks and 2 want-haves to p0
pwm.sendWants(p0, cids, cids2)
// Send opposite:
// 2 want-haves and 2 want-blocks to p1
pwm.sendWants(p1, cids2, cids)
if g.count != 4 {
t.Fatal("Expected 4 wants")
}
if wbg.count != 4 {
t.Fatal("Expected 4 want-blocks")
}
// Remove p0
pwm.removePeer(p0)
if g.count != 4 {
t.Fatal("Expected 4 wants")
}
if wbg.count != 2 {
t.Fatal("Expected 2 want-blocks")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment