Commit ec64ac1c authored by Dirk McCormick's avatar Dirk McCormick

fix: replace active DONT_HAVE with have/block

parent 746d0f9f
......@@ -17,13 +17,12 @@ type PeerTracker struct {
// Tasks that are pending being made active
pendingTasks map[string]*peertask.QueueTask
// Tasks that have been made active
// active must be locked around as it will be updated externally
activeTasks map[string]*peertask.QueueTask
// activeBytes must be locked around as it will be updated externally
activelk sync.Mutex
activeBytes int
// map of task id -> size in bytes
activeTasks map[string]int
// for the PQ interface
index int
......@@ -40,7 +39,7 @@ func New(target peer.ID) *PeerTracker {
target: target,
taskQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)),
pendingTasks: make(map[string]*peertask.QueueTask),
activeTasks: make(map[string]int),
activeTasks: make(map[string]*peertask.QueueTask),
}
}
......@@ -115,20 +114,27 @@ func (p *PeerTracker) PushTasks(tasks []peertask.Task) {
taskId := p.getTaskId(task.Identifier, task.IsWantBlock)
// If the task is currently active (being processed)
if isActiveTaskAWantBlock, ok := p.isAnyTaskWithIdentifierActive(task.Identifier); ok {
if existingTask, ok := p.anyActiveTaskWithIdentifier(task.Identifier); ok {
// We can replace a want-have with a want-block
replaceHaveWithBlock := !existingTask.IsWantBlock && task.IsWantBlock
// We can replace a DONT_HAVE with a HAVE or a block
replaceDontHave := existingTask.IsDontHave && !task.IsDontHave
// We can only replace tasks that are not active.
// If the active task could not have been replaced (even if it
// wasn't active) by the new task, that means the new task is
// not doing anything useful, so skip adding the new task.
if isActiveTaskAWantBlock || !task.IsWantBlock {
canReplace := replaceHaveWithBlock || replaceDontHave
if !canReplace {
continue
}
} else if existingTask, ok := p.anyPendingTaskWithIdentifier(task.Identifier); ok {
// There is already a task with this Identifier, and the task is not active
}
// If there is already a non-active task with this Identifier
if existingTask, ok := p.anyPendingTaskWithIdentifier(task.Identifier); ok {
// If the new task has a higher priority than the old task,
if task.Priority > existingTask.Priority {
// Update the priority and the tasks position in the queue
// Update the priority and the task's position in the queue
existingTask.Priority = task.Priority
p.taskQueue.Update(existingTask.Index())
}
......@@ -136,6 +142,7 @@ func (p *PeerTracker) PushTasks(tasks []peertask.Task) {
// If we now know the size of the block, update the existing entry
if existingTask.IsDontHave && !task.IsDontHave {
existingTask.Size = task.Size
existingTask.IsDontHave = false
}
// We can replace a want-have with a want-block
......@@ -209,7 +216,7 @@ func (p *PeerTracker) startTask(task *peertask.QueueTask) {
// Add task to active queue
if _, ok := p.activeTasks[taskId]; !ok {
p.activeTasks[taskId] = task.Size
p.activeTasks[taskId] = task
p.activeBytes += task.Size
}
}
......@@ -221,9 +228,9 @@ func (p *PeerTracker) TaskDone(identifier peertask.Identifier, isWantBlock bool)
// Remove task from active queue
taskId := p.getTaskId(identifier, isWantBlock)
if size, ok := p.activeTasks[taskId]; ok {
if task, ok := p.activeTasks[taskId]; ok {
delete(p.activeTasks, taskId)
p.activeBytes -= size
p.activeBytes -= task.Size
if p.activeBytes < 0 {
panic("more tasks finished than started!")
}
......@@ -272,21 +279,18 @@ func (p *PeerTracker) getTaskId(identifier peertask.Identifier, isWantBlock bool
return fmt.Sprintf("%s-%t", identifier, isWantBlock)
}
// isAnyTaskWithIdentifierActive indicates if there is an active task with the
// given identifier. The first return argument indicates the type:
// true: want-block
// false: want-have
func (p *PeerTracker) isAnyTaskWithIdentifierActive(identifier peertask.Identifier) (bool, bool) {
// anyActiveTaskWithIdentifier returns an active task with the given identifier.
func (p *PeerTracker) anyActiveTaskWithIdentifier(identifier peertask.Identifier) (*peertask.QueueTask, bool) {
taskIdBlock := p.getTaskId(identifier, true)
if _, ok := p.activeTasks[taskIdBlock]; ok {
return true, true
if taskBlock, ok := p.activeTasks[taskIdBlock]; ok {
return taskBlock, true
}
taskIdNotBlock := p.getTaskId(identifier, false)
if _, ok := p.activeTasks[taskIdNotBlock]; ok {
return false, true
if taskNotBlock, ok := p.activeTasks[taskIdNotBlock]; ok {
return taskNotBlock, true
}
return false, false
return nil, false
}
// anyPendingTaskWithIdentifier returns a queued task with the given identifier.
......
......@@ -380,6 +380,122 @@ func TestPushHaveVsBlockActive(t *testing.T) {
runTestCase([]peertask.Task{wantHave, wantBlock}, 2)
}
func TestPushSizeInfoActive(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
wantBlock := peertask.Task{
Identifier: "1",
Priority: 10,
IsWantBlock: true,
IsDontHave: false,
SendDontHave: false,
Size: 10,
}
wantBlockDontHave := peertask.Task{
Identifier: "1",
Priority: 10,
IsWantBlock: true,
IsDontHave: true,
SendDontHave: false,
Size: 10,
}
wantHave := peertask.Task{
Identifier: "1",
Priority: 10,
IsWantBlock: false,
IsDontHave: false,
SendDontHave: false,
Size: 10,
}
wantHaveDontHave := peertask.Task{
Identifier: "1",
Priority: 10,
IsWantBlock: false,
IsDontHave: true,
SendDontHave: false,
Size: 10,
}
runTestCase := func(tasks []peertask.Task, expCount int) {
tracker := New(partner)
var popped []peertask.Task
for _, task := range tasks {
// Push the task
tracker.PushTasks([]peertask.Task{task})
// Pop the task (which makes it active)
popped = append(popped, tracker.PopTasks(20)...)
}
if len(popped) != expCount {
t.Fatalf("Expected %d tasks, received %d tasks", expCount, len(popped))
}
}
// want-block with size should be added if there is existing want-block (DONT_HAVE)
runTestCase([]peertask.Task{wantBlockDontHave, wantBlock}, 2)
// want-block (DONT_HAVE) should not be added if there is existing want-block with size
runTestCase([]peertask.Task{wantBlock, wantBlockDontHave}, 1)
// want-block (DONT_HAVE) should not be added if there is existing want-block (DONT_HAVE)
runTestCase([]peertask.Task{wantBlockDontHave, wantBlockDontHave}, 1)
// want-have with size should be added if there is existing want-have (DONT_HAVE)
runTestCase([]peertask.Task{wantHaveDontHave, wantHave}, 2)
// want-have (DONT_HAVE) should not be added if there is existing want-have with size
runTestCase([]peertask.Task{wantHave, wantHaveDontHave}, 1)
// want-have (DONT_HAVE) should not be added if there is existing want-have (DONT_HAVE)
runTestCase([]peertask.Task{wantHaveDontHave, wantHaveDontHave}, 1)
}
func TestReplaceTaskThatIsActiveAndPending(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
wantBlock := peertask.Task{
Identifier: "1",
Priority: 10,
IsWantBlock: true,
IsDontHave: false,
SendDontHave: false,
Size: 10,
}
wantHave := peertask.Task{
Identifier: "1",
Priority: 10,
IsWantBlock: false,
IsDontHave: false,
SendDontHave: false,
Size: 10,
}
wantHaveDontHave := peertask.Task{
Identifier: "1",
Priority: 10,
IsWantBlock: false,
IsDontHave: true,
SendDontHave: false,
Size: 10,
}
tracker := New(partner)
// Push a want-have (DONT_HAVE)
tracker.PushTasks([]peertask.Task{wantHaveDontHave})
// Pop the want-have (DONT_HAVE) (which makes it active)
popped := tracker.PopTasks(20)
// Push a second want-have (with a size). Should be added to the pending
// queue.
tracker.PushTasks([]peertask.Task{wantHave})
// Push a want-block (should replace the pending want-have)
tracker.PushTasks([]peertask.Task{wantBlock})
popped = tracker.PopTasks(20)
if len(popped) != 1 {
t.Fatalf("Expected 1 task to be popped, received %d tasks", len(popped))
}
if !popped[0].IsWantBlock {
t.Fatalf("Expected task to be want-block")
}
}
func TestTaskDone(t *testing.T) {
partner := testutil.GeneratePeers(1)[0]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment