peer_request_queue.go 8.04 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3 4 5 6
package decision

import (
	"sync"
	"time"

Jeromy's avatar
Jeromy committed
7
	wantlist "github.com/ipfs/go-bitswap/wantlist"
8

Jeromy's avatar
Jeromy committed
9 10 11
	cid "github.com/ipfs/go-cid"
	pq "github.com/ipfs/go-ipfs-pq"
	peer "github.com/libp2p/go-libp2p-peer"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
12 13 14 15 16
)

type peerRequestQueue interface {
	// Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty.
	Pop() *peerRequestTask
17
	Push(to peer.ID, entries ...*wantlist.Entry)
18
	Remove(k cid.Cid, p peer.ID)
Jeromy's avatar
Jeromy committed
19

Brian Tiger Chow's avatar
Brian Tiger Chow committed
20 21 22 23
	// NB: cannot expose simply expose taskQueue.Len because trashed elements
	// may exist. These trashed elements should not contribute to the count.
}

Jeromy's avatar
Jeromy committed
24
func newPRQ() *prq {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
25
	return &prq{
26 27
		taskMap:  make(map[string]*peerRequestTask),
		partners: make(map[peer.ID]*activePartner),
Jeromy's avatar
Jeromy committed
28
		frozen:   make(map[peer.ID]*activePartner),
29
		pQueue:   pq.New(partnerCompare),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
30 31 32
	}
}

Jeromy's avatar
Jeromy committed
33
// verify interface implementation
Brian Tiger Chow's avatar
Brian Tiger Chow committed
34 35 36 37 38 39
var _ peerRequestQueue = &prq{}

// TODO: at some point, the strategy needs to plug in here
// to help decide how to sort tasks (on add) and how to select
// tasks (on getnext). For now, we are assuming a dumb/nice strategy.
type prq struct {
40 41 42 43
	lock     sync.Mutex
	pQueue   pq.PQ
	taskMap  map[string]*peerRequestTask
	partners map[peer.ID]*activePartner
Jeromy's avatar
Jeromy committed
44 45

	frozen map[peer.ID]*activePartner
Brian Tiger Chow's avatar
Brian Tiger Chow committed
46 47 48
}

// Push currently adds a new peerRequestTask to the end of the list
49
func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
50 51
	tl.lock.Lock()
	defer tl.lock.Unlock()
52 53
	partner, ok := tl.partners[to]
	if !ok {
54
		partner = newActivePartner()
55 56 57 58
		tl.pQueue.Push(partner)
		tl.partners[to] = partner
	}

59 60
	partner.activelk.Lock()
	defer partner.activelk.Unlock()
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78

	var priority int
	newEntries := make([]*wantlist.Entry, 0, len(entries))
	for _, entry := range entries {
		if partner.activeBlocks.Has(entry.Cid) {
			continue
		}
		if task, ok := tl.taskMap[taskEntryKey(to, entry.Cid)]; ok {
			if entry.Priority > task.Priority {
				task.Priority = entry.Priority
				partner.taskQueue.Update(task.index)
			}
			continue
		}
		if entry.Priority > priority {
			priority = entry.Priority
		}
		newEntries = append(newEntries, entry)
79 80
	}

81
	if len(newEntries) == 0 {
82 83 84
		return
	}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
85
	task := &peerRequestTask{
86
		Entries: newEntries,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
87 88
		Target:  to,
		created: time.Now(),
89
		Done: func(e []*wantlist.Entry) {
90
			tl.lock.Lock()
91 92 93
			for _, entry := range e {
				partner.TaskDone(entry.Cid)
			}
94 95 96
			tl.pQueue.Update(partner.Index())
			tl.lock.Unlock()
		},
Brian Tiger Chow's avatar
Brian Tiger Chow committed
97
	}
98
	task.Priority = priority
99
	partner.taskQueue.Push(task)
100 101 102 103
	for _, entry := range newEntries {
		tl.taskMap[taskEntryKey(to, entry.Cid)] = task
	}
	partner.requests += len(newEntries)
104
	tl.pQueue.Update(partner.Index())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
105 106 107 108 109 110
}

// Pop 'pops' the next task to be performed. Returns nil if no task exists.
func (tl *prq) Pop() *peerRequestTask {
	tl.lock.Lock()
	defer tl.lock.Unlock()
111 112 113
	if tl.pQueue.Len() == 0 {
		return nil
	}
Jeromy's avatar
Jeromy committed
114
	partner := tl.pQueue.Pop().(*activePartner)
115

Brian Tiger Chow's avatar
Brian Tiger Chow committed
116
	var out *peerRequestTask
Jeromy's avatar
Jeromy committed
117
	for partner.taskQueue.Len() > 0 && partner.freezeVal == 0 {
118
		out = partner.taskQueue.Pop().(*peerRequestTask)
Jeromy's avatar
Jeromy committed
119

120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
		newEntries := make([]*wantlist.Entry, 0, len(out.Entries))
		for _, entry := range out.Entries {
			delete(tl.taskMap, taskEntryKey(out.Target, entry.Cid))
			if entry.Trash {
				continue
			}
			partner.requests--
			partner.StartTask(entry.Cid)
			newEntries = append(newEntries, entry)
		}
		if len(newEntries) > 0 {
			out.Entries = newEntries
		} else {
			out = nil // discarding tasks that have been removed
			continue
		}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
136 137
		break // and return |out|
	}
Jeromy's avatar
Jeromy committed
138

139
	tl.pQueue.Push(partner)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
140 141 142 143
	return out
}

// Remove removes a task from the queue
144
func (tl *prq) Remove(k cid.Cid, p peer.ID) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
145
	tl.lock.Lock()
146
	t, ok := tl.taskMap[taskEntryKey(p, k)]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
147
	if ok {
148 149 150 151 152 153 154 155 156
		for _, entry := range t.Entries {
			if entry.Cid.Equals(k) {
				// remove the task "lazily"
				// simply mark it as trash, so it'll be dropped when popped off the
				// queue.
				entry.Trash = true
				break
			}
		}
Jeromy's avatar
Jeromy committed
157 158

		// having canceled a block, we now account for that in the given partner
Jeromy's avatar
Jeromy committed
159 160 161 162 163 164 165 166 167 168 169 170 171
		partner := tl.partners[p]
		partner.requests--

		// we now also 'freeze' that partner. If they sent us a cancel for a
		// block we were about to send them, we should wait a short period of time
		// to make sure we receive any other in-flight cancels before sending
		// them a block they already potentially have
		if partner.freezeVal == 0 {
			tl.frozen[p] = partner
		}

		partner.freezeVal++
		tl.pQueue.Update(partner.index)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
172 173 174 175
	}
	tl.lock.Unlock()
}

Jeromy's avatar
Jeromy committed
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
func (tl *prq) fullThaw() {
	tl.lock.Lock()
	defer tl.lock.Unlock()

	for id, partner := range tl.frozen {
		partner.freezeVal = 0
		delete(tl.frozen, id)
		tl.pQueue.Update(partner.index)
	}
}

