peer_request_queue.go 8 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1 2 3 4 5 6
package decision

import (
	"sync"
	"time"

Jeromy's avatar
Jeromy committed
7
	wantlist "github.com/ipfs/go-bitswap/wantlist"
8

Jeromy's avatar
Jeromy committed
9 10 11
	cid "github.com/ipfs/go-cid"
	pq "github.com/ipfs/go-ipfs-pq"
	peer "github.com/libp2p/go-libp2p-peer"
Brian Tiger Chow's avatar
Brian Tiger Chow committed
12 13 14 15 16
)

type peerRequestQueue interface {
	// Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty.
	Pop() *peerRequestTask
17
	Push(to peer.ID, entries ...*wantlist.Entry)
18
	Remove(k cid.Cid, p peer.ID)
Jeromy's avatar
Jeromy committed
19

Brian Tiger Chow's avatar
Brian Tiger Chow committed
20 21 22 23
	// NB: cannot expose simply expose taskQueue.Len because trashed elements
	// may exist. These trashed elements should not contribute to the count.
}

Jeromy's avatar
Jeromy committed
24
func newPRQ() *prq {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
25
	return &prq{
Steven Allen's avatar
Steven Allen committed
26
		taskMap:  make(map[taskEntryKey]*peerRequestTask),
27
		partners: make(map[peer.ID]*activePartner),
Jeromy's avatar
Jeromy committed
28
		frozen:   make(map[peer.ID]*activePartner),
29
		pQueue:   pq.New(partnerCompare),
Brian Tiger Chow's avatar
Brian Tiger Chow committed
30 31 32
	}
}

Jeromy's avatar
Jeromy committed
33
// verify interface implementation
Brian Tiger Chow's avatar
Brian Tiger Chow committed
34 35 36 37 38 39
var _ peerRequestQueue = &prq{}

// TODO: at some point, the strategy needs to plug in here
// to help decide how to sort tasks (on add) and how to select
// tasks (on getnext). For now, we are assuming a dumb/nice strategy.
type prq struct {
40 41
	lock     sync.Mutex
	pQueue   pq.PQ
Steven Allen's avatar
Steven Allen committed
42
	taskMap  map[taskEntryKey]*peerRequestTask
43
	partners map[peer.ID]*activePartner
Jeromy's avatar
Jeromy committed
44 45

	frozen map[peer.ID]*activePartner
Brian Tiger Chow's avatar
Brian Tiger Chow committed
46 47 48
}

// Push currently adds a new peerRequestTask to the end of the list
49
func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
50 51
	tl.lock.Lock()
	defer tl.lock.Unlock()
52 53
	partner, ok := tl.partners[to]
	if !ok {
54
		partner = newActivePartner()
55 56 57 58
		tl.pQueue.Push(partner)
		tl.partners[to] = partner
	}

59 60
	partner.activelk.Lock()
	defer partner.activelk.Unlock()
61 62 63 64 65 66 67

	var priority int
	newEntries := make([]*wantlist.Entry, 0, len(entries))
	for _, entry := range entries {
		if partner.activeBlocks.Has(entry.Cid) {
			continue
		}
Steven Allen's avatar
Steven Allen committed
68
		if task, ok := tl.taskMap[taskEntryKey{to, entry.Cid}]; ok {
69 70 71 72 73 74 75 76 77 78
			if entry.Priority > task.Priority {
				task.Priority = entry.Priority
				partner.taskQueue.Update(task.index)
			}
			continue
		}
		if entry.Priority > priority {
			priority = entry.Priority
		}
		newEntries = append(newEntries, entry)
79 80
	}

81
	if len(newEntries) == 0 {
82 83 84
		return
	}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
85
	task := &peerRequestTask{
86
		Entries: newEntries,
Brian Tiger Chow's avatar
Brian Tiger Chow committed
87 88
		Target:  to,
		created: time.Now(),
89
		Done: func(e []*wantlist.Entry) {
90
			tl.lock.Lock()
91 92 93
			for _, entry := range e {
				partner.TaskDone(entry.Cid)
			}
94 95 96
			tl.pQueue.Update(partner.Index())
			tl.lock.Unlock()
		},
Brian Tiger Chow's avatar
Brian Tiger Chow committed
97
	}
98
	task.Priority = priority
99
	partner.taskQueue.Push(task)
100
	for _, entry := range newEntries {
Steven Allen's avatar
Steven Allen committed
101
		tl.taskMap[taskEntryKey{to, entry.Cid}] = task
102 103
	}
	partner.requests += len(newEntries)
104
	tl.pQueue.Update(partner.Index())
Brian Tiger Chow's avatar
Brian Tiger Chow committed
105 106 107 108 109 110
}

// Pop 'pops' the next task to be performed. Returns nil if no task exists.
func (tl *prq) Pop() *peerRequestTask {
	tl.lock.Lock()
	defer tl.lock.Unlock()
111 112 113
	if tl.pQueue.Len() == 0 {
		return nil
	}
Jeromy's avatar
Jeromy committed
114
	partner := tl.pQueue.Pop().(*activePartner)
115

Brian Tiger Chow's avatar
Brian Tiger Chow committed
116
	var out *peerRequestTask
Jeromy's avatar
Jeromy committed
117
	for partner.taskQueue.Len() > 0 && partner.freezeVal == 0 {
118
		out = partner.taskQueue.Pop().(*peerRequestTask)
Jeromy's avatar
Jeromy committed
119

120 121
		newEntries := make([]*wantlist.Entry, 0, len(out.Entries))
		for _, entry := range out.Entries {
Steven Allen's avatar
Steven Allen committed
122
			delete(tl.taskMap, taskEntryKey{out.Target, entry.Cid})
123 124 125 126 127 128 129 130 131 132 133 134 135
			if entry.Trash {
				continue
			}
			partner.requests--
			partner.StartTask(entry.Cid)
			newEntries = append(newEntries, entry)
		}
		if len(newEntries) > 0 {
			out.Entries = newEntries
		} else {
			out = nil // discarding tasks that have been removed
			continue
		}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
136 137
		break // and return |out|
	}
Jeromy's avatar
Jeromy committed
138

139
	tl.pQueue.Push(partner)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
140 141 142 143
	return out
}

// Remove removes a task from the queue
144
func (tl *prq) Remove(k cid.Cid, p peer.ID) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
145
	tl.lock.Lock()
Steven Allen's avatar
Steven Allen committed
146
	t, ok := tl.taskMap[taskEntryKey{p, k}]
Brian Tiger Chow's avatar
Brian Tiger Chow committed
147
	if ok {
148 149 150 151 152 153 154 155 156
		for _, entry := range t.Entries {
			if entry.Cid.Equals(k) {
				// remove the task "lazily"
				// simply mark it as trash, so it'll be dropped when popped off the
				// queue.
				entry.Trash = true
				break
			}
		}
Jeromy's avatar
Jeromy committed
157 158

		// having canceled a block, we now account for that in the given partner
Jeromy's avatar
Jeromy committed
159 160 161 162 163 164 165 166 167 168 169 170 171
		partner := tl.partners[p]
		partner.requests--

		// we now also 'freeze' that partner. If they sent us a cancel for a
		// block we were about to send them, we should wait a short period of time
		// to make sure we receive any other in-flight cancels before sending
		// them a block they already potentially have
		if partner.freezeVal == 0 {
			tl.frozen[p] = partner
		}

		partner.freezeVal++
		tl.pQueue.Update(partner.index)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
172 173 174 175
	}
	tl.lock.Unlock()
}

Jeromy's avatar
Jeromy committed
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
func (tl *prq) fullThaw() {
	tl.lock.Lock()
	defer tl.lock.Unlock()

	for id, partner := range tl.frozen {
		partner.freezeVal = 0
		delete(tl.frozen, id)
		tl.pQueue.Update(partner.index)
	}
}

func (tl *prq) thawRound() {
	tl.lock.Lock()
	defer tl.lock.Unlock()

	for id, partner := range tl.frozen {
		partner.freezeVal -= (partner.freezeVal + 1) / 2
		if partner.freezeVal <= 0 {
			delete(tl.frozen, id)
		}
		tl.pQueue.Update(partner.index)
	}
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
200
type peerRequestTask struct {
201 202 203
	Entries  []*wantlist.Entry
	Priority int
	Target   peer.ID
204

Jeromy's avatar
Jeromy committed
205
	// A callback to signal that this task has been completed
206
	Done func([]*wantlist.Entry)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
207 208 209 210 211 212

	// created marks the time that the task was added to the queue
	created time.Time
	index   int // book-keeping field used by the pq container
}

Jeromy's avatar
Jeromy committed
213
// Index implements pq.Elem
Brian Tiger Chow's avatar
Brian Tiger Chow committed
214 215 216 217
func (t *peerRequestTask) Index() int {
	return t.index
}

Jeromy's avatar
Jeromy committed
218
// SetIndex implements pq.Elem
Brian Tiger Chow's avatar
Brian Tiger Chow committed
219 220 221 222
func (t *peerRequestTask) SetIndex(i int) {
	t.index = i
}

Steven Allen's avatar
Steven Allen committed
223 224 225 226
// taskEntryKey is a key identifying a task.
type taskEntryKey struct {
	p peer.ID
	k cid.Cid
Brian Tiger Chow's avatar
Brian Tiger Chow committed
227 228 229 230 231 232 233 234 235 236 237
}

// FIFO is a basic task comparator that returns tasks in the order created.
var FIFO = func(a, b *peerRequestTask) bool {
	return a.created.Before(b.created)
}

// V1 respects the target peer's wantlist priority. For tasks involving
// different peers, the oldest task is prioritized.
var V1 = func(a, b *peerRequestTask) bool {
	if a.Target == b.Target {
238
		return a.Priority > b.Priority
Brian Tiger Chow's avatar
Brian Tiger Chow committed
239 240 241 242 243 244 245 246 247
	}
	return FIFO(a, b)
}

func wrapCmp(f func(a, b *peerRequestTask) bool) func(a, b pq.Elem) bool {
	return func(a, b pq.Elem) bool {
		return f(a.(*peerRequestTask), b.(*peerRequestTask))
	}
}
248 249 250 251

type activePartner struct {

	// Active is the number of blocks this peer is currently being sent
Jeromy's avatar
Jeromy committed
252
	// active must be locked around as it will be updated externally
Jeromy's avatar
Jeromy committed
253 254
	activelk sync.Mutex
	active   int
255

256
	activeBlocks *cid.Set
257

258
	// requests is the number of blocks this peer is currently requesting
Jeromy's avatar
Jeromy committed
259 260
	// request need not be locked around as it will only be modified under
	// the peerRequestQueue's locks
261 262
	requests int

Jeromy's avatar
Jeromy committed
263
	// for the PQ interface
264 265
	index int

Jeromy's avatar
Jeromy committed
266 267
	freezeVal int

Jeromy's avatar
Jeromy committed
268
	// priority queue of tasks belonging to this peer
269 270 271
	taskQueue pq.PQ
}

272 273 274
func newActivePartner() *activePartner {
	return &activePartner{
		taskQueue:    pq.New(wrapCmp(V1)),
275
		activeBlocks: cid.NewSet(),
276 277 278
	}
}

Jeromy's avatar
Jeromy committed
279
// partnerCompare implements pq.ElemComparator
Jeromy's avatar
Jeromy committed
280
// returns true if peer 'a' has higher priority than peer 'b'
281 282 283 284 285
func partnerCompare(a, b pq.Elem) bool {
	pa := a.(*activePartner)
	pb := b.(*activePartner)

	// having no blocks in their wantlist means lowest priority
Jeromy's avatar
Jeromy committed
286
	// having both of these checks ensures stability of the sort
287 288 289 290 291 292
	if pa.requests == 0 {
		return false
	}
	if pb.requests == 0 {
		return true
	}
Jeromy's avatar
Jeromy committed
293 294 295 296 297 298 299 300

	if pa.freezeVal > pb.freezeVal {
		return false
	}
	if pa.freezeVal < pb.freezeVal {
		return true
	}

301 302 303 304 305 306
	if pa.active == pb.active {
		// sorting by taskQueue.Len() aids in cleaning out trash entries faster
		// if we sorted instead by requests, one peer could potentially build up
		// a huge number of cancelled entries in the queue resulting in a memory leak
		return pa.taskQueue.Len() > pb.taskQueue.Len()
	}
307 308 309
	return pa.active < pb.active
}

Jeromy's avatar
Jeromy committed
310
// StartTask signals that a task was started for this partner
311
func (p *activePartner) StartTask(k cid.Cid) {
Jeromy's avatar
Jeromy committed
312
	p.activelk.Lock()
313
	p.activeBlocks.Add(k)
314
	p.active++
Jeromy's avatar
Jeromy committed
315
	p.activelk.Unlock()
316 317
}

Jeromy's avatar
Jeromy committed
318
// TaskDone signals that a task was completed for this partner
319
func (p *activePartner) TaskDone(k cid.Cid) {
Jeromy's avatar
Jeromy committed
320
	p.activelk.Lock()
321
	p.activeBlocks.Remove(k)
322 323 324 325
	p.active--
	if p.active < 0 {
		panic("more tasks finished than started!")
	}
Jeromy's avatar
Jeromy committed
326
	p.activelk.Unlock()
327 328
}

Jeromy's avatar
Jeromy committed
329
// Index implements pq.Elem
330 331 332 333
func (p *activePartner) Index() int {
	return p.index
}

Jeromy's avatar
Jeromy committed
334
// SetIndex implements pq.Elem
335 336 337
func (p *activePartner) SetIndex(i int) {
	p.index = i
}