Unverified Commit 61f12234 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #116 from ipfs/bugs/queue-memory-leak

fix(decision): cleanup request queues
parents d1f829be 0bdc018c
...@@ -51,7 +51,7 @@ func (tl *prq) Push(to peer.ID, entries ...wantlist.Entry) { ...@@ -51,7 +51,7 @@ func (tl *prq) Push(to peer.ID, entries ...wantlist.Entry) {
defer tl.lock.Unlock() defer tl.lock.Unlock()
partner, ok := tl.partners[to] partner, ok := tl.partners[to]
if !ok { if !ok {
partner = newActivePartner() partner = newActivePartner(to)
tl.pQueue.Push(partner) tl.pQueue.Push(partner)
tl.partners[to] = partner tl.partners[to] = partner
} }
...@@ -136,7 +136,13 @@ func (tl *prq) Pop() *peerRequestTask { ...@@ -136,7 +136,13 @@ func (tl *prq) Pop() *peerRequestTask {
break // and return |out| break // and return |out|
} }
if partner.IsIdle() {
target := partner.target
delete(tl.partners, target)
delete(tl.frozen, target)
} else {
tl.pQueue.Push(partner) tl.pQueue.Push(partner)
}
return out return out
} }
...@@ -252,7 +258,7 @@ func wrapCmp(f func(a, b *peerRequestTask) bool) func(a, b pq.Elem) bool { ...@@ -252,7 +258,7 @@ func wrapCmp(f func(a, b *peerRequestTask) bool) func(a, b pq.Elem) bool {
} }
type activePartner struct { type activePartner struct {
target peer.ID
// Active is the number of blocks this peer is currently being sent // Active is the number of blocks this peer is currently being sent
// active must be locked around as it will be updated externally // active must be locked around as it will be updated externally
activelk sync.Mutex activelk sync.Mutex
...@@ -274,8 +280,9 @@ type activePartner struct { ...@@ -274,8 +280,9 @@ type activePartner struct {
taskQueue pq.PQ taskQueue pq.PQ
} }
func newActivePartner() *activePartner { func newActivePartner(target peer.ID) *activePartner {
return &activePartner{ return &activePartner{
target: target,
taskQueue: pq.New(wrapCmp(V1)), taskQueue: pq.New(wrapCmp(V1)),
activeBlocks: cid.NewSet(), activeBlocks: cid.NewSet(),
} }
...@@ -323,6 +330,7 @@ func (p *activePartner) StartTask(k cid.Cid) { ...@@ -323,6 +330,7 @@ func (p *activePartner) StartTask(k cid.Cid) {
// TaskDone signals that a task was completed for this partner. // TaskDone signals that a task was completed for this partner.
func (p *activePartner) TaskDone(k cid.Cid) { func (p *activePartner) TaskDone(k cid.Cid) {
p.activelk.Lock() p.activelk.Lock()
p.activeBlocks.Remove(k) p.activeBlocks.Remove(k)
p.active-- p.active--
if p.active < 0 { if p.active < 0 {
...@@ -331,6 +339,12 @@ func (p *activePartner) TaskDone(k cid.Cid) { ...@@ -331,6 +339,12 @@ func (p *activePartner) TaskDone(k cid.Cid) {
p.activelk.Unlock() p.activelk.Unlock()
} }
func (p *activePartner) IsIdle() bool {
p.activelk.Lock()
defer p.activelk.Unlock()
return p.requests == 0 && p.active == 0
}
// Index implements pq.Elem. // Index implements pq.Elem.
func (p *activePartner) Index() int { func (p *activePartner) Index() int {
return p.index return p.index
......
...@@ -128,3 +128,35 @@ func TestPeerRepeats(t *testing.T) { ...@@ -128,3 +128,35 @@ func TestPeerRepeats(t *testing.T) {
} }
} }
} }
func TestCleaningUpQueues(t *testing.T) {
partner := testutil.RandPeerIDFatal(t)
var entries []wantlist.Entry
for i := 0; i < 5; i++ {
entries = append(entries, wantlist.Entry{Cid: cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i))))})
}
prq := newPRQ()
// push a block, pop a block, complete everything, should be removed
prq.Push(partner, entries...)
task := prq.Pop()
task.Done(task.Entries)
task = prq.Pop()
if task != nil || len(prq.partners) > 0 || prq.pQueue.Len() > 0 {
t.Fatal("Partner should have been removed because it's idle")
}
// push a block, remove each of its entries, should be removed
prq.Push(partner, entries...)
for _, entry := range entries {
prq.Remove(entry.Cid, partner)
}
task = prq.Pop()
if task != nil || len(prq.partners) > 0 || prq.pQueue.Len() > 0 {
t.Fatal("Partner should have been removed because it's idle")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment