peerwantmanager.go 12 KB
Newer Older
dirkmc's avatar
dirkmc committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
package peermanager

import (
	"bytes"
	"fmt"

	cid "github.com/ipfs/go-cid"
	peer "github.com/libp2p/go-libp2p-core/peer"
)

// Gauge can be used to keep track of a metric that increases and decreases
// incrementally. It is used by the peerWantManager to track the number of
// want-blocks that are active (ie sent but no response received)
type Gauge interface {
	Inc()
	Dec()
}

// peerWantManager keeps track of which want-haves and want-blocks have been
// sent to each peer, so that the PeerManager doesn't send duplicates.
type peerWantManager struct {
22 23 24
	// peerWants maps peers to outstanding wants.
	// A peer's wants is the _union_ of the broadcast wants and the wants in
	// this list.
dirkmc's avatar
dirkmc committed
25
	peerWants map[peer.ID]*peerWant
26 27

	// Reverse index of all wants in peerWants.
28
	wantPeers map[cid.Cid]map[peer.ID]struct{}
29 30 31 32

	// broadcastWants tracks all the current broadcast wants.
	broadcastWants *cid.Set

dirkmc's avatar
dirkmc committed
33 34
	// Keeps track of the number of active want-haves & want-blocks
	wantGauge Gauge
dirkmc's avatar
dirkmc committed
35 36 37 38 39 40 41
	// Keeps track of the number of active want-blocks
	wantBlockGauge Gauge
}

type peerWant struct {
	wantBlocks *cid.Set
	wantHaves  *cid.Set
42
	peerQueue  PeerQueue
dirkmc's avatar
dirkmc committed
43 44 45 46
}

// New creates a new peerWantManager with a Gauge that keeps track of the
// number of active want-blocks (ie sent but no response received)
dirkmc's avatar
dirkmc committed
47
func newPeerWantManager(wantGauge Gauge, wantBlockGauge Gauge) *peerWantManager {
dirkmc's avatar
dirkmc committed
48
	return &peerWantManager{
49
		broadcastWants: cid.NewSet(),
dirkmc's avatar
dirkmc committed
50
		peerWants:      make(map[peer.ID]*peerWant),
51
		wantPeers:      make(map[cid.Cid]map[peer.ID]struct{}),
dirkmc's avatar
dirkmc committed
52
		wantGauge:      wantGauge,
dirkmc's avatar
dirkmc committed
53 54 55 56
		wantBlockGauge: wantBlockGauge,
	}
}

57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
// addPeer adds a peer whose wants we need to keep track of. It sends the
// current list of broadcast wants to the peer.
func (pwm *peerWantManager) addPeer(peerQueue PeerQueue, p peer.ID) {
	if _, ok := pwm.peerWants[p]; ok {
		return
	}

	pwm.peerWants[p] = &peerWant{
		wantBlocks: cid.NewSet(),
		wantHaves:  cid.NewSet(),
		peerQueue:  peerQueue,
	}

	// Broadcast any live want-haves to the newly connected peer
	if pwm.broadcastWants.Len() > 0 {
		wants := pwm.broadcastWants.Keys()
		peerQueue.AddBroadcastWantHaves(wants)
dirkmc's avatar
dirkmc committed
74 75 76 77
	}
}

// RemovePeer removes a peer and its associated wants from tracking
78
func (pwm *peerWantManager) removePeer(p peer.ID) {
79 80 81 82 83
	pws, ok := pwm.peerWants[p]
	if !ok {
		return
	}

dirkmc's avatar
dirkmc committed
84
	// Clean up want-blocks
85
	_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
86
		// Clean up want-blocks from the reverse index
Dirk McCormick's avatar
Dirk McCormick committed
87
		pwm.reverseIndexRemove(c, p)
dirkmc's avatar
dirkmc committed
88 89

		// Decrement the gauges by the number of pending want-blocks to the peer
90 91
		peerCounts := pwm.wantPeerCounts(c)
		if peerCounts.wantBlock == 0 {
dirkmc's avatar
dirkmc committed
92
			pwm.wantBlockGauge.Dec()
93 94 95
		}
		if !peerCounts.wanted() {
			pwm.wantGauge.Dec()
dirkmc's avatar
dirkmc committed
96
		}
Dirk McCormick's avatar
Dirk McCormick committed
97

98 99 100
		return nil
	})

