Commit 7f9589bc authored by hannahhoward's avatar hannahhoward

feat(sessions): use all of wantBudget

As soon as peers appear, consume all of the want budget
parent 2ea8ba82
...@@ -341,8 +341,16 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) { ...@@ -341,8 +341,16 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
s.fetchcnt++ s.fetchcnt++
s.notif.Publish(blk) s.notif.Publish(blk)
if next := s.tofetch.Pop(); next.Defined() { toAdd := s.wantBudget()
s.wantBlocks(ctx, []cid.Cid{next}) if toAdd > s.tofetch.Len() {
toAdd = s.tofetch.Len()
}
if toAdd > 0 {
var keys []cid.Cid
for i := 0; i < toAdd; i++ {
keys = append(keys, s.tofetch.Pop())
}
s.wantBlocks(ctx, keys)
} }
s.pastWants.Push(c) s.pastWants.Push(c)
......
...@@ -97,8 +97,11 @@ func TestSessionGetBlocks(t *testing.T) { ...@@ -97,8 +97,11 @@ func TestSessionGetBlocks(t *testing.T) {
receivedBlocks = append(receivedBlocks, receivedBlock) receivedBlocks = append(receivedBlocks, receivedBlock)
cancelBlock := <-cancelReqs cancelBlock := <-cancelReqs
newCancelReqs = append(newCancelReqs, cancelBlock) newCancelReqs = append(newCancelReqs, cancelBlock)
wantBlock := <-wantReqs select {
case wantBlock := <-wantReqs:
newBlockReqs = append(newBlockReqs, wantBlock) newBlockReqs = append(newBlockReqs, wantBlock)
default:
}
} }
// verify new peers were recorded // verify new peers were recorded
...@@ -120,22 +123,22 @@ func TestSessionGetBlocks(t *testing.T) { ...@@ -120,22 +123,22 @@ func TestSessionGetBlocks(t *testing.T) {
t.Fatal("did not cancel each block once it was received") t.Fatal("did not cancel each block once it was received")
} }
// new session reqs should be targeted // new session reqs should be targeted
totalEnqueued := 0 var newCidsRequested []cid.Cid
for _, w := range newBlockReqs { for _, w := range newBlockReqs {
if len(w.peers) == 0 { if len(w.peers) == 0 {
t.Fatal("should not have broadcast again after initial broadcast") t.Fatal("should not have broadcast again after initial broadcast")
} }
totalEnqueued += len(w.cids) newCidsRequested = append(newCidsRequested, w.cids...)
} }
// full new round of cids should be requested // full new round of cids should be requested
if totalEnqueued != broadcastLiveWantsLimit { if len(newCidsRequested) != broadcastLiveWantsLimit {
t.Fatal("new blocks were not requested") t.Fatal("new blocks were not requested")
} }
// receive remaining blocks // receive remaining blocks
for i, p := range peers { for i, p := range peers {
session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, newBlockReqs[i].cids[0])]) session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, newCidsRequested[i])])
receivedBlock := <-getBlocksCh receivedBlock := <-getBlocksCh
receivedBlocks = append(receivedBlocks, receivedBlock) receivedBlocks = append(receivedBlocks, receivedBlock)
cancelBlock := <-cancelReqs cancelBlock := <-cancelReqs
...@@ -190,7 +193,7 @@ func TestSessionFindMorePeers(t *testing.T) { ...@@ -190,7 +193,7 @@ func TestSessionFindMorePeers(t *testing.T) {
// verify a broadcast was made // verify a broadcast was made
receivedWantReq := <-wantReqs receivedWantReq := <-wantReqs
if len(receivedWantReq.cids) != broadcastLiveWantsLimit { if len(receivedWantReq.cids) < broadcastLiveWantsLimit {
t.Fatal("did not rebroadcast whole live list") t.Fatal("did not rebroadcast whole live list")
} }
if receivedWantReq.peers != nil { if receivedWantReq.peers != nil {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment