peerwantmanager.go 10.9 KB
Newer Older
dirkmc's avatar
dirkmc committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
package peermanager

import (
	"bytes"
	"fmt"

	cid "github.com/ipfs/go-cid"
	peer "github.com/libp2p/go-libp2p-core/peer"
)

// Gauge can be used to keep track of a metric that increases and decreases
// incrementally. It is used by the peerWantManager to track the number of
// want-blocks that are active (ie sent but no response received)
type Gauge interface {
	Inc()
	Dec()
}

// peerWantManager keeps track of which want-haves and want-blocks have been
// sent to each peer, so that the PeerManager doesn't send duplicates.
type peerWantManager struct {
22 23 24
	// peerWants maps peers to outstanding wants.
	// A peer's wants is the _union_ of the broadcast wants and the wants in
	// this list.
dirkmc's avatar
dirkmc committed
25
	peerWants map[peer.ID]*peerWant
26 27

	// Reverse index of all wants in peerWants.
28
	wantPeers map[cid.Cid]map[peer.ID]struct{}
29 30 31 32

	// broadcastWants tracks all the current broadcast wants.
	broadcastWants *cid.Set

dirkmc's avatar
dirkmc committed
33 34
	// Keeps track of the number of active want-haves & want-blocks
	wantGauge Gauge
dirkmc's avatar
dirkmc committed
35 36 37 38 39 40 41
	// Keeps track of the number of active want-blocks
	wantBlockGauge Gauge
}

type peerWant struct {
	wantBlocks *cid.Set
	wantHaves  *cid.Set
42
	peerQueue  PeerQueue
dirkmc's avatar
dirkmc committed
43 44 45 46
}

// New creates a new peerWantManager with a Gauge that keeps track of the
// number of active want-blocks (ie sent but no response received)
dirkmc's avatar
dirkmc committed
47
func newPeerWantManager(wantGauge Gauge, wantBlockGauge Gauge) *peerWantManager {
dirkmc's avatar
dirkmc committed
48
	return &peerWantManager{
49
		broadcastWants: cid.NewSet(),
dirkmc's avatar
dirkmc committed
50
		peerWants:      make(map[peer.ID]*peerWant),
51
		wantPeers:      make(map[cid.Cid]map[peer.ID]struct{}),
dirkmc's avatar
dirkmc committed
52
		wantGauge:      wantGauge,
dirkmc's avatar
dirkmc committed
53 54 55 56
		wantBlockGauge: wantBlockGauge,
	}
}

57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
// addPeer adds a peer whose wants we need to keep track of. It sends the
// current list of broadcast wants to the peer.
func (pwm *peerWantManager) addPeer(peerQueue PeerQueue, p peer.ID) {
	if _, ok := pwm.peerWants[p]; ok {
		return
	}

	pwm.peerWants[p] = &peerWant{
		wantBlocks: cid.NewSet(),
		wantHaves:  cid.NewSet(),
		peerQueue:  peerQueue,
	}

	// Broadcast any live want-haves to the newly connected peer
	if pwm.broadcastWants.Len() > 0 {
		wants := pwm.broadcastWants.Keys()
		peerQueue.AddBroadcastWantHaves(wants)
dirkmc's avatar
dirkmc committed
74 75 76 77
	}
}

// RemovePeer removes a peer and its associated wants from tracking
78
func (pwm *peerWantManager) removePeer(p peer.ID) {
79 80 81 82 83
	pws, ok := pwm.peerWants[p]
	if !ok {
		return
	}

dirkmc's avatar
dirkmc committed
84
	// Clean up want-blocks
85
	_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
86
		// Clean up want-blocks from the reverse index
dirkmc's avatar
dirkmc committed
87 88 89 90 91 92 93 94 95
		removedLastPeer := pwm.reverseIndexRemove(c, p)

		// Decrement the gauges by the number of pending want-blocks to the peer
		if removedLastPeer {
			pwm.wantBlockGauge.Dec()
			if !pwm.broadcastWants.Has(c) {
				pwm.wantGauge.Dec()
			}
		}
96 97 98
		return nil
	})

dirkmc's avatar
dirkmc committed
99
	// Clean up want-haves
100
	_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
dirkmc's avatar
dirkmc committed
101 102 103 104 105 106 107
		// Clean up want-haves from the reverse index
		removedLastPeer := pwm.reverseIndexRemove(c, p)

		// Decrement the gauge by the number of pending want-haves to the peer
		if removedLastPeer && !pwm.broadcastWants.Has(c) {
			pwm.wantGauge.Dec()
		}
108 109
		return nil
	})
110

dirkmc's avatar
dirkmc committed
111 112 113
	delete(pwm.peerWants, p)
}

114 115 116
// broadcastWantHaves sends want-haves to any peers that have not yet been sent them.
func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) {
	unsent := make([]cid.Cid, 0, len(wantHaves))
117 118 119 120 121 122
	for _, c := range wantHaves {
		if pwm.broadcastWants.Has(c) {
			// Already a broadcast want, skip it.
			continue
		}
		pwm.broadcastWants.Add(c)
123
		unsent = append(unsent, c)
dirkmc's avatar
dirkmc committed
124 125 126 127 128

		// Increment the total wants gauge
		if _, ok := pwm.wantPeers[c]; !ok {
			pwm.wantGauge.Inc()
		}
129 130 131 132 133 134 135 136
	}

	if len(unsent) == 0 {
		return
	}

	// Allocate a single buffer to filter broadcast wants for each peer
	bcstWantsBuffer := make([]cid.Cid, 0, len(unsent))
137

138 139 140 141
	// Send broadcast wants to each peer
	for _, pws := range pwm.peerWants {
		peerUnsent := bcstWantsBuffer[:0]
		for _, c := range unsent {
142
			// If we've already sent a want to this peer, skip them.
143 144
			if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
				peerUnsent = append(peerUnsent, c)
145
			}
146
		}
dirkmc's avatar
dirkmc committed
147

148 149
		if len(peerUnsent) > 0 {
			pws.peerQueue.AddBroadcastWantHaves(peerUnsent)
dirkmc's avatar
dirkmc committed
150 151 152 153
		}
	}
}

154 155 156 157 158
// sendWants only sends the peer the want-blocks and want-haves that have not
// already been sent to it.
func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
	fltWantBlks := make([]cid.Cid, 0, len(wantBlocks))
	fltWantHvs := make([]cid.Cid, 0, len(wantHaves))
dirkmc's avatar
dirkmc committed
159 160

	// Get the existing want-blocks and want-haves for the peer
161 162
	pws, ok := pwm.peerWants[p]
	if !ok {
163 164 165
		// In practice this should never happen
		log.Errorf("sendWants() called with peer %s but peer not found in peerWantManager", string(p))
		return
166
	}
dirkmc's avatar
dirkmc committed
167

168 169 170 171 172 173
	// Iterate over the requested want-blocks
	for _, c := range wantBlocks {
		// If the want-block hasn't been sent to the peer
		if !pws.wantBlocks.Has(c) {
			// Record that the CID was sent as a want-block
			pws.wantBlocks.Add(c)
dirkmc's avatar
dirkmc committed
174

175
			// Add the CID to the results
176
			fltWantBlks = append(fltWantBlks, c)
dirkmc's avatar
dirkmc committed
177

178 179 180
			// Make sure the CID is no longer recorded as a want-have
			pws.wantHaves.Remove(c)

dirkmc's avatar
dirkmc committed
181 182 183 184 185 186 187 188 189 190
			// Update the reverse index
			isNew := pwm.reverseIndexAdd(c, p)

			// Increment the want gauges
			if isNew {
				pwm.wantBlockGauge.Inc()
				if !pwm.broadcastWants.Has(c) {
					pwm.wantGauge.Inc()
				}
			}
dirkmc's avatar
dirkmc committed
191
		}
192
	}
dirkmc's avatar
dirkmc committed
193

194 195
	// Iterate over the requested want-haves
	for _, c := range wantHaves {
196 197 198 199 200 201
		// If we've already broadcasted this want, don't bother with a
		// want-have.
		if pwm.broadcastWants.Has(c) {
			continue
		}

202 203 204 205
		// If the CID has not been sent as a want-block or want-have
		if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
			// Record that the CID was sent as a want-have
			pws.wantHaves.Add(c)
dirkmc's avatar
dirkmc committed
206

207
			// Add the CID to the results
208
			fltWantHvs = append(fltWantHvs, c)
dirkmc's avatar
dirkmc committed
209 210 211 212 213 214 215 216

			// Update the reverse index
			isNew := pwm.reverseIndexAdd(c, p)

			// Increment the total wants gauge
			if isNew && !pwm.broadcastWants.Has(c) {
				pwm.wantGauge.Inc()
			}
dirkmc's avatar
dirkmc committed
217 218 219
		}
	}

220 221
	// Send the want-blocks and want-haves to the peer
	pws.peerQueue.AddWants(fltWantBlks, fltWantHvs)
dirkmc's avatar
dirkmc committed
222 223
}

224 225 226
// sendCancels sends a cancel to each peer to which a corresponding want was
// sent
func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
227
	if len(cancelKs) == 0 {
228
		return
229 230
	}

231 232 233
	// Create a buffer to use for filtering cancels per peer, with the
	// broadcast wants at the front of the buffer (broadcast wants are sent to
	// all peers)
234
	broadcastCancels := make([]cid.Cid, 0, len(cancelKs))
235
	for _, c := range cancelKs {
236
		if pwm.broadcastWants.Has(c) {
237
			broadcastCancels = append(broadcastCancels, c)
238
		}
239 240
	}

dirkmc's avatar
dirkmc committed
241 242 243
	cancelledWantBlocks := cid.NewSet()
	cancelledWantHaves := cid.NewSet()

244 245
	// Send cancels to a particular peer
	send := func(p peer.ID, pws *peerWant) {
246 247
		// Start from the broadcast cancels
		toCancel := broadcastCancels
248 249

		// For each key to be cancelled
250
		for _, c := range cancelKs {
251
			// Check if a want was sent for the key
252
			wantBlock := pws.wantBlocks.Has(c)
dirkmc's avatar
dirkmc committed
253
			wantHave := pws.wantHaves.Has(c)
254

dirkmc's avatar
dirkmc committed
255
			// Update the want gauges
256
			if wantBlock {
dirkmc's avatar
dirkmc committed
257 258 259 260 261
				cancelledWantBlocks.Add(c)
			} else if wantHave {
				cancelledWantHaves.Add(c)
			} else {
				continue
dirkmc's avatar
dirkmc committed
262 263
			}

264 265 266
			// Unconditionally remove from the want lists.
			pws.wantBlocks.Remove(c)
			pws.wantHaves.Remove(c)
dirkmc's avatar
dirkmc committed
267

268
			// If it's a broadcast want, we've already added it to
269 270
			// the peer cancels.
			if !pwm.broadcastWants.Has(c) {
271
				toCancel = append(toCancel, c)
272 273 274
			}
		}

275
		// Send cancels to the peer
276 277
		if len(toCancel) > 0 {
			pws.peerQueue.AddCancels(toCancel)
278
		}
279 280
	}

281
	if len(broadcastCancels) > 0 {
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
		// If a broadcast want is being cancelled, send the cancel to all
		// peers
		for p, pws := range pwm.peerWants {
			send(p, pws)
		}
	} else {
		// Only send cancels to peers that received a corresponding want
		cancelPeers := make(map[peer.ID]struct{}, len(pwm.wantPeers[cancelKs[0]]))
		for _, c := range cancelKs {
			for p := range pwm.wantPeers[c] {
				cancelPeers[p] = struct{}{}
			}
		}
		for p := range cancelPeers {
			pws, ok := pwm.peerWants[p]
			if !ok {
				// Should never happen but check just in case
				log.Errorf("sendCancels - peerWantManager index missing peer %s", p)
				continue
dirkmc's avatar
dirkmc committed
301
			}
302 303

			send(p, pws)
dirkmc's avatar
dirkmc committed
304 305 306
		}
	}

307
	// Remove cancelled broadcast wants
308
	for _, c := range broadcastCancels {
309
		pwm.broadcastWants.Remove(c)
dirkmc's avatar
dirkmc committed
310 311 312 313 314

		// Decrement the total wants gauge for broadcast wants
		if !cancelledWantHaves.Has(c) && !cancelledWantBlocks.Has(c) {
			pwm.wantGauge.Dec()
		}
315 316
	}

dirkmc's avatar
dirkmc committed
317 318 319 320 321 322 323 324 325 326 327
	// Decrement the total wants gauge for peer wants
	_ = cancelledWantHaves.ForEach(func(c cid.Cid) error {
		pwm.wantGauge.Dec()
		return nil
	})
	_ = cancelledWantBlocks.ForEach(func(c cid.Cid) error {
		pwm.wantGauge.Dec()
		pwm.wantBlockGauge.Dec()
		return nil
	})

328 329 330 331 332
	// Finally, batch-remove the reverse-index. There's no need to
	// clear this index peer-by-peer.
	for _, c := range cancelKs {
		delete(pwm.wantPeers, c)
	}
dirkmc's avatar
dirkmc committed
333

dirkmc's avatar
dirkmc committed
334 335
}

336
// Add the peer to the list of peers that have sent a want with the cid
dirkmc's avatar
dirkmc committed
337
func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) bool {
338 339
	peers, ok := pwm.wantPeers[c]
	if !ok {
340
		peers = make(map[peer.ID]struct{}, 10)
341 342 343
		pwm.wantPeers[c] = peers
	}
	peers[p] = struct{}{}
dirkmc's avatar
dirkmc committed
344
	return !ok
345 346 347
}

// Remove the peer from the list of peers that have sent a want with the cid
dirkmc's avatar
dirkmc committed
348
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) bool {
349 350 351 352
	if peers, ok := pwm.wantPeers[c]; ok {
		delete(peers, p)
		if len(peers) == 0 {
			delete(pwm.wantPeers, c)
dirkmc's avatar
dirkmc committed
353
			return true
354 355
		}
	}
dirkmc's avatar
dirkmc committed
356 357

	return false
358 359
}

dirkmc's avatar
dirkmc committed
360
// GetWantBlocks returns the set of all want-blocks sent to all peers
361
func (pwm *peerWantManager) getWantBlocks() []cid.Cid {
dirkmc's avatar
dirkmc committed
362 363 364 365 366
	res := cid.NewSet()

	// Iterate over all known peers
	for _, pws := range pwm.peerWants {
		// Iterate over all want-blocks
367
		_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
dirkmc's avatar
dirkmc committed
368 369
			// Add the CID to the results
			res.Add(c)
370 371
			return nil
		})
dirkmc's avatar
dirkmc committed
372 373 374 375 376 377
	}

	return res.Keys()
}

// GetWantHaves returns the set of all want-haves sent to all peers
378
func (pwm *peerWantManager) getWantHaves() []cid.Cid {
dirkmc's avatar
dirkmc committed
379 380
	res := cid.NewSet()

381
	// Iterate over all peers with active wants.
dirkmc's avatar
dirkmc committed
382 383
	for _, pws := range pwm.peerWants {
		// Iterate over all want-haves
384
		_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
dirkmc's avatar
dirkmc committed
385 386
			// Add the CID to the results
			res.Add(c)
387 388
			return nil
		})
dirkmc's avatar
dirkmc committed
389
	}
390 391 392 393
	_ = pwm.broadcastWants.ForEach(func(c cid.Cid) error {
		res.Add(c)
		return nil
	})
dirkmc's avatar
dirkmc committed
394 395 396 397

	return res.Keys()
}

398
// GetWants returns the set of all wants (both want-blocks and want-haves).
399
func (pwm *peerWantManager) getWants() []cid.Cid {
400
	res := pwm.broadcastWants.Keys()
401

402 403 404 405 406 407 408
	// Iterate over all targeted wants, removing ones that are also in the
	// broadcast list.
	for c := range pwm.wantPeers {
		if pwm.broadcastWants.Has(c) {
			continue
		}
		res = append(res, c)
409 410
	}

411
	return res
412 413
}

dirkmc's avatar
dirkmc committed
414 415 416
func (pwm *peerWantManager) String() string {
	var b bytes.Buffer
	for p, ws := range pwm.peerWants {
Dirk McCormick's avatar
Dirk McCormick committed
417
		b.WriteString(fmt.Sprintf("Peer %s: %d want-have / %d want-block:\n", p, ws.wantHaves.Len(), ws.wantBlocks.Len()))
dirkmc's avatar
dirkmc committed
418
		for _, c := range ws.wantHaves.Keys() {
Dirk McCormick's avatar
Dirk McCormick committed
419
			b.WriteString(fmt.Sprintf("  want-have  %s\n", c))
dirkmc's avatar
dirkmc committed
420 421
		}
		for _, c := range ws.wantBlocks.Keys() {
Dirk McCormick's avatar
Dirk McCormick committed
422
			b.WriteString(fmt.Sprintf("  want-block %s\n", c))
dirkmc's avatar
dirkmc committed
423 424 425 426
		}
	}
	return b.String()
}