taskqueue.go 1.84 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3 4 5 6 7 8 9 10
package strategy

import (
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
)

// TODO: at some point, the strategy needs to plug in here
// to help decide how to sort tasks (on add) and how to select
// tasks (on getnext). For now, we are assuming a dumb/nice strategy.
11
type taskQueue struct {
Jeromy's avatar
Jeromy committed
12
	tasks   []*Task
13
	taskmap map[string]*Task
Jeromy's avatar
Jeromy committed
14 15
}

16 17
func newTaskQueue() *taskQueue {
	return &taskQueue{
18
		taskmap: make(map[string]*Task),
Jeromy's avatar
Jeromy committed
19 20 21 22 23 24 25 26 27
	}
}

type Task struct {
	Key           u.Key
	Target        peer.Peer
	theirPriority int
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
28
// Push currently adds a new task to the end of the list
Jeromy's avatar
Jeromy committed
29
// TODO: make this into a priority queue
30
func (tl *taskQueue) Push(block u.Key, priority int, to peer.Peer) {
31
	if task, ok := tl.taskmap[taskKey(to, block)]; ok {
Jeromy's avatar
Jeromy committed
32 33 34 35 36 37 38 39 40 41 42
		// TODO: when priority queue is implemented,
		//       rearrange this Task
		task.theirPriority = priority
		return
	}
	task := &Task{
		Key:           block,
		Target:        to,
		theirPriority: priority,
	}
	tl.tasks = append(tl.tasks, task)
43
	tl.taskmap[taskKey(to, block)] = task
Jeromy's avatar
Jeromy committed
44 45
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
46
// Pop 'pops' the next task to be performed. Returns nil no task exists.
47
func (tl *taskQueue) Pop() *Task {
Jeromy's avatar
Jeromy committed
48 49 50 51 52 53 54
	var out *Task
	for len(tl.tasks) > 0 {
		// TODO: instead of zero, use exponential distribution
		//       it will help reduce the chance of receiving
		//		 the same block from multiple peers
		out = tl.tasks[0]
		tl.tasks = tl.tasks[1:]
55
		delete(tl.taskmap, taskKey(out.Target, out.Key))
Jeromy's avatar
Jeromy committed
56 57 58 59 60 61 62 63 64 65
		// Filter out blocks that have been cancelled
		if out.theirPriority >= 0 {
			break
		}
	}

	return out
}

// Cancel lazily cancels the sending of a block to a given peer
66
func (tl *taskQueue) Cancel(k u.Key, p peer.Peer) {
67
	t, ok := tl.taskmap[taskKey(p, k)]
Jeromy's avatar
Jeromy committed
68 69 70 71
	if ok {
		t.theirPriority = -1
	}
}
72 73 74 75 76

// taskKey returns a key that uniquely identifies a task.
func taskKey(p peer.Peer, k u.Key) string {
	return string(p.Key() + k)
}