peerwantmanager.go 9.6 KB
Newer Older
dirkmc's avatar
dirkmc committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
package peermanager

import (
	"bytes"
	"fmt"

	cid "github.com/ipfs/go-cid"
	peer "github.com/libp2p/go-libp2p-core/peer"
)

// Gauge can be used to keep track of a metric that increases and decreases
// incrementally. It is used by the peerWantManager to track the number of
// want-blocks that are active (ie sent but no response received)
type Gauge interface {
	Inc()
	Dec()
}

// peerWantManager keeps track of which want-haves and want-blocks have been
// sent to each peer, so that the PeerManager doesn't send duplicates.
type peerWantManager struct {
22 23 24
	// peerWants maps peers to outstanding wants.
	// A peer's wants is the _union_ of the broadcast wants and the wants in
	// this list.
dirkmc's avatar
dirkmc committed
25
	peerWants map[peer.ID]*peerWant
26 27

	// Reverse index of all wants in peerWants.
28
	wantPeers map[cid.Cid]map[peer.ID]struct{}
29 30 31 32

	// broadcastWants tracks all the current broadcast wants.
	broadcastWants *cid.Set

dirkmc's avatar
dirkmc committed
33 34 35 36 37 38 39
	// Keeps track of the number of active want-blocks
	wantBlockGauge Gauge
}

type peerWant struct {
	wantBlocks *cid.Set
	wantHaves  *cid.Set
40
	peerQueue  PeerQueue
dirkmc's avatar
dirkmc committed
41 42 43 44 45 46
}

// New creates a new peerWantManager with a Gauge that keeps track of the
// number of active want-blocks (ie sent but no response received)
func newPeerWantManager(wantBlockGauge Gauge) *peerWantManager {
	return &peerWantManager{
47
		broadcastWants: cid.NewSet(),
dirkmc's avatar
dirkmc committed
48
		peerWants:      make(map[peer.ID]*peerWant),
49
		wantPeers:      make(map[cid.Cid]map[peer.ID]struct{}),
dirkmc's avatar
dirkmc committed
50 51 52 53
		wantBlockGauge: wantBlockGauge,
	}
}

54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
// addPeer adds a peer whose wants we need to keep track of. It sends the
// current list of broadcast wants to the peer.
func (pwm *peerWantManager) addPeer(peerQueue PeerQueue, p peer.ID) {
	if _, ok := pwm.peerWants[p]; ok {
		return
	}

	pwm.peerWants[p] = &peerWant{
		wantBlocks: cid.NewSet(),
		wantHaves:  cid.NewSet(),
		peerQueue:  peerQueue,
	}

	// Broadcast any live want-haves to the newly connected peer
	if pwm.broadcastWants.Len() > 0 {
		wants := pwm.broadcastWants.Keys()
		peerQueue.AddBroadcastWantHaves(wants)
dirkmc's avatar
dirkmc committed
71 72 73 74
	}
}

// RemovePeer removes a peer and its associated wants from tracking
75
func (pwm *peerWantManager) removePeer(p peer.ID) {
76 77 78 79 80
	pws, ok := pwm.peerWants[p]
	if !ok {
		return
	}

81
	_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
82
		// Decrement the gauge by the number of pending want-blocks to the peer
83
		pwm.wantBlockGauge.Dec()
84 85 86 87 88 89
		// Clean up want-blocks from the reverse index
		pwm.reverseIndexRemove(c, p)
		return nil
	})

	// Clean up want-haves from the reverse index
90
	_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
91 92 93
		pwm.reverseIndexRemove(c, p)
		return nil
	})
94

dirkmc's avatar
dirkmc committed
95 96 97
	delete(pwm.peerWants, p)
}

98 99 100
// broadcastWantHaves sends want-haves to any peers that have not yet been sent them.
func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) {
	unsent := make([]cid.Cid, 0, len(wantHaves))
101 102 103 104 105 106
	for _, c := range wantHaves {
		if pwm.broadcastWants.Has(c) {
			// Already a broadcast want, skip it.
			continue
		}
		pwm.broadcastWants.Add(c)
107 108 109 110 111 112 113 114 115
		unsent = append(unsent, c)
	}

	if len(unsent) == 0 {
		return
	}

	// Allocate a single buffer to filter broadcast wants for each peer
	bcstWantsBuffer := make([]cid.Cid, 0, len(unsent))
116

117 118 119 120
	// Send broadcast wants to each peer
	for _, pws := range pwm.peerWants {
		peerUnsent := bcstWantsBuffer[:0]
		for _, c := range unsent {
121
			// If we've already sent a want to this peer, skip them.
122 123
			if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
				peerUnsent = append(peerUnsent, c)
124
			}
125
		}
dirkmc's avatar
dirkmc committed
126

127 128
		if len(peerUnsent) > 0 {
			pws.peerQueue.AddBroadcastWantHaves(peerUnsent)
dirkmc's avatar
dirkmc committed
129 130 131 132
		}
	}
}

133 134 135 136 137
// sendWants only sends the peer the want-blocks and want-haves that have not
// already been sent to it.
func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
	fltWantBlks := make([]cid.Cid, 0, len(wantBlocks))
	fltWantHvs := make([]cid.Cid, 0, len(wantHaves))
dirkmc's avatar
dirkmc committed
138 139

	// Get the existing want-blocks and want-haves for the peer
140 141
	pws, ok := pwm.peerWants[p]
	if !ok {
142 143 144
		// In practice this should never happen
		log.Errorf("sendWants() called with peer %s but peer not found in peerWantManager", string(p))
		return
145
	}
dirkmc's avatar
dirkmc committed
146

147 148 149 150 151 152
	// Iterate over the requested want-blocks
	for _, c := range wantBlocks {
		// If the want-block hasn't been sent to the peer
		if !pws.wantBlocks.Has(c) {
			// Record that the CID was sent as a want-block
			pws.wantBlocks.Add(c)
dirkmc's avatar
dirkmc committed
153

154 155 156
			// Update the reverse index
			pwm.reverseIndexAdd(c, p)

157
			// Add the CID to the results
158
			fltWantBlks = append(fltWantBlks, c)
dirkmc's avatar
dirkmc committed
159

160 161 162 163 164
			// Make sure the CID is no longer recorded as a want-have
			pws.wantHaves.Remove(c)

			// Increment the count of want-blocks
			pwm.wantBlockGauge.Inc()
dirkmc's avatar
dirkmc committed
165
		}
166
	}
dirkmc's avatar
dirkmc committed
167

168 169
	// Iterate over the requested want-haves
	for _, c := range wantHaves {
170 171 172 173 174 175
		// If we've already broadcasted this want, don't bother with a
		// want-have.
		if pwm.broadcastWants.Has(c) {
			continue
		}

176 177 178 179
		// If the CID has not been sent as a want-block or want-have
		if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
			// Record that the CID was sent as a want-have
			pws.wantHaves.Add(c)
dirkmc's avatar
dirkmc committed
180

181 182 183
			// Update the reverse index
			pwm.reverseIndexAdd(c, p)

184
			// Add the CID to the results
185
			fltWantHvs = append(fltWantHvs, c)
dirkmc's avatar
dirkmc committed
186 187 188
		}
	}

189 190
	// Send the want-blocks and want-haves to the peer
	pws.peerQueue.AddWants(fltWantBlks, fltWantHvs)
dirkmc's avatar
dirkmc committed
191 192
}

193 194 195
// sendCancels sends a cancel to each peer to which a corresponding want was
// sent
func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
196
	if len(cancelKs) == 0 {
197
		return
198 199
	}

200 201 202 203 204
	// Create a buffer to use for filtering cancels per peer, with the
	// broadcast wants at the front of the buffer (broadcast wants are sent to
	// all peers)
	i := 0
	cancelsBuff := make([]cid.Cid, len(cancelKs))
205
	for _, c := range cancelKs {
206
		if pwm.broadcastWants.Has(c) {
207 208
			cancelsBuff[i] = c
			i++
209
		}
210
	}
211
	broadcastKsCount := i
212 213 214

	// Send cancels to a particular peer
	send := func(p peer.ID, pws *peerWant) {
215 216 217 218
		// Start the index into the buffer after the broadcast wants
		i = broadcastKsCount

		// For each key to be cancelled
219
		for _, c := range cancelKs {
220
			// Check if a want was sent for the key
221 222
			wantBlock := pws.wantBlocks.Has(c)
			if !wantBlock && !pws.wantHaves.Has(c) {
223 224 225
				continue
			}

226
			// Update the want gauge.
227
			if wantBlock {
dirkmc's avatar
dirkmc committed
228 229 230
				pwm.wantBlockGauge.Dec()
			}

231 232 233
			// Unconditionally remove from the want lists.
			pws.wantBlocks.Remove(c)
			pws.wantHaves.Remove(c)
dirkmc's avatar
dirkmc committed
234

235
			// If it's a broadcast want, we've already added it to
236 237
			// the peer cancels.
			if !pwm.broadcastWants.Has(c) {
238 239
				cancelsBuff[i] = c
				i++
240 241 242
			}
		}

243
		// Send cancels to the peer
244 245
		if i > 0 {
			pws.peerQueue.AddCancels(cancelsBuff[:i])
246
		}
247 248
	}

249
	if broadcastKsCount > 0 {
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
		// If a broadcast want is being cancelled, send the cancel to all
		// peers
		for p, pws := range pwm.peerWants {
			send(p, pws)
		}
	} else {
		// Only send cancels to peers that received a corresponding want
		cancelPeers := make(map[peer.ID]struct{}, len(pwm.wantPeers[cancelKs[0]]))
		for _, c := range cancelKs {
			for p := range pwm.wantPeers[c] {
				cancelPeers[p] = struct{}{}
			}
		}
		for p := range cancelPeers {
			pws, ok := pwm.peerWants[p]
			if !ok {
				// Should never happen but check just in case
				log.Errorf("sendCancels - peerWantManager index missing peer %s", p)
				continue
dirkmc's avatar
dirkmc committed
269
			}
270 271

			send(p, pws)
dirkmc's avatar
dirkmc committed
272 273 274
		}
	}

275 276 277 278 279
	// Remove cancelled broadcast wants
	for _, c := range cancelsBuff[:broadcastKsCount] {
		pwm.broadcastWants.Remove(c)
	}

280 281 282 283 284
	// Finally, batch-remove the reverse-index. There's no need to
	// clear this index peer-by-peer.
	for _, c := range cancelKs {
		delete(pwm.wantPeers, c)
	}
dirkmc's avatar
dirkmc committed
285 286
}

287 288 289 290
// Add the peer to the list of peers that have sent a want with the cid
func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) {
	peers, ok := pwm.wantPeers[c]
	if !ok {
291
		peers = make(map[peer.ID]struct{}, 10)
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
		pwm.wantPeers[c] = peers
	}
	peers[p] = struct{}{}
}

// Remove the peer from the list of peers that have sent a want with the cid
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) {
	if peers, ok := pwm.wantPeers[c]; ok {
		delete(peers, p)
		if len(peers) == 0 {
			delete(pwm.wantPeers, c)
		}
	}
}

dirkmc's avatar
dirkmc committed
307
// GetWantBlocks returns the set of all want-blocks sent to all peers
308
func (pwm *peerWantManager) getWantBlocks() []cid.Cid {
dirkmc's avatar
dirkmc committed
309 310 311 312 313
	res := cid.NewSet()

	// Iterate over all known peers
	for _, pws := range pwm.peerWants {
		// Iterate over all want-blocks
314
		_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
dirkmc's avatar
dirkmc committed
315 316
			// Add the CID to the results
			res.Add(c)
317 318
			return nil
		})
dirkmc's avatar
dirkmc committed
319 320 321 322 323 324
	}

	return res.Keys()
}

// GetWantHaves returns the set of all want-haves sent to all peers
325
func (pwm *peerWantManager) getWantHaves() []cid.Cid {
dirkmc's avatar
dirkmc committed
326 327
	res := cid.NewSet()

328
	// Iterate over all peers with active wants.
dirkmc's avatar
dirkmc committed
329 330
	for _, pws := range pwm.peerWants {
		// Iterate over all want-haves
331
		_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
dirkmc's avatar
dirkmc committed
332 333
			// Add the CID to the results
			res.Add(c)
334 335
			return nil
		})
dirkmc's avatar
dirkmc committed
336
	}
337 338 339 340
	_ = pwm.broadcastWants.ForEach(func(c cid.Cid) error {
		res.Add(c)
		return nil
	})
dirkmc's avatar
dirkmc committed
341 342 343 344

	return res.Keys()
}

345
// GetWants returns the set of all wants (both want-blocks and want-haves).
346
func (pwm *peerWantManager) getWants() []cid.Cid {
347
	res := pwm.broadcastWants.Keys()
348

349 350 351 352 353 354 355
	// Iterate over all targeted wants, removing ones that are also in the
	// broadcast list.
	for c := range pwm.wantPeers {
		if pwm.broadcastWants.Has(c) {
			continue
		}
		res = append(res, c)
356 357
	}

358
	return res
359 360
}

dirkmc's avatar
dirkmc committed
361 362 363
func (pwm *peerWantManager) String() string {
	var b bytes.Buffer
	for p, ws := range pwm.peerWants {
Dirk McCormick's avatar
Dirk McCormick committed
364
		b.WriteString(fmt.Sprintf("Peer %s: %d want-have / %d want-block:\n", p, ws.wantHaves.Len(), ws.wantBlocks.Len()))
dirkmc's avatar
dirkmc committed
365
		for _, c := range ws.wantHaves.Keys() {
Dirk McCormick's avatar
Dirk McCormick committed
366
			b.WriteString(fmt.Sprintf("  want-have  %s\n", c))
dirkmc's avatar
dirkmc committed
367 368
		}
		for _, c := range ws.wantBlocks.Keys() {
Dirk McCormick's avatar
Dirk McCormick committed
369
			b.WriteString(fmt.Sprintf("  want-block %s\n", c))
dirkmc's avatar
dirkmc committed
370 371 372 373
		}
	}
	return b.String()
}