peerwantmanager.go 11.9 KB
Newer Older
dirkmc's avatar
dirkmc committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
package peermanager

import (
	"bytes"
	"fmt"

	cid "github.com/ipfs/go-cid"
	peer "github.com/libp2p/go-libp2p-core/peer"
)

// Gauge can be used to keep track of a metric that increases and decreases
// incrementally. It is used by the peerWantManager to track the number of
// want-blocks that are active (ie sent but no response received)
type Gauge interface {
	Inc()
	Dec()
}

// peerWantManager keeps track of which want-haves and want-blocks have been
// sent to each peer, so that the PeerManager doesn't send duplicates.
type peerWantManager struct {
22 23 24
	// peerWants maps peers to outstanding wants.
	// A peer's wants is the _union_ of the broadcast wants and the wants in
	// this list.
dirkmc's avatar
dirkmc committed
25
	peerWants map[peer.ID]*peerWant
26 27

	// Reverse index of all wants in peerWants.
28
	wantPeers map[cid.Cid]map[peer.ID]struct{}
29 30 31 32

	// broadcastWants tracks all the current broadcast wants.
	broadcastWants *cid.Set

dirkmc's avatar
dirkmc committed
33 34
	// Keeps track of the number of active want-haves & want-blocks
	wantGauge Gauge
dirkmc's avatar
dirkmc committed
35 36 37 38 39 40 41
	// Keeps track of the number of active want-blocks
	wantBlockGauge Gauge
}

type peerWant struct {
	wantBlocks *cid.Set
	wantHaves  *cid.Set
42
	peerQueue  PeerQueue
dirkmc's avatar
dirkmc committed
43 44 45 46
}

// New creates a new peerWantManager with a Gauge that keeps track of the
// number of active want-blocks (ie sent but no response received)
dirkmc's avatar
dirkmc committed
47
func newPeerWantManager(wantGauge Gauge, wantBlockGauge Gauge) *peerWantManager {
dirkmc's avatar
dirkmc committed
48
	return &peerWantManager{
49
		broadcastWants: cid.NewSet(),
dirkmc's avatar
dirkmc committed
50
		peerWants:      make(map[peer.ID]*peerWant),
51
		wantPeers:      make(map[cid.Cid]map[peer.ID]struct{}),
dirkmc's avatar
dirkmc committed
52
		wantGauge:      wantGauge,
dirkmc's avatar
dirkmc committed
53 54 55 56
		wantBlockGauge: wantBlockGauge,
	}
}

57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
// addPeer adds a peer whose wants we need to keep track of. It sends the
// current list of broadcast wants to the peer.
func (pwm *peerWantManager) addPeer(peerQueue PeerQueue, p peer.ID) {
	if _, ok := pwm.peerWants[p]; ok {
		return
	}

	pwm.peerWants[p] = &peerWant{
		wantBlocks: cid.NewSet(),
		wantHaves:  cid.NewSet(),
		peerQueue:  peerQueue,
	}

	// Broadcast any live want-haves to the newly connected peer
	if pwm.broadcastWants.Len() > 0 {
		wants := pwm.broadcastWants.Keys()
		peerQueue.AddBroadcastWantHaves(wants)
dirkmc's avatar
dirkmc committed
74 75 76 77
	}
}

// RemovePeer removes a peer and its associated wants from tracking
78
func (pwm *peerWantManager) removePeer(p peer.ID) {
79 80 81 82 83
	pws, ok := pwm.peerWants[p]
	if !ok {
		return
	}

dirkmc's avatar
dirkmc committed
84
	// Clean up want-blocks
85
	_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
86
		// Clean up want-blocks from the reverse index
Dirk McCormick's avatar
Dirk McCormick committed
87
		pwm.reverseIndexRemove(c, p)
dirkmc's avatar
dirkmc committed
88 89

		// Decrement the gauges by the number of pending want-blocks to the peer
Dirk McCormick's avatar
Dirk McCormick committed
90 91
		peersWantingBlock, peersWantingHave := pwm.peersWanting(c)
		if peersWantingBlock == 0 {
dirkmc's avatar
dirkmc committed
92
			pwm.wantBlockGauge.Dec()
Dirk McCormick's avatar
Dirk McCormick committed
93
			if peersWantingHave == 0 && !pwm.broadcastWants.Has(c) {
dirkmc's avatar
dirkmc committed
94 95 96
				pwm.wantGauge.Dec()
			}
		}
Dirk McCormick's avatar
Dirk McCormick committed
97

98 99 100
		return nil
	})

