diff --git a/peertask/peertask.go b/peertask/peertask.go index d9ac6226c8f37d79b6e999ce714e55cb4c915822..497efc9b62350afabecc0a3686138cb4268c0a70 100644 --- a/peertask/peertask.go +++ b/peertask/peertask.go @@ -1,6 +1,7 @@ package peertask import ( + "fmt" "time" pq "github.com/ipfs/go-ipfs-pq" @@ -39,7 +40,7 @@ type TaskInfo interface{} type Task struct { Identifier Identifier Priority int - Replaceable bool + IsBlock bool Info TaskInfo } @@ -54,11 +55,16 @@ type TaskBlock struct { // toPrune are the tasks that have already been taken care of as part of // a different task block which can be removed from the task block. - toPrune map[Identifier]struct{} + // toPrune map[Identifier]struct{} + toPrune map[string]struct{} created time.Time // created marks the time that the task was added to the queue index int // book-keeping field used by the pq container } +func (t *Task) String() string { + return fmt.Sprintf("Cid: %s\nIsBlock: %t\nInfo:\n%s\n", t.Identifier, t.IsBlock, t.Info) +} + // NewTaskBlock creates a new task block with the given tasks, priority, target // peer, and task completion function. func NewTaskBlock(tasks []Task, priority int, target peer.ID, done func([]Task)) *TaskBlock { @@ -67,7 +73,8 @@ func NewTaskBlock(tasks []Task, priority int, target peer.ID, done func([]Task)) Priority: priority, Target: target, Done: done, - toPrune: make(map[Identifier]struct{}, len(tasks)), + // toPrune: make(map[Identifier]struct{}, len(tasks)), + toPrune: make(map[string]struct{}, len(tasks)), created: time.Now(), } } @@ -75,27 +82,30 @@ func NewTaskBlock(tasks []Task, priority int, target peer.ID, done func([]Task)) func (pt *TaskBlock) ReplaceTask(task Task) bool { // ReplaceTask() should not be called on a Prunable task, but check // just in case - _, ok := pt.toPrune[task.Identifier] + _, ok := pt.toPrune[pt.getTaskId(task.Identifier, task.IsBlock)] if ok { + // fmt.Printf("ReplaceTask %s (cannot: pruned)\n", task.Identifier) return false } if i, ok := pt.canReplaceTask(task); ok { pt.Tasks[i] = task + // fmt.Printf("ReplaceTask %s\n", task.Identifier) return true } + // fmt.Printf("ReplaceTask %s (cannot replace task with that cid)\n", task.Identifier) return false } -func (pt *TaskBlock) CanReplaceTask(task Task) bool { - _, ok := pt.canReplaceTask(task) - return ok -} +// func (pt *TaskBlock) CanReplaceTask(task Task) bool { +// _, ok := pt.canReplaceTask(task) +// return ok +// } func (pt *TaskBlock) canReplaceTask(task Task) (int, bool) { for i, existing := range pt.Tasks { if existing.Identifier == task.Identifier { - if existing.Replaceable && !task.Replaceable { + if !existing.IsBlock && task.IsBlock { return i, true } return -1, false @@ -106,8 +116,9 @@ func (pt *TaskBlock) canReplaceTask(task Task) (int, bool) { // MarkPrunable marks any tasks with the given identifier as prunable at the time // the task block is pulled of the queue to execute (because they've already been removed). -func (pt *TaskBlock) MarkPrunable(identifier Identifier) { - pt.toPrune[identifier] = struct{}{} +func (pt *TaskBlock) MarkPrunable(identifier Identifier, isBlock bool) { + // pt.toPrune[identifier] = struct{}{} + pt.toPrune[pt.getTaskId(identifier, isBlock)] = struct{}{} } // PruneTasks removes all tasks previously marked as prunable from the lists of @@ -115,7 +126,8 @@ func (pt *TaskBlock) MarkPrunable(identifier Identifier) { func (pt *TaskBlock) PruneTasks() { newTasks := make([]Task, 0, len(pt.Tasks)-len(pt.toPrune)) for _, task := range pt.Tasks { - if _, ok := pt.toPrune[task.Identifier]; !ok { + // if _, ok := pt.toPrune[task.Identifier]; !ok { + if _, ok := pt.toPrune[pt.getTaskId(task.Identifier, task.IsBlock)]; !ok { newTasks = append(newTasks, task) } } @@ -131,3 +143,7 @@ func (pt *TaskBlock) Index() int { func (pt *TaskBlock) SetIndex(i int) { pt.index = i } + +func (pt *TaskBlock) getTaskId(identifier Identifier, isBlock bool) string { + return fmt.Sprintf("%s-%t", identifier, isBlock) +} diff --git a/peertaskqueue.go b/peertaskqueue.go index 40200bdcd7de89a722268d1fd8589a8440f1400c..7001df8560fe71226dfec593fd82f0bf63e6136f 100644 --- a/peertaskqueue.go +++ b/peertaskqueue.go @@ -135,7 +135,7 @@ func (ptq *PeerTaskQueue) PushBlock(to peer.ID, tasks ...peertask.Task) { peerTracker.PushBlock(to, tasks, func(e []peertask.Task) { ptq.lock.Lock() for _, task := range e { - peerTracker.TaskDone(task.Identifier) + peerTracker.TaskDone(task.Identifier, task.IsBlock) } ptq.pQueue.Update(peerTracker.Index()) ptq.lock.Unlock() diff --git a/peertracker/peertracker.go b/peertracker/peertracker.go index 002f67cae92f475f25bcb3f8e974d1d658698c4f..ef0c5ca2cd60c3f775ea784d3cdb35eb5e48ac03 100644 --- a/peertracker/peertracker.go +++ b/peertracker/peertracker.go @@ -1,6 +1,7 @@ package peertracker import ( + "fmt" "sync" pq "github.com/ipfs/go-ipfs-pq" @@ -17,7 +18,8 @@ type PeerTracker struct { // active must be locked around as it will be updated externally activelk sync.Mutex active int - activeTasks map[peertask.Identifier]struct{} + // activeTasks map[peertask.Identifier]struct{} + activeTasks map[string]struct{} // total number of task tasks for this task numTasks int @@ -27,7 +29,8 @@ type PeerTracker struct { freezeVal int - taskMap map[peertask.Identifier]*peertask.TaskBlock + // taskMap map[peertask.Identifier]*peertask.TaskBlock + taskMap map[string]*peertask.TaskBlock // priority queue of tasks belonging to this peer taskBlockQueue pq.PQ @@ -38,8 +41,10 @@ func New(target peer.ID) *PeerTracker { return &PeerTracker{ target: target, taskBlockQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)), - taskMap: make(map[peertask.Identifier]*peertask.TaskBlock), - activeTasks: make(map[peertask.Identifier]struct{}), + // taskMap: make(map[peertask.Identifier]*peertask.TaskBlock), + // activeTasks: make(map[peertask.Identifier]struct{}), + taskMap: make(map[string]*peertask.TaskBlock), + activeTasks: make(map[string]struct{}), } } @@ -75,17 +80,20 @@ func PeerCompare(a, b pq.Elem) bool { } // StartTask signals that a task was started for this peer. -func (p *PeerTracker) StartTask(identifier peertask.Identifier) { +func (p *PeerTracker) StartTask(identifier peertask.Identifier, isBlock bool) { p.activelk.Lock() - p.activeTasks[identifier] = struct{}{} + // p.activeTasks[identifier] = struct{}{} + p.activeTasks[p.getTaskId(identifier, isBlock)] = struct{}{} p.active++ p.activelk.Unlock() } // TaskDone signals that a task was completed for this peer. -func (p *PeerTracker) TaskDone(identifier peertask.Identifier) { +func (p *PeerTracker) TaskDone(identifier peertask.Identifier, isBlock bool) { + // fmt.Printf("TaskDone() %s-%t\n", identifier, isBlock) p.activelk.Lock() - delete(p.activeTasks, identifier) + // delete(p.activeTasks, identifier) + delete(p.activeTasks, p.getTaskId(identifier, isBlock)) p.active-- if p.active < 0 { panic("more tasks finished than started!") @@ -124,35 +132,52 @@ func (p *PeerTracker) PushBlock(target peer.ID, tasks []peertask.Task, done func var priority int newTasks := make([]peertask.Task, 0, len(tasks)) + // fmt.Printf(" PushBlock() %d tasks\n", len(tasks)) for _, task := range tasks { + taskId := p.getTaskId(task.Identifier, task.IsBlock) + // If the task is currently active (being processed) - if _, ok := p.activeTasks[task.Identifier]; ok { + // if _, ok := p.activeTasks[taskId]; ok { + if isActiveTaskBlock, ok := p.isAnyTaskWithIdentifierActive(task.Identifier); ok { + // fmt.Printf(" active task %s\n", task.Identifier) // We can only replace tasks that are not active. // If the active task could not have been replaced (even if it // wasn't active) by the new task, that means the new task is // not doing anything useful, so skip adding the new task. - canReplace := true - if taskBlock, ok := p.taskMap[task.Identifier]; ok { - canReplace = taskBlock.CanReplaceTask(task) - } - if !canReplace { + if isActiveTaskBlock || !task.IsBlock { continue } } else { - // If there is already a task with this identifier, and the new task - // has a higher priority than the old task block, move the old task - // block towards the front of the queue. - if taskBlock, ok := p.taskMap[task.Identifier]; ok { + // fmt.Printf(" in-active task %s\n", task.Identifier) + // If there is already a task with this Identifier + // if taskBlock, ok := p.taskMap[task.Identifier]; ok { + if oldTaskId, ok := p.anyQueuedTaskWithIdentifier(task.Identifier); ok { + // fmt.Printf(" (existing task %s)\n", task.Identifier) + taskBlock, ok := p.taskMap[oldTaskId] + if !ok { + panic("anyQueuedTaskWithIdentifier() returned non-existent task id") + } + + // If the new task has a higher priority than the old task block, + // move the old task block towards the front of the queue. if task.Priority > taskBlock.Priority { taskBlock.Priority = task.Priority p.taskBlockQueue.Update(taskBlock.Index()) - } + } + + // TODO: Replacing the task may result in the block exceeding the message size limit // Replace the task (if it's replaceable) if taskBlock.ReplaceTask(task) { - // If the task was replaced, we don't need to add the new - // task to the queue - continue + // Update the mapping from task -> task block + if taskId != oldTaskId { + p.taskMap[taskId] = taskBlock + delete(p.taskMap, oldTaskId) + } } + + // If a task with the Identifier exists, we don't need to add + // the new task to the queue + continue } } @@ -174,26 +199,32 @@ func (p *PeerTracker) PushBlock(target peer.ID, tasks []peertask.Task, done func taskBlock := peertask.NewTaskBlock(newTasks, priority, target, done) p.taskBlockQueue.Push(taskBlock) for _, task := range newTasks { - p.taskMap[task.Identifier] = taskBlock + // p.taskMap[task.Identifier] = taskBlock + p.taskMap[p.getTaskId(task.Identifier, task.IsBlock)] = taskBlock } p.numTasks += len(newTasks) } // PopBlock removes a block of tasks from this peers queue func (p *PeerTracker) PopBlock() *peertask.TaskBlock { + // fmt.Printf("PopBlock()\n") var out *peertask.TaskBlock for p.taskBlockQueue.Len() > 0 && p.freezeVal == 0 { out = p.taskBlockQueue.Pop().(*peertask.TaskBlock) for _, task := range out.Tasks { - delete(p.taskMap, task.Identifier) + // delete(p.taskMap, task.Identifier) + delete(p.taskMap, p.getTaskId(task.Identifier, task.IsBlock)) } out.PruneTasks() + // fmt.Printf(" %d tasks\n", len(out.Tasks)) if len(out.Tasks) > 0 { for _, task := range out.Tasks { + // fmt.Printf(" StartTask() %s\n", task) p.numTasks-- - p.StartTask(task.Identifier) + // p.StartTask(task.Identifier) + p.StartTask(task.Identifier, task.IsBlock) } } else { out = nil @@ -206,12 +237,30 @@ func (p *PeerTracker) PopBlock() *peertask.TaskBlock { // Remove removes the task with the given identifier from this peers queue func (p *PeerTracker) Remove(identifier peertask.Identifier) bool { - taskBlock, ok := p.taskMap[identifier] + // taskBlock, ok := p.taskMap[identifier] + // if ok { + // // taskBlock.MarkPrunable(identifier) + // taskBlock.MarkPrunable(taskId) + // p.numTasks-- + // } + // fmt.Printf("Remove() %s\n", identifier) + + taskIdBlock := p.getTaskId(identifier, true) + // fmt.Printf(" remove %s\n", taskIdBlock) + taskBlockIsBlock, ok := p.taskMap[taskIdBlock] if ok { - taskBlock.MarkPrunable(identifier) + taskBlockIsBlock.MarkPrunable(taskIdBlock, true) + p.numTasks-- + } + + taskIdNotBlock := p.getTaskId(identifier, false) + // fmt.Printf(" remove %s\n", taskIdNotBlock) + taskBlockNotBlock, oknb := p.taskMap[taskIdNotBlock] + if oknb { + taskBlockNotBlock.MarkPrunable(taskIdNotBlock, false) p.numTasks-- } - return ok + return ok || oknb } // Freeze increments the freeze value for this peer. While a peer is frozen @@ -236,3 +285,33 @@ func (p *PeerTracker) FullThaw() { func (p *PeerTracker) IsFrozen() bool { return p.freezeVal > 0 } + +func (p *PeerTracker) getTaskId(identifier peertask.Identifier, isBlock bool) string { + return fmt.Sprintf("%s-%t", identifier, isBlock) +} + +func (p *PeerTracker) isAnyTaskWithIdentifierActive(identifier peertask.Identifier) (bool, bool) { + taskIdBlock := p.getTaskId(identifier, true) + if _, ok := p.activeTasks[taskIdBlock]; ok { + return true, true + } + + taskIdNotBlock := p.getTaskId(identifier, false) + if _, ok := p.activeTasks[taskIdNotBlock]; ok { + return false, true + } + return false, false +} + +func (p *PeerTracker) anyQueuedTaskWithIdentifier(identifier peertask.Identifier) (string, bool) { + taskIdBlock := p.getTaskId(identifier, true) + if _, ok := p.taskMap[taskIdBlock]; ok { + return taskIdBlock, true + } + + taskIdNotBlock := p.getTaskId(identifier, false) + if _, ok := p.taskMap[taskIdNotBlock]; ok { + return taskIdNotBlock, true + } + return "", false +}