taskqueue.go 1.83 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3 4 5 6 7 8 9 10
package strategy

import (
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
)

// TODO: at some point, the strategy needs to plug in here
// to help decide how to sort tasks (on add) and how to select
// tasks (on getnext). For now, we are assuming a dumb/nice strategy.
11
type taskQueue struct {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
12 13
	tasks   []*task
	taskmap map[string]*task
Jeromy's avatar
Jeromy committed
14 15
}

16 17
func newTaskQueue() *taskQueue {
	return &taskQueue{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
18
		taskmap: make(map[string]*task),
Jeromy's avatar
Jeromy committed
19 20 21
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
22
type task struct {
Jeromy's avatar
Jeromy committed
23 24 25 26 27
	Key           u.Key
	Target        peer.Peer
	theirPriority int
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
28
// Push currently adds a new task to the end of the list
Jeromy's avatar
Jeromy committed
29
// TODO: make this into a priority queue
30
func (tl *taskQueue) Push(block u.Key, priority int, to peer.Peer) {
31
	if task, ok := tl.taskmap[taskKey(to, block)]; ok {
Jeromy's avatar
Jeromy committed
32
		// TODO: when priority queue is implemented,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
33
		//       rearrange this task
Jeromy's avatar
Jeromy committed
34 35 36
		task.theirPriority = priority
		return
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
37
	task := &task{
Jeromy's avatar
Jeromy committed
38 39 40 41 42
		Key:           block,
		Target:        to,
		theirPriority: priority,
	}
	tl.tasks = append(tl.tasks, task)
43
	tl.taskmap[taskKey(to, block)] = task
Jeromy's avatar
Jeromy committed
44 45
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
46
// Pop 'pops' the next task to be performed. Returns nil no task exists.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
47 48
func (tl *taskQueue) Pop() *task {
	var out *task
Jeromy's avatar
Jeromy committed
49 50 51 52 53 54
	for len(tl.tasks) > 0 {
		// TODO: instead of zero, use exponential distribution
		//       it will help reduce the chance of receiving
		//		 the same block from multiple peers
		out = tl.tasks[0]
		tl.tasks = tl.tasks[1:]
55
		delete(tl.taskmap, taskKey(out.Target, out.Key))
Jeromy's avatar
Jeromy committed
56 57 58 59 60 61 62 63 64
		// Filter out blocks that have been cancelled
		if out.theirPriority >= 0 {
			break
		}
	}

	return out
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
65 66
// Remove lazily removes a task from the queue
func (tl *taskQueue) Remove(k u.Key, p peer.Peer) {
67
	t, ok := tl.taskmap[taskKey(p, k)]
Jeromy's avatar
Jeromy committed
68 69 70 71
	if ok {
		t.theirPriority = -1
	}
}
72 73 74 75 76

// taskKey returns a key that uniquely identifies a task.
func taskKey(p peer.Peer, k u.Key) string {
	return string(p.Key() + k)
}