Commit 8450a8d4 authored by Jeromy's avatar Jeromy

address comments from CR

parent 76e879c9
......@@ -55,6 +55,9 @@ type Envelope struct {
Peer peer.ID
// Message is the payload
Message bsmsg.BitSwapMessage
// A callback to notify the decision queue that the task is complete
Sent func()
}
type Engine struct {
......@@ -132,6 +135,9 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
block, err := e.bs.Get(nextTask.Entry.Key)
if err != nil {
// If we don't have the block, don't hold that against the peer
// make sure to update that the task has been 'completed'
nextTask.Done()
continue
}
......@@ -140,6 +146,7 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
return &Envelope{
Peer: nextTask.Target,
Message: m,
Sent: nextTask.Done,
}, nil
}
}
......
......@@ -173,11 +173,11 @@ func wrapCmp(f func(a, b *peerRequestTask) bool) func(a, b pq.Elem) bool {
}
type activePartner struct {
lk sync.Mutex
// Active is the number of blocks this peer is currently being sent
// active must be locked around as it will be updated externally
active int
activelk sync.Mutex
active int
// requests is the number of blocks this peer is currently requesting
// request need not be locked around as it will only be modified under
......@@ -197,6 +197,7 @@ func partnerCompare(a, b pq.Elem) bool {
pb := b.(*activePartner)
// having no blocks in their wantlist means lowest priority
// having both of these checks ensures stability of the sort
if pa.requests == 0 {
return false
}
......@@ -208,19 +209,19 @@ func partnerCompare(a, b pq.Elem) bool {
// StartTask signals that a task was started for this partner
func (p *activePartner) StartTask() {
p.lk.Lock()
p.activelk.Lock()
p.active++
p.lk.Unlock()
p.activelk.Unlock()
}
// TaskDone signals that a task was completed for this partner
func (p *activePartner) TaskDone() {
p.lk.Lock()
p.activelk.Lock()
p.active--
if p.active < 0 {
panic("more tasks finished than started!")
}
p.lk.Unlock()
p.activelk.Unlock()
}
// Index implements pq.Elem
......
......@@ -105,10 +105,15 @@ func TestPeerRepeats(t *testing.T) {
// Now, if one of the tasks gets finished, the next task off the queue should
// be for the same peer
tasks[0].Done()
ntask := prq.Pop()
if ntask.Target != tasks[0].Target {
t.Fatal("Expected task from peer with lowest active count")
for blockI := 0; blockI < 4; blockI++ {
for i := 0; i < 4; i++ {
// its okay to mark the same task done multiple times here (JUST FOR TESTING)
tasks[i].Done()
ntask := prq.Pop()
if ntask.Target != tasks[i].Target {
t.Fatal("Expected task from peer with lowest active count")
}
}
}
}
......@@ -51,6 +51,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) {
}
log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
bs.send(ctx, envelope.Peer, envelope.Message)
envelope.Sent()
case <-ctx.Done():
return
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment