peer_request_queue.go 6.34 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3 4 5 6
package decision

import (
	"sync"
	"time"

7
	key "github.com/ipfs/go-ipfs/blocks/key"
8 9
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
	pq "github.com/ipfs/go-ipfs/thirdparty/pq"
10
	peer "gx/ipfs/QmccGfZs3rzku8Bv6sTPH3bMUKD1EVod8srgRjt5csdmva/go-libp2p/p2p/peer"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
11 12 13 14 15 16
)

type peerRequestQueue interface {
	// Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty.
	Pop() *peerRequestTask
	Push(entry wantlist.Entry, to peer.ID)
17
	Remove(k key.Key, p peer.ID)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
18 19 20 21 22 23
	// NB: cannot expose simply expose taskQueue.Len because trashed elements
	// may exist. These trashed elements should not contribute to the count.
}

func newPRQ() peerRequestQueue {
	return &prq{
24 25 26
		taskMap:  make(map[string]*peerRequestTask),
		partners: make(map[peer.ID]*activePartner),
		pQueue:   pq.New(partnerCompare),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
27 28 29
	}
}

Jeromy's avatar
Jeromy committed
30
// verify interface implementation
Brian Tiger Chow's avatar
Brian Tiger Chow committed
31 32 33 34 35 36
var _ peerRequestQueue = &prq{}

// TODO: at some point, the strategy needs to plug in here
// to help decide how to sort tasks (on add) and how to select
// tasks (on getnext). For now, we are assuming a dumb/nice strategy.
type prq struct {
37 38 39 40
	lock     sync.Mutex
	pQueue   pq.PQ
	taskMap  map[string]*peerRequestTask
	partners map[peer.ID]*activePartner
Brian Tiger Chow's avatar
Brian Tiger Chow committed
41 42 43 44 45 46
}

// Push currently adds a new peerRequestTask to the end of the list
func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
	tl.lock.Lock()
	defer tl.lock.Unlock()
47 48
	partner, ok := tl.partners[to]
	if !ok {
49
		partner = newActivePartner()
50 51 52 53
		tl.pQueue.Push(partner)
		tl.partners[to] = partner
	}

54 55 56 57 58 59 60
	partner.activelk.Lock()
	defer partner.activelk.Unlock()
	_, ok = partner.activeBlocks[entry.Key]
	if ok {
		return
	}

61 62 63 64 65 66
	if task, ok := tl.taskMap[taskKey(to, entry.Key)]; ok {
		task.Entry.Priority = entry.Priority
		partner.taskQueue.Update(task.index)
		return
	}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
67 68 69 70
	task := &peerRequestTask{
		Entry:   entry,
		Target:  to,
		created: time.Now(),
71 72
		Done: func() {
			tl.lock.Lock()
Jeromy's avatar
Jeromy committed
73
			partner.TaskDone(entry.Key)
74 75 76
			tl.pQueue.Update(partner.Index())
			tl.lock.Unlock()
		},
Brian Tiger Chow's avatar
Brian Tiger Chow committed
77
	}
78 79

	partner.taskQueue.Push(task)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
80
	tl.taskMap[task.Key()] = task
81 82
	partner.requests++
	tl.pQueue.Update(partner.Index())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
83 84 85 86 87 88
}

// Pop 'pops' the next task to be performed. Returns nil if no task exists.
func (tl *prq) Pop() *peerRequestTask {
	tl.lock.Lock()
	defer tl.lock.Unlock()
89 90 91
	if tl.pQueue.Len() == 0 {
		return nil
	}
Jeromy's avatar
Jeromy committed
92
	partner := tl.pQueue.Pop().(*activePartner)
93

Brian Tiger Chow's avatar
Brian Tiger Chow committed
94
	var out *peerRequestTask
95 96
	for partner.taskQueue.Len() > 0 {
		out = partner.taskQueue.Pop().(*peerRequestTask)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
97 98
		delete(tl.taskMap, out.Key())
		if out.trash {
Jeromy's avatar
Jeromy committed
99
			out = nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
100 101
			continue // discarding tasks that have been removed
		}
Jeromy's avatar
Jeromy committed
102

103
		partner.StartTask(out.Entry.Key)
Jeromy's avatar
Jeromy committed
104
		partner.requests--
Brian Tiger Chow's avatar
Brian Tiger Chow committed
105 106
		break // and return |out|
	}
Jeromy's avatar
Jeromy committed
107

108
	tl.pQueue.Push(partner)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
109 110 111 112
	return out
}

// Remove removes a task from the queue
113
func (tl *prq) Remove(k key.Key, p peer.ID) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
114 115 116 117 118 119 120
	tl.lock.Lock()
	t, ok := tl.taskMap[taskKey(p, k)]
	if ok {
		// remove the task "lazily"
		// simply mark it as trash, so it'll be dropped when popped off the
		// queue.
		t.trash = true
Jeromy's avatar
Jeromy committed
121 122

		// having canceled a block, we now account for that in the given partner
123
		tl.partners[p].requests--
Brian Tiger Chow's avatar
Brian Tiger Chow committed
124 125 126 127 128 129
	}
	tl.lock.Unlock()
}

type peerRequestTask struct {
	Entry  wantlist.Entry
130 131
	Target peer.ID

Jeromy's avatar
Jeromy committed
132
	// A callback to signal that this task has been completed
133
	Done func()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
134 135 136 137 138 139 140 141 142 143 144 145 146

	// trash in a book-keeping field
	trash bool
	// created marks the time that the task was added to the queue
	created time.Time
	index   int // book-keeping field used by the pq container
}

// Key uniquely identifies a task.
func (t *peerRequestTask) Key() string {
	return taskKey(t.Target, t.Entry.Key)
}

Jeromy's avatar
Jeromy committed
147
// Index implements pq.Elem
Brian Tiger Chow's avatar
Brian Tiger Chow committed
148 149 150 151
func (t *peerRequestTask) Index() int {
	return t.index
}

Jeromy's avatar
Jeromy committed
152
// SetIndex implements pq.Elem
Brian Tiger Chow's avatar
Brian Tiger Chow committed
153 154 155 156 157
func (t *peerRequestTask) SetIndex(i int) {
	t.index = i
}

// taskKey returns a key that uniquely identifies a task.
158
func taskKey(p peer.ID, k key.Key) string {
159
	return string(p) + string(k)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
}

// FIFO is a basic task comparator that returns tasks in the order created.
var FIFO = func(a, b *peerRequestTask) bool {
	return a.created.Before(b.created)
}

// V1 respects the target peer's wantlist priority. For tasks involving
// different peers, the oldest task is prioritized.
var V1 = func(a, b *peerRequestTask) bool {
	if a.Target == b.Target {
		return a.Entry.Priority > b.Entry.Priority
	}
	return FIFO(a, b)
}

func wrapCmp(f func(a, b *peerRequestTask) bool) func(a, b pq.Elem) bool {
	return func(a, b pq.Elem) bool {
		return f(a.(*peerRequestTask), b.(*peerRequestTask))
	}
}
181 182 183 184

type activePartner struct {

	// Active is the number of blocks this peer is currently being sent
Jeromy's avatar
Jeromy committed
185
	// active must be locked around as it will be updated externally
Jeromy's avatar
Jeromy committed
186 187
	activelk sync.Mutex
	active   int
188

189
	activeBlocks map[key.Key]struct{}
190

191
	// requests is the number of blocks this peer is currently requesting
Jeromy's avatar
Jeromy committed
192 193
	// request need not be locked around as it will only be modified under
	// the peerRequestQueue's locks
194 195
	requests int

Jeromy's avatar
Jeromy committed
196
	// for the PQ interface
197 198
	index int

Jeromy's avatar
Jeromy committed
199
	// priority queue of tasks belonging to this peer
200 201 202
	taskQueue pq.PQ
}

203 204 205
func newActivePartner() *activePartner {
	return &activePartner{
		taskQueue:    pq.New(wrapCmp(V1)),
206
		activeBlocks: make(map[key.Key]struct{}),
207 208 209
	}
}

Jeromy's avatar
Jeromy committed
210
// partnerCompare implements pq.ElemComparator
211 212 213 214 215
func partnerCompare(a, b pq.Elem) bool {
	pa := a.(*activePartner)
	pb := b.(*activePartner)

	// having no blocks in their wantlist means lowest priority
Jeromy's avatar
Jeromy committed
216
	// having both of these checks ensures stability of the sort
217 218 219 220 221 222
	if pa.requests == 0 {
		return false
	}
	if pb.requests == 0 {
		return true
	}
223 224 225 226 227 228
	if pa.active == pb.active {
		// sorting by taskQueue.Len() aids in cleaning out trash entries faster
		// if we sorted instead by requests, one peer could potentially build up
		// a huge number of cancelled entries in the queue resulting in a memory leak
		return pa.taskQueue.Len() > pb.taskQueue.Len()
	}
229 230 231
	return pa.active < pb.active
}

Jeromy's avatar
Jeromy committed
232
// StartTask signals that a task was started for this partner
233
func (p *activePartner) StartTask(k key.Key) {
Jeromy's avatar
Jeromy committed
234
	p.activelk.Lock()
235
	p.activeBlocks[k] = struct{}{}
236
	p.active++
Jeromy's avatar
Jeromy committed
237
	p.activelk.Unlock()
238 239
}

Jeromy's avatar
Jeromy committed
240
// TaskDone signals that a task was completed for this partner
241
func (p *activePartner) TaskDone(k key.Key) {
Jeromy's avatar
Jeromy committed
242
	p.activelk.Lock()
243
	delete(p.activeBlocks, k)
244 245 246 247
	p.active--
	if p.active < 0 {
		panic("more tasks finished than started!")
	}
Jeromy's avatar
Jeromy committed
248
	p.activelk.Unlock()
249 250
}

Jeromy's avatar
Jeromy committed
251
// Index implements pq.Elem
252 253 254 255
func (p *activePartner) Index() int {
	return p.index
}

Jeromy's avatar
Jeromy committed
256
// SetIndex implements pq.Elem
257 258 259
func (p *activePartner) SetIndex(i int) {
	p.index = i
}