Commit 0a309a17 authored by hannahhoward's avatar hannahhoward

fix(decision): cleanup request queues

Make sure when request queues are idle that they are removed

fix #112
parent d1f829be
...@@ -136,7 +136,17 @@ func (tl *prq) Pop() *peerRequestTask { ...@@ -136,7 +136,17 @@ func (tl *prq) Pop() *peerRequestTask {
break // and return |out| break // and return |out|
} }
if partner.IsIdle() {
for target, testPartner := range tl.partners {
if testPartner == partner {
delete(tl.partners, target)
delete(tl.frozen, target)
break
}
}
} else {
tl.pQueue.Push(partner) tl.pQueue.Push(partner)
}
return out return out
} }
...@@ -323,6 +333,7 @@ func (p *activePartner) StartTask(k cid.Cid) { ...@@ -323,6 +333,7 @@ func (p *activePartner) StartTask(k cid.Cid) {
// TaskDone signals that a task was completed for this partner. // TaskDone signals that a task was completed for this partner.
func (p *activePartner) TaskDone(k cid.Cid) { func (p *activePartner) TaskDone(k cid.Cid) {
p.activelk.Lock() p.activelk.Lock()
p.activeBlocks.Remove(k) p.activeBlocks.Remove(k)
p.active-- p.active--
if p.active < 0 { if p.active < 0 {
...@@ -331,6 +342,12 @@ func (p *activePartner) TaskDone(k cid.Cid) { ...@@ -331,6 +342,12 @@ func (p *activePartner) TaskDone(k cid.Cid) {
p.activelk.Unlock() p.activelk.Unlock()
} }
func (p *activePartner) IsIdle() bool {
p.activelk.Lock()
defer p.activelk.Unlock()
return p.requests == 0 && p.active == 0
}
// Index implements pq.Elem. // Index implements pq.Elem.
func (p *activePartner) Index() int { func (p *activePartner) Index() int {
return p.index return p.index
......
...@@ -128,3 +128,35 @@ func TestPeerRepeats(t *testing.T) { ...@@ -128,3 +128,35 @@ func TestPeerRepeats(t *testing.T) {
} }
} }
} }
func TestCleaningUpQueues(t *testing.T) {
partner := testutil.RandPeerIDFatal(t)
var entries []wantlist.Entry
for i := 0; i < 5; i++ {
entries = append(entries, wantlist.Entry{Cid: cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i))))})
}
prq := newPRQ()
// push a block, pop a block, complete everything, should be removed
prq.Push(partner, entries...)
task := prq.Pop()
task.Done(task.Entries)
task = prq.Pop()
if task != nil || len(prq.partners) > 0 || prq.pQueue.Len() > 0 {
t.Fatal("Partner should have been removed because it's idle")
}
// push a block, remove each of its entries, should be removed
prq.Push(partner, entries...)
for _, entry := range entries {
prq.Remove(entry.Cid, partner)
}
task = prq.Pop()
if task != nil || len(prq.partners) > 0 || prq.pQueue.Len() > 0 {
t.Fatal("Partner should have been removed because it's idle")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment