Commit 36427bde authored by Jeromy's avatar Jeromy

try harder to not send duplicate blocks

parent 90fede8d
...@@ -46,7 +46,7 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) { ...@@ -46,7 +46,7 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
defer tl.lock.Unlock() defer tl.lock.Unlock()
partner, ok := tl.partners[to] partner, ok := tl.partners[to]
if !ok { if !ok {
partner = &activePartner{taskQueue: pq.New(wrapCmp(V1))} partner = newActivePartner()
tl.pQueue.Push(partner) tl.pQueue.Push(partner)
tl.partners[to] = partner tl.partners[to] = partner
} }
...@@ -57,12 +57,19 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) { ...@@ -57,12 +57,19 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
return return
} }
partner.activelk.Lock()
defer partner.activelk.Unlock()
_, ok = partner.activeBlocks[entry.Key]
if ok {
return
}
task := &peerRequestTask{ task := &peerRequestTask{
Entry: entry, Entry: entry,
Target: to, Target: to,
created: time.Now(), created: time.Now(),
Done: func() { Done: func() {
partner.TaskDone() partner.TaskDone(entry.Key)
tl.lock.Lock() tl.lock.Lock()
tl.pQueue.Update(partner.Index()) tl.pQueue.Update(partner.Index())
tl.lock.Unlock() tl.lock.Unlock()
...@@ -93,7 +100,7 @@ func (tl *prq) Pop() *peerRequestTask { ...@@ -93,7 +100,7 @@ func (tl *prq) Pop() *peerRequestTask {
continue // discarding tasks that have been removed continue // discarding tasks that have been removed
} }
partner.StartTask() partner.StartTask(out.Entry.Key)
partner.requests-- partner.requests--
break // and return |out| break // and return |out|
} }
...@@ -179,6 +186,8 @@ type activePartner struct { ...@@ -179,6 +186,8 @@ type activePartner struct {
activelk sync.Mutex activelk sync.Mutex
active int active int
activeBlocks map[u.Key]struct{}
// requests is the number of blocks this peer is currently requesting // requests is the number of blocks this peer is currently requesting
// request need not be locked around as it will only be modified under // request need not be locked around as it will only be modified under
// the peerRequestQueue's locks // the peerRequestQueue's locks
...@@ -191,6 +200,13 @@ type activePartner struct { ...@@ -191,6 +200,13 @@ type activePartner struct {
taskQueue pq.PQ taskQueue pq.PQ
} }
func newActivePartner() *activePartner {
return &activePartner{
taskQueue: pq.New(wrapCmp(V1)),
activeBlocks: make(map[u.Key]struct{}),
}
}
// partnerCompare implements pq.ElemComparator // partnerCompare implements pq.ElemComparator
func partnerCompare(a, b pq.Elem) bool { func partnerCompare(a, b pq.Elem) bool {
pa := a.(*activePartner) pa := a.(*activePartner)
...@@ -208,15 +224,17 @@ func partnerCompare(a, b pq.Elem) bool { ...@@ -208,15 +224,17 @@ func partnerCompare(a, b pq.Elem) bool {
} }
// StartTask signals that a task was started for this partner // StartTask signals that a task was started for this partner
func (p *activePartner) StartTask() { func (p *activePartner) StartTask(k u.Key) {
p.activelk.Lock() p.activelk.Lock()
p.activeBlocks[k] = struct{}{}
p.active++ p.active++
p.activelk.Unlock() p.activelk.Unlock()
} }
// TaskDone signals that a task was completed for this partner // TaskDone signals that a task was completed for this partner
func (p *activePartner) TaskDone() { func (p *activePartner) TaskDone(k u.Key) {
p.activelk.Lock() p.activelk.Lock()
delete(p.activeBlocks, k)
p.active-- p.active--
if p.active < 0 { if p.active < 0 {
panic("more tasks finished than started!") panic("more tasks finished than started!")
......
...@@ -11,7 +11,7 @@ import ( ...@@ -11,7 +11,7 @@ import (
u "github.com/ipfs/go-ipfs/util" u "github.com/ipfs/go-ipfs/util"
) )
var TaskWorkerCount = 16 var TaskWorkerCount = 8
func init() { func init() {
twc := os.Getenv("IPFS_BITSWAP_TASK_WORKERS") twc := os.Getenv("IPFS_BITSWAP_TASK_WORKERS")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment