Commit da397938 authored by Dirk McCormick's avatar Dirk McCormick

feat: always pop first block even if over-sized

parent fbf23b3b
......@@ -150,7 +150,9 @@ func (ptq *PeerTaskQueue) PushTasks(to peer.ID, tasks ...peertask.Task) {
}
// PopTasks pops the highest priority tasks from the peer off the queue, up to
// the given maximum size of those tasks.
// the given maximum size of those tasks. Note that the first task is always
// popped off the queue even if it's over maxSize, to prevent large tasks from
// blocking up the queue.
func (ptq *PeerTaskQueue) PopTasks(maxSize int) (peer.ID, []*peertask.Task) {
ptq.lock.Lock()
defer ptq.lock.Unlock()
......
......@@ -164,9 +164,14 @@ func (p *PeerTracker) PushTasks(tasks []peertask.Task) {
}
// PopTasks pops as many tasks as possible up to the given size off the queue
// in priority order
// in priority order. Note that the first task is always popped, even if it's
// over maxSize, so that large tasks don't block up the queue.
func (p *PeerTracker) PopTasks(maxSize int) []*peertask.Task {
var out []*peertask.Task
if maxSize <= 0 {
return out
}
size := 0
for p.taskQueue.Len() > 0 && p.freezeVal == 0 {
// Peek at the next task in the queue
......@@ -179,8 +184,11 @@ func (p *PeerTracker) PopTasks(maxSize int) []*peertask.Task {
continue
}
// If the next task is too big for the message
if size+task.Size > maxSize {
// We always pop the first task off the queue, even if it's bigger
// than the max size. This ensures that big tasks don't get stuck in
// the queue forever.
// After the first task, pop tasks if they fit under the max size.
if len(out) > 0 && size+task.Size > maxSize {
// We have as many tasks as we can fit into the message, so return
// the tasks
return out
......
......@@ -81,12 +81,8 @@ func TestPushPopSizeAndOrder(t *testing.T) {
},
}
tracker.PushTasks(tasks)
popped := tracker.PopTasks(5)
if len(popped) != 0 {
t.Fatal("Expected 0 tasks")
}
popped = tracker.PopTasks(10)
popped := tracker.PopTasks(10)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
......@@ -108,6 +104,38 @@ func TestPushPopSizeAndOrder(t *testing.T) {
}
}
func TestPushPopFirstItemOversized(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 20,
Size: 10,
},
peertask.Task{
Topic: "2",
Priority: 10,
Size: 5,
},
}
tracker.PushTasks(tasks)
// Pop with max size 7.
// PopTasks should always return the first task even if it's over max size
// (this is to prevent large tasks from blocking up the queue).
popped := tracker.PopTasks(7)
if len(popped) != 1 || popped[0].Topic != "1" {
t.Fatal("Expected first task to be popped")
}
popped = tracker.PopTasks(100)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
}
func TestRemove(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
tracker := New(partner, &DefaultTaskMerger{})
......@@ -270,31 +298,41 @@ func TestReplaceTaskSize(t *testing.T) {
tasks := []peertask.Task{
peertask.Task{
Topic: "1",
Priority: 10,
Priority: 20,
Size: 10,
Data: "a",
},
peertask.Task{
Topic: "1",
Priority: 20,
Size: 20,
Topic: "2",
Priority: 10,
Size: 10,
Data: "b",
},
peertask.Task{
Topic: "2",
Priority: 10,
Size: 30,
Data: "c",
},
}
// Push task "a"
tracker.PushTasks([]peertask.Task{tasks[0]}) // Topic "1"
// Push task "b". Has same topic and permissive task merger, so should
// replace task "a", including new size
tracker.PushTasks([]peertask.Task{tasks[1]}) // Topic "1"
// Push task "b"
tracker.PushTasks([]peertask.Task{tasks[1]}) // Topic "2"
// Pop with maxSize 10. Should not pop anything because Size is now 20.
popped := tracker.PopTasks(10)
if len(popped) != 0 {
t.Fatal("Expected no tasks")
// Push task "c". Has same topic as task "b" and permissive task merger,
// so should replace task "b", and update its size
tracker.PushTasks([]peertask.Task{tasks[2]}) // Topic "2"
// Pop with maxSize 20. Should only pop task "a" because only other task
// (with Topic "2") now has size 30.
popped := tracker.PopTasks(20)
if len(popped) != 1 || popped[0].Data != "a" {
t.Fatal("Expected 1 task", popped[0], popped[1])
}
popped = tracker.PopTasks(20)
popped = tracker.PopTasks(30)
if len(popped) != 1 {
t.Fatal("Expected 1 task")
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment