diff --git a/internal/peermanager/peerwantmanager.go b/internal/peermanager/peerwantmanager.go index 08914bbcae7e1a359b8f9ff207c26d04fed8ab25..1928966ca18bf88651922fc4f86b1a1a924070a9 100644 --- a/internal/peermanager/peerwantmanager.go +++ b/internal/peermanager/peerwantmanager.go @@ -20,6 +20,9 @@ type Gauge interface { // sent to each peer, so that the PeerManager doesn't send duplicates. type peerWantManager struct { peerWants map[peer.ID]*peerWant + // Reverse index mapping wants to the peers that sent them. This is used + // to speed up cancels + wantPeers map[cid.Cid]map[peer.ID]struct{} // Keeps track of the number of active want-blocks wantBlockGauge Gauge } @@ -34,6 +37,7 @@ type peerWant struct { func newPeerWantManager(wantBlockGauge Gauge) *peerWantManager { return &peerWantManager{ peerWants: make(map[peer.ID]*peerWant), + wantPeers: make(map[cid.Cid]map[peer.ID]struct{}), wantBlockGauge: wantBlockGauge, } } @@ -55,10 +59,19 @@ func (pwm *peerWantManager) removePeer(p peer.ID) { return } - // Decrement the gauge by the number of pending want-blocks to the peer - for range pws.wantBlocks.Keys() { + pws.wantBlocks.ForEach(func(c cid.Cid) error { + // Decrement the gauge by the number of pending want-blocks to the peer pwm.wantBlockGauge.Dec() - } + // Clean up want-blocks from the reverse index + pwm.reverseIndexRemove(c, p) + return nil + }) + + // Clean up want-haves from the reverse index + pws.wantHaves.ForEach(func(c cid.Cid) error { + pwm.reverseIndexRemove(c, p) + return nil + }) delete(pwm.peerWants, p) } @@ -77,6 +90,9 @@ func (pwm *peerWantManager) prepareBroadcastWantHaves(wantHaves []cid.Cid) map[p // Record that the CID has been sent as a want-have pws.wantHaves.Add(c) + // Update the reverse index + pwm.reverseIndexAdd(c, p) + // Add the CID to the results if _, ok := res[p]; !ok { res[p] = make([]cid.Cid, 0, 1) @@ -114,6 +130,9 @@ func (pwm *peerWantManager) prepareSendWants(p peer.ID, wantBlocks []cid.Cid, wa // Record that the CID was sent as a want-block pws.wantBlocks.Add(c) + // Update the reverse index + pwm.reverseIndexAdd(c, p) + // Add the CID to the results resWantBlks = append(resWantBlks, c) @@ -132,6 +151,9 @@ func (pwm *peerWantManager) prepareSendWants(p peer.ID, wantBlocks []cid.Cid, wa // Record that the CID was sent as a want-have pws.wantHaves.Add(c) + // Update the reverse index + pwm.reverseIndexAdd(c, p) + // Add the CID to the results resWantHvs = append(resWantHvs, c) } @@ -146,10 +168,17 @@ func (pwm *peerWantManager) prepareSendWants(p peer.ID, wantBlocks []cid.Cid, wa func (pwm *peerWantManager) prepareSendCancels(cancelKs []cid.Cid) map[peer.ID][]cid.Cid { res := make(map[peer.ID][]cid.Cid) - // Iterate over all known peers - for p, pws := range pwm.peerWants { - // Iterate over all requested cancels - for _, c := range cancelKs { + // Iterate over all requested cancels + for _, c := range cancelKs { + // Iterate over peers that have sent a corresponding want + for p := range pwm.wantPeers[c] { + pws, ok := pwm.peerWants[p] + if !ok { + // Should never happen but check just in case + log.Errorf("peerWantManager reverse index missing peer %s for key %s", p, c) + continue + } + isWantBlock := pws.wantBlocks.Has(c) isWantHave := pws.wantHaves.Has(c) @@ -169,6 +198,9 @@ func (pwm *peerWantManager) prepareSendCancels(cancelKs []cid.Cid) map[peer.ID][ res[p] = make([]cid.Cid, 0, 1) } res[p] = append(res[p], c) + + // Update the reverse index + pwm.reverseIndexRemove(c, p) } } } @@ -176,6 +208,26 @@ func (pwm *peerWantManager) prepareSendCancels(cancelKs []cid.Cid) map[peer.ID][ return res } +// Add the peer to the list of peers that have sent a want with the cid +func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) { + peers, ok := pwm.wantPeers[c] + if !ok { + peers = make(map[peer.ID]struct{}, 1) + pwm.wantPeers[c] = peers + } + peers[p] = struct{}{} +} + +// Remove the peer from the list of peers that have sent a want with the cid +func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) { + if peers, ok := pwm.wantPeers[c]; ok { + delete(peers, p) + if len(peers) == 0 { + delete(pwm.wantPeers, c) + } + } +} + // GetWantBlocks returns the set of all want-blocks sent to all peers func (pwm *peerWantManager) getWantBlocks() []cid.Cid { res := cid.NewSet() @@ -183,10 +235,11 @@ func (pwm *peerWantManager) getWantBlocks() []cid.Cid { // Iterate over all known peers for _, pws := range pwm.peerWants { // Iterate over all want-blocks - for _, c := range pws.wantBlocks.Keys() { + pws.wantBlocks.ForEach(func(c cid.Cid) error { // Add the CID to the results res.Add(c) - } + return nil + }) } return res.Keys() @@ -199,10 +252,11 @@ func (pwm *peerWantManager) getWantHaves() []cid.Cid { // Iterate over all known peers for _, pws := range pwm.peerWants { // Iterate over all want-haves - for _, c := range pws.wantHaves.Keys() { + pws.wantHaves.ForEach(func(c cid.Cid) error { // Add the CID to the results res.Add(c) - } + return nil + }) } return res.Keys() @@ -215,16 +269,18 @@ func (pwm *peerWantManager) getWants() []cid.Cid { // Iterate over all known peers for _, pws := range pwm.peerWants { // Iterate over all want-blocks - for _, c := range pws.wantBlocks.Keys() { + pws.wantBlocks.ForEach(func(c cid.Cid) error { // Add the CID to the results res.Add(c) - } + return nil + }) // Iterate over all want-haves - for _, c := range pws.wantHaves.Keys() { + pws.wantHaves.ForEach(func(c cid.Cid) error { // Add the CID to the results res.Add(c) - } + return nil + }) } return res.Keys()