dirkmc's avatar
dirkmc committed
101
	// Clean up want-haves
102
	_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
dirkmc's avatar
dirkmc committed
103
		// Clean up want-haves from the reverse index
Dirk McCormick's avatar
Dirk McCormick committed
104
		pwm.reverseIndexRemove(c, p)
dirkmc's avatar
dirkmc committed
105 106

		// Decrement the gauge by the number of pending want-haves to the peer
Dirk McCormick's avatar
Dirk McCormick committed
107 108
		peersWantingBlock, peersWantingHave := pwm.peersWanting(c)
		if peersWantingBlock == 0 && peersWantingHave == 0 && !pwm.broadcastWants.Has(c) {
dirkmc's avatar
dirkmc committed
109 110
			pwm.wantGauge.Dec()
		}
111 112
		return nil
	})
113

dirkmc's avatar
dirkmc committed
114 115 116
	delete(pwm.peerWants, p)
}

117 118 119
// broadcastWantHaves sends want-haves to any peers that have not yet been sent them.
func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) {
	unsent := make([]cid.Cid, 0, len(wantHaves))
120 121 122 123 124 125
	for _, c := range wantHaves {
		if pwm.broadcastWants.Has(c) {
			// Already a broadcast want, skip it.
			continue
		}
		pwm.broadcastWants.Add(c)
126
		unsent = append(unsent, c)
dirkmc's avatar
dirkmc committed
127

Dirk McCormick's avatar
Dirk McCormick committed
128
		// If no peer has a pending want for the key
dirkmc's avatar
dirkmc committed
129
		if _, ok := pwm.wantPeers[c]; !ok {
Dirk McCormick's avatar
Dirk McCormick committed
130
			// Increment the total wants gauge
dirkmc's avatar
dirkmc committed
131 132
			pwm.wantGauge.Inc()
		}
133 134 135 136 137 138 139 140
	}

	if len(unsent) == 0 {
		return
	}

	// Allocate a single buffer to filter broadcast wants for each peer
	bcstWantsBuffer := make([]cid.Cid, 0, len(unsent))
141

142 143 144 145
	// Send broadcast wants to each peer
	for _, pws := range pwm.peerWants {
		peerUnsent := bcstWantsBuffer[:0]
		for _, c := range unsent {
146
			// If we've already sent a want to this peer, skip them.
147 148
			if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
				peerUnsent = append(peerUnsent, c)
149
			}
150
		}
dirkmc's avatar
dirkmc committed
151

152 153
		if len(peerUnsent) > 0 {
			pws.peerQueue.AddBroadcastWantHaves(peerUnsent)
dirkmc's avatar
dirkmc committed
154 155 156 157
		}
	}
}

158 159 160 161 162
// sendWants only sends the peer the want-blocks and want-haves that have not
// already been sent to it.
func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
	fltWantBlks := make([]cid.Cid, 0, len(wantBlocks))
	fltWantHvs := make([]cid.Cid, 0, len(wantHaves))
dirkmc's avatar
dirkmc committed
163 164

	// Get the existing want-blocks and want-haves for the peer
165 166
	pws, ok := pwm.peerWants[p]
	if !ok {
167 168 169
		// In practice this should never happen
		log.Errorf("sendWants() called with peer %s but peer not found in peerWantManager", string(p))
		return
170
	}
dirkmc's avatar
dirkmc committed
171

172 173 174
	// Iterate over the requested want-blocks
	for _, c := range wantBlocks {
		// If the want-block hasn't been sent to the peer
Dirk McCormick's avatar
Dirk McCormick committed
175 176 177
		if pws.wantBlocks.Has(c) {
			continue
		}
178

Dirk McCormick's avatar
Dirk McCormick committed
179 180 181 182 183 184
		// Increment the want gauges
		peersWantingBlock, peersWantingHave := pwm.peersWanting(c)
		if peersWantingBlock == 0 {
			pwm.wantBlockGauge.Inc()
			if peersWantingHave == 0 && !pwm.broadcastWants.Has(c) {
				pwm.wantGauge.Inc()
dirkmc's avatar
dirkmc committed
185
			}
dirkmc's avatar
dirkmc committed
186
		}
Dirk McCormick's avatar
Dirk McCormick committed
187 188 189 190 191 192 193 194 195 196 197 198

		// Make sure the CID is no longer recorded as a want-have
		pws.wantHaves.Remove(c)

		// Record that the CID was sent as a want-block
		pws.wantBlocks.Add(c)

		// Add the CID to the results
		fltWantBlks = append(fltWantBlks, c)

		// Update the reverse index
		pwm.reverseIndexAdd(c, p)
199
	}
dirkmc's avatar
dirkmc committed
200

201 202
	// Iterate over the requested want-haves
	for _, c := range wantHaves {
203 204 205 206 207 208
		// If we've already broadcasted this want, don't bother with a
		// want-have.
		if pwm.broadcastWants.Has(c) {
			continue
		}

209 210
		// If the CID has not been sent as a want-block or want-have
		if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
Dirk McCormick's avatar
Dirk McCormick committed
211 212 213 214 215 216
			// Increment the total wants gauge
			peersWantingBlock, peersWantingHave := pwm.peersWanting(c)
			if peersWantingHave == 0 && !pwm.broadcastWants.Has(c) && peersWantingBlock == 0 {
				pwm.wantGauge.Inc()
			}

217 218
			// Record that the CID was sent as a want-have
			pws.wantHaves.Add(c)
dirkmc's avatar
dirkmc committed
219

220
			// Add the CID to the results
221
			fltWantHvs = append(fltWantHvs, c)
dirkmc's avatar
dirkmc committed
222 223

			// Update the reverse index
Dirk McCormick's avatar
Dirk McCormick committed
224
			pwm.reverseIndexAdd(c, p)
dirkmc's avatar
dirkmc committed
225 226 227
		}
	}

228 229
	// Send the want-blocks and want-haves to the peer
	pws.peerQueue.AddWants(fltWantBlks, fltWantHvs)
dirkmc's avatar
dirkmc committed
230 231
}

232 233 234
// sendCancels sends a cancel to each peer to which a corresponding want was
// sent
func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
235
	if len(cancelKs) == 0 {
236
		return
237 238
	}

Dirk McCormick's avatar
Dirk McCormick committed
239 240 241 242 243 244 245 246
	// Record how many peers have a pending want-block and want-have for each
	// key to be cancelled
	peersWantingBefore := make(map[cid.Cid][]int, len(cancelKs))
	for _, c := range cancelKs {
		blks, haves := pwm.peersWanting(c)
		peersWantingBefore[c] = []int{blks, haves}
	}

247 248 249
	// Create a buffer to use for filtering cancels per peer, with the
	// broadcast wants at the front of the buffer (broadcast wants are sent to
	// all peers)
250
	broadcastCancels := make([]cid.Cid, 0, len(cancelKs))
