diff --git a/internal/peermanager/peermanager.go b/internal/peermanager/peermanager.go index 04b015bfd201e51bd06b34cef236d4e0a765e962..0ce735846f3e006015b7077f5a5b0c4fb11b4657 100644 --- a/internal/peermanager/peermanager.go +++ b/internal/peermanager/peermanager.go @@ -90,9 +90,8 @@ func (pm *PeerManager) Connected(p peer.ID) { pq := pm.getOrCreate(p) // Inform the peer want manager that there's a new peer - wants := pm.pwm.addPeer(p) - // Broadcast any live want-haves to the newly connected peers - pq.AddBroadcastWantHaves(wants) + pm.pwm.addPeer(pq, p) + // Inform the sessions that the peer has connected pm.signalAvailability(p, true) } @@ -138,11 +137,7 @@ func (pm *PeerManager) BroadcastWantHaves(ctx context.Context, wantHaves []cid.C pm.pqLk.Lock() defer pm.pqLk.Unlock() - for p, ks := range pm.pwm.prepareBroadcastWantHaves(wantHaves) { - if pq, ok := pm.peerQueues[p]; ok { - pq.AddBroadcastWantHaves(ks) - } - } + pm.pwm.broadcastWantHaves(wantHaves) } // SendWants sends the given want-blocks and want-haves to the given peer. @@ -151,9 +146,8 @@ func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []ci pm.pqLk.Lock() defer pm.pqLk.Unlock() - if pq, ok := pm.peerQueues[p]; ok { - wblks, whvs := pm.pwm.prepareSendWants(p, wantBlocks, wantHaves) - pq.AddWants(wblks, whvs) + if _, ok := pm.peerQueues[p]; ok { + pm.pwm.sendWants(p, wantBlocks, wantHaves) } } @@ -164,11 +158,7 @@ func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) { defer pm.pqLk.Unlock() // Send a CANCEL to each peer that has been sent a want-block or want-have - for p, ks := range pm.pwm.prepareSendCancels(cancelKs) { - if pq, ok := pm.peerQueues[p]; ok { - pq.AddCancels(ks) - } - } + pm.pwm.sendCancels(cancelKs) } // CurrentWants returns the list of pending wants (both want-haves and want-blocks). diff --git a/internal/peermanager/peermanager_test.go b/internal/peermanager/peermanager_test.go index 5608684665186b015fe31496e0dd3397bb8ca16c..2a4c4c697b9fbb2692a10a8d14b23874573fd0e9 100644 --- a/internal/peermanager/peermanager_test.go +++ b/internal/peermanager/peermanager_test.go @@ -2,6 +2,7 @@ package peermanager import ( "context" + "math/rand" "testing" "time" @@ -318,3 +319,61 @@ func TestSessionRegistration(t *testing.T) { t.Fatal("Expected no signal callback (session unregistered)") } } + +type benchPeerQueue struct { +} + +func (*benchPeerQueue) Startup() {} +func (*benchPeerQueue) Shutdown() {} + +func (*benchPeerQueue) AddBroadcastWantHaves(whs []cid.Cid) {} +func (*benchPeerQueue) AddWants(wbs []cid.Cid, whs []cid.Cid) {} +func (*benchPeerQueue) AddCancels(cs []cid.Cid) {} +func (*benchPeerQueue) ResponseReceived(ks []cid.Cid) {} + +// Simplistic benchmark to allow us to stress test +func BenchmarkPeerManager(b *testing.B) { + b.StopTimer() + + ctx := context.Background() + + peerQueueFactory := func(ctx context.Context, p peer.ID) PeerQueue { + return &benchPeerQueue{} + } + + self := testutil.GeneratePeers(1)[0] + peers := testutil.GeneratePeers(500) + peerManager := New(ctx, peerQueueFactory, self) + + // Create a bunch of connections + connected := 0 + for i := 0; i < len(peers); i++ { + peerManager.Connected(peers[i]) + connected++ + } + + var wanted []cid.Cid + + b.StartTimer() + for n := 0; n < b.N; n++ { + // Pick a random peer + i := rand.Intn(connected) + + // Alternately add either a few wants or many broadcast wants + r := rand.Intn(8) + if r == 0 { + wants := testutil.GenerateCids(10) + peerManager.SendWants(ctx, peers[i], wants[:2], wants[2:]) + wanted = append(wanted, wants...) + } else if r == 1 { + wants := testutil.GenerateCids(30) + peerManager.BroadcastWantHaves(ctx, wants) + wanted = append(wanted, wants...) + } else { + limit := len(wanted) / 10 + cancel := wanted[:limit] + wanted = wanted[limit:] + peerManager.SendCancels(ctx, cancel) + } + } +} diff --git a/internal/peermanager/peerwantmanager.go b/internal/peermanager/peerwantmanager.go index 418a646c49897d7dd3df82943a25b22e46e93d93..16d1913788beb9ba35c0bfbcf06657d5281df288 100644 --- a/internal/peermanager/peerwantmanager.go +++ b/internal/peermanager/peerwantmanager.go @@ -37,6 +37,7 @@ type peerWantManager struct { type peerWant struct { wantBlocks *cid.Set wantHaves *cid.Set + peerQueue PeerQueue } // New creates a new peerWantManager with a Gauge that keeps track of the @@ -50,17 +51,24 @@ func newPeerWantManager(wantBlockGauge Gauge) *peerWantManager { } } -// addPeer adds a peer whose wants we need to keep track of. It returns the -// current list of broadcast wants that should be sent to the peer. -func (pwm *peerWantManager) addPeer(p peer.ID) []cid.Cid { - if _, ok := pwm.peerWants[p]; !ok { - pwm.peerWants[p] = &peerWant{ - wantBlocks: cid.NewSet(), - wantHaves: cid.NewSet(), - } - return pwm.broadcastWants.Keys() +// addPeer adds a peer whose wants we need to keep track of. It sends the +// current list of broadcast wants to the peer. +func (pwm *peerWantManager) addPeer(peerQueue PeerQueue, p peer.ID) { + if _, ok := pwm.peerWants[p]; ok { + return + } + + pwm.peerWants[p] = &peerWant{ + wantBlocks: cid.NewSet(), + wantHaves: cid.NewSet(), + peerQueue: peerQueue, + } + + // Broadcast any live want-haves to the newly connected peer + if pwm.broadcastWants.Len() > 0 { + wants := pwm.broadcastWants.Keys() + peerQueue.AddBroadcastWantHaves(wants) } - return nil } // RemovePeer removes a peer and its associated wants from tracking @@ -87,55 +95,53 @@ func (pwm *peerWantManager) removePeer(p peer.ID) { delete(pwm.peerWants, p) } -// PrepareBroadcastWantHaves filters the list of want-haves for each peer, -// returning a map of peers to the want-haves they have not yet been sent. -func (pwm *peerWantManager) prepareBroadcastWantHaves(wantHaves []cid.Cid) map[peer.ID][]cid.Cid { - res := make(map[peer.ID][]cid.Cid, len(pwm.peerWants)) +// broadcastWantHaves sends want-haves to any peers that have not yet been sent them. +func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) { + unsent := make([]cid.Cid, 0, len(wantHaves)) for _, c := range wantHaves { if pwm.broadcastWants.Has(c) { // Already a broadcast want, skip it. continue } pwm.broadcastWants.Add(c) + unsent = append(unsent, c) + } - // Prepare broadcast. - wantedBy := pwm.wantPeers[c] - for p := range pwm.peerWants { + if len(unsent) == 0 { + return + } + + // Allocate a single buffer to filter broadcast wants for each peer + bcstWantsBuffer := make([]cid.Cid, 0, len(unsent)) + + // Send broadcast wants to each peer + for _, pws := range pwm.peerWants { + peerUnsent := bcstWantsBuffer[:0] + for _, c := range unsent { // If we've already sent a want to this peer, skip them. - // - // This is faster than checking the actual wantlists due - // to better locality. - if _, ok := wantedBy[p]; ok { - continue + if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) { + peerUnsent = append(peerUnsent, c) } + } - cids, ok := res[p] - if !ok { - cids = make([]cid.Cid, 0, len(wantHaves)) - } - res[p] = append(cids, c) + if len(peerUnsent) > 0 { + pws.peerQueue.AddBroadcastWantHaves(peerUnsent) } } - - return res } -// PrepareSendWants filters the list of want-blocks and want-haves such that -// it only contains wants that have not already been sent to the peer. -func (pwm *peerWantManager) prepareSendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) ([]cid.Cid, []cid.Cid) { - resWantBlks := make([]cid.Cid, 0) - resWantHvs := make([]cid.Cid, 0) +// sendWants only sends the peer the want-blocks and want-haves that have not +// already been sent to it. +func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) { + fltWantBlks := make([]cid.Cid, 0, len(wantBlocks)) + fltWantHvs := make([]cid.Cid, 0, len(wantHaves)) // Get the existing want-blocks and want-haves for the peer pws, ok := pwm.peerWants[p] - if !ok { - // In practice this should never happen: - // - PeerManager calls addPeer() as soon as the peer connects - // - PeerManager calls removePeer() as soon as the peer disconnects - // - All calls to PeerWantManager are locked - log.Errorf("prepareSendWants() called with peer %s but peer not found in peerWantManager", string(p)) - return resWantBlks, resWantHvs + // In practice this should never happen + log.Errorf("sendWants() called with peer %s but peer not found in peerWantManager", string(p)) + return } // Iterate over the requested want-blocks @@ -149,7 +155,7 @@ func (pwm *peerWantManager) prepareSendWants(p peer.ID, wantBlocks []cid.Cid, wa pwm.reverseIndexAdd(c, p) // Add the CID to the results - resWantBlks = append(resWantBlks, c) + fltWantBlks = append(fltWantBlks, c) // Make sure the CID is no longer recorded as a want-have pws.wantHaves.Remove(c) @@ -176,57 +182,46 @@ func (pwm *peerWantManager) prepareSendWants(p peer.ID, wantBlocks []cid.Cid, wa pwm.reverseIndexAdd(c, p) // Add the CID to the results - resWantHvs = append(resWantHvs, c) + fltWantHvs = append(fltWantHvs, c) } } - return resWantBlks, resWantHvs + // Send the want-blocks and want-haves to the peer + pws.peerQueue.AddWants(fltWantBlks, fltWantHvs) } -// PrepareSendCancels filters the list of cancels for each peer, -// returning a map of peers which only contains cancels for wants that have -// been sent to the peer. -func (pwm *peerWantManager) prepareSendCancels(cancelKs []cid.Cid) map[peer.ID][]cid.Cid { +// sendCancels sends a cancel to each peer to which a corresponding want was +// sent +func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { if len(cancelKs) == 0 { - return nil - } - - // Pre-allocate enough space for all peers that have the first CID. - // Chances are these peers are related. - expectedResSize := 0 - firstCancel := cancelKs[0] - if pwm.broadcastWants.Has(firstCancel) { - expectedResSize = len(pwm.peerWants) - } else { - expectedResSize = len(pwm.wantPeers[firstCancel]) + return } - res := make(map[peer.ID][]cid.Cid, expectedResSize) - - // Keep the broadcast keys separate. This lets us batch-process them at - // the end. - broadcastKs := make([]cid.Cid, 0, len(cancelKs)) - // Iterate over all requested cancels + // Create a buffer to use for filtering cancels per peer, with the + // broadcast wants at the front of the buffer (broadcast wants are sent to + // all peers) + broadcastCancels := make([]cid.Cid, 0, len(cancelKs)) for _, c := range cancelKs { - // Handle broadcast wants up-front. - isBroadcast := pwm.broadcastWants.Has(c) - if isBroadcast { - broadcastKs = append(broadcastKs, c) - pwm.broadcastWants.Remove(c) + if pwm.broadcastWants.Has(c) { + broadcastCancels = append(broadcastCancels, c) } + } - // Even if this is a broadcast, we may have sent targeted wants. - // Deal with them. - for p := range pwm.wantPeers[c] { - pws, ok := pwm.peerWants[p] - if !ok { - // Should never happen but check just in case - log.Errorf("peerWantManager reverse index missing peer %s for key %s", p, c) + // Send cancels to a particular peer + send := func(p peer.ID, pws *peerWant) { + // Start from the broadcast cancels + toCancel := broadcastCancels + + // For each key to be cancelled + for _, c := range cancelKs { + // Check if a want was sent for the key + wantBlock := pws.wantBlocks.Has(c) + if !wantBlock && !pws.wantHaves.Has(c) { continue } // Update the want gauge. - if pws.wantBlocks.Has(c) { + if wantBlock { pwm.wantBlockGauge.Dec() } @@ -235,40 +230,54 @@ func (pwm *peerWantManager) prepareSendCancels(cancelKs []cid.Cid) map[peer.ID][ pws.wantHaves.Remove(c) // If it's a broadcast want, we've already added it to - // the broadcastKs list. - if isBroadcast { - continue + // the peer cancels. + if !pwm.broadcastWants.Has(c) { + toCancel = append(toCancel, c) } - - // Add the CID to the result for the peer. - cids, ok := res[p] - if !ok { - // Pre-allocate enough for all keys. - // Cancels are usually related. - cids = make([]cid.Cid, 0, len(cancelKs)) - } - res[p] = append(cids, c) } - // Finally, batch-remove the reverse-index. There's no need to - // clear this index peer-by-peer. - delete(pwm.wantPeers, c) + // Send cancels to the peer + if len(toCancel) > 0 { + pws.peerQueue.AddCancels(toCancel) + } } - // If we have any broadcasted CIDs, add them in. - // - // Doing this at the end can save us a bunch of work and allocations. - if len(broadcastKs) > 0 { - for p := range pwm.peerWants { - if cids, ok := res[p]; ok { - res[p] = append(cids, broadcastKs...) - } else { - res[p] = broadcastKs + if len(broadcastCancels) > 0 { + // If a broadcast want is being cancelled, send the cancel to all + // peers + for p, pws := range pwm.peerWants { + send(p, pws) + } + } else { + // Only send cancels to peers that received a corresponding want + cancelPeers := make(map[peer.ID]struct{}, len(pwm.wantPeers[cancelKs[0]])) + for _, c := range cancelKs { + for p := range pwm.wantPeers[c] { + cancelPeers[p] = struct{}{} } } + for p := range cancelPeers { + pws, ok := pwm.peerWants[p] + if !ok { + // Should never happen but check just in case + log.Errorf("sendCancels - peerWantManager index missing peer %s", p) + continue + } + + send(p, pws) + } } - return res + // Remove cancelled broadcast wants + for _, c := range broadcastCancels { + pwm.broadcastWants.Remove(c) + } + + // Finally, batch-remove the reverse-index. There's no need to + // clear this index peer-by-peer. + for _, c := range cancelKs { + delete(pwm.wantPeers, c) + } } // Add the peer to the list of peers that have sent a want with the cid diff --git a/internal/peermanager/peerwantmanager_test.go b/internal/peermanager/peerwantmanager_test.go index 766033e8f457003bd4e18f02fd64975040339677..396ea0d82acac2c838a4939759db5c00c832d9e7 100644 --- a/internal/peermanager/peerwantmanager_test.go +++ b/internal/peermanager/peerwantmanager_test.go @@ -4,8 +4,8 @@ import ( "testing" "github.com/ipfs/go-bitswap/internal/testutil" - cid "github.com/ipfs/go-cid" + peer "github.com/libp2p/go-libp2p-core/peer" ) type gauge struct { @@ -19,6 +19,42 @@ func (g *gauge) Dec() { g.count-- } +type mockPQ struct { + bcst []cid.Cid + wbs []cid.Cid + whs []cid.Cid + cancels []cid.Cid +} + +func (mpq *mockPQ) clear() { + mpq.bcst = nil + mpq.wbs = nil + mpq.whs = nil + mpq.cancels = nil +} + +func (mpq *mockPQ) Startup() {} +func (mpq *mockPQ) Shutdown() {} + +func (mpq *mockPQ) AddBroadcastWantHaves(whs []cid.Cid) { + mpq.bcst = append(mpq.bcst, whs...) +} +func (mpq *mockPQ) AddWants(wbs []cid.Cid, whs []cid.Cid) { + mpq.wbs = append(mpq.wbs, wbs...) + mpq.whs = append(mpq.whs, whs...) +} +func (mpq *mockPQ) AddCancels(cs []cid.Cid) { + mpq.cancels = append(mpq.cancels, cs...) +} +func (mpq *mockPQ) ResponseReceived(ks []cid.Cid) { +} + +func clearSent(pqs map[peer.ID]PeerQueue) { + for _, pqi := range pqs { + pqi.(*mockPQ).clear() + } +} + func TestEmpty(t *testing.T) { pwm := newPeerWantManager(&gauge{}) @@ -30,7 +66,7 @@ func TestEmpty(t *testing.T) { } } -func TestPrepareBroadcastWantHaves(t *testing.T) { +func TestPWMBroadcastWantHaves(t *testing.T) { pwm := newPeerWantManager(&gauge{}) peers := testutil.GeneratePeers(3) @@ -38,74 +74,87 @@ func TestPrepareBroadcastWantHaves(t *testing.T) { cids2 := testutil.GenerateCids(2) cids3 := testutil.GenerateCids(2) - if blist := pwm.addPeer(peers[0]); len(blist) > 0 { - t.Errorf("expected no broadcast wants") - } - if blist := pwm.addPeer(peers[1]); len(blist) > 0 { - t.Errorf("expected no broadcast wants") + peerQueues := make(map[peer.ID]PeerQueue) + for _, p := range peers[:2] { + pq := &mockPQ{} + peerQueues[p] = pq + pwm.addPeer(pq, p) + if len(pq.bcst) > 0 { + t.Errorf("expected no broadcast wants") + } } // Broadcast 2 cids to 2 peers - bcst := pwm.prepareBroadcastWantHaves(cids) - if len(bcst) != 2 { - t.Fatal("Expected 2 peers") - } - for p := range bcst { - if !testutil.MatchKeysIgnoreOrder(bcst[p], cids) { + pwm.broadcastWantHaves(cids) + for _, pqi := range peerQueues { + pq := pqi.(*mockPQ) + if len(pq.bcst) != 2 { + t.Fatal("Expected 2 want-haves") + } + if !testutil.MatchKeysIgnoreOrder(pq.bcst, cids) { t.Fatal("Expected all cids to be broadcast") } } // Broadcasting same cids should have no effect - bcst2 := pwm.prepareBroadcastWantHaves(cids) - if len(bcst2) != 0 { - t.Fatal("Expected 0 peers") + clearSent(peerQueues) + pwm.broadcastWantHaves(cids) + for _, pqi := range peerQueues { + pq := pqi.(*mockPQ) + if len(pq.bcst) != 0 { + t.Fatal("Expected 0 want-haves") + } } // Broadcast 2 other cids - bcst3 := pwm.prepareBroadcastWantHaves(cids2) - if len(bcst3) != 2 { - t.Fatal("Expected 2 peers") - } - for p := range bcst3 { - if !testutil.MatchKeysIgnoreOrder(bcst3[p], cids2) { + clearSent(peerQueues) + pwm.broadcastWantHaves(cids2) + for _, pqi := range peerQueues { + pq := pqi.(*mockPQ) + if len(pq.bcst) != 2 { + t.Fatal("Expected 2 want-haves") + } + if !testutil.MatchKeysIgnoreOrder(pq.bcst, cids2) { t.Fatal("Expected all new cids to be broadcast") } } // Broadcast mix of old and new cids - bcst4 := pwm.prepareBroadcastWantHaves(append(cids, cids3...)) - if len(bcst4) != 2 { - t.Fatal("Expected 2 peers") - } - // Only new cids should be broadcast - for p := range bcst4 { - if !testutil.MatchKeysIgnoreOrder(bcst4[p], cids3) { + clearSent(peerQueues) + pwm.broadcastWantHaves(append(cids, cids3...)) + for _, pqi := range peerQueues { + pq := pqi.(*mockPQ) + if len(pq.bcst) != 2 { + t.Fatal("Expected 2 want-haves") + } + // Only new cids should be broadcast + if !testutil.MatchKeysIgnoreOrder(pq.bcst, cids3) { t.Fatal("Expected all new cids to be broadcast") } } // Sending want-block for a cid should prevent broadcast to that peer + clearSent(peerQueues) cids4 := testutil.GenerateCids(4) wantBlocks := []cid.Cid{cids4[0], cids4[2]} - pwm.prepareSendWants(peers[0], wantBlocks, []cid.Cid{}) - - bcst5 := pwm.prepareBroadcastWantHaves(cids4) - if len(bcst4) != 2 { - t.Fatal("Expected 2 peers") - } - // Only cids that were not sent as want-block to peer should be broadcast - for p := range bcst5 { - if p == peers[0] { - if !testutil.MatchKeysIgnoreOrder(bcst5[p], []cid.Cid{cids4[1], cids4[3]}) { - t.Fatal("Expected unsent cids to be broadcast") - } - } - if p == peers[1] { - if !testutil.MatchKeysIgnoreOrder(bcst5[p], cids4) { - t.Fatal("Expected all cids to be broadcast") - } - } + p0 := peers[0] + p1 := peers[1] + pwm.sendWants(p0, wantBlocks, []cid.Cid{}) + + pwm.broadcastWantHaves(cids4) + pq0 := peerQueues[p0].(*mockPQ) + if len(pq0.bcst) != 2 { // only broadcast 2 / 4 want-haves + t.Fatal("Expected 2 want-haves") + } + if !testutil.MatchKeysIgnoreOrder(pq0.bcst, []cid.Cid{cids4[1], cids4[3]}) { + t.Fatalf("Expected unsent cids to be broadcast") + } + pq1 := peerQueues[p1].(*mockPQ) + if len(pq1.bcst) != 4 { // broadcast all 4 want-haves + t.Fatal("Expected 4 want-haves") + } + if !testutil.MatchKeysIgnoreOrder(pq1.bcst, cids4) { + t.Fatal("Expected all cids to be broadcast") } allCids := cids @@ -114,17 +163,22 @@ func TestPrepareBroadcastWantHaves(t *testing.T) { allCids = append(allCids, cids4...) // Add another peer - bcst6 := pwm.addPeer(peers[2]) - if !testutil.MatchKeysIgnoreOrder(bcst6, allCids) { + peer2 := peers[2] + pq2 := &mockPQ{} + peerQueues[peer2] = pq2 + pwm.addPeer(pq2, peer2) + if !testutil.MatchKeysIgnoreOrder(pq2.bcst, allCids) { t.Fatalf("Expected all cids to be broadcast.") } - if broadcast := pwm.prepareBroadcastWantHaves(allCids); len(broadcast) != 0 { + clearSent(peerQueues) + pwm.broadcastWantHaves(allCids) + if len(pq2.bcst) != 0 { t.Errorf("did not expect to have CIDs to broadcast") } } -func TestPrepareSendWants(t *testing.T) { +func TestPWMSendWants(t *testing.T) { pwm := newPeerWantManager(&gauge{}) peers := testutil.GeneratePeers(2) @@ -133,68 +187,78 @@ func TestPrepareSendWants(t *testing.T) { cids := testutil.GenerateCids(2) cids2 := testutil.GenerateCids(2) - pwm.addPeer(p0) - pwm.addPeer(p1) + peerQueues := make(map[peer.ID]PeerQueue) + for _, p := range peers[:2] { + pq := &mockPQ{} + peerQueues[p] = pq + pwm.addPeer(pq, p) + } + pq0 := peerQueues[p0].(*mockPQ) + pq1 := peerQueues[p1].(*mockPQ) // Send 2 want-blocks and 2 want-haves to p0 - wb, wh := pwm.prepareSendWants(p0, cids, cids2) - if !testutil.MatchKeysIgnoreOrder(wb, cids) { + clearSent(peerQueues) + pwm.sendWants(p0, cids, cids2) + if !testutil.MatchKeysIgnoreOrder(pq0.wbs, cids) { t.Fatal("Expected 2 want-blocks") } - if !testutil.MatchKeysIgnoreOrder(wh, cids2) { + if !testutil.MatchKeysIgnoreOrder(pq0.whs, cids2) { t.Fatal("Expected 2 want-haves") } // Send to p0 // - 1 old want-block and 2 new want-blocks // - 1 old want-have and 2 new want-haves + clearSent(peerQueues) cids3 := testutil.GenerateCids(2) cids4 := testutil.GenerateCids(2) - wb2, wh2 := pwm.prepareSendWants(p0, append(cids3, cids[0]), append(cids4, cids2[0])) - if !testutil.MatchKeysIgnoreOrder(wb2, cids3) { + pwm.sendWants(p0, append(cids3, cids[0]), append(cids4, cids2[0])) + if !testutil.MatchKeysIgnoreOrder(pq0.wbs, cids3) { t.Fatal("Expected 2 want-blocks") } - if !testutil.MatchKeysIgnoreOrder(wh2, cids4) { + if !testutil.MatchKeysIgnoreOrder(pq0.whs, cids4) { t.Fatal("Expected 2 want-haves") } // Send to p0 as want-blocks: 1 new want-block, 1 old want-have + clearSent(peerQueues) cids5 := testutil.GenerateCids(1) newWantBlockOldWantHave := append(cids5, cids2[0]) - wb3, wh3 := pwm.prepareSendWants(p0, newWantBlockOldWantHave, []cid.Cid{}) + pwm.sendWants(p0, newWantBlockOldWantHave, []cid.Cid{}) // If a want was sent as a want-have, it should be ok to now send it as a // want-block - if !testutil.MatchKeysIgnoreOrder(wb3, newWantBlockOldWantHave) { + if !testutil.MatchKeysIgnoreOrder(pq0.wbs, newWantBlockOldWantHave) { t.Fatal("Expected 2 want-blocks") } - if len(wh3) != 0 { + if len(pq0.whs) != 0 { t.Fatal("Expected 0 want-haves") } // Send to p0 as want-haves: 1 new want-have, 1 old want-block + clearSent(peerQueues) cids6 := testutil.GenerateCids(1) newWantHaveOldWantBlock := append(cids6, cids[0]) - wb4, wh4 := pwm.prepareSendWants(p0, []cid.Cid{}, newWantHaveOldWantBlock) + pwm.sendWants(p0, []cid.Cid{}, newWantHaveOldWantBlock) // If a want was previously sent as a want-block, it should not be // possible to now send it as a want-have - if !testutil.MatchKeysIgnoreOrder(wh4, cids6) { + if !testutil.MatchKeysIgnoreOrder(pq0.whs, cids6) { t.Fatal("Expected 1 want-have") } - if len(wb4) != 0 { + if len(pq0.wbs) != 0 { t.Fatal("Expected 0 want-blocks") } // Send 2 want-blocks and 2 want-haves to p1 - wb5, wh5 := pwm.prepareSendWants(p1, cids, cids2) - if !testutil.MatchKeysIgnoreOrder(wb5, cids) { + pwm.sendWants(p1, cids, cids2) + if !testutil.MatchKeysIgnoreOrder(pq1.wbs, cids) { t.Fatal("Expected 2 want-blocks") } - if !testutil.MatchKeysIgnoreOrder(wh5, cids2) { + if !testutil.MatchKeysIgnoreOrder(pq1.whs, cids2) { t.Fatal("Expected 2 want-haves") } } -func TestPrepareSendCancels(t *testing.T) { +func TestPWMSendCancels(t *testing.T) { pwm := newPeerWantManager(&gauge{}) peers := testutil.GeneratePeers(2) @@ -207,14 +271,20 @@ func TestPrepareSendCancels(t *testing.T) { allwb := append(wb1, wb2...) allwh := append(wh1, wh2...) - pwm.addPeer(p0) - pwm.addPeer(p1) + peerQueues := make(map[peer.ID]PeerQueue) + for _, p := range peers[:2] { + pq := &mockPQ{} + peerQueues[p] = pq + pwm.addPeer(pq, p) + } + pq0 := peerQueues[p0].(*mockPQ) + pq1 := peerQueues[p1].(*mockPQ) // Send 2 want-blocks and 2 want-haves to p0 - pwm.prepareSendWants(p0, wb1, wh1) + pwm.sendWants(p0, wb1, wh1) // Send 3 want-blocks and 3 want-haves to p1 // (1 overlapping want-block / want-have with p0) - pwm.prepareSendWants(p1, append(wb2, wb1[1]), append(wh2, wh1[1])) + pwm.sendWants(p1, append(wb2, wb1[1]), append(wh2, wh1[1])) if !testutil.MatchKeysIgnoreOrder(pwm.getWantBlocks(), allwb) { t.Fatal("Expected 4 cids to be wanted") @@ -224,12 +294,13 @@ func TestPrepareSendCancels(t *testing.T) { } // Cancel 1 want-block and 1 want-have that were sent to p0 - res := pwm.prepareSendCancels([]cid.Cid{wb1[0], wh1[0]}) + clearSent(peerQueues) + pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]}) // Should cancel the want-block and want-have - if len(res) != 1 { - t.Fatal("Expected 1 peer") + if len(pq1.cancels) != 0 { + t.Fatal("Expected no cancels sent to p1") } - if !testutil.MatchKeysIgnoreOrder(res[p0], []cid.Cid{wb1[0], wh1[0]}) { + if !testutil.MatchKeysIgnoreOrder(pq0.cancels, []cid.Cid{wb1[0], wh1[0]}) { t.Fatal("Expected 2 cids to be cancelled") } if !testutil.MatchKeysIgnoreOrder(pwm.getWantBlocks(), append(wb2, wb1[1])) { @@ -240,18 +311,21 @@ func TestPrepareSendCancels(t *testing.T) { } // Cancel everything + clearSent(peerQueues) allCids := append(allwb, allwh...) - res2 := pwm.prepareSendCancels(allCids) - // Should cancel the remaining want-blocks and want-haves - if len(res2) != 2 { - t.Fatal("Expected 2 peers", len(res2)) - } - if !testutil.MatchKeysIgnoreOrder(res2[p0], []cid.Cid{wb1[1], wh1[1]}) { + pwm.sendCancels(allCids) + // Should cancel the remaining want-blocks and want-haves for p0 + if !testutil.MatchKeysIgnoreOrder(pq0.cancels, []cid.Cid{wb1[1], wh1[1]}) { t.Fatal("Expected un-cancelled cids to be cancelled") } - remainingP2 := append(wb2, wh2...) - remainingP2 = append(remainingP2, wb1[1], wh1[1]) - if !testutil.MatchKeysIgnoreOrder(res2[p1], remainingP2) { + + // Should cancel the remaining want-blocks and want-haves for p1 + remainingP1 := append(wb2, wh2...) + remainingP1 = append(remainingP1, wb1[1], wh1[1]) + if len(pq1.cancels) != len(remainingP1) { + t.Fatal("mismatch", len(pq1.cancels), len(remainingP1)) + } + if !testutil.MatchKeysIgnoreOrder(pq1.cancels, remainingP1) { t.Fatal("Expected un-cancelled cids to be cancelled") } if len(pwm.getWantBlocks()) != 0 { @@ -271,10 +345,13 @@ func TestStats(t *testing.T) { cids := testutil.GenerateCids(2) cids2 := testutil.GenerateCids(2) - pwm.addPeer(p0) + peerQueues := make(map[peer.ID]PeerQueue) + pq := &mockPQ{} + peerQueues[p0] = pq + pwm.addPeer(pq, p0) // Send 2 want-blocks and 2 want-haves to p0 - pwm.prepareSendWants(p0, cids, cids2) + pwm.sendWants(p0, cids, cids2) if g.count != 2 { t.Fatal("Expected 2 want-blocks") @@ -282,7 +359,7 @@ func TestStats(t *testing.T) { // Send 1 old want-block and 2 new want-blocks to p0 cids3 := testutil.GenerateCids(2) - pwm.prepareSendWants(p0, append(cids3, cids[0]), []cid.Cid{}) + pwm.sendWants(p0, append(cids3, cids[0]), []cid.Cid{}) if g.count != 4 { t.Fatal("Expected 4 want-blocks") @@ -291,7 +368,7 @@ func TestStats(t *testing.T) { // Cancel 1 want-block that was sent to p0 // and 1 want-block that was not sent cids4 := testutil.GenerateCids(1) - pwm.prepareSendCancels(append(cids4, cids[0])) + pwm.sendCancels(append(cids4, cids[0])) if g.count != 3 { t.Fatal("Expected 3 want-blocks", g.count)