dirkmc's avatar
dirkmc committed
101
	// Clean up want-haves
102
	_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
dirkmc's avatar
dirkmc committed
103
		// Clean up want-haves from the reverse index
Dirk McCormick's avatar
Dirk McCormick committed
104
		pwm.reverseIndexRemove(c, p)
dirkmc's avatar
dirkmc committed
105 106

		// Decrement the gauge by the number of pending want-haves to the peer
107 108
		peerCounts := pwm.wantPeerCounts(c)
		if !peerCounts.wanted() {
dirkmc's avatar
dirkmc committed
109 110
			pwm.wantGauge.Dec()
		}
111 112
		return nil
	})
113

dirkmc's avatar
dirkmc committed
114 115 116
	delete(pwm.peerWants, p)
}

117 118 119
// broadcastWantHaves sends want-haves to any peers that have not yet been sent them.
func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) {
	unsent := make([]cid.Cid, 0, len(wantHaves))
120 121 122 123 124 125
	for _, c := range wantHaves {
		if pwm.broadcastWants.Has(c) {
			// Already a broadcast want, skip it.
			continue
		}
		pwm.broadcastWants.Add(c)
126
		unsent = append(unsent, c)
dirkmc's avatar
dirkmc committed
127

Dirk McCormick's avatar
Dirk McCormick committed
128
		// If no peer has a pending want for the key
dirkmc's avatar
dirkmc committed
129
		if _, ok := pwm.wantPeers[c]; !ok {
Dirk McCormick's avatar
Dirk McCormick committed
130
			// Increment the total wants gauge
dirkmc's avatar
dirkmc committed
131 132
			pwm.wantGauge.Inc()
		}
133 134 135 136 137 138 139 140
	}

	if len(unsent) == 0 {
		return
	}

	// Allocate a single buffer to filter broadcast wants for each peer
	bcstWantsBuffer := make([]cid.Cid, 0, len(unsent))
141

142 143 144 145
	// Send broadcast wants to each peer
	for _, pws := range pwm.peerWants {
		peerUnsent := bcstWantsBuffer[:0]
		for _, c := range unsent {
146
			// If we've already sent a want to this peer, skip them.
147 148
			if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
				peerUnsent = append(peerUnsent, c)
149
			}
150
		}
dirkmc's avatar
dirkmc committed
151

152 153
		if len(peerUnsent) > 0 {
			pws.peerQueue.AddBroadcastWantHaves(peerUnsent)
dirkmc's avatar
dirkmc committed
154 155 156 157
		}
	}
}

158 159 160 161 162
// sendWants only sends the peer the want-blocks and want-haves that have not
// already been sent to it.
func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
	fltWantBlks := make([]cid.Cid, 0, len(wantBlocks))
	fltWantHvs := make([]cid.Cid, 0, len(wantHaves))
dirkmc's avatar
dirkmc committed
163 164

	// Get the existing want-blocks and want-haves for the peer
165 166
	pws, ok := pwm.peerWants[p]
	if !ok {
167 168 169
		// In practice this should never happen
		log.Errorf("sendWants() called with peer %s but peer not found in peerWantManager", string(p))
		return
170
	}
dirkmc's avatar
dirkmc committed
171

