Commit a45f185a authored by Jeromy's avatar Jeromy

some code cleanup and commenting

parent 219ed260
...@@ -55,9 +55,6 @@ type Envelope struct { ...@@ -55,9 +55,6 @@ type Envelope struct {
Peer peer.ID Peer peer.ID
// Message is the payload // Message is the payload
Message bsmsg.BitSwapMessage Message bsmsg.BitSwapMessage
// A callback to notify the decision queue that the task is complete
Sent func()
} }
type Engine struct { type Engine struct {
...@@ -143,7 +140,6 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { ...@@ -143,7 +140,6 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
return &Envelope{ return &Envelope{
Peer: nextTask.Target, Peer: nextTask.Target,
Message: m, Message: m,
Sent: nextTask.Done,
}, nil }, nil
} }
} }
......
...@@ -27,6 +27,7 @@ func newPRQ() peerRequestQueue { ...@@ -27,6 +27,7 @@ func newPRQ() peerRequestQueue {
} }
} }
// verify interface implementation
var _ peerRequestQueue = &prq{} var _ peerRequestQueue = &prq{}
// TODO: at some point, the strategy needs to plug in here // TODO: at some point, the strategy needs to plug in here
...@@ -81,12 +82,7 @@ func (tl *prq) Pop() *peerRequestTask { ...@@ -81,12 +82,7 @@ func (tl *prq) Pop() *peerRequestTask {
if tl.pQueue.Len() == 0 { if tl.pQueue.Len() == 0 {
return nil return nil
} }
pElem := tl.pQueue.Pop() partner := tl.pQueue.Pop().(*activePartner)
if pElem == nil {
return nil
}
partner := pElem.(*activePartner)
var out *peerRequestTask var out *peerRequestTask
for partner.taskQueue.Len() > 0 { for partner.taskQueue.Len() > 0 {
...@@ -97,6 +93,8 @@ func (tl *prq) Pop() *peerRequestTask { ...@@ -97,6 +93,8 @@ func (tl *prq) Pop() *peerRequestTask {
} }
break // and return |out| break // and return |out|
} }
// start the new task, and push the partner back onto the queue
partner.StartTask() partner.StartTask()
partner.requests-- partner.requests--
tl.pQueue.Push(partner) tl.pQueue.Push(partner)
...@@ -112,6 +110,8 @@ func (tl *prq) Remove(k u.Key, p peer.ID) { ...@@ -112,6 +110,8 @@ func (tl *prq) Remove(k u.Key, p peer.ID) {
// simply mark it as trash, so it'll be dropped when popped off the // simply mark it as trash, so it'll be dropped when popped off the
// queue. // queue.
t.trash = true t.trash = true
// having canceled a block, we now account for that in the given partner
tl.partners[p].requests-- tl.partners[p].requests--
} }
tl.lock.Unlock() tl.lock.Unlock()
...@@ -121,6 +121,7 @@ type peerRequestTask struct { ...@@ -121,6 +121,7 @@ type peerRequestTask struct {
Entry wantlist.Entry Entry wantlist.Entry
Target peer.ID Target peer.ID
// A callback to signal that this task has been completed
Done func() Done func()
// trash in a book-keeping field // trash in a book-keeping field
...@@ -135,10 +136,12 @@ func (t *peerRequestTask) Key() string { ...@@ -135,10 +136,12 @@ func (t *peerRequestTask) Key() string {
return taskKey(t.Target, t.Entry.Key) return taskKey(t.Target, t.Entry.Key)
} }
// Index implements pq.Elem
func (t *peerRequestTask) Index() int { func (t *peerRequestTask) Index() int {
return t.index return t.index
} }
// SetIndex implements pq.Elem
func (t *peerRequestTask) SetIndex(i int) { func (t *peerRequestTask) SetIndex(i int) {
t.index = i t.index = i
} }
...@@ -172,17 +175,22 @@ type activePartner struct { ...@@ -172,17 +175,22 @@ type activePartner struct {
lk sync.Mutex lk sync.Mutex
// Active is the number of blocks this peer is currently being sent // Active is the number of blocks this peer is currently being sent
// active must be locked around as it will be updated externally
active int active int
// requests is the number of blocks this peer is currently requesting // requests is the number of blocks this peer is currently requesting
// request need not be locked around as it will only be modified under
// the peerRequestQueue's locks
requests int requests int
// for the PQ interface
index int index int
// priority queue of // priority queue of tasks belonging to this peer
taskQueue pq.PQ taskQueue pq.PQ
} }
// partnerCompare implements pq.ElemComparator
func partnerCompare(a, b pq.Elem) bool { func partnerCompare(a, b pq.Elem) bool {
pa := a.(*activePartner) pa := a.(*activePartner)
pb := b.(*activePartner) pb := b.(*activePartner)
...@@ -197,12 +205,14 @@ func partnerCompare(a, b pq.Elem) bool { ...@@ -197,12 +205,14 @@ func partnerCompare(a, b pq.Elem) bool {
return pa.active < pb.active return pa.active < pb.active
} }
// StartTask signals that a task was started for this partner
func (p *activePartner) StartTask() { func (p *activePartner) StartTask() {
p.lk.Lock() p.lk.Lock()
p.active++ p.active++
p.lk.Unlock() p.lk.Unlock()
} }
// TaskDone signals that a task was completed for this partner
func (p *activePartner) TaskDone() { func (p *activePartner) TaskDone() {
p.lk.Lock() p.lk.Lock()
p.active-- p.active--
...@@ -212,10 +222,12 @@ func (p *activePartner) TaskDone() { ...@@ -212,10 +222,12 @@ func (p *activePartner) TaskDone() {
p.lk.Unlock() p.lk.Unlock()
} }
// Index implements pq.Elem
func (p *activePartner) Index() int { func (p *activePartner) Index() int {
return p.index return p.index
} }
// SetIndex implements pq.Elem
func (p *activePartner) SetIndex(i int) { func (p *activePartner) SetIndex(i int) {
p.index = i p.index = i
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment