Unverified Commit 07300422 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #395 from ipfs/perf/peer-want-mgr

Improve peer manager performance
parents 5ff88cfe 60b07e92
......@@ -90,9 +90,8 @@ func (pm *PeerManager) Connected(p peer.ID) {
pq := pm.getOrCreate(p)
// Inform the peer want manager that there's a new peer
wants := pm.pwm.addPeer(p)
// Broadcast any live want-haves to the newly connected peers
pq.AddBroadcastWantHaves(wants)
pm.pwm.addPeer(pq, p)
// Inform the sessions that the peer has connected
pm.signalAvailability(p, true)
}
......@@ -138,11 +137,7 @@ func (pm *PeerManager) BroadcastWantHaves(ctx context.Context, wantHaves []cid.C
pm.pqLk.Lock()
defer pm.pqLk.Unlock()
for p, ks := range pm.pwm.prepareBroadcastWantHaves(wantHaves) {
if pq, ok := pm.peerQueues[p]; ok {
pq.AddBroadcastWantHaves(ks)
}
}
pm.pwm.broadcastWantHaves(wantHaves)
}
// SendWants sends the given want-blocks and want-haves to the given peer.
......@@ -151,9 +146,8 @@ func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []ci
pm.pqLk.Lock()
defer pm.pqLk.Unlock()
if pq, ok := pm.peerQueues[p]; ok {
wblks, whvs := pm.pwm.prepareSendWants(p, wantBlocks, wantHaves)
pq.AddWants(wblks, whvs)
if _, ok := pm.peerQueues[p]; ok {
pm.pwm.sendWants(p, wantBlocks, wantHaves)
}
}
......@@ -164,11 +158,7 @@ func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) {
defer pm.pqLk.Unlock()
// Send a CANCEL to each peer that has been sent a want-block or want-have
for p, ks := range pm.pwm.prepareSendCancels(cancelKs) {
if pq, ok := pm.peerQueues[p]; ok {
pq.AddCancels(ks)
}
}
pm.pwm.sendCancels(cancelKs)
}
// CurrentWants returns the list of pending wants (both want-haves and want-blocks).
......
......@@ -2,6 +2,7 @@ package peermanager
import (
"context"
"math/rand"
"testing"
"time"
......@@ -318,3 +319,61 @@ func TestSessionRegistration(t *testing.T) {
t.Fatal("Expected no signal callback (session unregistered)")
}
}
type benchPeerQueue struct {
}
func (*benchPeerQueue) Startup() {}
func (*benchPeerQueue) Shutdown() {}
func (*benchPeerQueue) AddBroadcastWantHaves(whs []cid.Cid) {}
func (*benchPeerQueue) AddWants(wbs []cid.Cid, whs []cid.Cid) {}
func (*benchPeerQueue) AddCancels(cs []cid.Cid) {}
func (*benchPeerQueue) ResponseReceived(ks []cid.Cid) {}
// Simplistic benchmark to allow us to stress test
func BenchmarkPeerManager(b *testing.B) {
b.StopTimer()
ctx := context.Background()
peerQueueFactory := func(ctx context.Context, p peer.ID) PeerQueue {
return &benchPeerQueue{}
}
self := testutil.GeneratePeers(1)[0]
peers := testutil.GeneratePeers(500)
peerManager := New(ctx, peerQueueFactory, self)
// Create a bunch of connections
connected := 0
for i := 0; i < len(peers); i++ {
peerManager.Connected(peers[i])
connected++
}
var wanted []cid.Cid
b.StartTimer()
for n := 0; n < b.N; n++ {
// Pick a random peer
i := rand.Intn(connected)
// Alternately add either a few wants or many broadcast wants
r := rand.Intn(8)
if r == 0 {
wants := testutil.GenerateCids(10)
peerManager.SendWants(ctx, peers[i], wants[:2], wants[2:])
wanted = append(wanted, wants...)
} else if r == 1 {
wants := testutil.GenerateCids(30)
peerManager.BroadcastWantHaves(ctx, wants)
wanted = append(wanted, wants...)
} else {
limit := len(wanted) / 10
cancel := wanted[:limit]
wanted = wanted[limit:]
peerManager.SendCancels(ctx, cancel)
}
}
}
......@@ -37,6 +37,7 @@ type peerWantManager struct {
type peerWant struct {
wantBlocks *cid.Set
wantHaves *cid.Set
peerQueue PeerQueue
}
// New creates a new peerWantManager with a Gauge that keeps track of the
......@@ -50,17 +51,24 @@ func newPeerWantManager(wantBlockGauge Gauge) *peerWantManager {
}
}
// addPeer adds a peer whose wants we need to keep track of. It returns the
// current list of broadcast wants that should be sent to the peer.
func (pwm *peerWantManager) addPeer(p peer.ID) []cid.Cid {
if _, ok := pwm.peerWants[p]; !ok {
pwm.peerWants[p] = &peerWant{
wantBlocks: cid.NewSet(),
wantHaves: cid.NewSet(),
}
return pwm.broadcastWants.Keys()
// addPeer adds a peer whose wants we need to keep track of. It sends the
// current list of broadcast wants to the peer.
func (pwm *peerWantManager) addPeer(peerQueue PeerQueue, p peer.ID) {
if _, ok := pwm.peerWants[p]; ok {
return
}
pwm.peerWants[p] = &peerWant{
wantBlocks: cid.NewSet(),
wantHaves: cid.NewSet(),
peerQueue: peerQueue,
}
// Broadcast any live want-haves to the newly connected peer
if pwm.broadcastWants.Len() > 0 {
wants := pwm.broadcastWants.Keys()
peerQueue.AddBroadcastWantHaves(wants)
}
return nil
}
// RemovePeer removes a peer and its associated wants from tracking
......@@ -87,55 +95,53 @@ func (pwm *peerWantManager) removePeer(p peer.ID) {
delete(pwm.peerWants, p)
}
// PrepareBroadcastWantHaves filters the list of want-haves for each peer,
// returning a map of peers to the want-haves they have not yet been sent.
func (pwm *peerWantManager) prepareBroadcastWantHaves(wantHaves []cid.Cid) map[peer.ID][]cid.Cid {
res := make(map[peer.ID][]cid.Cid, len(pwm.peerWants))
// broadcastWantHaves sends want-haves to any peers that have not yet been sent them.
func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) {
unsent := make([]cid.Cid, 0, len(wantHaves))
for _, c := range wantHaves {
if pwm.broadcastWants.Has(c) {
// Already a broadcast want, skip it.
continue
}
pwm.broadcastWants.Add(c)
unsent = append(unsent, c)
}
// Prepare broadcast.
wantedBy := pwm.wantPeers[c]
for p := range pwm.peerWants {
if len(unsent) == 0 {
return
}
// Allocate a single buffer to filter broadcast wants for each peer
bcstWantsBuffer := make([]cid.Cid, 0, len(unsent))
// Send broadcast wants to each peer
for _, pws := range pwm.peerWants {
peerUnsent := bcstWantsBuffer[:0]
for _, c := range unsent {
// If we've already sent a want to this peer, skip them.
//
// This is faster than checking the actual wantlists due
// to better locality.
if _, ok := wantedBy[p]; ok {
continue
if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
peerUnsent = append(peerUnsent, c)
}
}
cids, ok := res[p]
if !ok {
cids = make([]cid.Cid, 0, len(wantHaves))
}
res[p] = append(cids, c)
if len(peerUnsent) > 0 {
pws.peerQueue.AddBroadcastWantHaves(peerUnsent)
}
}
return res
}
// PrepareSendWants filters the list of want-blocks and want-haves such that
// it only contains wants that have not already been sent to the peer.
func (pwm *peerWantManager) prepareSendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) ([]cid.Cid, []cid.Cid) {
resWantBlks := make([]cid.Cid, 0)
resWantHvs := make([]cid.Cid, 0)
// sendWants only sends the peer the want-blocks and want-haves that have not
// already been sent to it.
func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
fltWantBlks := make([]cid.Cid, 0, len(wantBlocks))
fltWantHvs := make([]cid.Cid, 0, len(wantHaves))
// Get the existing want-blocks and want-haves for the peer
pws, ok := pwm.peerWants[p]
if !ok {
// In practice this should never happen:
// - PeerManager calls addPeer() as soon as the peer connects
// - PeerManager calls removePeer() as soon as the peer disconnects
// - All calls to PeerWantManager are locked
log.Errorf("prepareSendWants() called with peer %s but peer not found in peerWantManager", string(p))
return resWantBlks, resWantHvs
// In practice this should never happen
log.Errorf("sendWants() called with peer %s but peer not found in peerWantManager", string(p))
return
}
// Iterate over the requested want-blocks
......@@ -149,7 +155,7 @@ func (pwm *peerWantManager) prepareSendWants(p peer.ID, wantBlocks []cid.Cid, wa
pwm.reverseIndexAdd(c, p)
// Add the CID to the results
resWantBlks = append(resWantBlks, c)
fltWantBlks = append(fltWantBlks, c)
// Make sure the CID is no longer recorded as a want-have
pws.wantHaves.Remove(c)
......@@ -176,57 +182,46 @@ func (pwm *peerWantManager) prepareSendWants(p peer.ID, wantBlocks []cid.Cid, wa
pwm.reverseIndexAdd(c, p)
// Add the CID to the results
resWantHvs = append(resWantHvs, c)
fltWantHvs = append(fltWantHvs, c)
}
}
return resWantBlks, resWantHvs
// Send the want-blocks and want-haves to the peer
pws.peerQueue.AddWants(fltWantBlks, fltWantHvs)
}
// PrepareSendCancels filters the list of cancels for each peer,
// returning a map of peers which only contains cancels for wants that have
// been sent to the peer.
func (pwm *peerWantManager) prepareSendCancels(cancelKs []cid.Cid) map[peer.ID][]cid.Cid {
// sendCancels sends a cancel to each peer to which a corresponding want was
// sent
func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
if len(cancelKs) == 0 {
return nil
}
// Pre-allocate enough space for all peers that have the first CID.
// Chances are these peers are related.
expectedResSize := 0
firstCancel := cancelKs[0]
if pwm.broadcastWants.Has(firstCancel) {
expectedResSize = len(pwm.peerWants)
} else {
expectedResSize = len(pwm.wantPeers[firstCancel])
return
}
res := make(map[peer.ID][]cid.Cid, expectedResSize)
// Keep the broadcast keys separate. This lets us batch-process them at
// the end.
broadcastKs := make([]cid.Cid, 0, len(cancelKs))
// Iterate over all requested cancels
// Create a buffer to use for filtering cancels per peer, with the
// broadcast wants at the front of the buffer (broadcast wants are sent to
// all peers)
broadcastCancels := make([]cid.Cid, 0, len(cancelKs))
for _, c := range cancelKs {
// Handle broadcast wants up-front.
isBroadcast := pwm.broadcastWants.Has(c)
if isBroadcast {
broadcastKs = append(broadcastKs, c)
pwm.broadcastWants.Remove(c)
if pwm.broadcastWants.Has(c) {
broadcastCancels = append(broadcastCancels, c)
}
}
// Even if this is a broadcast, we may have sent targeted wants.
// Deal with them.
for p := range pwm.wantPeers[c] {
pws, ok := pwm.peerWants[p]
if !ok {
// Should never happen but check just in case
log.Errorf("peerWantManager reverse index missing peer %s for key %s", p, c)
// Send cancels to a particular peer
send := func(p peer.ID, pws *peerWant) {
// Start from the broadcast cancels
toCancel := broadcastCancels
// For each key to be cancelled
for _, c := range cancelKs {
// Check if a want was sent for the key
wantBlock := pws.wantBlocks.Has(c)
if !wantBlock && !pws.wantHaves.Has(c) {
continue
}
// Update the want gauge.
if pws.wantBlocks.Has(c) {
if wantBlock {
pwm.wantBlockGauge.Dec()
}
......@@ -235,40 +230,54 @@ func (pwm *peerWantManager) prepareSendCancels(cancelKs []cid.Cid) map[peer.ID][
pws.wantHaves.Remove(c)
// If it's a broadcast want, we've already added it to
// the broadcastKs list.
if isBroadcast {
continue
// the peer cancels.
if !pwm.broadcastWants.Has(c) {
toCancel = append(toCancel, c)
}
// Add the CID to the result for the peer.
cids, ok := res[p]
if !ok {
// Pre-allocate enough for all keys.
// Cancels are usually related.
cids = make([]cid.Cid, 0, len(cancelKs))
}
res[p] = append(cids, c)
}
// Finally, batch-remove the reverse-index. There's no need to
// clear this index peer-by-peer.
delete(pwm.wantPeers, c)
// Send cancels to the peer
if len(toCancel) > 0 {
pws.peerQueue.AddCancels(toCancel)
}
}
// If we have any broadcasted CIDs, add them in.
//
// Doing this at the end can save us a bunch of work and allocations.
if len(broadcastKs) > 0 {
for p := range pwm.peerWants {
if cids, ok := res[p]; ok {
res[p] = append(cids, broadcastKs...)
} else {
res[p] = broadcastKs
if len(broadcastCancels) > 0 {
// If a broadcast want is being cancelled, send the cancel to all
// peers
for p, pws := range pwm.peerWants {
send(p, pws)
}
} else {
// Only send cancels to peers that received a corresponding want
cancelPeers := make(map[peer.ID]struct{}, len(pwm.wantPeers[cancelKs[0]]))
for _, c := range cancelKs {
for p := range pwm.wantPeers[c] {
cancelPeers[p] = struct{}{}
}
}
for p := range cancelPeers {
pws, ok := pwm.peerWants[p]
if !ok {
// Should never happen but check just in case
log.Errorf("sendCancels - peerWantManager index missing peer %s", p)
continue
}
send(p, pws)
}
}
return res
// Remove cancelled broadcast wants
for _, c := range broadcastCancels {
pwm.broadcastWants.Remove(c)
}
// Finally, batch-remove the reverse-index. There's no need to
// clear this index peer-by-peer.
for _, c := range cancelKs {
delete(pwm.wantPeers, c)
}
}
// Add the peer to the list of peers that have sent a want with the cid
......
......@@ -4,8 +4,8 @@ import (
"testing"
"github.com/ipfs/go-bitswap/internal/testutil"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
)
type gauge struct {
......@@ -19,6 +19,42 @@ func (g *gauge) Dec() {
g.count--
}
type mockPQ struct {
bcst []cid.Cid
wbs []cid.Cid
whs []cid.Cid
cancels []cid.Cid
}
func (mpq *mockPQ) clear() {
mpq.bcst = nil
mpq.wbs = nil
mpq.whs = nil
mpq.cancels = nil
}
func (mpq *mockPQ) Startup() {}
func (mpq *mockPQ) Shutdown() {}
func (mpq *mockPQ) AddBroadcastWantHaves(whs []cid.Cid) {
mpq.bcst = append(mpq.bcst, whs...)
}
func (mpq *mockPQ) AddWants(wbs []cid.Cid, whs []cid.Cid) {
mpq.wbs = append(mpq.wbs, wbs...)
mpq.whs = append(mpq.whs, whs...)
}
func (mpq *mockPQ) AddCancels(cs []cid.Cid) {
mpq.cancels = append(mpq.cancels, cs...)
}
func (mpq *mockPQ) ResponseReceived(ks []cid.Cid) {
}
func clearSent(pqs map[peer.ID]PeerQueue) {
for _, pqi := range pqs {
pqi.(*mockPQ).clear()
}
}
func TestEmpty(t *testing.T) {
pwm := newPeerWantManager(&gauge{})
......@@ -30,7 +66,7 @@ func TestEmpty(t *testing.T) {
}
}
func TestPrepareBroadcastWantHaves(t *testing.T) {
func TestPWMBroadcastWantHaves(t *testing.T) {
pwm := newPeerWantManager(&gauge{})
peers := testutil.GeneratePeers(3)
......@@ -38,74 +74,87 @@ func TestPrepareBroadcastWantHaves(t *testing.T) {
cids2 := testutil.GenerateCids(2)
cids3 := testutil.GenerateCids(2)
if blist := pwm.addPeer(peers[0]); len(blist) > 0 {
t.Errorf("expected no broadcast wants")
}
if blist := pwm.addPeer(peers[1]); len(blist) > 0 {
t.Errorf("expected no broadcast wants")
peerQueues := make(map[peer.ID]PeerQueue)
for _, p := range peers[:2] {
pq := &mockPQ{}
peerQueues[p] = pq
pwm.addPeer(pq, p)
if len(pq.bcst) > 0 {
t.Errorf("expected no broadcast wants")
}
}
// Broadcast 2 cids to 2 peers
bcst := pwm.prepareBroadcastWantHaves(cids)
if len(bcst) != 2 {
t.Fatal("Expected 2 peers")
}
for p := range bcst {
if !testutil.MatchKeysIgnoreOrder(bcst[p], cids) {
pwm.broadcastWantHaves(cids)
for _, pqi := range peerQueues {
pq := pqi.(*mockPQ)
if len(pq.bcst) != 2 {
t.Fatal("Expected 2 want-haves")
}
if !testutil.MatchKeysIgnoreOrder(pq.bcst, cids) {
t.Fatal("Expected all cids to be broadcast")
}
}
// Broadcasting same cids should have no effect
bcst2 := pwm.prepareBroadcastWantHaves(cids)
if len(bcst2) != 0 {
t.Fatal("Expected 0 peers")
clearSent(peerQueues)
pwm.broadcastWantHaves(cids)
for _, pqi := range peerQueues {
pq := pqi.(*mockPQ)
if len(pq.bcst) != 0 {
t.Fatal("Expected 0 want-haves")
}
}
// Broadcast 2 other cids
bcst3 := pwm.prepareBroadcastWantHaves(cids2)
if len(bcst3) != 2 {
t.Fatal("Expected 2 peers")
}
for p := range bcst3 {
if !testutil.MatchKeysIgnoreOrder(bcst3[p], cids2) {
clearSent(peerQueues)
pwm.broadcastWantHaves(cids2)
for _, pqi := range peerQueues {
pq := pqi.(*mockPQ)
if len(pq.bcst) != 2 {
t.Fatal("Expected 2 want-haves")
}
if !testutil.MatchKeysIgnoreOrder(pq.bcst, cids2) {
t.Fatal("Expected all new cids to be broadcast")
}
}
// Broadcast mix of old and new cids
bcst4 := pwm.prepareBroadcastWantHaves(append(cids, cids3...))
if len(bcst4) != 2 {
t.Fatal("Expected 2 peers")
}
// Only new cids should be broadcast
for p := range bcst4 {
if !testutil.MatchKeysIgnoreOrder(bcst4[p], cids3) {
clearSent(peerQueues)
pwm.broadcastWantHaves(append(cids, cids3...))
for _, pqi := range peerQueues {
pq := pqi.(*mockPQ)
if len(pq.bcst) != 2 {
t.Fatal("Expected 2 want-haves")
}
// Only new cids should be broadcast
if !testutil.MatchKeysIgnoreOrder(pq.bcst, cids3) {
t.Fatal("Expected all new cids to be broadcast")
}
}
// Sending want-block for a cid should prevent broadcast to that peer
clearSent(peerQueues)
cids4 := testutil.GenerateCids(4)
wantBlocks := []cid.Cid{cids4[0], cids4[2]}
pwm.prepareSendWants(peers[0], wantBlocks, []cid.Cid{})
bcst5 := pwm.prepareBroadcastWantHaves(cids4)
if len(bcst4) != 2 {
t.Fatal("Expected 2 peers")
}
// Only cids that were not sent as want-block to peer should be broadcast
for p := range bcst5 {
if p == peers[0] {
if !testutil.MatchKeysIgnoreOrder(bcst5[p], []cid.Cid{cids4[1], cids4[3]}) {
t.Fatal("Expected unsent cids to be broadcast")
}
}
if p == peers[1] {
if !testutil.MatchKeysIgnoreOrder(bcst5[p], cids4) {
t.Fatal("Expected all cids to be broadcast")
}
}
p0 := peers[0]
p1 := peers[1]
pwm.sendWants(p0, wantBlocks, []cid.Cid{})
pwm.broadcastWantHaves(cids4)
pq0 := peerQueues[p0].(*mockPQ)
if len(pq0.bcst) != 2 { // only broadcast 2 / 4 want-haves
t.Fatal("Expected 2 want-haves")
}
if !testutil.MatchKeysIgnoreOrder(pq0.bcst, []cid.Cid{cids4[1], cids4[3]}) {
t.Fatalf("Expected unsent cids to be broadcast")
}
pq1 := peerQueues[p1].(*mockPQ)
if len(pq1.bcst) != 4 { // broadcast all 4 want-haves
t.Fatal("Expected 4 want-haves")
}
if !testutil.MatchKeysIgnoreOrder(pq1.bcst, cids4) {
t.Fatal("Expected all cids to be broadcast")
}
allCids := cids
......@@ -114,17 +163,22 @@ func TestPrepareBroadcastWantHaves(t *testing.T) {
allCids = append(allCids, cids4...)
// Add another peer
bcst6 := pwm.addPeer(peers[2])
if !testutil.MatchKeysIgnoreOrder(bcst6, allCids) {
peer2 := peers[2]
pq2 := &mockPQ{}
peerQueues[peer2] = pq2
pwm.addPeer(pq2, peer2)
if !testutil.MatchKeysIgnoreOrder(pq2.bcst, allCids) {
t.Fatalf("Expected all cids to be broadcast.")
}
if broadcast := pwm.prepareBroadcastWantHaves(allCids); len(broadcast) != 0 {
clearSent(peerQueues)
pwm.broadcastWantHaves(allCids)
if len(pq2.bcst) != 0 {
t.Errorf("did not expect to have CIDs to broadcast")
}
}
func TestPrepareSendWants(t *testing.T) {
func TestPWMSendWants(t *testing.T) {
pwm := newPeerWantManager(&gauge{})
peers := testutil.GeneratePeers(2)
......@@ -133,68 +187,78 @@ func TestPrepareSendWants(t *testing.T) {
cids := testutil.GenerateCids(2)
cids2 := testutil.GenerateCids(2)
pwm.addPeer(p0)
pwm.addPeer(p1)
peerQueues := make(map[peer.ID]PeerQueue)
for _, p := range peers[:2] {
pq := &mockPQ{}
peerQueues[p] = pq
pwm.addPeer(pq, p)
}
pq0 := peerQueues[p0].(*mockPQ)
pq1 := peerQueues[p1].(*mockPQ)
// Send 2 want-blocks and 2 want-haves to p0
wb, wh := pwm.prepareSendWants(p0, cids, cids2)
if !testutil.MatchKeysIgnoreOrder(wb, cids) {
clearSent(peerQueues)
pwm.sendWants(p0, cids, cids2)
if !testutil.MatchKeysIgnoreOrder(pq0.wbs, cids) {
t.Fatal("Expected 2 want-blocks")
}
if !testutil.MatchKeysIgnoreOrder(wh, cids2) {
if !testutil.MatchKeysIgnoreOrder(pq0.whs, cids2) {
t.Fatal("Expected 2 want-haves")
}
// Send to p0
// - 1 old want-block and 2 new want-blocks
// - 1 old want-have and 2 new want-haves
clearSent(peerQueues)
cids3 := testutil.GenerateCids(2)
cids4 := testutil.GenerateCids(2)
wb2, wh2 := pwm.prepareSendWants(p0, append(cids3, cids[0]), append(cids4, cids2[0]))
if !testutil.MatchKeysIgnoreOrder(wb2, cids3) {
pwm.sendWants(p0, append(cids3, cids[0]), append(cids4, cids2[0]))
if !testutil.MatchKeysIgnoreOrder(pq0.wbs, cids3) {
t.Fatal("Expected 2 want-blocks")
}
if !testutil.MatchKeysIgnoreOrder(wh2, cids4) {
if !testutil.MatchKeysIgnoreOrder(pq0.whs, cids4) {
t.Fatal("Expected 2 want-haves")
}
// Send to p0 as want-blocks: 1 new want-block, 1 old want-have
clearSent(peerQueues)
cids5 := testutil.GenerateCids(1)
newWantBlockOldWantHave := append(cids5, cids2[0])
wb3, wh3 := pwm.prepareSendWants(p0, newWantBlockOldWantHave, []cid.Cid{})
pwm.sendWants(p0, newWantBlockOldWantHave, []cid.Cid{})
// If a want was sent as a want-have, it should be ok to now send it as a
// want-block
if !testutil.MatchKeysIgnoreOrder(wb3, newWantBlockOldWantHave) {
if !testutil.MatchKeysIgnoreOrder(pq0.wbs, newWantBlockOldWantHave) {
t.Fatal("Expected 2 want-blocks")
}
if len(wh3) != 0 {
if len(pq0.whs) != 0 {
t.Fatal("Expected 0 want-haves")
}
// Send to p0 as want-haves: 1 new want-have, 1 old want-block
clearSent(peerQueues)
cids6 := testutil.GenerateCids(1)
newWantHaveOldWantBlock := append(cids6, cids[0])
wb4, wh4 := pwm.prepareSendWants(p0, []cid.Cid{}, newWantHaveOldWantBlock)
pwm.sendWants(p0, []cid.Cid{}, newWantHaveOldWantBlock)
// If a want was previously sent as a want-block, it should not be
// possible to now send it as a want-have
if !testutil.MatchKeysIgnoreOrder(wh4, cids6) {
if !testutil.MatchKeysIgnoreOrder(pq0.whs, cids6) {
t.Fatal("Expected 1 want-have")
}
if len(wb4) != 0 {
if len(pq0.wbs) != 0 {
t.Fatal("Expected 0 want-blocks")
}
// Send 2 want-blocks and 2 want-haves to p1
wb5, wh5 := pwm.prepareSendWants(p1, cids, cids2)
if !testutil.MatchKeysIgnoreOrder(wb5, cids) {
pwm.sendWants(p1, cids, cids2)
if !testutil.MatchKeysIgnoreOrder(pq1.wbs, cids) {
t.Fatal("Expected 2 want-blocks")
}
if !testutil.MatchKeysIgnoreOrder(wh5, cids2) {
if !testutil.MatchKeysIgnoreOrder(pq1.whs, cids2) {
t.Fatal("Expected 2 want-haves")
}
}
func TestPrepareSendCancels(t *testing.T) {
func TestPWMSendCancels(t *testing.T) {
pwm := newPeerWantManager(&gauge{})
peers := testutil.GeneratePeers(2)
......@@ -207,14 +271,20 @@ func TestPrepareSendCancels(t *testing.T) {
allwb := append(wb1, wb2...)
allwh := append(wh1, wh2...)
pwm.addPeer(p0)
pwm.addPeer(p1)
peerQueues := make(map[peer.ID]PeerQueue)
for _, p := range peers[:2] {
pq := &mockPQ{}
peerQueues[p] = pq
pwm.addPeer(pq, p)
}
pq0 := peerQueues[p0].(*mockPQ)
pq1 := peerQueues[p1].(*mockPQ)
// Send 2 want-blocks and 2 want-haves to p0
pwm.prepareSendWants(p0, wb1, wh1)
pwm.sendWants(p0, wb1, wh1)
// Send 3 want-blocks and 3 want-haves to p1
// (1 overlapping want-block / want-have with p0)
pwm.prepareSendWants(p1, append(wb2, wb1[1]), append(wh2, wh1[1]))
pwm.sendWants(p1, append(wb2, wb1[1]), append(wh2, wh1[1]))
if !testutil.MatchKeysIgnoreOrder(pwm.getWantBlocks(), allwb) {
t.Fatal("Expected 4 cids to be wanted")
......@@ -224,12 +294,13 @@ func TestPrepareSendCancels(t *testing.T) {
}
// Cancel 1 want-block and 1 want-have that were sent to p0
res := pwm.prepareSendCancels([]cid.Cid{wb1[0], wh1[0]})
clearSent(peerQueues)
pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]})
// Should cancel the want-block and want-have
if len(res) != 1 {
t.Fatal("Expected 1 peer")
if len(pq1.cancels) != 0 {
t.Fatal("Expected no cancels sent to p1")
}
if !testutil.MatchKeysIgnoreOrder(res[p0], []cid.Cid{wb1[0], wh1[0]}) {
if !testutil.MatchKeysIgnoreOrder(pq0.cancels, []cid.Cid{wb1[0], wh1[0]}) {
t.Fatal("Expected 2 cids to be cancelled")
}
if !testutil.MatchKeysIgnoreOrder(pwm.getWantBlocks(), append(wb2, wb1[1])) {
......@@ -240,18 +311,21 @@ func TestPrepareSendCancels(t *testing.T) {
}
// Cancel everything
clearSent(peerQueues)
allCids := append(allwb, allwh...)
res2 := pwm.prepareSendCancels(allCids)
// Should cancel the remaining want-blocks and want-haves
if len(res2) != 2 {
t.Fatal("Expected 2 peers", len(res2))
}
if !testutil.MatchKeysIgnoreOrder(res2[p0], []cid.Cid{wb1[1], wh1[1]}) {
pwm.sendCancels(allCids)
// Should cancel the remaining want-blocks and want-haves for p0
if !testutil.MatchKeysIgnoreOrder(pq0.cancels, []cid.Cid{wb1[1], wh1[1]}) {
t.Fatal("Expected un-cancelled cids to be cancelled")
}
remainingP2 := append(wb2, wh2...)
remainingP2 = append(remainingP2, wb1[1], wh1[1])
if !testutil.MatchKeysIgnoreOrder(res2[p1], remainingP2) {
// Should cancel the remaining want-blocks and want-haves for p1
remainingP1 := append(wb2, wh2...)
remainingP1 = append(remainingP1, wb1[1], wh1[1])
if len(pq1.cancels) != len(remainingP1) {
t.Fatal("mismatch", len(pq1.cancels), len(remainingP1))
}
if !testutil.MatchKeysIgnoreOrder(pq1.cancels, remainingP1) {
t.Fatal("Expected un-cancelled cids to be cancelled")
}
if len(pwm.getWantBlocks()) != 0 {
......@@ -271,10 +345,13 @@ func TestStats(t *testing.T) {
cids := testutil.GenerateCids(2)
cids2 := testutil.GenerateCids(2)
pwm.addPeer(p0)
peerQueues := make(map[peer.ID]PeerQueue)
pq := &mockPQ{}
peerQueues[p0] = pq
pwm.addPeer(pq, p0)
// Send 2 want-blocks and 2 want-haves to p0
pwm.prepareSendWants(p0, cids, cids2)
pwm.sendWants(p0, cids, cids2)
if g.count != 2 {
t.Fatal("Expected 2 want-blocks")
......@@ -282,7 +359,7 @@ func TestStats(t *testing.T) {
// Send 1 old want-block and 2 new want-blocks to p0
cids3 := testutil.GenerateCids(2)
pwm.prepareSendWants(p0, append(cids3, cids[0]), []cid.Cid{})
pwm.sendWants(p0, append(cids3, cids[0]), []cid.Cid{})
if g.count != 4 {
t.Fatal("Expected 4 want-blocks")
......@@ -291,7 +368,7 @@ func TestStats(t *testing.T) {
// Cancel 1 want-block that was sent to p0
// and 1 want-block that was not sent
cids4 := testutil.GenerateCids(1)
pwm.prepareSendCancels(append(cids4, cids[0]))
pwm.sendCancels(append(cids4, cids[0]))
if g.count != 3 {
t.Fatal("Expected 3 want-blocks", g.count)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment