peerwantmanager.go 9.44 KB
Newer Older
dirkmc's avatar
dirkmc committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
package peermanager

import (
	"bytes"
	"fmt"

	cid "github.com/ipfs/go-cid"
	peer "github.com/libp2p/go-libp2p-core/peer"
)

// Gauge can be used to keep track of a metric that increases and decreases
// incrementally. It is used by the peerWantManager to track the number of
// want-blocks that are active (ie sent but no response received)
type Gauge interface {
	Inc()
	Dec()
}

// peerWantManager keeps track of which want-haves and want-blocks have been
// sent to each peer, so that the PeerManager doesn't send duplicates.
type peerWantManager struct {
22 23 24
	// peerWants maps peers to outstanding wants.
	// A peer's wants is the _union_ of the broadcast wants and the wants in
	// this list.
dirkmc's avatar
dirkmc committed
25
	peerWants map[peer.ID]*peerWant
26 27

	// Reverse index of all wants in peerWants.
28
	wantPeers map[cid.Cid]map[peer.ID]struct{}
29 30 31 32

	// broadcastWants tracks all the current broadcast wants.
	broadcastWants *cid.Set

dirkmc's avatar
dirkmc committed
33 34 35 36 37 38 39
	// Keeps track of the number of active want-blocks
	wantBlockGauge Gauge
}

type peerWant struct {
	wantBlocks *cid.Set
	wantHaves  *cid.Set
40
	peerQueue  PeerQueue
dirkmc's avatar
dirkmc committed
41 42 43 44 45 46
}

// New creates a new peerWantManager with a Gauge that keeps track of the
// number of active want-blocks (ie sent but no response received)
func newPeerWantManager(wantBlockGauge Gauge) *peerWantManager {
	return &peerWantManager{
47
		broadcastWants: cid.NewSet(),
dirkmc's avatar
dirkmc committed
48
		peerWants:      make(map[peer.ID]*peerWant),
49
		wantPeers:      make(map[cid.Cid]map[peer.ID]struct{}),
dirkmc's avatar
dirkmc committed
50 51 52 53
		wantBlockGauge: wantBlockGauge,
	}
}

54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
// addPeer adds a peer whose wants we need to keep track of. It sends the
// current list of broadcast wants to the peer.
func (pwm *peerWantManager) addPeer(peerQueue PeerQueue, p peer.ID) {
	if _, ok := pwm.peerWants[p]; ok {
		return
	}

	pwm.peerWants[p] = &peerWant{
		wantBlocks: cid.NewSet(),
		wantHaves:  cid.NewSet(),
		peerQueue:  peerQueue,
	}

	// Broadcast any live want-haves to the newly connected peer
	if pwm.broadcastWants.Len() > 0 {
		wants := pwm.broadcastWants.Keys()
		peerQueue.AddBroadcastWantHaves(wants)
dirkmc's avatar
dirkmc committed
71 72 73 74
	}
}

// RemovePeer removes a peer and its associated wants from tracking
75
func (pwm *peerWantManager) removePeer(p peer.ID) {
76 77 78 79 80
	pws, ok := pwm.peerWants[p]
	if !ok {
		return
	}

81
	_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
82
		// Decrement the gauge by the number of pending want-blocks to the peer
83
		pwm.wantBlockGauge.Dec()
84 85 86 87 88 89
		// Clean up want-blocks from the reverse index
		pwm.reverseIndexRemove(c, p)
		return nil
	})

	// Clean up want-haves from the reverse index
90
	_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
91 92 93
		pwm.reverseIndexRemove(c, p)
		return nil
	})
94

dirkmc's avatar
dirkmc committed
95 96 97
	delete(pwm.peerWants, p)
}

98 99 100
// broadcastWantHaves sends want-haves to any peers that have not yet been sent them.
func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) {
	unsent := make([]cid.Cid, 0, len(wantHaves))
101 102 103 104 105 106
	for _, c := range wantHaves {
		if pwm.broadcastWants.Has(c) {
			// Already a broadcast want, skip it.
			continue
		}
		pwm.broadcastWants.Add(c)
107 108 109 110 111 112 113 114 115
		unsent = append(unsent, c)
	}

	if len(unsent) == 0 {
		return
	}

	// Allocate a single buffer to filter broadcast wants for each peer
	bcstWantsBuffer := make([]cid.Cid, 0, len(unsent))
116

117 118 119 120
	// Send broadcast wants to each peer
	for _, pws := range pwm.peerWants {
		peerUnsent := bcstWantsBuffer[:0]
		for _, c := range unsent {
121
			// If we've already sent a want to this peer, skip them.
122 123
			if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
				peerUnsent = append(peerUnsent, c)
124
			}
125
		}
dirkmc's avatar
dirkmc committed
126

127 128
		if len(peerUnsent) > 0 {
			pws.peerQueue.AddBroadcastWantHaves(peerUnsent)
dirkmc's avatar
dirkmc committed
129 130 131 132
		}
	}
}

133 134 135 136 137
// sendWants only sends the peer the want-blocks and want-haves that have not
// already been sent to it.
func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
	fltWantBlks := make([]cid.Cid, 0, len(wantBlocks))
	fltWantHvs := make([]cid.Cid, 0, len(wantHaves))
dirkmc's avatar
dirkmc committed
138 139

	// Get the existing want-blocks and want-haves for the peer
140 141
	pws, ok := pwm.peerWants[p]
	if !ok {
142 143 144
		// In practice this should never happen
		log.Errorf("sendWants() called with peer %s but peer not found in peerWantManager", string(p))
		return
145
	}
dirkmc's avatar
dirkmc committed
146

147 148 149 150 151 152
	// Iterate over the requested want-blocks
	for _, c := range wantBlocks {
		// If the want-block hasn't been sent to the peer
		if !pws.wantBlocks.Has(c) {
			// Record that the CID was sent as a want-block
			pws.wantBlocks.Add(c)
dirkmc's avatar
dirkmc committed
153

154 155 156
			// Update the reverse index
			pwm.reverseIndexAdd(c, p)

157
			// Add the CID to the results
158
			fltWantBlks = append(fltWantBlks, c)
dirkmc's avatar
dirkmc committed
159

160 161 162 163 164
			// Make sure the CID is no longer recorded as a want-have
			pws.wantHaves.Remove(c)

			// Increment the count of want-blocks
			pwm.wantBlockGauge.Inc()
dirkmc's avatar
dirkmc committed
165
		}
166
	}
dirkmc's avatar
dirkmc committed
167

168 169
	// Iterate over the requested want-haves
	for _, c := range wantHaves {
170 171 172 173 174 175
		// If we've already broadcasted this want, don't bother with a
		// want-have.
		if pwm.broadcastWants.Has(c) {
			continue
		}

176 177 178 179
		// If the CID has not been sent as a want-block or want-have
		if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
			// Record that the CID was sent as a want-have
			pws.wantHaves.Add(c)
dirkmc's avatar
dirkmc committed
180

181 182 183
			// Update the reverse index
			pwm.reverseIndexAdd(c, p)

184
			// Add the CID to the results
185
			fltWantHvs = append(fltWantHvs, c)
dirkmc's avatar
dirkmc committed
186 187 188
		}
	}

189 190
	// Send the want-blocks and want-haves to the peer
	pws.peerQueue.AddWants(fltWantBlks, fltWantHvs)
dirkmc's avatar
dirkmc committed
191 192
}

193 194 195
// sendCancels sends a cancel to each peer to which a corresponding want was
// sent
func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
196
	if len(cancelKs) == 0 {
197
		return
198 199
	}

200
	// Handle broadcast wants up-front
201
	broadcastKs := make([]cid.Cid, 0, len(cancelKs))
202
	for _, c := range cancelKs {
203
		if pwm.broadcastWants.Has(c) {
204 205 206
			broadcastKs = append(broadcastKs, c)
			pwm.broadcastWants.Remove(c)
		}
207
	}
208

209 210 211 212 213 214 215 216 217 218
	// Allocate a single buffer to filter the cancels to send to each peer
	cancelsBuff := make([]cid.Cid, 0, len(cancelKs))

	// Send cancels to a particular peer
	send := func(p peer.ID, pws *peerWant) {
		// Include broadcast cancels
		peerCancels := append(cancelsBuff[:0], broadcastKs...)
		for _, c := range cancelKs {
			wantBlock := pws.wantBlocks.Has(c)
			if !wantBlock && !pws.wantHaves.Has(c) {
219 220 221
				continue
			}

222
			// Update the want gauge.
223
			if wantBlock {
dirkmc's avatar
dirkmc committed
224 225 226
				pwm.wantBlockGauge.Dec()
			}

227 228 229
			// Unconditionally remove from the want lists.
			pws.wantBlocks.Remove(c)
			pws.wantHaves.Remove(c)
dirkmc's avatar
dirkmc committed
230

231
			// If it's a broadcast want, we've already added it to
232 233 234
			// the peer cancels.
			if !pwm.broadcastWants.Has(c) {
				peerCancels = append(peerCancels, c)
235 236 237
			}
		}

238 239 240 241
		// Send cancels to the peer
		if len(peerCancels) > 0 {
			pws.peerQueue.AddCancels(peerCancels)
		}
242 243 244
	}

	if len(broadcastKs) > 0 {
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
		// If a broadcast want is being cancelled, send the cancel to all
		// peers
		for p, pws := range pwm.peerWants {
			send(p, pws)
		}
	} else {
		// Only send cancels to peers that received a corresponding want
		cancelPeers := make(map[peer.ID]struct{}, len(pwm.wantPeers[cancelKs[0]]))
		for _, c := range cancelKs {
			for p := range pwm.wantPeers[c] {
				cancelPeers[p] = struct{}{}
			}
		}
		for p := range cancelPeers {
			pws, ok := pwm.peerWants[p]
			if !ok {
				// Should never happen but check just in case
				log.Errorf("sendCancels - peerWantManager index missing peer %s", p)
				continue
dirkmc's avatar
dirkmc committed
264
			}
265 266

			send(p, pws)
dirkmc's avatar
dirkmc committed
267 268 269
		}
	}

270 271 272 273 274
	// Finally, batch-remove the reverse-index. There's no need to
	// clear this index peer-by-peer.
	for _, c := range cancelKs {
		delete(pwm.wantPeers, c)
	}
dirkmc's avatar
dirkmc committed
275 276
}

277 278 279 280
// Add the peer to the list of peers that have sent a want with the cid
func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) {
	peers, ok := pwm.wantPeers[c]
	if !ok {
281
		peers = make(map[peer.ID]struct{}, 10)
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296
		pwm.wantPeers[c] = peers
	}
	peers[p] = struct{}{}
}

// Remove the peer from the list of peers that have sent a want with the cid
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) {
	if peers, ok := pwm.wantPeers[c]; ok {
		delete(peers, p)
		if len(peers) == 0 {
			delete(pwm.wantPeers, c)
		}
	}
}

dirkmc's avatar
dirkmc committed
297
// GetWantBlocks returns the set of all want-blocks sent to all peers
298
func (pwm *peerWantManager) getWantBlocks() []cid.Cid {
dirkmc's avatar
dirkmc committed
299 300 301 302 303
	res := cid.NewSet()

	// Iterate over all known peers
	for _, pws := range pwm.peerWants {
		// Iterate over all want-blocks
304
		_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
dirkmc's avatar
dirkmc committed
305 306
			// Add the CID to the results
			res.Add(c)
307 308
			return nil
		})
dirkmc's avatar
dirkmc committed
309 310 311 312 313 314
	}

	return res.Keys()
}

// GetWantHaves returns the set of all want-haves sent to all peers
315
func (pwm *peerWantManager) getWantHaves() []cid.Cid {
dirkmc's avatar
dirkmc committed
316 317
	res := cid.NewSet()

318
	// Iterate over all peers with active wants.
dirkmc's avatar
dirkmc committed
319 320
	for _, pws := range pwm.peerWants {
		// Iterate over all want-haves
321
		_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
dirkmc's avatar
dirkmc committed
322 323
			// Add the CID to the results
			res.Add(c)
324 325
			return nil
		})
dirkmc's avatar
dirkmc committed
326
	}
327 328 329 330
	_ = pwm.broadcastWants.ForEach(func(c cid.Cid) error {
		res.Add(c)
		return nil
	})
dirkmc's avatar
dirkmc committed
331 332 333 334

	return res.Keys()
}

335
// GetWants returns the set of all wants (both want-blocks and want-haves).
336
func (pwm *peerWantManager) getWants() []cid.Cid {
337
	res := pwm.broadcastWants.Keys()
338

339 340 341 342 343 344 345
	// Iterate over all targeted wants, removing ones that are also in the
	// broadcast list.
	for c := range pwm.wantPeers {
		if pwm.broadcastWants.Has(c) {
			continue
		}
		res = append(res, c)
346 347
	}

348
	return res
349 350
}

dirkmc's avatar
dirkmc committed
351 352 353
func (pwm *peerWantManager) String() string {
	var b bytes.Buffer
	for p, ws := range pwm.peerWants {
Dirk McCormick's avatar
Dirk McCormick committed
354
		b.WriteString(fmt.Sprintf("Peer %s: %d want-have / %d want-block:\n", p, ws.wantHaves.Len(), ws.wantBlocks.Len()))
dirkmc's avatar
dirkmc committed
355
		for _, c := range ws.wantHaves.Keys() {
Dirk McCormick's avatar
Dirk McCormick committed
356
			b.WriteString(fmt.Sprintf("  want-have  %s\n", c))
dirkmc's avatar
dirkmc committed
357 358
		}
		for _, c := range ws.wantBlocks.Keys() {
Dirk McCormick's avatar
Dirk McCormick committed
359
			b.WriteString(fmt.Sprintf("  want-block %s\n", c))
dirkmc's avatar
dirkmc committed
360 361 362 363
		}
	}
	return b.String()
}