251
	for _, c := range cancelKs {
252
		if pwm.broadcastWants.Has(c) {
253
			broadcastCancels = append(broadcastCancels, c)
254
		}
255 256 257 258
	}

	// Send cancels to a particular peer
	send := func(p peer.ID, pws *peerWant) {
259 260
		// Start from the broadcast cancels
		toCancel := broadcastCancels
261 262

		// For each key to be cancelled
263
		for _, c := range cancelKs {
264
			// Check if a want was sent for the key
Dirk McCormick's avatar
Dirk McCormick committed
265
			if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
dirkmc's avatar
dirkmc committed
266
				continue
dirkmc's avatar
dirkmc committed
267 268
			}

269 270 271
			// Unconditionally remove from the want lists.
			pws.wantBlocks.Remove(c)
			pws.wantHaves.Remove(c)
dirkmc's avatar
dirkmc committed
272

273
			// If it's a broadcast want, we've already added it to
274 275
			// the peer cancels.
			if !pwm.broadcastWants.Has(c) {
276
				toCancel = append(toCancel, c)
277 278 279
			}
		}

280
		// Send cancels to the peer
281 282
		if len(toCancel) > 0 {
			pws.peerQueue.AddCancels(toCancel)
283
		}
284 285
	}

286
	if len(broadcastCancels) > 0 {
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
		// If a broadcast want is being cancelled, send the cancel to all
		// peers
		for p, pws := range pwm.peerWants {
			send(p, pws)
		}
	} else {
		// Only send cancels to peers that received a corresponding want
		cancelPeers := make(map[peer.ID]struct{}, len(pwm.wantPeers[cancelKs[0]]))
		for _, c := range cancelKs {
			for p := range pwm.wantPeers[c] {
				cancelPeers[p] = struct{}{}
			}
		}
		for p := range cancelPeers {
			pws, ok := pwm.peerWants[p]
			if !ok {
				// Should never happen but check just in case
				log.Errorf("sendCancels - peerWantManager index missing peer %s", p)
				continue
dirkmc's avatar
dirkmc committed
306
			}
307 308

			send(p, pws)
dirkmc's avatar
dirkmc committed
309 310 311
		}
	}

Dirk McCormick's avatar
Dirk McCormick committed
312 313 314 315 316
	// Decrement the wants gauges
	for _, c := range cancelKs {
		before := peersWantingBefore[c]
		peersWantingBlockBefore := before[0]
		peersWantingHaveBefore := before[1]
dirkmc's avatar
dirkmc committed
317

Dirk McCormick's avatar
Dirk McCormick committed
318 319 320 321 322 323 324 325 326
		// If there were any peers that had a pending want-block for the key
		if peersWantingBlockBefore > 0 {
			// Decrement the want-block gauge
			pwm.wantBlockGauge.Dec()
		}

		// If there was a peer that had a pending want or it was a broadcast want
		if peersWantingBlockBefore > 0 || peersWantingHaveBefore > 0 || pwm.broadcastWants.Has(c) {
			// Decrement the total wants gauge
dirkmc's avatar
dirkmc committed
327 328
			pwm.wantGauge.Dec()
		}
329 330
	}

Dirk McCormick's avatar
Dirk McCormick committed
331 332 333 334
	// Remove cancelled broadcast wants
	for _, c := range broadcastCancels {
		pwm.broadcastWants.Remove(c)
	}
dirkmc's avatar
dirkmc committed
335

Dirk McCormick's avatar
Dirk McCormick committed
336 337
	// Batch-remove the reverse-index. There's no need to clear this index
	// peer-by-peer.
338 339 340
	for _, c := range cancelKs {
		delete(pwm.wantPeers, c)
	}
Dirk McCormick's avatar
Dirk McCormick committed
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
}

// peersWanting counts how many peers have a pending want-block and want-have
// for the given CID
func (pwm *peerWantManager) peersWanting(c cid.Cid) (int, int) {
	blockCount := 0
	haveCount := 0
	for p := range pwm.wantPeers[c] {
		pws, ok := pwm.peerWants[p]
		if !ok {
			continue
		}

		if pws.wantBlocks.Has(c) {
			blockCount++
		} else if pws.wantHaves.Has(c) {
			haveCount++
		}
	}
dirkmc's avatar
dirkmc committed
360

Dirk McCormick's avatar
Dirk McCormick committed
361
	return blockCount, haveCount
dirkmc's avatar
dirkmc committed
362 363
}

364
// Add the peer to the list of peers that have sent a want with the cid
dirkmc's avatar
dirkmc committed
365
func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) bool {
366 367
	peers, ok := pwm.wantPeers[c]
	if !ok {
368
		peers = make(map[peer.ID]struct{}, 10)
369 370 371
		pwm.wantPeers[c] = peers
	}
	peers[p] = struct{}{}
dirkmc's avatar
dirkmc committed
372
	return !ok
373 374 375
}

// Remove the peer from the list of peers that have sent a want with the cid
Dirk McCormick's avatar
Dirk McCormick committed
376
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) {
377 378 379 380 381 382 383 384
	if peers, ok := pwm.wantPeers[c]; ok {
		delete(peers, p)
		if len(peers) == 0 {
			delete(pwm.wantPeers, c)
		}
	}
}

dirkmc's avatar
dirkmc committed
385
// GetWantBlocks returns the set of all want-blocks sent to all peers
386
func (pwm *peerWantManager) getWantBlocks() []cid.Cid {
dirkmc's avatar
dirkmc committed
387 388 389 390 391
	res := cid.NewSet()

	// Iterate over all known peers
	for _, pws := range pwm.peerWants {
		// Iterate over all want-blocks
392
		_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
dirkmc's avatar
dirkmc committed
393 394
			// Add the CID to the results
			res.Add(c)
395 396
			return nil
		})
dirkmc's avatar
dirkmc committed
397 398 399 400 401 402
	}

	return res.Keys()
}

// GetWantHaves returns the set of all want-haves sent to all peers
403
func (pwm *peerWantManager) getWantHaves() []cid.Cid {
dirkmc's avatar
dirkmc committed
404 405
	res := cid.NewSet()

406
	// Iterate over all peers with active wants.
dirkmc's avatar
dirkmc committed
407 408
	for _, pws := range pwm.peerWants {
		// Iterate over all want-haves
409
		_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
dirkmc's avatar
dirkmc committed
410 411
			// Add the CID to the results
			res.Add(c)
412 413
			return nil
		})
dirkmc's avatar
dirkmc committed
414
	}
415 416 417 418
	_ = pwm.broadcastWants.ForEach(func(c cid.Cid) error {
		res.Add(c)
		return nil
	})
dirkmc's avatar
dirkmc committed
419 420 421 422

	return res.Keys()
}

423
// GetWants returns the set of all wants (both want-blocks and want-haves).
424
func (pwm *peerWantManager) getWants() []cid.Cid {
425
	res := pwm.broadcastWants.Keys()
426

427 428 429 430 431 432 433
	// Iterate over all targeted wants, removing ones that are also in the
	// broadcast list.
	for c := range pwm.wantPeers {
		if pwm.broadcastWants.Has(c) {
			continue
		}
		res = append(res, c)
434 435
	}

436
	return res
437 438
}

dirkmc's avatar
dirkmc committed
439 440 441
func (pwm *peerWantManager) String() string {
	var b bytes.Buffer
	for p, ws := range pwm.peerWants {
Dirk McCormick's avatar
Dirk McCormick committed
442
		b.WriteString(fmt.Sprintf("Peer %s: %d want-have / %d want-block:\n", p, ws.wantHaves.Len(), ws.wantBlocks.Len()))
dirkmc's avatar
dirkmc committed
443
		for _, c := range ws.wantHaves.Keys() {
Dirk McCormick's avatar
Dirk McCormick committed
444
			b.WriteString(fmt.Sprintf("  want-have  %s\n", c))
dirkmc's avatar
dirkmc committed
445 446
		}
		for _, c := range ws.wantBlocks.Keys() {
Dirk McCormick's avatar
Dirk McCormick committed
447
			b.WriteString(fmt.Sprintf("  want-block %s\n", c))
dirkmc's avatar
dirkmc committed
448 449 450 451
		}
	}
	return b.String()
}