Commit 369b794b authored by Dirk McCormick's avatar Dirk McCormick

test: fix session tests

parent 960f6971
......@@ -162,7 +162,7 @@ func TestSessionGetBlocks(t *testing.T) {
// Simulate receiving block for a CID
session.ReceiveFrom(peers[1], []cid.Cid{blks[0].Cid()}, []cid.Cid{}, []cid.Cid{})
time.Sleep(100 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
// Verify session no longer wants received block
wanted, unwanted := sim.SplitWantedUnwanted(blks)
......
......@@ -72,10 +72,11 @@ func TestSendWants(t *testing.T) {
peerA := peers[0]
sid := uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
go spm.Run()
......@@ -83,7 +84,7 @@ func TestSendWants(t *testing.T) {
blkCids0 := cids[0:2]
spm.Add(blkCids0)
// peerA: HAVE cid0
spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}, true)
spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{})
// Wait for processing to complete
peerSends := pm.waitNextWants()
......@@ -109,10 +110,11 @@ func TestSendsWantBlockToOnePeerOnly(t *testing.T) {
peerB := peers[1]
sid := uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
go spm.Run()
......@@ -120,7 +122,7 @@ func TestSendsWantBlockToOnePeerOnly(t *testing.T) {
blkCids0 := cids[0:2]
spm.Add(blkCids0)
// peerA: HAVE cid0
spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}, true)
spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{})
// Wait for processing to complete
peerSends := pm.waitNextWants()
......@@ -139,7 +141,7 @@ func TestSendsWantBlockToOnePeerOnly(t *testing.T) {
pm.clearWants()
// peerB: HAVE cid0
spm.Update(peerB, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}, true)
spm.Update(peerB, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{})
// Wait for processing to complete
peerSends = pm.waitNextWants()
......@@ -166,17 +168,18 @@ func TestReceiveBlock(t *testing.T) {
peerB := peers[1]
sid := uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
go spm.Run()
// add cid0, cid1
spm.Add(cids)
// peerA: HAVE cid0
spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}, true)
spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{})
// Wait for processing to complete
peerSends := pm.waitNextWants()
......@@ -196,10 +199,10 @@ func TestReceiveBlock(t *testing.T) {
// peerA: block cid0, DONT_HAVE cid1
bpm.ReceiveFrom(peerA, []cid.Cid{}, []cid.Cid{cids[1]})
spm.Update(peerA, []cid.Cid{cids[0]}, []cid.Cid{}, []cid.Cid{cids[1]}, false)
spm.Update(peerA, []cid.Cid{cids[0]}, []cid.Cid{}, []cid.Cid{cids[1]})
// peerB: HAVE cid0, cid1
bpm.ReceiveFrom(peerB, cids, []cid.Cid{})
spm.Update(peerB, []cid.Cid{}, cids, []cid.Cid{}, true)
spm.Update(peerB, []cid.Cid{}, cids, []cid.Cid{})
// Wait for processing to complete
peerSends = pm.waitNextWants()
......@@ -225,17 +228,18 @@ func TestPeerUnavailable(t *testing.T) {
peerB := peers[1]
sid := uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
go spm.Run()
// add cid0, cid1
spm.Add(cids)
// peerA: HAVE cid0
spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}, true)
spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{})
// Wait for processing to complete
peerSends := pm.waitNextWants()
......@@ -254,7 +258,7 @@ func TestPeerUnavailable(t *testing.T) {
pm.clearWants()
// peerB: HAVE cid0
spm.Update(peerB, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}, true)
spm.Update(peerB, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{})
// Wait for processing to complete
peerSends = pm.waitNextWants()
......@@ -283,12 +287,13 @@ func TestPeerUnavailable(t *testing.T) {
}
func TestPeersExhausted(t *testing.T) {
cids := testutil.GenerateCids(2)
cids := testutil.GenerateCids(3)
peers := testutil.GeneratePeers(2)
peerA := peers[0]
peerB := peers[1]
sid := uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
......@@ -296,53 +301,62 @@ func TestPeersExhausted(t *testing.T) {
onPeersExhausted := func(ks []cid.Cid) {
exhausted = append(exhausted, ks...)
}
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
go spm.Run()
// add cid0, cid1
spm.Add(cids)
// peerA: DONT_HAVE cid0
bpm.ReceiveFrom(peerA, []cid.Cid{}, []cid.Cid{cids[0]})
// peerA: HAVE cid0
bpm.ReceiveFrom(peerA, []cid.Cid{cids[0]}, []cid.Cid{})
// Note: this also registers peer A as being available
spm.Update(peerA, []cid.Cid{}, []cid.Cid{}, []cid.Cid{cids[0]}, true)
spm.Update(peerA, []cid.Cid{cids[0]}, []cid.Cid{}, []cid.Cid{})
// peerA: DONT_HAVE cid1
bpm.ReceiveFrom(peerA, []cid.Cid{}, []cid.Cid{cids[1]})
spm.Update(peerA, []cid.Cid{}, []cid.Cid{}, []cid.Cid{cids[1]})
time.Sleep(5 * time.Millisecond)
// All available peers (peer A) have sent us a DONT_HAVE for cid0,
// so expect that onPeersExhausted() will be called with cid0
if !testutil.MatchKeysIgnoreOrder(exhausted, []cid.Cid{cids[0]}) {
// All available peers (peer A) have sent us a DONT_HAVE for cid1,
// so expect that onPeersExhausted() will be called with cid1
if !testutil.MatchKeysIgnoreOrder(exhausted, []cid.Cid{cids[1]}) {
t.Fatal("Wrong keys")
}
// Clear exhausted cids
exhausted = []cid.Cid{}
// peerB: DONT_HAVE cid0, cid1
bpm.ReceiveFrom(peerB, []cid.Cid{}, cids)
spm.Update(peerB, []cid.Cid{}, []cid.Cid{}, cids, true)
// peerB: HAVE cid0
bpm.ReceiveFrom(peerB, []cid.Cid{cids[0]}, []cid.Cid{})
// Note: this also registers peer B as being available
spm.Update(peerB, []cid.Cid{cids[0]}, []cid.Cid{}, []cid.Cid{})
// peerB: DONT_HAVE cid1, cid2
bpm.ReceiveFrom(peerB, []cid.Cid{}, []cid.Cid{cids[1], cids[2]})
spm.Update(peerB, []cid.Cid{}, []cid.Cid{}, []cid.Cid{cids[1], cids[2]})
// Wait for processing to complete
pm.waitNextWants()
// All available peers (peer A and peer B) have sent us a DONT_HAVE
// for cid0, but we already called onPeersExhausted with cid0, so it
// for cid1, but we already called onPeersExhausted with cid1, so it
// should not be called again
if len(exhausted) > 0 {
t.Fatal("Wrong keys")
}
// peerA: DONT_HAVE cid1
bpm.ReceiveFrom(peerA, []cid.Cid{}, []cid.Cid{cids[1]})
spm.Update(peerA, []cid.Cid{}, []cid.Cid{}, []cid.Cid{cids[1]}, false)
// peerA: DONT_HAVE cid2
bpm.ReceiveFrom(peerA, []cid.Cid{}, []cid.Cid{cids[2]})
spm.Update(peerA, []cid.Cid{}, []cid.Cid{}, []cid.Cid{cids[2]})
// Wait for processing to complete
pm.waitNextWants()
// All available peers (peer A and peer B) have sent us a DONT_HAVE for
// cid1, so expect that onPeersExhausted() will be called with cid1
if !testutil.MatchKeysIgnoreOrder(exhausted, []cid.Cid{cids[1]}) {
// cid2, so expect that onPeersExhausted() will be called with cid2
if !testutil.MatchKeysIgnoreOrder(exhausted, []cid.Cid{cids[2]}) {
t.Fatal("Wrong keys")
}
}
......@@ -358,6 +372,7 @@ func TestPeersExhaustedLastWaitingPeerUnavailable(t *testing.T) {
peerB := peers[1]
sid := uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
......@@ -365,7 +380,7 @@ func TestPeersExhaustedLastWaitingPeerUnavailable(t *testing.T) {
onPeersExhausted := func(ks []cid.Cid) {
exhausted = append(exhausted, ks...)
}
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
go spm.Run()
......@@ -375,15 +390,15 @@ func TestPeersExhaustedLastWaitingPeerUnavailable(t *testing.T) {
// peerA: HAVE cid0
bpm.ReceiveFrom(peerA, []cid.Cid{cids[0]}, []cid.Cid{})
// Note: this also registers peer A as being available
spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}, true)
spm.Update(peerA, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{})
// peerB: HAVE cid0
bpm.ReceiveFrom(peerB, []cid.Cid{cids[0]}, []cid.Cid{})
// Note: this also registers peer B as being available
spm.Update(peerB, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}, true)
spm.Update(peerB, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{})
// peerA: DONT_HAVE cid1
bpm.ReceiveFrom(peerA, []cid.Cid{}, []cid.Cid{cids[1]})
spm.Update(peerA, []cid.Cid{}, []cid.Cid{}, []cid.Cid{cids[0]}, false)
spm.Update(peerA, []cid.Cid{}, []cid.Cid{}, []cid.Cid{cids[0]})
time.Sleep(5 * time.Millisecond)
......@@ -408,6 +423,7 @@ func TestPeersExhaustedAllPeersUnavailable(t *testing.T) {
peerB := peers[1]
sid := uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
......@@ -415,7 +431,7 @@ func TestPeersExhaustedAllPeersUnavailable(t *testing.T) {
onPeersExhausted := func(ks []cid.Cid) {
exhausted = append(exhausted, ks...)
}
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
go spm.Run()
......@@ -423,11 +439,11 @@ func TestPeersExhaustedAllPeersUnavailable(t *testing.T) {
spm.Add(cids)
// peerA: receive block for cid0 (and register peer A with sessionWantSender)
spm.Update(peerA, []cid.Cid{cids[0]}, []cid.Cid{}, []cid.Cid{}, true)
spm.Update(peerA, []cid.Cid{cids[0]}, []cid.Cid{}, []cid.Cid{})
// peerB: HAVE cid1
bpm.ReceiveFrom(peerB, []cid.Cid{cids[0]}, []cid.Cid{})
// Note: this also registers peer B as being available
spm.Update(peerB, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{}, true)
spm.Update(peerB, []cid.Cid{}, []cid.Cid{cids[0]}, []cid.Cid{})
time.Sleep(5 * time.Millisecond)
......@@ -449,10 +465,11 @@ func TestConsecutiveDontHaveLimit(t *testing.T) {
p := testutil.GeneratePeers(1)[0]
sid := uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
go spm.Run()
......@@ -461,41 +478,41 @@ func TestConsecutiveDontHaveLimit(t *testing.T) {
// Receive a HAVE from peer (adds it to the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{})
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
if has := fpm.HasPeer(p); !has {
t.Fatal("Expected peer to be available")
}
// Receive DONT_HAVEs from peer that do not exceed limit
for _, c := range cids[1:peerDontHaveLimit] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c})
}
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
if has := fpm.HasPeer(p); !has {
t.Fatal("Expected peer to be available")
}
// Receive DONT_HAVEs from peer that exceed limit
for _, c := range cids[peerDontHaveLimit:] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c})
}
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Session should remove peer
if avail, _ := spm.isAvailable(p); avail {
if has := fpm.HasPeer(p); has {
t.Fatal("Expected peer not to be available")
}
}
......@@ -505,10 +522,11 @@ func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) {
p := testutil.GeneratePeers(1)[0]
sid := uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
go spm.Run()
......@@ -517,13 +535,13 @@ func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) {
// Receive a HAVE from peer (adds it to the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{})
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
if has := fpm.HasPeer(p); !has {
t.Fatal("Expected peer to be available")
}
......@@ -533,24 +551,24 @@ func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) {
for _, c := range cids[1:peerDontHaveLimit] {
// DONT_HAVEs
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c})
}
for _, c := range cids[peerDontHaveLimit : peerDontHaveLimit+1] {
// HAVEs
bpm.ReceiveFrom(p, []cid.Cid{c}, []cid.Cid{})
spm.Update(p, []cid.Cid{}, []cid.Cid{c}, []cid.Cid{}, false)
spm.Update(p, []cid.Cid{}, []cid.Cid{c}, []cid.Cid{})
}
for _, c := range cids[peerDontHaveLimit+1:] {
// DONT_HAVEs
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c})
}
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
if has := fpm.HasPeer(p); !has {
t.Fatal("Expected peer to be available")
}
}
......@@ -560,10 +578,11 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
p := testutil.GeneratePeers(1)[0]
sid := uint64(1)
pm := newMockPeerManager()
fpm := newFakeSessionPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
go spm.Run()
......@@ -572,39 +591,39 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
// Receive a HAVE from peer (adds it to the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{})
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
if has := fpm.HasPeer(p); !has {
t.Fatal("Expected peer to be available")
}
// Receive DONT_HAVEs from peer that exceed limit
for _, c := range cids[1 : peerDontHaveLimit+2] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c})
}
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Session should remove peer
if avail, _ := spm.isAvailable(p); avail {
if has := fpm.HasPeer(p); has {
t.Fatal("Expected peer not to be available")
}
// Receive a HAVE from peer (adds it back into the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{})
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
if has := fpm.HasPeer(p); !has {
t.Fatal("Expected peer to be available")
}
......@@ -613,28 +632,28 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
// Receive DONT_HAVEs from peer that don't exceed limit
for _, c := range cids2[1:peerDontHaveLimit] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c})
}
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
if has := fpm.HasPeer(p); !has {
t.Fatal("Expected peer to be available")
}
// Receive DONT_HAVEs from peer that exceed limit
for _, c := range cids2[peerDontHaveLimit:] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c})
}
// Wait for processing to complete
time.Sleep(5 * time.Millisecond)
// Session should remove peer
if avail, _ := spm.isAvailable(p); avail {
if has := fpm.HasPeer(p); has {
t.Fatal("Expected peer not to be available")
}
}
......@@ -109,6 +109,14 @@ func (spm *SessionPeerManager) HasPeers() bool {
return len(spm.peers) > 0
}
func (spm *SessionPeerManager) HasPeer(p peer.ID) bool {
spm.plk.RLock()
defer spm.plk.RUnlock()
_, ok := spm.peers[p]
return ok
}
// Shutdown untags all the peers
func (spm *SessionPeerManager) Shutdown() {
spm.plk.Lock()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment