taskqueue.go 1.94 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package strategy

import (
4
	wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
Jeromy's avatar
Jeromy committed
5 6 7 8 9 10 11
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"
)

// TODO: at some point, the strategy needs to plug in here
// to help decide how to sort tasks (on add) and how to select
// tasks (on getnext). For now, we are assuming a dumb/nice strategy.
12
type taskQueue struct {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
13 14
	tasks   []*task
	taskmap map[string]*task
Jeromy's avatar
Jeromy committed
15 16
}

17 18
func newTaskQueue() *taskQueue {
	return &taskQueue{
Brian Tiger Chow's avatar
Brian Tiger Chow committed
19
		taskmap: make(map[string]*task),
Jeromy's avatar
Jeromy committed
20 21 22
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
23
type task struct {
24 25
	Entry  wantlist.Entry
	Target peer.Peer
Jeromy's avatar
Jeromy committed
26 27
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
28
// Push currently adds a new task to the end of the list
Jeromy's avatar
Jeromy committed
29
// TODO: make this into a priority queue
30
func (tl *taskQueue) Push(block u.Key, priority int, to peer.Peer) {
31
	if task, ok := tl.taskmap[taskKey(to, block)]; ok {
Jeromy's avatar
Jeromy committed
32
		// TODO: when priority queue is implemented,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
33
		//       rearrange this task
34
		task.Entry.Priority = priority
Jeromy's avatar
Jeromy committed
35 36
		return
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
37
	task := &task{
38 39 40 41 42
		Entry: wantlist.Entry{
			Key:      block,
			Priority: priority,
		},
		Target: to,
Jeromy's avatar
Jeromy committed
43 44
	}
	tl.tasks = append(tl.tasks, task)
45
	tl.taskmap[taskKey(to, block)] = task
Jeromy's avatar
Jeromy committed
46 47
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
48
// Pop 'pops' the next task to be performed. Returns nil no task exists.
Brian Tiger Chow's avatar
Brian Tiger Chow committed
49 50
func (tl *taskQueue) Pop() *task {
	var out *task
Jeromy's avatar
Jeromy committed
51 52 53 54 55 56
	for len(tl.tasks) > 0 {
		// TODO: instead of zero, use exponential distribution
		//       it will help reduce the chance of receiving
		//		 the same block from multiple peers
		out = tl.tasks[0]
		tl.tasks = tl.tasks[1:]
57
		delete(tl.taskmap, taskKey(out.Target, out.Entry.Key))
Jeromy's avatar
Jeromy committed
58
		// Filter out blocks that have been cancelled
59
		if out.Entry.Priority >= 0 { // FIXME separate the "cancel" signal from priority
Jeromy's avatar
Jeromy committed
60 61 62 63 64 65 66
			break
		}
	}

	return out
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
67 68
// Remove lazily removes a task from the queue
func (tl *taskQueue) Remove(k u.Key, p peer.Peer) {
69
	t, ok := tl.taskmap[taskKey(p, k)]
Jeromy's avatar
Jeromy committed
70
	if ok {
71
		t.Entry.Priority = -1
Jeromy's avatar
Jeromy committed
72 73
	}
}
74 75 76 77 78

// taskKey returns a key that uniquely identifies a task.
func taskKey(p peer.Peer, k u.Key) string {
	return string(p.Key() + k)
}