func (tl *prq) thawRound() {
	tl.lock.Lock()
	defer tl.lock.Unlock()

	for id, partner := range tl.frozen {
		partner.freezeVal -= (partner.freezeVal + 1) / 2
		if partner.freezeVal <= 0 {
			delete(tl.frozen, id)
		}
		tl.pQueue.Update(partner.index)
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
200
type peerRequestTask struct {
201 202 203
	Entries  []*wantlist.Entry
	Priority int
	Target   peer.ID
204

Jeromy's avatar
Jeromy committed
205
	// A callback to signal that this task has been completed
206
	Done func([]*wantlist.Entry)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
207 208 209 210 211 212

	// created marks the time that the task was added to the queue
	created time.Time
	index   int // book-keeping field used by the pq container
}

Jeromy's avatar
Jeromy committed
213
// Index implements pq.Elem
Brian Tiger Chow's avatar
Brian Tiger Chow committed
214 215 216 217
func (t *peerRequestTask) Index() int {
	return t.index
}

Jeromy's avatar
Jeromy committed
218
// SetIndex implements pq.Elem
Brian Tiger Chow's avatar
Brian Tiger Chow committed
219 220 221 222
func (t *peerRequestTask) SetIndex(i int) {
	t.index = i
}

223 224
// taskEntryKey returns a key that uniquely identifies a task.
func taskEntryKey(p peer.ID, k cid.Cid) string {
225
	return string(p) + k.KeyString()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
226 227 228 229 230 231 232 233 234 235 236
}

// FIFO is a basic task comparator that returns tasks in the order created.
var FIFO = func(a, b *peerRequestTask) bool {
	return a.created.Before(b.created)
}

// V1 respects the target peer's wantlist priority. For tasks involving
// different peers, the oldest task is prioritized.
var V1 = func(a, b *peerRequestTask) bool {
	if a.Target == b.Target {
237
		return a.Priority > b.Priority
Brian Tiger Chow's avatar
Brian Tiger Chow committed
238 239 240 241 242 243 244 245 246
	}
	return FIFO(a, b)
}

func wrapCmp(f func(a, b *peerRequestTask) bool) func(a, b pq.Elem) bool {
	return func(a, b pq.Elem) bool {
		return f(a.(*peerRequestTask), b.(*peerRequestTask))
	}
}
247 248 249 250

type activePartner struct {

	// Active is the number of blocks this peer is currently being sent
Jeromy's avatar
Jeromy committed
251
	// active must be locked around as it will be updated externally
Jeromy's avatar
Jeromy committed
252 253
	activelk sync.Mutex
	active   int
254

255
	activeBlocks *cid.Set
256

257
	// requests is the number of blocks this peer is currently requesting
Jeromy's avatar
Jeromy committed
258 259
	// request need not be locked around as it will only be modified under
	// the peerRequestQueue's locks
260 261
	requests int

Jeromy's avatar
Jeromy committed
262
	// for the PQ interface
263 264
	index int

Jeromy's avatar
Jeromy committed
265 266
	freezeVal int

Jeromy's avatar
Jeromy committed
267
	// priority queue of tasks belonging to this peer
268 269 270
	taskQueue pq.PQ
}

271 272 273
func newActivePartner() *activePartner {
	return &activePartner{
		taskQueue:    pq.New(wrapCmp(V1)),
274
		activeBlocks: cid.NewSet(),
275 276 277
	}
}

Jeromy's avatar
Jeromy committed
278
// partnerCompare implements pq.ElemComparator
Jeromy's avatar
Jeromy committed
279
// returns true if peer 'a' has higher priority than peer 'b'
280 281 282 283 284
func partnerCompare(a, b pq.Elem) bool {
	pa := a.(*activePartner)
	pb := b.(*activePartner)

	// having no blocks in their wantlist means lowest priority
Jeromy's avatar
Jeromy committed
285
	// having both of these checks ensures stability of the sort
286 287 288 289 290 291
	if pa.requests == 0 {
		return false
	}
	if pb.requests == 0 {
		return true
	}
Jeromy's avatar
Jeromy committed
292 293 294 295 296 297 298 299

	if pa.freezeVal > pb.freezeVal {
		return false
	}
	if pa.freezeVal < pb.freezeVal {
		return true
	}

300 301 302 303 304 305
	if pa.active == pb.active {
		// sorting by taskQueue.Len() aids in cleaning out trash entries faster
		// if we sorted instead by requests, one peer could potentially build up
		// a huge number of cancelled entries in the queue resulting in a memory leak
		return pa.taskQueue.Len() > pb.taskQueue.Len()
	}
306 307 308
	return pa.active < pb.active
}

Jeromy's avatar
Jeromy committed
309
// StartTask signals that a task was started for this partner
310
func (p *activePartner) StartTask(k cid.Cid) {
Jeromy's avatar
Jeromy committed
311
	p.activelk.Lock()
312
	p.activeBlocks.Add(k)
313
	p.active++
Jeromy's avatar
Jeromy committed
314
	p.activelk.Unlock()
315 316
}

Jeromy's avatar
Jeromy committed
317
// TaskDone signals that a task was completed for this partner
318
func (p *activePartner) TaskDone(k cid.Cid) {
Jeromy's avatar
Jeromy committed
319
	p.activelk.Lock()
320
	p.activeBlocks.Remove(k)
321 322 323 324
	p.active--
	if p.active < 0 {
		panic("more tasks finished than started!")
	}
Jeromy's avatar
Jeromy committed
325
	p.activelk.Unlock()
326 327
}

Jeromy's avatar
Jeromy committed
328
// Index implements pq.Elem
329 330 331 332
func (p *activePartner) Index() int {
	return p.index
}

Jeromy's avatar
Jeromy committed
333
// SetIndex implements pq.Elem
334 335 336
func (p *activePartner) SetIndex(i int) {
	p.index = i
}