Commit 6d9c17eb authored by Dirk McCormick's avatar Dirk McCormick

perf: improve cancel wants perf

parent e4f2791e
...@@ -197,23 +197,27 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { ...@@ -197,23 +197,27 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
return return
} }
// Handle broadcast wants up-front // Create a buffer to use for filtering cancels per peer, with the
broadcastKs := make([]cid.Cid, 0, len(cancelKs)) // broadcast wants at the front of the buffer (broadcast wants are sent to
// all peers)
i := 0
cancelsBuff := make([]cid.Cid, len(cancelKs))
for _, c := range cancelKs { for _, c := range cancelKs {
if pwm.broadcastWants.Has(c) { if pwm.broadcastWants.Has(c) {
broadcastKs = append(broadcastKs, c) cancelsBuff[i] = c
pwm.broadcastWants.Remove(c) i++
} }
} }
broadcastKsCount := i
// Allocate a single buffer to filter the cancels to send to each peer
cancelsBuff := make([]cid.Cid, 0, len(cancelKs))
// Send cancels to a particular peer // Send cancels to a particular peer
send := func(p peer.ID, pws *peerWant) { send := func(p peer.ID, pws *peerWant) {
// Include broadcast cancels // Start the index into the buffer after the broadcast wants
peerCancels := append(cancelsBuff[:0], broadcastKs...) i = broadcastKsCount
// For each key to be cancelled
for _, c := range cancelKs { for _, c := range cancelKs {
// Check if a want was sent for the key
wantBlock := pws.wantBlocks.Has(c) wantBlock := pws.wantBlocks.Has(c)
if !wantBlock && !pws.wantHaves.Has(c) { if !wantBlock && !pws.wantHaves.Has(c) {
continue continue
...@@ -231,17 +235,18 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { ...@@ -231,17 +235,18 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
// If it's a broadcast want, we've already added it to // If it's a broadcast want, we've already added it to
// the peer cancels. // the peer cancels.
if !pwm.broadcastWants.Has(c) { if !pwm.broadcastWants.Has(c) {
peerCancels = append(peerCancels, c) cancelsBuff[i] = c
i++
} }
} }
// Send cancels to the peer // Send cancels to the peer
if len(peerCancels) > 0 { if i > 0 {
pws.peerQueue.AddCancels(peerCancels) pws.peerQueue.AddCancels(cancelsBuff[:i])
} }
} }
if len(broadcastKs) > 0 { if broadcastKsCount > 0 {
// If a broadcast want is being cancelled, send the cancel to all // If a broadcast want is being cancelled, send the cancel to all
// peers // peers
for p, pws := range pwm.peerWants { for p, pws := range pwm.peerWants {
...@@ -267,6 +272,11 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { ...@@ -267,6 +272,11 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
} }
} }
// Remove cancelled broadcast wants
for _, c := range cancelsBuff[:broadcastKsCount] {
pwm.broadcastWants.Remove(c)
}
// Finally, batch-remove the reverse-index. There's no need to // Finally, batch-remove the reverse-index. There's no need to
// clear this index peer-by-peer. // clear this index peer-by-peer.
for _, c := range cancelKs { for _, c := range cancelKs {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment