diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 928af7c4b84df74688217e232bbae358ba361507..119869677056532daf5fcefd82d78e7177984eea 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -55,6 +55,9 @@ type Envelope struct { Peer peer.ID // Message is the payload Message bsmsg.BitSwapMessage + + // A callback to notify the decision queue that the task is complete + Sent func() } type Engine struct { @@ -132,6 +135,9 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { block, err := e.bs.Get(nextTask.Entry.Key) if err != nil { + // If we don't have the block, don't hold that against the peer + // make sure to update that the task has been 'completed' + nextTask.Done() continue } @@ -140,6 +146,7 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { return &Envelope{ Peer: nextTask.Target, Message: m, + Sent: nextTask.Done, }, nil } } diff --git a/exchange/bitswap/decision/peer_request_queue.go b/exchange/bitswap/decision/peer_request_queue.go index a1c6ae1024af3ff501f8d1361701df3d6a8984f8..e771ece0bb6ccb3065972921f96f56bbf249b141 100644 --- a/exchange/bitswap/decision/peer_request_queue.go +++ b/exchange/bitswap/decision/peer_request_queue.go @@ -173,11 +173,11 @@ func wrapCmp(f func(a, b *peerRequestTask) bool) func(a, b pq.Elem) bool { } type activePartner struct { - lk sync.Mutex // Active is the number of blocks this peer is currently being sent // active must be locked around as it will be updated externally - active int + activelk sync.Mutex + active int // requests is the number of blocks this peer is currently requesting // request need not be locked around as it will only be modified under @@ -197,6 +197,7 @@ func partnerCompare(a, b pq.Elem) bool { pb := b.(*activePartner) // having no blocks in their wantlist means lowest priority + // having both of these checks ensures stability of the sort if pa.requests == 0 { return false } @@ -208,19 +209,19 @@ func partnerCompare(a, b pq.Elem) bool { // StartTask signals that a task was started for this partner func (p *activePartner) StartTask() { - p.lk.Lock() + p.activelk.Lock() p.active++ - p.lk.Unlock() + p.activelk.Unlock() } // TaskDone signals that a task was completed for this partner func (p *activePartner) TaskDone() { - p.lk.Lock() + p.activelk.Lock() p.active-- if p.active < 0 { panic("more tasks finished than started!") } - p.lk.Unlock() + p.activelk.Unlock() } // Index implements pq.Elem diff --git a/exchange/bitswap/decision/peer_request_queue_test.go b/exchange/bitswap/decision/peer_request_queue_test.go index cd8c4b1ff5176c6cbb1ad3751a40c85363aa5c02..96c136d6fcbeb6b309f47d58a40f7e8ff496eecf 100644 --- a/exchange/bitswap/decision/peer_request_queue_test.go +++ b/exchange/bitswap/decision/peer_request_queue_test.go @@ -105,10 +105,15 @@ func TestPeerRepeats(t *testing.T) { // Now, if one of the tasks gets finished, the next task off the queue should // be for the same peer - tasks[0].Done() - - ntask := prq.Pop() - if ntask.Target != tasks[0].Target { - t.Fatal("Expected task from peer with lowest active count") + for blockI := 0; blockI < 4; blockI++ { + for i := 0; i < 4; i++ { + // its okay to mark the same task done multiple times here (JUST FOR TESTING) + tasks[i].Done() + + ntask := prq.Pop() + if ntask.Target != tasks[i].Target { + t.Fatal("Expected task from peer with lowest active count") + } + } } } diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index fdd3c1549c9beeede6ea0374220fc5e391c3fcaf..370aa1a87ec8d387eda5184f59cbf60086a49fbf 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -51,6 +51,7 @@ func (bs *Bitswap) taskWorker(ctx context.Context) { } log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer) bs.send(ctx, envelope.Peer, envelope.Message) + envelope.Sent() case <-ctx.Done(): return }