diff --git a/peertracker/peertracker.go b/peertracker/peertracker.go index e4906d75c9d43a027aa7d178ba44fd85cf60614f..8156c279fedf68b880039893887b78eb50be21ba 100644 --- a/peertracker/peertracker.go +++ b/peertracker/peertracker.go @@ -17,13 +17,12 @@ type PeerTracker struct { // Tasks that are pending being made active pendingTasks map[string]*peertask.QueueTask - // Tasks that have been made active - // active must be locked around as it will be updated externally + activeTasks map[string]*peertask.QueueTask + + // activeBytes must be locked around as it will be updated externally activelk sync.Mutex activeBytes int - // map of task id -> size in bytes - activeTasks map[string]int // for the PQ interface index int @@ -40,7 +39,7 @@ func New(target peer.ID) *PeerTracker { target: target, taskQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)), pendingTasks: make(map[string]*peertask.QueueTask), - activeTasks: make(map[string]int), + activeTasks: make(map[string]*peertask.QueueTask), } } @@ -115,20 +114,27 @@ func (p *PeerTracker) PushTasks(tasks []peertask.Task) { taskId := p.getTaskId(task.Identifier, task.IsWantBlock) // If the task is currently active (being processed) - if isActiveTaskAWantBlock, ok := p.isAnyTaskWithIdentifierActive(task.Identifier); ok { + if existingTask, ok := p.anyActiveTaskWithIdentifier(task.Identifier); ok { + // We can replace a want-have with a want-block + replaceHaveWithBlock := !existingTask.IsWantBlock && task.IsWantBlock + // We can replace a DONT_HAVE with a HAVE or a block + replaceDontHave := existingTask.IsDontHave && !task.IsDontHave + // We can only replace tasks that are not active. // If the active task could not have been replaced (even if it // wasn't active) by the new task, that means the new task is // not doing anything useful, so skip adding the new task. - if isActiveTaskAWantBlock || !task.IsWantBlock { + canReplace := replaceHaveWithBlock || replaceDontHave + if !canReplace { continue } - } else if existingTask, ok := p.anyPendingTaskWithIdentifier(task.Identifier); ok { - // There is already a task with this Identifier, and the task is not active + } + // If there is already a non-active task with this Identifier + if existingTask, ok := p.anyPendingTaskWithIdentifier(task.Identifier); ok { // If the new task has a higher priority than the old task, if task.Priority > existingTask.Priority { - // Update the priority and the tasks position in the queue + // Update the priority and the task's position in the queue existingTask.Priority = task.Priority p.taskQueue.Update(existingTask.Index()) } @@ -136,6 +142,7 @@ func (p *PeerTracker) PushTasks(tasks []peertask.Task) { // If we now know the size of the block, update the existing entry if existingTask.IsDontHave && !task.IsDontHave { existingTask.Size = task.Size + existingTask.IsDontHave = false } // We can replace a want-have with a want-block @@ -209,7 +216,7 @@ func (p *PeerTracker) startTask(task *peertask.QueueTask) { // Add task to active queue if _, ok := p.activeTasks[taskId]; !ok { - p.activeTasks[taskId] = task.Size + p.activeTasks[taskId] = task p.activeBytes += task.Size } } @@ -221,9 +228,9 @@ func (p *PeerTracker) TaskDone(identifier peertask.Identifier, isWantBlock bool) // Remove task from active queue taskId := p.getTaskId(identifier, isWantBlock) - if size, ok := p.activeTasks[taskId]; ok { + if task, ok := p.activeTasks[taskId]; ok { delete(p.activeTasks, taskId) - p.activeBytes -= size + p.activeBytes -= task.Size if p.activeBytes < 0 { panic("more tasks finished than started!") } @@ -272,21 +279,18 @@ func (p *PeerTracker) getTaskId(identifier peertask.Identifier, isWantBlock bool return fmt.Sprintf("%s-%t", identifier, isWantBlock) } -// isAnyTaskWithIdentifierActive indicates if there is an active task with the -// given identifier. The first return argument indicates the type: -// true: want-block -// false: want-have -func (p *PeerTracker) isAnyTaskWithIdentifierActive(identifier peertask.Identifier) (bool, bool) { +// anyActiveTaskWithIdentifier returns an active task with the given identifier. +func (p *PeerTracker) anyActiveTaskWithIdentifier(identifier peertask.Identifier) (*peertask.QueueTask, bool) { taskIdBlock := p.getTaskId(identifier, true) - if _, ok := p.activeTasks[taskIdBlock]; ok { - return true, true + if taskBlock, ok := p.activeTasks[taskIdBlock]; ok { + return taskBlock, true } taskIdNotBlock := p.getTaskId(identifier, false) - if _, ok := p.activeTasks[taskIdNotBlock]; ok { - return false, true + if taskNotBlock, ok := p.activeTasks[taskIdNotBlock]; ok { + return taskNotBlock, true } - return false, false + return nil, false } // anyPendingTaskWithIdentifier returns a queued task with the given identifier. diff --git a/peertracker/peertracker_test.go b/peertracker/peertracker_test.go index cfda75791d1e875c33959052ef3a779256cab0ec..483faaec0756906f07473d1b2b0b1b56dccee73c 100644 --- a/peertracker/peertracker_test.go +++ b/peertracker/peertracker_test.go @@ -380,6 +380,122 @@ func TestPushHaveVsBlockActive(t *testing.T) { runTestCase([]peertask.Task{wantHave, wantBlock}, 2) } +func TestPushSizeInfoActive(t *testing.T) { + partner := testutil.GeneratePeers(1)[0] + + wantBlock := peertask.Task{ + Identifier: "1", + Priority: 10, + IsWantBlock: true, + IsDontHave: false, + SendDontHave: false, + Size: 10, + } + wantBlockDontHave := peertask.Task{ + Identifier: "1", + Priority: 10, + IsWantBlock: true, + IsDontHave: true, + SendDontHave: false, + Size: 10, + } + wantHave := peertask.Task{ + Identifier: "1", + Priority: 10, + IsWantBlock: false, + IsDontHave: false, + SendDontHave: false, + Size: 10, + } + wantHaveDontHave := peertask.Task{ + Identifier: "1", + Priority: 10, + IsWantBlock: false, + IsDontHave: true, + SendDontHave: false, + Size: 10, + } + + runTestCase := func(tasks []peertask.Task, expCount int) { + tracker := New(partner) + var popped []peertask.Task + for _, task := range tasks { + // Push the task + tracker.PushTasks([]peertask.Task{task}) + // Pop the task (which makes it active) + popped = append(popped, tracker.PopTasks(20)...) + } + if len(popped) != expCount { + t.Fatalf("Expected %d tasks, received %d tasks", expCount, len(popped)) + } + } + + // want-block with size should be added if there is existing want-block (DONT_HAVE) + runTestCase([]peertask.Task{wantBlockDontHave, wantBlock}, 2) + // want-block (DONT_HAVE) should not be added if there is existing want-block with size + runTestCase([]peertask.Task{wantBlock, wantBlockDontHave}, 1) + // want-block (DONT_HAVE) should not be added if there is existing want-block (DONT_HAVE) + runTestCase([]peertask.Task{wantBlockDontHave, wantBlockDontHave}, 1) + // want-have with size should be added if there is existing want-have (DONT_HAVE) + runTestCase([]peertask.Task{wantHaveDontHave, wantHave}, 2) + // want-have (DONT_HAVE) should not be added if there is existing want-have with size + runTestCase([]peertask.Task{wantHave, wantHaveDontHave}, 1) + // want-have (DONT_HAVE) should not be added if there is existing want-have (DONT_HAVE) + runTestCase([]peertask.Task{wantHaveDontHave, wantHaveDontHave}, 1) +} + +func TestReplaceTaskThatIsActiveAndPending(t *testing.T) { + partner := testutil.GeneratePeers(1)[0] + + wantBlock := peertask.Task{ + Identifier: "1", + Priority: 10, + IsWantBlock: true, + IsDontHave: false, + SendDontHave: false, + Size: 10, + } + wantHave := peertask.Task{ + Identifier: "1", + Priority: 10, + IsWantBlock: false, + IsDontHave: false, + SendDontHave: false, + Size: 10, + } + wantHaveDontHave := peertask.Task{ + Identifier: "1", + Priority: 10, + IsWantBlock: false, + IsDontHave: true, + SendDontHave: false, + Size: 10, + } + + tracker := New(partner) + + // Push a want-have (DONT_HAVE) + tracker.PushTasks([]peertask.Task{wantHaveDontHave}) + + // Pop the want-have (DONT_HAVE) (which makes it active) + popped := tracker.PopTasks(20) + + // Push a second want-have (with a size). Should be added to the pending + // queue. + tracker.PushTasks([]peertask.Task{wantHave}) + + // Push a want-block (should replace the pending want-have) + tracker.PushTasks([]peertask.Task{wantBlock}) + + popped = tracker.PopTasks(20) + if len(popped) != 1 { + t.Fatalf("Expected 1 task to be popped, received %d tasks", len(popped)) + } + if !popped[0].IsWantBlock { + t.Fatalf("Expected task to be want-block") + } +} + func TestTaskDone(t *testing.T) { partner := testutil.GeneratePeers(1)[0]