Commit d51c4ca7 authored by Dirk McCormick's avatar Dirk McCormick

fix: make replaceable tasks work

parent b84ec785
package peertask
import (
"fmt"
"time"
pq "github.com/ipfs/go-ipfs-pq"
......@@ -39,7 +40,7 @@ type TaskInfo interface{}
type Task struct {
Identifier Identifier
Priority int
Replaceable bool
IsBlock bool
Info TaskInfo
}
......@@ -54,11 +55,16 @@ type TaskBlock struct {
// toPrune are the tasks that have already been taken care of as part of
// a different task block which can be removed from the task block.
toPrune map[Identifier]struct{}
// toPrune map[Identifier]struct{}
toPrune map[string]struct{}
created time.Time // created marks the time that the task was added to the queue
index int // book-keeping field used by the pq container
}
func (t *Task) String() string {
return fmt.Sprintf("Cid: %s\nIsBlock: %t\nInfo:\n%s\n", t.Identifier, t.IsBlock, t.Info)
}
// NewTaskBlock creates a new task block with the given tasks, priority, target
// peer, and task completion function.
func NewTaskBlock(tasks []Task, priority int, target peer.ID, done func([]Task)) *TaskBlock {
......@@ -67,7 +73,8 @@ func NewTaskBlock(tasks []Task, priority int, target peer.ID, done func([]Task))
Priority: priority,
Target: target,
Done: done,
toPrune: make(map[Identifier]struct{}, len(tasks)),
// toPrune: make(map[Identifier]struct{}, len(tasks)),
toPrune: make(map[string]struct{}, len(tasks)),
created: time.Now(),
}
}
......@@ -75,27 +82,30 @@ func NewTaskBlock(tasks []Task, priority int, target peer.ID, done func([]Task))
func (pt *TaskBlock) ReplaceTask(task Task) bool {
// ReplaceTask() should not be called on a Prunable task, but check
// just in case
_, ok := pt.toPrune[task.Identifier]
_, ok := pt.toPrune[pt.getTaskId(task.Identifier, task.IsBlock)]
if ok {
// fmt.Printf("ReplaceTask %s (cannot: pruned)\n", task.Identifier)
return false
}
if i, ok := pt.canReplaceTask(task); ok {
pt.Tasks[i] = task
// fmt.Printf("ReplaceTask %s\n", task.Identifier)
return true
}
// fmt.Printf("ReplaceTask %s (cannot replace task with that cid)\n", task.Identifier)
return false
}
func (pt *TaskBlock) CanReplaceTask(task Task) bool {
_, ok := pt.canReplaceTask(task)
return ok
}
// func (pt *TaskBlock) CanReplaceTask(task Task) bool {
// _, ok := pt.canReplaceTask(task)
// return ok
// }
func (pt *TaskBlock) canReplaceTask(task Task) (int, bool) {
for i, existing := range pt.Tasks {
if existing.Identifier == task.Identifier {
if existing.Replaceable && !task.Replaceable {
if !existing.IsBlock && task.IsBlock {
return i, true
}
return -1, false
......@@ -106,8 +116,9 @@ func (pt *TaskBlock) canReplaceTask(task Task) (int, bool) {
// MarkPrunable marks any tasks with the given identifier as prunable at the time
// the task block is pulled of the queue to execute (because they've already been removed).
func (pt *TaskBlock) MarkPrunable(identifier Identifier) {
pt.toPrune[identifier] = struct{}{}
func (pt *TaskBlock) MarkPrunable(identifier Identifier, isBlock bool) {
// pt.toPrune[identifier] = struct{}{}
pt.toPrune[pt.getTaskId(identifier, isBlock)] = struct{}{}
}
// PruneTasks removes all tasks previously marked as prunable from the lists of
......@@ -115,7 +126,8 @@ func (pt *TaskBlock) MarkPrunable(identifier Identifier) {
func (pt *TaskBlock) PruneTasks() {
newTasks := make([]Task, 0, len(pt.Tasks)-len(pt.toPrune))
for _, task := range pt.Tasks {
if _, ok := pt.toPrune[task.Identifier]; !ok {
// if _, ok := pt.toPrune[task.Identifier]; !ok {
if _, ok := pt.toPrune[pt.getTaskId(task.Identifier, task.IsBlock)]; !ok {
newTasks = append(newTasks, task)
}
}
......@@ -131,3 +143,7 @@ func (pt *TaskBlock) Index() int {
func (pt *TaskBlock) SetIndex(i int) {
pt.index = i
}
func (pt *TaskBlock) getTaskId(identifier Identifier, isBlock bool) string {
return fmt.Sprintf("%s-%t", identifier, isBlock)
}
......@@ -135,7 +135,7 @@ func (ptq *PeerTaskQueue) PushBlock(to peer.ID, tasks ...peertask.Task) {
peerTracker.PushBlock(to, tasks, func(e []peertask.Task) {
ptq.lock.Lock()
for _, task := range e {
peerTracker.TaskDone(task.Identifier)
peerTracker.TaskDone(task.Identifier, task.IsBlock)
}
ptq.pQueue.Update(peerTracker.Index())
ptq.lock.Unlock()
......
package peertracker
import (
"fmt"
"sync"
pq "github.com/ipfs/go-ipfs-pq"
......@@ -17,7 +18,8 @@ type PeerTracker struct {
// active must be locked around as it will be updated externally
activelk sync.Mutex
active int
activeTasks map[peertask.Identifier]struct{}
// activeTasks map[peertask.Identifier]struct{}
activeTasks map[string]struct{}
// total number of task tasks for this task
numTasks int
......@@ -27,7 +29,8 @@ type PeerTracker struct {
freezeVal int
taskMap map[peertask.Identifier]*peertask.TaskBlock
// taskMap map[peertask.Identifier]*peertask.TaskBlock
taskMap map[string]*peertask.TaskBlock
// priority queue of tasks belonging to this peer
taskBlockQueue pq.PQ
......@@ -38,8 +41,10 @@ func New(target peer.ID) *PeerTracker {
return &PeerTracker{
target: target,
taskBlockQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)),
taskMap: make(map[peertask.Identifier]*peertask.TaskBlock),
activeTasks: make(map[peertask.Identifier]struct{}),
// taskMap: make(map[peertask.Identifier]*peertask.TaskBlock),
// activeTasks: make(map[peertask.Identifier]struct{}),
taskMap: make(map[string]*peertask.TaskBlock),
activeTasks: make(map[string]struct{}),
}
}
......@@ -75,17 +80,20 @@ func PeerCompare(a, b pq.Elem) bool {
}
// StartTask signals that a task was started for this peer.
func (p *PeerTracker) StartTask(identifier peertask.Identifier) {
func (p *PeerTracker) StartTask(identifier peertask.Identifier, isBlock bool) {
p.activelk.Lock()
p.activeTasks[identifier] = struct{}{}
// p.activeTasks[identifier] = struct{}{}
p.activeTasks[p.getTaskId(identifier, isBlock)] = struct{}{}
p.active++
p.activelk.Unlock()
}
// TaskDone signals that a task was completed for this peer.
func (p *PeerTracker) TaskDone(identifier peertask.Identifier) {
func (p *PeerTracker) TaskDone(identifier peertask.Identifier, isBlock bool) {
// fmt.Printf("TaskDone() %s-%t\n", identifier, isBlock)
p.activelk.Lock()
delete(p.activeTasks, identifier)
// delete(p.activeTasks, identifier)
delete(p.activeTasks, p.getTaskId(identifier, isBlock))
p.active--
if p.active < 0 {
panic("more tasks finished than started!")
......@@ -124,35 +132,52 @@ func (p *PeerTracker) PushBlock(target peer.ID, tasks []peertask.Task, done func
var priority int
newTasks := make([]peertask.Task, 0, len(tasks))
// fmt.Printf(" PushBlock() %d tasks\n", len(tasks))
for _, task := range tasks {
taskId := p.getTaskId(task.Identifier, task.IsBlock)
// If the task is currently active (being processed)
if _, ok := p.activeTasks[task.Identifier]; ok {
// if _, ok := p.activeTasks[taskId]; ok {
if isActiveTaskBlock, ok := p.isAnyTaskWithIdentifierActive(task.Identifier); ok {
// fmt.Printf(" active task %s\n", task.Identifier)
// We can only replace tasks that are not active.
// If the active task could not have been replaced (even if it
// wasn't active) by the new task, that means the new task is
// not doing anything useful, so skip adding the new task.
canReplace := true
if taskBlock, ok := p.taskMap[task.Identifier]; ok {
canReplace = taskBlock.CanReplaceTask(task)
}
if !canReplace {
if isActiveTaskBlock || !task.IsBlock {
continue
}
} else {
// If there is already a task with this identifier, and the new task
// has a higher priority than the old task block, move the old task
// block towards the front of the queue.
if taskBlock, ok := p.taskMap[task.Identifier]; ok {
// fmt.Printf(" in-active task %s\n", task.Identifier)
// If there is already a task with this Identifier
// if taskBlock, ok := p.taskMap[task.Identifier]; ok {
if oldTaskId, ok := p.anyQueuedTaskWithIdentifier(task.Identifier); ok {
// fmt.Printf(" (existing task %s)\n", task.Identifier)
taskBlock, ok := p.taskMap[oldTaskId]
if !ok {
panic("anyQueuedTaskWithIdentifier() returned non-existent task id")
}
// If the new task has a higher priority than the old task block,
// move the old task block towards the front of the queue.
if task.Priority > taskBlock.Priority {
taskBlock.Priority = task.Priority
p.taskBlockQueue.Update(taskBlock.Index())
}
}
// TODO: Replacing the task may result in the block exceeding the message size limit
// Replace the task (if it's replaceable)
if taskBlock.ReplaceTask(task) {
// If the task was replaced, we don't need to add the new
// task to the queue
continue
// Update the mapping from task -> task block
if taskId != oldTaskId {
p.taskMap[taskId] = taskBlock
delete(p.taskMap, oldTaskId)
}
}
// If a task with the Identifier exists, we don't need to add
// the new task to the queue
continue
}
}
......@@ -174,26 +199,32 @@ func (p *PeerTracker) PushBlock(target peer.ID, tasks []peertask.Task, done func
taskBlock := peertask.NewTaskBlock(newTasks, priority, target, done)
p.taskBlockQueue.Push(taskBlock)
for _, task := range newTasks {
p.taskMap[task.Identifier] = taskBlock
// p.taskMap[task.Identifier] = taskBlock
p.taskMap[p.getTaskId(task.Identifier, task.IsBlock)] = taskBlock
}
p.numTasks += len(newTasks)
}
// PopBlock removes a block of tasks from this peers queue
func (p *PeerTracker) PopBlock() *peertask.TaskBlock {
// fmt.Printf("PopBlock()\n")
var out *peertask.TaskBlock
for p.taskBlockQueue.Len() > 0 && p.freezeVal == 0 {
out = p.taskBlockQueue.Pop().(*peertask.TaskBlock)
for _, task := range out.Tasks {
delete(p.taskMap, task.Identifier)
// delete(p.taskMap, task.Identifier)
delete(p.taskMap, p.getTaskId(task.Identifier, task.IsBlock))
}
out.PruneTasks()
// fmt.Printf(" %d tasks\n", len(out.Tasks))
if len(out.Tasks) > 0 {
for _, task := range out.Tasks {
// fmt.Printf(" StartTask() %s\n", task)
p.numTasks--
p.StartTask(task.Identifier)
// p.StartTask(task.Identifier)
p.StartTask(task.Identifier, task.IsBlock)
}
} else {
out = nil
......@@ -206,12 +237,30 @@ func (p *PeerTracker) PopBlock() *peertask.TaskBlock {
// Remove removes the task with the given identifier from this peers queue
func (p *PeerTracker) Remove(identifier peertask.Identifier) bool {
taskBlock, ok := p.taskMap[identifier]
// taskBlock, ok := p.taskMap[identifier]
// if ok {
// // taskBlock.MarkPrunable(identifier)
// taskBlock.MarkPrunable(taskId)
// p.numTasks--
// }
// fmt.Printf("Remove() %s\n", identifier)
taskIdBlock := p.getTaskId(identifier, true)
// fmt.Printf(" remove %s\n", taskIdBlock)
taskBlockIsBlock, ok := p.taskMap[taskIdBlock]
if ok {
taskBlock.MarkPrunable(identifier)
taskBlockIsBlock.MarkPrunable(taskIdBlock, true)
p.numTasks--
}
taskIdNotBlock := p.getTaskId(identifier, false)
// fmt.Printf(" remove %s\n", taskIdNotBlock)
taskBlockNotBlock, oknb := p.taskMap[taskIdNotBlock]
if oknb {
taskBlockNotBlock.MarkPrunable(taskIdNotBlock, false)
p.numTasks--
}
return ok
return ok || oknb
}
// Freeze increments the freeze value for this peer. While a peer is frozen
......@@ -236,3 +285,33 @@ func (p *PeerTracker) FullThaw() {
func (p *PeerTracker) IsFrozen() bool {
return p.freezeVal > 0
}
func (p *PeerTracker) getTaskId(identifier peertask.Identifier, isBlock bool) string {
return fmt.Sprintf("%s-%t", identifier, isBlock)
}
func (p *PeerTracker) isAnyTaskWithIdentifierActive(identifier peertask.Identifier) (bool, bool) {
taskIdBlock := p.getTaskId(identifier, true)
if _, ok := p.activeTasks[taskIdBlock]; ok {
return true, true
}
taskIdNotBlock := p.getTaskId(identifier, false)
if _, ok := p.activeTasks[taskIdNotBlock]; ok {
return false, true
}
return false, false
}
func (p *PeerTracker) anyQueuedTaskWithIdentifier(identifier peertask.Identifier) (string, bool) {
taskIdBlock := p.getTaskId(identifier, true)
if _, ok := p.taskMap[taskIdBlock]; ok {
return taskIdBlock, true
}
taskIdNotBlock := p.getTaskId(identifier, false)
if _, ok := p.taskMap[taskIdNotBlock]; ok {
return taskIdNotBlock, true
}
return "", false
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment