Unverified Commit 2a033735 authored by Steven Allen's avatar Steven Allen Committed by GitHub

feat: move broadcast wantlist into the peermanager (#365)

* feat: small optimizations

* feat: move broadcast wantlist into the peermanager

This deduplicates some state and allows us to do less book-keeping for broadcast
wants. We should probably rename the PeerManager to the WantManager and rename the
WantManager to something else.

* fix: lint warnings
parent 4ce7de96
......@@ -261,7 +261,6 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) {
mq.dhTimeoutMgr.CancelPending(cancelKs)
mq.wllock.Lock()
defer mq.wllock.Unlock()
workReady := false
......@@ -282,6 +281,10 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) {
}
}
mq.wllock.Unlock()
// Unlock first to be nice to the scheduler.
// Schedule a message send
if workReady {
mq.signalWorkReady()
......
......@@ -82,18 +82,16 @@ func (pm *PeerManager) ConnectedPeers() []peer.ID {
// Connected is called to add a new peer to the pool, and send it an initial set
// of wants.
func (pm *PeerManager) Connected(p peer.ID, initialWantHaves []cid.Cid) {
func (pm *PeerManager) Connected(p peer.ID) {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()
pq := pm.getOrCreate(p)
// Inform the peer want manager that there's a new peer
pm.pwm.addPeer(p)
// Record that the want-haves are being sent to the peer
_, wantHaves := pm.pwm.prepareSendWants(p, nil, initialWantHaves)
wants := pm.pwm.addPeer(p)
// Broadcast any live want-haves to the newly connected peers
pq.AddBroadcastWantHaves(wantHaves)
pq.AddBroadcastWantHaves(wants)
// Inform the sessions that the peer has connected
pm.signalAvailability(p, true)
}
......
......@@ -82,9 +82,9 @@ func TestAddingAndRemovingPeers(t *testing.T) {
self, peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4], tp[5]
peerManager := New(ctx, peerQueueFactory, self)
peerManager.Connected(peer1, nil)
peerManager.Connected(peer2, nil)
peerManager.Connected(peer3, nil)
peerManager.Connected(peer1)
peerManager.Connected(peer2)
peerManager.Connected(peer3)
connectedPeers := peerManager.ConnectedPeers()
......@@ -108,7 +108,7 @@ func TestAddingAndRemovingPeers(t *testing.T) {
}
// reconnect peer
peerManager.Connected(peer1, nil)
peerManager.Connected(peer1)
connectedPeers = peerManager.ConnectedPeers()
if !testutil.ContainsPeer(connectedPeers, peer1) {
......@@ -126,9 +126,10 @@ func TestBroadcastOnConnect(t *testing.T) {
peerManager := New(ctx, peerQueueFactory, self)
cids := testutil.GenerateCids(2)
peerManager.BroadcastWantHaves(ctx, cids)
// Connect with two broadcast wants for first peer
peerManager.Connected(peer1, cids)
peerManager.Connected(peer1)
collected := collectMessages(msgs, 2*time.Millisecond)
if len(collected[peer1].wantHaves) != 2 {
......@@ -147,8 +148,11 @@ func TestBroadcastWantHaves(t *testing.T) {
cids := testutil.GenerateCids(3)
// Connect to first peer with two broadcast wants
peerManager.Connected(peer1, []cid.Cid{cids[0], cids[1]})
// Broadcast the first two.
peerManager.BroadcastWantHaves(ctx, cids[:2])
// First peer should get them.
peerManager.Connected(peer1)
collected := collectMessages(msgs, 2*time.Millisecond)
if len(collected[peer1].wantHaves) != 2 {
......@@ -156,7 +160,7 @@ func TestBroadcastWantHaves(t *testing.T) {
}
// Connect to second peer
peerManager.Connected(peer2, nil)
peerManager.Connected(peer2)
// Send a broadcast to all peers, including cid that was already sent to
// first peer
......@@ -165,10 +169,12 @@ func TestBroadcastWantHaves(t *testing.T) {
// One of the want-haves was already sent to peer1
if len(collected[peer1].wantHaves) != 1 {
t.Fatal("Expected 1 want-haves to be sent to first peer", collected[peer1].wantHaves)
t.Fatalf("Expected 1 want-haves to be sent to first peer, got %d",
len(collected[peer1].wantHaves))
}
if len(collected[peer2].wantHaves) != 2 {
t.Fatal("Expected 2 want-haves to be sent to second peer")
if len(collected[peer2].wantHaves) != 3 {
t.Fatalf("Expected 3 want-haves to be sent to second peer, got %d",
len(collected[peer2].wantHaves))
}
}
......@@ -182,7 +188,7 @@ func TestSendWants(t *testing.T) {
peerManager := New(ctx, peerQueueFactory, self)
cids := testutil.GenerateCids(4)
peerManager.Connected(peer1, nil)
peerManager.Connected(peer1)
peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0]}, []cid.Cid{cids[2]})
collected := collectMessages(msgs, 2*time.Millisecond)
......@@ -217,8 +223,8 @@ func TestSendCancels(t *testing.T) {
cids := testutil.GenerateCids(4)
// Connect to peer1 and peer2
peerManager.Connected(peer1, nil)
peerManager.Connected(peer2, nil)
peerManager.Connected(peer1)
peerManager.Connected(peer2)
// Send 2 want-blocks and 1 want-have to peer1
peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0], cids[1]}, []cid.Cid{cids[2]})
......@@ -286,11 +292,11 @@ func TestSessionRegistration(t *testing.T) {
t.Fatal("Expected peer not be available till connected")
}
peerManager.Connected(p1, nil)
peerManager.Connected(p1)
if !s.available[p1] {
t.Fatal("Expected signal callback")
}
peerManager.Connected(p2, nil)
peerManager.Connected(p2)
if !s.available[p2] {
t.Fatal("Expected signal callback")
}
......@@ -305,7 +311,7 @@ func TestSessionRegistration(t *testing.T) {
peerManager.UnregisterSession(id)
peerManager.Connected(p1, nil)
peerManager.Connected(p1)
if s.available[p1] {
t.Fatal("Expected no signal callback (session unregistered)")
}
......
......@@ -19,10 +19,17 @@ type Gauge interface {
// peerWantManager keeps track of which want-haves and want-blocks have been
// sent to each peer, so that the PeerManager doesn't send duplicates.
type peerWantManager struct {
// peerWants maps peers to outstanding wants.
// A peer's wants is the _union_ of the broadcast wants and the wants in
// this list.
peerWants map[peer.ID]*peerWant
// Reverse index mapping wants to the peers that sent them. This is used
// to speed up cancels
// Reverse index of all wants in peerWants.
wantPeers map[cid.Cid]map[peer.ID]struct{}
// broadcastWants tracks all the current broadcast wants.
broadcastWants *cid.Set
// Keeps track of the number of active want-blocks
wantBlockGauge Gauge
}
......@@ -36,20 +43,24 @@ type peerWant struct {
// number of active want-blocks (ie sent but no response received)
func newPeerWantManager(wantBlockGauge Gauge) *peerWantManager {
return &peerWantManager{
broadcastWants: cid.NewSet(),
peerWants: make(map[peer.ID]*peerWant),
wantPeers: make(map[cid.Cid]map[peer.ID]struct{}),
wantBlockGauge: wantBlockGauge,
}
}
// AddPeer adds a peer whose wants we need to keep track of
func (pwm *peerWantManager) addPeer(p peer.ID) {
// addPeer adds a peer whose wants we need to keep track of. It returns the
// current list of broadcast wants that should be sent to the peer.
func (pwm *peerWantManager) addPeer(p peer.ID) []cid.Cid {
if _, ok := pwm.peerWants[p]; !ok {
pwm.peerWants[p] = &peerWant{
wantBlocks: cid.NewSet(),
wantHaves: cid.NewSet(),
}
return pwm.broadcastWants.Keys()
}
return nil
}
// RemovePeer removes a peer and its associated wants from tracking
......@@ -59,7 +70,7 @@ func (pwm *peerWantManager) removePeer(p peer.ID) {
return
}
pws.wantBlocks.ForEach(func(c cid.Cid) error {
_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
// Decrement the gauge by the number of pending want-blocks to the peer
pwm.wantBlockGauge.Dec()
// Clean up want-blocks from the reverse index
......@@ -68,7 +79,7 @@ func (pwm *peerWantManager) removePeer(p peer.ID) {
})
// Clean up want-haves from the reverse index
pws.wantHaves.ForEach(func(c cid.Cid) error {
_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
pwm.reverseIndexRemove(c, p)
return nil
})
......@@ -79,26 +90,30 @@ func (pwm *peerWantManager) removePeer(p peer.ID) {
// PrepareBroadcastWantHaves filters the list of want-haves for each peer,
// returning a map of peers to the want-haves they have not yet been sent.
func (pwm *peerWantManager) prepareBroadcastWantHaves(wantHaves []cid.Cid) map[peer.ID][]cid.Cid {
res := make(map[peer.ID][]cid.Cid)
// Iterate over all known peers
for p, pws := range pwm.peerWants {
// Iterate over all want-haves
res := make(map[peer.ID][]cid.Cid, len(pwm.peerWants))
for _, c := range wantHaves {
// If the CID has not been sent as a want-block or want-have
if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
// Record that the CID has been sent as a want-have
pws.wantHaves.Add(c)
// Update the reverse index
pwm.reverseIndexAdd(c, p)
// Add the CID to the results
if _, ok := res[p]; !ok {
res[p] = make([]cid.Cid, 0, 1)
if pwm.broadcastWants.Has(c) {
// Already a broadcast want, skip it.
continue
}
res[p] = append(res[p], c)
pwm.broadcastWants.Add(c)
// Prepare broadcast.
wantedBy := pwm.wantPeers[c]
for p := range pwm.peerWants {
// If we've already sent a want to this peer, skip them.
//
// This is faster than checking the actual wantlists due
// to better locality.
if _, ok := wantedBy[p]; ok {
continue
}
cids, ok := res[p]
if !ok {
cids = make([]cid.Cid, 0, len(wantHaves))
}
res[p] = append(cids, c)
}
}
......@@ -146,6 +161,12 @@ func (pwm *peerWantManager) prepareSendWants(p peer.ID, wantBlocks []cid.Cid, wa
// Iterate over the requested want-haves
for _, c := range wantHaves {
// If we've already broadcasted this want, don't bother with a
// want-have.
if pwm.broadcastWants.Has(c) {
continue
}
// If the CID has not been sent as a want-block or want-have
if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
// Record that the CID was sent as a want-have
......@@ -166,11 +187,36 @@ func (pwm *peerWantManager) prepareSendWants(p peer.ID, wantBlocks []cid.Cid, wa
// returning a map of peers which only contains cancels for wants that have
// been sent to the peer.
func (pwm *peerWantManager) prepareSendCancels(cancelKs []cid.Cid) map[peer.ID][]cid.Cid {
res := make(map[peer.ID][]cid.Cid)
if len(cancelKs) == 0 {
return nil
}
// Pre-allocate enough space for all peers that have the first CID.
// Chances are these peers are related.
expectedResSize := 0
firstCancel := cancelKs[0]
if pwm.broadcastWants.Has(firstCancel) {
expectedResSize = len(pwm.peerWants)
} else {
expectedResSize = len(pwm.wantPeers[firstCancel])
}
res := make(map[peer.ID][]cid.Cid, expectedResSize)
// Keep the broadcast keys separate. This lets us batch-process them at
// the end.
broadcastKs := make([]cid.Cid, 0, len(cancelKs))
// Iterate over all requested cancels
for _, c := range cancelKs {
// Iterate over peers that have sent a corresponding want
// Handle broadcast wants up-front.
isBroadcast := pwm.broadcastWants.Has(c)
if isBroadcast {
broadcastKs = append(broadcastKs, c)
pwm.broadcastWants.Remove(c)
}
// Even if this is a broadcast, we may have sent targeted wants.
// Deal with them.
for p := range pwm.wantPeers[c] {
pws, ok := pwm.peerWants[p]
if !ok {
......@@ -179,28 +225,45 @@ func (pwm *peerWantManager) prepareSendCancels(cancelKs []cid.Cid) map[peer.ID][
continue
}
isWantBlock := pws.wantBlocks.Has(c)
isWantHave := pws.wantHaves.Has(c)
// If the CID was sent as a want-block, decrement the want-block count
if isWantBlock {
// Update the want gauge.
if pws.wantBlocks.Has(c) {
pwm.wantBlockGauge.Dec()
}
// If the CID was sent as a want-block or want-have
if isWantBlock || isWantHave {
// Remove the CID from the recorded want-blocks and want-haves
// Unconditionally remove from the want lists.
pws.wantBlocks.Remove(c)
pws.wantHaves.Remove(c)
// Add the CID to the results
if _, ok := res[p]; !ok {
res[p] = make([]cid.Cid, 0, 1)
// If it's a broadcast want, we've already added it to
// the broadcastKs list.
if isBroadcast {
continue
}
res[p] = append(res[p], c)
// Update the reverse index
pwm.reverseIndexRemove(c, p)
// Add the CID to the result for the peer.
cids, ok := res[p]
if !ok {
// Pre-allocate enough for all keys.
// Cancels are usually related.
cids = make([]cid.Cid, 0, len(cancelKs))
}
res[p] = append(cids, c)
}
// Finally, batch-remove the reverse-index. There's no need to
// clear this index peer-by-peer.
delete(pwm.wantPeers, c)
}
// If we have any broadcasted CIDs, add them in.
//
// Doing this at the end can save us a bunch of work and allocations.
if len(broadcastKs) > 0 {
for p := range pwm.peerWants {
if cids, ok := res[p]; ok {
res[p] = append(cids, broadcastKs...)
} else {
res[p] = broadcastKs
}
}
}
......@@ -212,7 +275,7 @@ func (pwm *peerWantManager) prepareSendCancels(cancelKs []cid.Cid) map[peer.ID][
func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) {
peers, ok := pwm.wantPeers[c]
if !ok {
peers = make(map[peer.ID]struct{}, 1)
peers = make(map[peer.ID]struct{}, 10)
pwm.wantPeers[c] = peers
}
peers[p] = struct{}{}
......@@ -235,7 +298,7 @@ func (pwm *peerWantManager) getWantBlocks() []cid.Cid {
// Iterate over all known peers
for _, pws := range pwm.peerWants {
// Iterate over all want-blocks
pws.wantBlocks.ForEach(func(c cid.Cid) error {
_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
// Add the CID to the results
res.Add(c)
return nil
......@@ -249,41 +312,37 @@ func (pwm *peerWantManager) getWantBlocks() []cid.Cid {
func (pwm *peerWantManager) getWantHaves() []cid.Cid {
res := cid.NewSet()
// Iterate over all known peers
// Iterate over all peers with active wants.
for _, pws := range pwm.peerWants {
// Iterate over all want-haves
pws.wantHaves.ForEach(func(c cid.Cid) error {
_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
// Add the CID to the results
res.Add(c)
return nil
})
}
_ = pwm.broadcastWants.ForEach(func(c cid.Cid) error {
res.Add(c)
return nil
})
return res.Keys()
}
// GetWants returns the set of all wants (both want-blocks and want-haves).
func (pwm *peerWantManager) getWants() []cid.Cid {
res := cid.NewSet()
// Iterate over all known peers
for _, pws := range pwm.peerWants {
// Iterate over all want-blocks
pws.wantBlocks.ForEach(func(c cid.Cid) error {
// Add the CID to the results
res.Add(c)
return nil
})
res := pwm.broadcastWants.Keys()
// Iterate over all want-haves
pws.wantHaves.ForEach(func(c cid.Cid) error {
// Add the CID to the results
res.Add(c)
return nil
})
// Iterate over all targeted wants, removing ones that are also in the
// broadcast list.
for c := range pwm.wantPeers {
if pwm.broadcastWants.Has(c) {
continue
}
res = append(res, c)
}
return res.Keys()
return res
}
func (pwm *peerWantManager) String() string {
......
......@@ -38,8 +38,12 @@ func TestPrepareBroadcastWantHaves(t *testing.T) {
cids2 := testutil.GenerateCids(2)
cids3 := testutil.GenerateCids(2)
pwm.addPeer(peers[0])
pwm.addPeer(peers[1])
if blist := pwm.addPeer(peers[0]); len(blist) > 0 {
t.Errorf("expected no broadcast wants")
}
if blist := pwm.addPeer(peers[1]); len(blist) > 0 {
t.Errorf("expected no broadcast wants")
}
// Broadcast 2 cids to 2 peers
bcst := pwm.prepareBroadcastWantHaves(cids)
......@@ -104,16 +108,19 @@ func TestPrepareBroadcastWantHaves(t *testing.T) {
}
}
allCids := cids
allCids = append(allCids, cids2...)
allCids = append(allCids, cids3...)
allCids = append(allCids, cids4...)
// Add another peer
pwm.addPeer(peers[2])
bcst6 := pwm.prepareBroadcastWantHaves(cids)
if len(bcst6) != 1 {
t.Fatal("Expected 1 peer")
}
for p := range bcst6 {
if !testutil.MatchKeysIgnoreOrder(bcst6[p], cids) {
t.Fatal("Expected all cids to be broadcast")
bcst6 := pwm.addPeer(peers[2])
if !testutil.MatchKeysIgnoreOrder(bcst6, allCids) {
t.Fatalf("Expected all cids to be broadcast.")
}
if broadcast := pwm.prepareBroadcastWantHaves(allCids); len(broadcast) != 0 {
t.Errorf("did not expect to have CIDs to broadcast")
}
}
......
......@@ -6,7 +6,6 @@ import (
bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
"github.com/ipfs/go-bitswap/internal/sessionmanager"
bsswl "github.com/ipfs/go-bitswap/internal/sessionwantlist"
logging "github.com/ipfs/go-log"
cid "github.com/ipfs/go-cid"
......@@ -17,9 +16,8 @@ var log = logging.Logger("bitswap")
// PeerHandler sends wants / cancels to other peers
type PeerHandler interface {
// Connected is called when a peer connects, with any initial want-haves
// that have been broadcast to all peers (as part of session discovery)
Connected(p peer.ID, initialWants []cid.Cid)
// Connected is called when a peer connects.
Connected(p peer.ID)
// Disconnected is called when a peer disconnects
Disconnected(p peer.ID)
// BroadcastWantHaves sends want-haves to all connected peers
......@@ -38,11 +36,7 @@ type SessionManager interface {
// - informs the SessionManager and BlockPresenceManager of incoming information
// and cancelled sessions
// - informs the PeerManager of connects and disconnects
// - manages the list of want-haves that are broadcast to the internet
// (as opposed to being sent to specific peers)
type WantManager struct {
bcwl *bsswl.SessionWantlist
peerHandler PeerHandler
sim *bssim.SessionInterestManager
bpm *bsbpm.BlockPresenceManager
......@@ -52,7 +46,6 @@ type WantManager struct {
// New initializes a new WantManager for a given context.
func New(ctx context.Context, peerHandler PeerHandler, sim *bssim.SessionInterestManager, bpm *bsbpm.BlockPresenceManager) *WantManager {
return &WantManager{
bcwl: bsswl.NewSessionWantlist(),
peerHandler: peerHandler,
sim: sim,
bpm: bpm,
......@@ -69,8 +62,6 @@ func (wm *WantManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Ci
wm.bpm.ReceiveFrom(p, haves, dontHaves)
// Inform interested sessions
wm.sm.ReceiveFrom(p, blks, haves, dontHaves)
// Remove received blocks from broadcast wantlist
wm.bcwl.RemoveKeys(blks)
// Send CANCEL to all peers with want-have / want-block
wm.peerHandler.SendCancels(ctx, blks)
}
......@@ -78,11 +69,10 @@ func (wm *WantManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid.Ci
// BroadcastWantHaves is called when want-haves should be broadcast to all
// connected peers (as part of session discovery)
func (wm *WantManager) BroadcastWantHaves(ctx context.Context, ses uint64, wantHaves []cid.Cid) {
log.Debugf("BroadcastWantHaves session%d: %s", ses, wantHaves)
// Record broadcast wants
wm.bcwl.Add(wantHaves, ses)
// TODO: Avoid calling broadcast through here. It doesn't fit with
// everything else this module does.
log.Debugf("BroadcastWantHaves session%d: %s", ses, wantHaves)
// Send want-haves to all peers
wm.peerHandler.BroadcastWantHaves(ctx, wantHaves)
}
......@@ -92,9 +82,6 @@ func (wm *WantManager) RemoveSession(ctx context.Context, ses uint64) {
// Remove session's interest in the given blocks.
cancelKs := wm.sim.RemoveSessionInterest(ses)
// Remove broadcast want-haves for session
wm.bcwl.RemoveSession(ses)
// Free up block presence tracking for keys that no session is interested
// in anymore
wm.bpm.RemoveKeys(cancelKs)
......@@ -107,7 +94,7 @@ func (wm *WantManager) RemoveSession(ctx context.Context, ses uint64) {
func (wm *WantManager) Connected(p peer.ID) {
// Tell the peer handler that there is a new connection and give it the
// list of outstanding broadcast wants
wm.peerHandler.Connected(p, wm.bcwl.Keys())
wm.peerHandler.Connected(p)
}
// Disconnected is called when a peer disconnects
......
......@@ -14,13 +14,11 @@ import (
)
type fakePeerHandler struct {
lastInitialWants []cid.Cid
lastBcstWants []cid.Cid
lastCancels []cid.Cid
}
func (fph *fakePeerHandler) Connected(p peer.ID, initialWants []cid.Cid) {
fph.lastInitialWants = initialWants
func (fph *fakePeerHandler) Connected(p peer.ID) {
}
func (fph *fakePeerHandler) Disconnected(p peer.ID) {
......@@ -39,124 +37,6 @@ func (*fakeSessionManager) ReceiveFrom(p peer.ID, blks []cid.Cid, haves []cid.Ci
return nil
}
func TestInitialBroadcastWantsAddedCorrectly(t *testing.T) {
ctx := context.Background()
ph := &fakePeerHandler{}
sim := bssim.New()
bpm := bsbpm.New()
wm := New(context.Background(), ph, sim, bpm)
sm := &fakeSessionManager{}
wm.SetSessionManager(sm)
peers := testutil.GeneratePeers(3)
// Connect peer 0. Should not receive anything yet.
wm.Connected(peers[0])
if len(ph.lastInitialWants) != 0 {
t.Fatal("expected no initial wants")
}
// Broadcast 2 wants
wantHaves := testutil.GenerateCids(2)
wm.BroadcastWantHaves(ctx, 1, wantHaves)
if len(ph.lastBcstWants) != 2 {
t.Fatal("expected broadcast wants")
}
// Connect peer 1. Should receive all wants broadcast so far.
wm.Connected(peers[1])
if len(ph.lastInitialWants) != 2 {
t.Fatal("expected broadcast wants")
}
// Broadcast 3 more wants
wantHaves2 := testutil.GenerateCids(3)
wm.BroadcastWantHaves(ctx, 2, wantHaves2)
if len(ph.lastBcstWants) != 3 {
t.Fatal("expected broadcast wants")
}
// Connect peer 2. Should receive all wants broadcast so far.
wm.Connected(peers[2])
if len(ph.lastInitialWants) != 5 {
t.Fatal("expected all wants to be broadcast")
}
}
func TestReceiveFromRemovesBroadcastWants(t *testing.T) {
ctx := context.Background()
ph := &fakePeerHandler{}
sim := bssim.New()
bpm := bsbpm.New()
wm := New(context.Background(), ph, sim, bpm)
sm := &fakeSessionManager{}
wm.SetSessionManager(sm)
peers := testutil.GeneratePeers(3)
// Broadcast 2 wants
cids := testutil.GenerateCids(2)
wm.BroadcastWantHaves(ctx, 1, cids)
if len(ph.lastBcstWants) != 2 {
t.Fatal("expected broadcast wants")
}
// Connect peer 0. Should receive all wants.
wm.Connected(peers[0])
if len(ph.lastInitialWants) != 2 {
t.Fatal("expected broadcast wants")
}
// Receive block for first want
ks := cids[0:1]
haves := []cid.Cid{}
dontHaves := []cid.Cid{}
wm.ReceiveFrom(ctx, peers[1], ks, haves, dontHaves)
// Connect peer 2. Should get remaining want (the one that the block has
// not yet been received for).
wm.Connected(peers[2])
if len(ph.lastInitialWants) != 1 {
t.Fatal("expected remaining wants")
}
}
func TestRemoveSessionRemovesBroadcastWants(t *testing.T) {
ctx := context.Background()
ph := &fakePeerHandler{}
sim := bssim.New()
bpm := bsbpm.New()
wm := New(context.Background(), ph, sim, bpm)
sm := &fakeSessionManager{}
wm.SetSessionManager(sm)
peers := testutil.GeneratePeers(2)
// Broadcast 2 wants for session 0 and 2 wants for session 1
ses0 := uint64(0)
ses1 := uint64(1)
ses0wants := testutil.GenerateCids(2)
ses1wants := testutil.GenerateCids(2)
wm.BroadcastWantHaves(ctx, ses0, ses0wants)
wm.BroadcastWantHaves(ctx, ses1, ses1wants)
// Connect peer 0. Should receive all wants.
wm.Connected(peers[0])
if len(ph.lastInitialWants) != 4 {
t.Fatal("expected broadcast wants")
}
// Remove session 0
wm.RemoveSession(ctx, ses0)
// Connect peer 1. Should receive all wants from session that has not been
// removed.
wm.Connected(peers[1])
if len(ph.lastInitialWants) != 2 {
t.Fatal("expected broadcast wants")
}
}
func TestReceiveFrom(t *testing.T) {
ctx := context.Background()
ph := &fakePeerHandler{}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment