Unverified Commit 4ce7de96 authored by dirkmc's avatar dirkmc Committed by GitHub

refactor: add reverse index to peerWantManager to speed up cancels (#364)

* refactor: add reverse index to peerWantManager to speed up cancels

* refactor: in peerWantManager use ForEach instead of allocating lists
parent 824f7264
......@@ -20,6 +20,9 @@ type Gauge interface {
// sent to each peer, so that the PeerManager doesn't send duplicates.
type peerWantManager struct {
peerWants map[peer.ID]*peerWant
// Reverse index mapping wants to the peers that sent them. This is used
// to speed up cancels
wantPeers map[cid.Cid]map[peer.ID]struct{}
// Keeps track of the number of active want-blocks
wantBlockGauge Gauge
}
......@@ -34,6 +37,7 @@ type peerWant struct {
func newPeerWantManager(wantBlockGauge Gauge) *peerWantManager {
return &peerWantManager{
peerWants: make(map[peer.ID]*peerWant),
wantPeers: make(map[cid.Cid]map[peer.ID]struct{}),
wantBlockGauge: wantBlockGauge,
}
}
......@@ -55,10 +59,19 @@ func (pwm *peerWantManager) removePeer(p peer.ID) {
return
}
// Decrement the gauge by the number of pending want-blocks to the peer
for range pws.wantBlocks.Keys() {
pws.wantBlocks.ForEach(func(c cid.Cid) error {
// Decrement the gauge by the number of pending want-blocks to the peer
pwm.wantBlockGauge.Dec()
}
// Clean up want-blocks from the reverse index
pwm.reverseIndexRemove(c, p)
return nil
})
// Clean up want-haves from the reverse index
pws.wantHaves.ForEach(func(c cid.Cid) error {
pwm.reverseIndexRemove(c, p)
return nil
})
delete(pwm.peerWants, p)
}
......@@ -77,6 +90,9 @@ func (pwm *peerWantManager) prepareBroadcastWantHaves(wantHaves []cid.Cid) map[p
// Record that the CID has been sent as a want-have
pws.wantHaves.Add(c)
// Update the reverse index
pwm.reverseIndexAdd(c, p)
// Add the CID to the results
if _, ok := res[p]; !ok {
res[p] = make([]cid.Cid, 0, 1)
......@@ -114,6 +130,9 @@ func (pwm *peerWantManager) prepareSendWants(p peer.ID, wantBlocks []cid.Cid, wa
// Record that the CID was sent as a want-block
pws.wantBlocks.Add(c)
// Update the reverse index
pwm.reverseIndexAdd(c, p)
// Add the CID to the results
resWantBlks = append(resWantBlks, c)
......@@ -132,6 +151,9 @@ func (pwm *peerWantManager) prepareSendWants(p peer.ID, wantBlocks []cid.Cid, wa
// Record that the CID was sent as a want-have
pws.wantHaves.Add(c)
// Update the reverse index
pwm.reverseIndexAdd(c, p)
// Add the CID to the results
resWantHvs = append(resWantHvs, c)
}
......@@ -146,10 +168,17 @@ func (pwm *peerWantManager) prepareSendWants(p peer.ID, wantBlocks []cid.Cid, wa
func (pwm *peerWantManager) prepareSendCancels(cancelKs []cid.Cid) map[peer.ID][]cid.Cid {
res := make(map[peer.ID][]cid.Cid)
// Iterate over all known peers
for p, pws := range pwm.peerWants {
// Iterate over all requested cancels
for _, c := range cancelKs {
// Iterate over all requested cancels
for _, c := range cancelKs {
// Iterate over peers that have sent a corresponding want
for p := range pwm.wantPeers[c] {
pws, ok := pwm.peerWants[p]
if !ok {
// Should never happen but check just in case
log.Errorf("peerWantManager reverse index missing peer %s for key %s", p, c)
continue
}
isWantBlock := pws.wantBlocks.Has(c)
isWantHave := pws.wantHaves.Has(c)
......@@ -169,6 +198,9 @@ func (pwm *peerWantManager) prepareSendCancels(cancelKs []cid.Cid) map[peer.ID][
res[p] = make([]cid.Cid, 0, 1)
}
res[p] = append(res[p], c)
// Update the reverse index
pwm.reverseIndexRemove(c, p)
}
}
}
......@@ -176,6 +208,26 @@ func (pwm *peerWantManager) prepareSendCancels(cancelKs []cid.Cid) map[peer.ID][
return res
}
// Add the peer to the list of peers that have sent a want with the cid
func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) {
peers, ok := pwm.wantPeers[c]
if !ok {
peers = make(map[peer.ID]struct{}, 1)
pwm.wantPeers[c] = peers
}
peers[p] = struct{}{}
}
// Remove the peer from the list of peers that have sent a want with the cid
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) {
if peers, ok := pwm.wantPeers[c]; ok {
delete(peers, p)
if len(peers) == 0 {
delete(pwm.wantPeers, c)
}
}
}
// GetWantBlocks returns the set of all want-blocks sent to all peers
func (pwm *peerWantManager) getWantBlocks() []cid.Cid {
res := cid.NewSet()
......@@ -183,10 +235,11 @@ func (pwm *peerWantManager) getWantBlocks() []cid.Cid {
// Iterate over all known peers
for _, pws := range pwm.peerWants {
// Iterate over all want-blocks
for _, c := range pws.wantBlocks.Keys() {
pws.wantBlocks.ForEach(func(c cid.Cid) error {
// Add the CID to the results
res.Add(c)
}
return nil
})
}
return res.Keys()
......@@ -199,10 +252,11 @@ func (pwm *peerWantManager) getWantHaves() []cid.Cid {
// Iterate over all known peers
for _, pws := range pwm.peerWants {
// Iterate over all want-haves
for _, c := range pws.wantHaves.Keys() {
pws.wantHaves.ForEach(func(c cid.Cid) error {
// Add the CID to the results
res.Add(c)
}
return nil
})
}
return res.Keys()
......@@ -215,16 +269,18 @@ func (pwm *peerWantManager) getWants() []cid.Cid {
// Iterate over all known peers
for _, pws := range pwm.peerWants {
// Iterate over all want-blocks
for _, c := range pws.wantBlocks.Keys() {
pws.wantBlocks.ForEach(func(c cid.Cid) error {
// Add the CID to the results
res.Add(c)
}
return nil
})
// Iterate over all want-haves
for _, c := range pws.wantHaves.Keys() {
pws.wantHaves.ForEach(func(c cid.Cid) error {
// Add the CID to the results
res.Add(c)
}
return nil
})
}
return res.Keys()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment