peer_request_queue.go 7.47 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3 4 5 6
package decision

import (
	"sync"
	"time"

7
	wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
8

9
	pq "gx/ipfs/QmZUbTDJ39JpvtFCSubiWeUTQRvMA1tVE5RZCJrY4oeAsC/go-ipfs-pq"
Steven Allen's avatar
Steven Allen committed
10
	peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer"
Steven Allen's avatar
Steven Allen committed
11
	cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
12 13 14 15 16
)

type peerRequestQueue interface {
	// Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty.
	Pop() *peerRequestTask
17
	Push(entry *wantlist.Entry, to peer.ID)
18
	Remove(k *cid.Cid, p peer.ID)
Jeromy's avatar
Jeromy committed
19

Brian Tiger Chow's avatar
Brian Tiger Chow committed
20 21 22 23
	// NB: cannot expose simply expose taskQueue.Len because trashed elements
	// may exist. These trashed elements should not contribute to the count.
}

Jeromy's avatar
Jeromy committed
24
func newPRQ() *prq {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
25
	return &prq{
26 27
		taskMap:  make(map[string]*peerRequestTask),
		partners: make(map[peer.ID]*activePartner),
Jeromy's avatar
Jeromy committed
28
		frozen:   make(map[peer.ID]*activePartner),
29
		pQueue:   pq.New(partnerCompare),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
30 31 32
	}
}

Jeromy's avatar
Jeromy committed
33
// verify interface implementation
Brian Tiger Chow's avatar
Brian Tiger Chow committed
34 35 36 37 38 39
var _ peerRequestQueue = &prq{}

// TODO: at some point, the strategy needs to plug in here
// to help decide how to sort tasks (on add) and how to select
// tasks (on getnext). For now, we are assuming a dumb/nice strategy.
type prq struct {
40 41 42 43
	lock     sync.Mutex
	pQueue   pq.PQ
	taskMap  map[string]*peerRequestTask
	partners map[peer.ID]*activePartner
Jeromy's avatar
Jeromy committed
44 45

	frozen map[peer.ID]*activePartner
Brian Tiger Chow's avatar
Brian Tiger Chow committed
46 47 48
}

// Push currently adds a new peerRequestTask to the end of the list
49
func (tl *prq) Push(entry *wantlist.Entry, to peer.ID) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
50 51
	tl.lock.Lock()
	defer tl.lock.Unlock()
52 53
	partner, ok := tl.partners[to]
	if !ok {
54
		partner = newActivePartner()
55 56 57 58
		tl.pQueue.Push(partner)
		tl.partners[to] = partner
	}

59 60
	partner.activelk.Lock()
	defer partner.activelk.Unlock()
61
	if partner.activeBlocks.Has(entry.Cid) {
62 63 64
		return
	}

65
	if task, ok := tl.taskMap[taskKey(to, entry.Cid)]; ok {
66 67 68 69 70
		task.Entry.Priority = entry.Priority
		partner.taskQueue.Update(task.index)
		return
	}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
71 72 73 74
	task := &peerRequestTask{
		Entry:   entry,
		Target:  to,
		created: time.Now(),
75 76
		Done: func() {
			tl.lock.Lock()
77
			partner.TaskDone(entry.Cid)
78 79 80
			tl.pQueue.Update(partner.Index())
			tl.lock.Unlock()
		},
Brian Tiger Chow's avatar
Brian Tiger Chow committed
81
	}
82 83

	partner.taskQueue.Push(task)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
84
	tl.taskMap[task.Key()] = task
85 86
	partner.requests++
	tl.pQueue.Update(partner.Index())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
87 88 89 90 91 92
}

// Pop 'pops' the next task to be performed. Returns nil if no task exists.
func (tl *prq) Pop() *peerRequestTask {
	tl.lock.Lock()
	defer tl.lock.Unlock()
93 94 95
	if tl.pQueue.Len() == 0 {
		return nil
	}
Jeromy's avatar
Jeromy committed
96
	partner := tl.pQueue.Pop().(*activePartner)
97

Brian Tiger Chow's avatar
Brian Tiger Chow committed
98
	var out *peerRequestTask
Jeromy's avatar
Jeromy committed
99
	for partner.taskQueue.Len() > 0 && partner.freezeVal == 0 {
100
		out = partner.taskQueue.Pop().(*peerRequestTask)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
101 102
		delete(tl.taskMap, out.Key())
		if out.trash {
Jeromy's avatar
Jeromy committed
103
			out = nil
Brian Tiger Chow's avatar
Brian Tiger Chow committed
104 105
			continue // discarding tasks that have been removed
		}
Jeromy's avatar
Jeromy committed
106

107
		partner.StartTask(out.Entry.Cid)
Jeromy's avatar
Jeromy committed
108
		partner.requests--
Brian Tiger Chow's avatar
Brian Tiger Chow committed
109 110
		break // and return |out|
	}
Jeromy's avatar
Jeromy committed
111

112
	tl.pQueue.Push(partner)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
113 114 115 116
	return out
}

// Remove removes a task from the queue
117
func (tl *prq) Remove(k *cid.Cid, p peer.ID) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
118 119 120 121 122 123 124
	tl.lock.Lock()
	t, ok := tl.taskMap[taskKey(p, k)]
	if ok {
		// remove the task "lazily"
		// simply mark it as trash, so it'll be dropped when popped off the
		// queue.
		t.trash = true
Jeromy's avatar
Jeromy committed
125 126

		// having canceled a block, we now account for that in the given partner
Jeromy's avatar
Jeromy committed
127 128 129 130 131 132 133 134 135 136 137 138 139
		partner := tl.partners[p]
		partner.requests--

		// we now also 'freeze' that partner. If they sent us a cancel for a
		// block we were about to send them, we should wait a short period of time
		// to make sure we receive any other in-flight cancels before sending
		// them a block they already potentially have
		if partner.freezeVal == 0 {
			tl.frozen[p] = partner
		}

		partner.freezeVal++
		tl.pQueue.Update(partner.index)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
140 141 142 143
	}
	tl.lock.Unlock()
}

Jeromy's avatar
Jeromy committed
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
func (tl *prq) fullThaw() {
	tl.lock.Lock()
	defer tl.lock.Unlock()

	for id, partner := range tl.frozen {
		partner.freezeVal = 0
		delete(tl.frozen, id)
		tl.pQueue.Update(partner.index)
	}
}

func (tl *prq) thawRound() {
	tl.lock.Lock()
	defer tl.lock.Unlock()

	for id, partner := range tl.frozen {
		partner.freezeVal -= (partner.freezeVal + 1) / 2
		if partner.freezeVal <= 0 {
			delete(tl.frozen, id)
		}
		tl.pQueue.Update(partner.index)
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
168
type peerRequestTask struct {
169
	Entry  *wantlist.Entry
170 171
	Target peer.ID

Jeromy's avatar
Jeromy committed
172
	// A callback to signal that this task has been completed
173
	Done func()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
174 175 176 177 178 179 180 181 182 183

	// trash in a book-keeping field
	trash bool
	// created marks the time that the task was added to the queue
	created time.Time
	index   int // book-keeping field used by the pq container
}

// Key uniquely identifies a task.
func (t *peerRequestTask) Key() string {
184
	return taskKey(t.Target, t.Entry.Cid)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
185 186
}

Jeromy's avatar
Jeromy committed
187
// Index implements pq.Elem
Brian Tiger Chow's avatar
Brian Tiger Chow committed
188 189 190 191
func (t *peerRequestTask) Index() int {
	return t.index
}

Jeromy's avatar
Jeromy committed
192
// SetIndex implements pq.Elem
Brian Tiger Chow's avatar
Brian Tiger Chow committed
193 194 195 196 197
func (t *peerRequestTask) SetIndex(i int) {
	t.index = i
}

// taskKey returns a key that uniquely identifies a task.
198 199
func taskKey(p peer.ID, k *cid.Cid) string {
	return string(p) + k.KeyString()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
}

// FIFO is a basic task comparator that returns tasks in the order created.
var FIFO = func(a, b *peerRequestTask) bool {
	return a.created.Before(b.created)
}

// V1 respects the target peer's wantlist priority. For tasks involving
// different peers, the oldest task is prioritized.
var V1 = func(a, b *peerRequestTask) bool {
	if a.Target == b.Target {
		return a.Entry.Priority > b.Entry.Priority
	}
	return FIFO(a, b)
}

func wrapCmp(f func(a, b *peerRequestTask) bool) func(a, b pq.Elem) bool {
	return func(a, b pq.Elem) bool {
		return f(a.(*peerRequestTask), b.(*peerRequestTask))
	}
}
221 222 223 224

type activePartner struct {

	// Active is the number of blocks this peer is currently being sent
Jeromy's avatar
Jeromy committed
225
	// active must be locked around as it will be updated externally
Jeromy's avatar
Jeromy committed
226 227
	activelk sync.Mutex
	active   int
228

229
	activeBlocks *cid.Set
230

231
	// requests is the number of blocks this peer is currently requesting
Jeromy's avatar
Jeromy committed
232 233
	// request need not be locked around as it will only be modified under
	// the peerRequestQueue's locks
234 235
	requests int

Jeromy's avatar
Jeromy committed
236
	// for the PQ interface
237 238
	index int

Jeromy's avatar
Jeromy committed
239 240
	freezeVal int

Jeromy's avatar
Jeromy committed
241
	// priority queue of tasks belonging to this peer
242 243 244
	taskQueue pq.PQ
}

245 246 247
func newActivePartner() *activePartner {
	return &activePartner{
		taskQueue:    pq.New(wrapCmp(V1)),
248
		activeBlocks: cid.NewSet(),
249 250 251
	}
}

Jeromy's avatar
Jeromy committed
252
// partnerCompare implements pq.ElemComparator
Jeromy's avatar
Jeromy committed
253
// returns true if peer 'a' has higher priority than peer 'b'
254 255 256 257 258
func partnerCompare(a, b pq.Elem) bool {
	pa := a.(*activePartner)
	pb := b.(*activePartner)

	// having no blocks in their wantlist means lowest priority
Jeromy's avatar
Jeromy committed
259
	// having both of these checks ensures stability of the sort
260 261 262 263 264 265
	if pa.requests == 0 {
		return false
	}
	if pb.requests == 0 {
		return true
	}
Jeromy's avatar
Jeromy committed
266 267 268 269 270 271 272 273

	if pa.freezeVal > pb.freezeVal {
		return false
	}
	if pa.freezeVal < pb.freezeVal {
		return true
	}

274 275 276 277 278 279
	if pa.active == pb.active {
		// sorting by taskQueue.Len() aids in cleaning out trash entries faster
		// if we sorted instead by requests, one peer could potentially build up
		// a huge number of cancelled entries in the queue resulting in a memory leak
		return pa.taskQueue.Len() > pb.taskQueue.Len()
	}
280 281 282
	return pa.active < pb.active
}

Jeromy's avatar
Jeromy committed
283
// StartTask signals that a task was started for this partner
284
func (p *activePartner) StartTask(k *cid.Cid) {
Jeromy's avatar
Jeromy committed
285
	p.activelk.Lock()
286
	p.activeBlocks.Add(k)
287
	p.active++
Jeromy's avatar
Jeromy committed
288
	p.activelk.Unlock()
289 290
}

Jeromy's avatar
Jeromy committed
291
// TaskDone signals that a task was completed for this partner
292
func (p *activePartner) TaskDone(k *cid.Cid) {
Jeromy's avatar
Jeromy committed
293
	p.activelk.Lock()
294
	p.activeBlocks.Remove(k)
295 296 297 298
	p.active--
	if p.active < 0 {
		panic("more tasks finished than started!")
	}
Jeromy's avatar
Jeromy committed
299
	p.activelk.Unlock()
300 301
}

Jeromy's avatar
Jeromy committed
302
// Index implements pq.Elem
303 304 305 306
func (p *activePartner) Index() int {
	return p.index
}

Jeromy's avatar
Jeromy committed
307
// SetIndex implements pq.Elem
308 309 310
func (p *activePartner) SetIndex(i int) {
	p.index = i
}