172 173 174
	// Iterate over the requested want-blocks
	for _, c := range wantBlocks {
		// If the want-block hasn't been sent to the peer
Dirk McCormick's avatar
Dirk McCormick committed
175 176 177
		if pws.wantBlocks.Has(c) {
			continue
		}
178

Dirk McCormick's avatar
Dirk McCormick committed
179
		// Increment the want gauges
180 181
		peerCounts := pwm.wantPeerCounts(c)
		if peerCounts.wantBlock == 0 {
Dirk McCormick's avatar
Dirk McCormick committed
182
			pwm.wantBlockGauge.Inc()
183 184 185
		}
		if !peerCounts.wanted() {
			pwm.wantGauge.Inc()
dirkmc's avatar
dirkmc committed
186
		}
Dirk McCormick's avatar
Dirk McCormick committed
187 188 189 190 191 192 193 194 195 196 197 198

		// Make sure the CID is no longer recorded as a want-have
		pws.wantHaves.Remove(c)

		// Record that the CID was sent as a want-block
		pws.wantBlocks.Add(c)

		// Add the CID to the results
		fltWantBlks = append(fltWantBlks, c)

		// Update the reverse index
		pwm.reverseIndexAdd(c, p)
199
	}
dirkmc's avatar
dirkmc committed
200

201 202
	// Iterate over the requested want-haves
	for _, c := range wantHaves {
203 204 205 206 207 208
		// If we've already broadcasted this want, don't bother with a
		// want-have.
		if pwm.broadcastWants.Has(c) {
			continue
		}

209 210
		// If the CID has not been sent as a want-block or want-have
		if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
Dirk McCormick's avatar
Dirk McCormick committed
211
			// Increment the total wants gauge
212 213
			peerCounts := pwm.wantPeerCounts(c)
			if !peerCounts.wanted() {
Dirk McCormick's avatar
Dirk McCormick committed
214 215 216
				pwm.wantGauge.Inc()
			}

217 218
			// Record that the CID was sent as a want-have
			pws.wantHaves.Add(c)
dirkmc's avatar
dirkmc committed
219

220
			// Add the CID to the results
221
			fltWantHvs = append(fltWantHvs, c)
dirkmc's avatar
dirkmc committed
222 223

			// Update the reverse index
Dirk McCormick's avatar
Dirk McCormick committed
224
			pwm.reverseIndexAdd(c, p)
dirkmc's avatar
dirkmc committed
225 226 227
		}
	}

228 229
	// Send the want-blocks and want-haves to the peer
	pws.peerQueue.AddWants(fltWantBlks, fltWantHvs)
dirkmc's avatar
dirkmc committed
230 231
}

232 233 234
// sendCancels sends a cancel to each peer to which a corresponding want was
// sent
func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
235
	if len(cancelKs) == 0 {
236
		return
237 238
	}

Dirk McCormick's avatar
Dirk McCormick committed
239 240
	// Record how many peers have a pending want-block and want-have for each
	// key to be cancelled
241
	peerCounts := make(map[cid.Cid]wantPeerCnts, len(cancelKs))
Dirk McCormick's avatar
Dirk McCormick committed
242
	for _, c := range cancelKs {
243
		peerCounts[c] = pwm.wantPeerCounts(c)
Dirk McCormick's avatar
Dirk McCormick committed
244 245
	}

246 247 248
	// Create a buffer to use for filtering cancels per peer, with the
	// broadcast wants at the front of the buffer (broadcast wants are sent to
	// all peers)
249
	broadcastCancels := make([]cid.Cid, 0, len(cancelKs))
250
	for _, c := range cancelKs {
251
		if pwm.broadcastWants.Has(c) {
252
			broadcastCancels = append(broadcastCancels, c)
253
		}
254 255 256 257
	}

	// Send cancels to a particular peer
	send := func(p peer.ID, pws *peerWant) {
258 259
		// Start from the broadcast cancels
		toCancel := broadcastCancels
260 261

		// For each key to be cancelled
262
		for _, c := range cancelKs {
263
			// Check if a want was sent for the key
Dirk McCormick's avatar
Dirk McCormick committed
264
			if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
dirkmc's avatar
dirkmc committed
265
				continue
dirkmc's avatar
dirkmc committed
266 267
			}

268 269 270
			// Unconditionally remove from the want lists.
			pws.wantBlocks.Remove(c)
			pws.wantHaves.Remove(c)
dirkmc's avatar
dirkmc committed
271

272
			// If it's a broadcast want, we've already added it to
273 274
			// the peer cancels.
			if !pwm.broadcastWants.Has(c) {
275
				toCancel = append(toCancel, c)
276 277 278
			}
		}

279
		// Send cancels to the peer
280 281
		if len(toCancel) > 0 {
			pws.peerQueue.AddCancels(toCancel)
282
		}
283 284
	}

285
	if len(broadcastCancels) > 0 {
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
		// If a broadcast want is being cancelled, send the cancel to all
		// peers
		for p, pws := range pwm.peerWants {
			send(p, pws)
		}
	} else {
		// Only send cancels to peers that received a corresponding want
		cancelPeers := make(map[peer.ID]struct{}, len(pwm.wantPeers[cancelKs[0]]))
		for _, c := range cancelKs {
			for p := range pwm.wantPeers[c] {
				cancelPeers[p] = struct{}{}
			}
		}
		for p := range cancelPeers {
			pws, ok := pwm.peerWants[p]
			if !ok {
				// Should never happen but check just in case
				log.Errorf("sendCancels - peerWantManager index missing peer %s", p)
				continue
dirkmc's avatar
dirkmc committed
305
			}
306 307

			send(p, pws)
dirkmc's avatar
dirkmc committed
308 309 310
		}
	}

Dirk McCormick's avatar
Dirk McCormick committed
311 312
	// Decrement the wants gauges
	for _, c := range cancelKs {
313
		peerCnts := peerCounts[c]
dirkmc's avatar
dirkmc committed
314

Dirk McCormick's avatar
Dirk McCormick committed
315
		// If there were any peers that had a pending want-block for the key
316
		if peerCnts.wantBlock > 0 {
Dirk McCormick's avatar
Dirk McCormick committed
317 318 319 320 321
			// Decrement the want-block gauge
			pwm.wantBlockGauge.Dec()
		}

		// If there was a peer that had a pending want or it was a broadcast want
322
		if peerCnts.wanted() {
Dirk McCormick's avatar
Dirk McCormick committed
323
			// Decrement the total wants gauge
dirkmc's avatar
dirkmc committed
324 325
			pwm.wantGauge.Dec()
		}
326 327
	}

Dirk McCormick's avatar
Dirk McCormick committed
328 329 330 331
	// Remove cancelled broadcast wants
	for _, c := range broadcastCancels {
		pwm.broadcastWants.Remove(c)
	}
dirkmc's avatar
dirkmc committed
332

Dirk McCormick's avatar
Dirk McCormick committed
333 334
	// Batch-remove the reverse-index. There's no need to clear this index
	// peer-by-peer.
335 336 337
	for _, c := range cancelKs {
		delete(pwm.wantPeers, c)
	}
Dirk McCormick's avatar
Dirk McCormick committed
338 339
}

340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
// wantPeerCnts stores the number of peers that have pending wants for a CID
type wantPeerCnts struct {
	// number of peers that have a pending want-block for the CID
	wantBlock int
	// number of peers that have a pending want-have for the CID
	wantHave int
	// whether the CID is a broadcast want
	isBroadcast bool
}

// wanted returns true if any peer wants the CID or it's a broadcast want
func (pwm *wantPeerCnts) wanted() bool {
	return pwm.wantBlock > 0 || pwm.wantHave > 0 || pwm.isBroadcast
}

// wantPeerCounts counts how many peers have a pending want-block and want-have
Dirk McCormick's avatar
Dirk McCormick committed
356
// for the given CID
357
func (pwm *peerWantManager) wantPeerCounts(c cid.Cid) wantPeerCnts {
Dirk McCormick's avatar
Dirk McCormick committed
358 359 360 361 362
	blockCount := 0
	haveCount := 0
	for p := range pwm.wantPeers[c] {
		pws, ok := pwm.peerWants[p]
		if !ok {
363
			log.Errorf("reverse index has extra peer %s for key %s in peerWantManager", string(p), c)
Dirk McCormick's avatar
Dirk McCormick committed
364 365 366 367 368 369 370 371 372
			continue
		}

		if pws.wantBlocks.Has(c) {
			blockCount++
		} else if pws.wantHaves.Has(c) {
			haveCount++
		}
	}
dirkmc's avatar
dirkmc committed
373

374
	return wantPeerCnts{blockCount, haveCount, pwm.broadcastWants.Has(c)}
dirkmc's avatar
dirkmc committed
375 376
}

377
// Add the peer to the list of peers that have sent a want with the cid
dirkmc's avatar
dirkmc committed
378
func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) bool {
379 380
	peers, ok := pwm.wantPeers[c]
	if !ok {
381
		peers = make(map[peer.ID]struct{}, 10)
382 383 384
		pwm.wantPeers[c] = peers
	}
	peers[p] = struct{}{}
dirkmc's avatar
dirkmc committed
385
	return !ok
386 387 388
}

// Remove the peer from the list of peers that have sent a want with the cid
Dirk McCormick's avatar
Dirk McCormick committed
389
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) {
390 391 392 393 394 395 396 397
	if peers, ok := pwm.wantPeers[c]; ok {
		delete(peers, p)
		if len(peers) == 0 {
			delete(pwm.wantPeers, c)
		}
	}
}

dirkmc's avatar
dirkmc committed
398
// GetWantBlocks returns the set of all want-blocks sent to all peers
399
func (pwm *peerWantManager) getWantBlocks() []cid.Cid {
dirkmc's avatar
dirkmc committed
400 401 402 403 404
	res := cid.NewSet()

	// Iterate over all known peers
	for _, pws := range pwm.peerWants {
		// Iterate over all want-blocks
405
		_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
dirkmc's avatar
dirkmc committed
406 407
			// Add the CID to the results
			res.Add(c)
408 409
			return nil
		})
dirkmc's avatar
dirkmc committed
410 411 412 413 414 415
	}

	return res.Keys()
}

// GetWantHaves returns the set of all want-haves sent to all peers
416
func (pwm *peerWantManager) getWantHaves() []cid.Cid {
dirkmc's avatar
dirkmc committed
417 418
	res := cid.NewSet()

419
	// Iterate over all peers with active wants.
dirkmc's avatar
dirkmc committed
420 421
	for _, pws := range pwm.peerWants {
		// Iterate over all want-haves
422
		_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
dirkmc's avatar
dirkmc committed
423 424
			// Add the CID to the results
			res.Add(c)
425 426
			return nil
		})
dirkmc's avatar
dirkmc committed
427
	}
428 429 430 431
	_ = pwm.broadcastWants.ForEach(func(c cid.Cid) error {
		res.Add(c)
		return nil
	})
dirkmc's avatar
dirkmc committed
432 433 434 435

	return res.Keys()
}

436
// GetWants returns the set of all wants (both want-blocks and want-haves).
437
func (pwm *peerWantManager) getWants() []cid.Cid {
438
	res := pwm.broadcastWants.Keys()
439

440 441 442 443 444 445 446
	// Iterate over all targeted wants, removing ones that are also in the
	// broadcast list.
	for c := range pwm.wantPeers {
		if pwm.broadcastWants.Has(c) {
			continue
		}
		res = append(res, c)
447 448
	}

449
	return res
450 451
}

dirkmc's avatar
dirkmc committed
452 453 454
func (pwm *peerWantManager) String() string {
	var b bytes.Buffer
	for p, ws := range pwm.peerWants {
Dirk McCormick's avatar
Dirk McCormick committed
455
		b.WriteString(fmt.Sprintf("Peer %s: %d want-have / %d want-block:\n", p, ws.wantHaves.Len(), ws.wantBlocks.Len()))
dirkmc's avatar
dirkmc committed
456
		for _, c := range ws.wantHaves.Keys() {
Dirk McCormick's avatar
Dirk McCormick committed
457
			b.WriteString(fmt.Sprintf("  want-have  %s\n", c))
dirkmc's avatar
dirkmc committed
458 459
		}
		for _, c := range ws.wantBlocks.Keys() {
Dirk McCormick's avatar
Dirk McCormick committed
460
			b.WriteString(fmt.Sprintf("  want-block %s\n", c))
dirkmc's avatar
dirkmc committed
461 462 463 464
		}
	}
	return b.String()
}