peerwantmanager.go 8.15 KB
Newer Older
dirkmc's avatar
dirkmc committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
package peermanager

import (
	"bytes"
	"fmt"

	cid "github.com/ipfs/go-cid"
	peer "github.com/libp2p/go-libp2p-core/peer"
)

// Gauge can be used to keep track of a metric that increases and decreases
// incrementally. It is used by the peerWantManager to track the number of
// want-blocks that are active (ie sent but no response received)
type Gauge interface {
	Inc()
	Dec()
}

// peerWantManager keeps track of which want-haves and want-blocks have been
// sent to each peer, so that the PeerManager doesn't send duplicates.
type peerWantManager struct {
	peerWants map[peer.ID]*peerWant
23 24 25
	// Reverse index mapping wants to the peers that sent them. This is used
	// to speed up cancels
	wantPeers map[cid.Cid]map[peer.ID]struct{}
dirkmc's avatar
dirkmc committed
26 27 28 29 30 31 32 33 34 35 36 37 38 39
	// Keeps track of the number of active want-blocks
	wantBlockGauge Gauge
}

type peerWant struct {
	wantBlocks *cid.Set
	wantHaves  *cid.Set
}

// New creates a new peerWantManager with a Gauge that keeps track of the
// number of active want-blocks (ie sent but no response received)
func newPeerWantManager(wantBlockGauge Gauge) *peerWantManager {
	return &peerWantManager{
		peerWants:      make(map[peer.ID]*peerWant),
40
		wantPeers:      make(map[cid.Cid]map[peer.ID]struct{}),
dirkmc's avatar
dirkmc committed
41 42 43 44 45
		wantBlockGauge: wantBlockGauge,
	}
}

// AddPeer adds a peer whose wants we need to keep track of
46
func (pwm *peerWantManager) addPeer(p peer.ID) {
dirkmc's avatar
dirkmc committed
47 48 49 50 51 52 53 54 55
	if _, ok := pwm.peerWants[p]; !ok {
		pwm.peerWants[p] = &peerWant{
			wantBlocks: cid.NewSet(),
			wantHaves:  cid.NewSet(),
		}
	}
}

// RemovePeer removes a peer and its associated wants from tracking
56
func (pwm *peerWantManager) removePeer(p peer.ID) {
57 58 59 60 61
	pws, ok := pwm.peerWants[p]
	if !ok {
		return
	}

62 63
	pws.wantBlocks.ForEach(func(c cid.Cid) error {
		// Decrement the gauge by the number of pending want-blocks to the peer
64
		pwm.wantBlockGauge.Dec()
65 66 67 68 69 70 71 72 73 74
		// Clean up want-blocks from the reverse index
		pwm.reverseIndexRemove(c, p)
		return nil
	})

	// Clean up want-haves from the reverse index
	pws.wantHaves.ForEach(func(c cid.Cid) error {
		pwm.reverseIndexRemove(c, p)
		return nil
	})
75

dirkmc's avatar
dirkmc committed
76 77 78 79 80
	delete(pwm.peerWants, p)
}

// PrepareBroadcastWantHaves filters the list of want-haves for each peer,
// returning a map of peers to the want-haves they have not yet been sent.
81
func (pwm *peerWantManager) prepareBroadcastWantHaves(wantHaves []cid.Cid) map[peer.ID][]cid.Cid {
dirkmc's avatar
dirkmc committed
82 83 84 85 86 87 88 89 90 91 92
	res := make(map[peer.ID][]cid.Cid)

	// Iterate over all known peers
	for p, pws := range pwm.peerWants {
		// Iterate over all want-haves
		for _, c := range wantHaves {
			// If the CID has not been sent as a want-block or want-have
			if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
				// Record that the CID has been sent as a want-have
				pws.wantHaves.Add(c)

93 94 95
				// Update the reverse index
				pwm.reverseIndexAdd(c, p)

dirkmc's avatar
dirkmc committed
96 97 98 99 100 101 102 103 104 105 106 107 108 109
				// Add the CID to the results
				if _, ok := res[p]; !ok {
					res[p] = make([]cid.Cid, 0, 1)
				}
				res[p] = append(res[p], c)
			}
		}
	}

	return res
}

// PrepareSendWants filters the list of want-blocks and want-haves such that
// it only contains wants that have not already been sent to the peer.
110
func (pwm *peerWantManager) prepareSendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) ([]cid.Cid, []cid.Cid) {
dirkmc's avatar
dirkmc committed
111 112 113 114
	resWantBlks := make([]cid.Cid, 0)
	resWantHvs := make([]cid.Cid, 0)

	// Get the existing want-blocks and want-haves for the peer
115 116 117 118 119 120 121 122 123 124
	pws, ok := pwm.peerWants[p]

	if !ok {
		// In practice this should never happen:
		// - PeerManager calls addPeer() as soon as the peer connects
		// - PeerManager calls removePeer() as soon as the peer disconnects
		// - All calls to PeerWantManager are locked
		log.Errorf("prepareSendWants() called with peer %s but peer not found in peerWantManager", string(p))
		return resWantBlks, resWantHvs
	}
dirkmc's avatar
dirkmc committed
125

126 127 128 129 130 131
	// Iterate over the requested want-blocks
	for _, c := range wantBlocks {
		// If the want-block hasn't been sent to the peer
		if !pws.wantBlocks.Has(c) {
			// Record that the CID was sent as a want-block
			pws.wantBlocks.Add(c)
dirkmc's avatar
dirkmc committed
132

133 134 135
			// Update the reverse index
			pwm.reverseIndexAdd(c, p)

136 137
			// Add the CID to the results
			resWantBlks = append(resWantBlks, c)
dirkmc's avatar
dirkmc committed
138

139 140 141 142 143
			// Make sure the CID is no longer recorded as a want-have
			pws.wantHaves.Remove(c)

			// Increment the count of want-blocks
			pwm.wantBlockGauge.Inc()
dirkmc's avatar
dirkmc committed
144
		}
145
	}
dirkmc's avatar
dirkmc committed
146

147 148 149 150 151 152
	// Iterate over the requested want-haves
	for _, c := range wantHaves {
		// If the CID has not been sent as a want-block or want-have
		if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
			// Record that the CID was sent as a want-have
			pws.wantHaves.Add(c)
dirkmc's avatar
dirkmc committed
153

154 155 156
			// Update the reverse index
			pwm.reverseIndexAdd(c, p)

157 158
			// Add the CID to the results
			resWantHvs = append(resWantHvs, c)
dirkmc's avatar
dirkmc committed
159 160 161 162 163 164 165 166 167
		}
	}

	return resWantBlks, resWantHvs
}

// PrepareSendCancels filters the list of cancels for each peer,
// returning a map of peers which only contains cancels for wants that have
// been sent to the peer.
168
func (pwm *peerWantManager) prepareSendCancels(cancelKs []cid.Cid) map[peer.ID][]cid.Cid {
dirkmc's avatar
dirkmc committed
169 170
	res := make(map[peer.ID][]cid.Cid)

171 172 173 174 175 176 177 178 179 180 181
	// Iterate over all requested cancels
	for _, c := range cancelKs {
		// Iterate over peers that have sent a corresponding want
		for p := range pwm.wantPeers[c] {
			pws, ok := pwm.peerWants[p]
			if !ok {
				// Should never happen but check just in case
				log.Errorf("peerWantManager reverse index missing peer %s for key %s", p, c)
				continue
			}

dirkmc's avatar
dirkmc committed
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
			isWantBlock := pws.wantBlocks.Has(c)
			isWantHave := pws.wantHaves.Has(c)

			// If the CID was sent as a want-block, decrement the want-block count
			if isWantBlock {
				pwm.wantBlockGauge.Dec()
			}

			// If the CID was sent as a want-block or want-have
			if isWantBlock || isWantHave {
				// Remove the CID from the recorded want-blocks and want-haves
				pws.wantBlocks.Remove(c)
				pws.wantHaves.Remove(c)

				// Add the CID to the results
				if _, ok := res[p]; !ok {
					res[p] = make([]cid.Cid, 0, 1)
				}
				res[p] = append(res[p], c)
201 202 203

				// Update the reverse index
				pwm.reverseIndexRemove(c, p)
dirkmc's avatar
dirkmc committed
204 205 206 207 208 209 210
			}
		}
	}

	return res
}

211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
// Add the peer to the list of peers that have sent a want with the cid
func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) {
	peers, ok := pwm.wantPeers[c]
	if !ok {
		peers = make(map[peer.ID]struct{}, 1)
		pwm.wantPeers[c] = peers
	}
	peers[p] = struct{}{}
}

// Remove the peer from the list of peers that have sent a want with the cid
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) {
	if peers, ok := pwm.wantPeers[c]; ok {
		delete(peers, p)
		if len(peers) == 0 {
			delete(pwm.wantPeers, c)
		}
	}
}

dirkmc's avatar
dirkmc committed
231
// GetWantBlocks returns the set of all want-blocks sent to all peers
232
func (pwm *peerWantManager) getWantBlocks() []cid.Cid {
dirkmc's avatar
dirkmc committed
233 234 235 236 237
	res := cid.NewSet()

	// Iterate over all known peers
	for _, pws := range pwm.peerWants {
		// Iterate over all want-blocks
238
		pws.wantBlocks.ForEach(func(c cid.Cid) error {
dirkmc's avatar
dirkmc committed
239 240
			// Add the CID to the results
			res.Add(c)
241 242
			return nil
		})
dirkmc's avatar
dirkmc committed
243 244 245 246 247 248
	}

	return res.Keys()
}

// GetWantHaves returns the set of all want-haves sent to all peers
249
func (pwm *peerWantManager) getWantHaves() []cid.Cid {
dirkmc's avatar
dirkmc committed
250 251 252 253 254
	res := cid.NewSet()

	// Iterate over all known peers
	for _, pws := range pwm.peerWants {
		// Iterate over all want-haves
255
		pws.wantHaves.ForEach(func(c cid.Cid) error {
dirkmc's avatar
dirkmc committed
256 257
			// Add the CID to the results
			res.Add(c)
258 259
			return nil
		})
dirkmc's avatar
dirkmc committed
260 261 262 263 264
	}

	return res.Keys()
}

265
// GetWants returns the set of all wants (both want-blocks and want-haves).
266
func (pwm *peerWantManager) getWants() []cid.Cid {
267 268 269 270 271
	res := cid.NewSet()

	// Iterate over all known peers
	for _, pws := range pwm.peerWants {
		// Iterate over all want-blocks
272
		pws.wantBlocks.ForEach(func(c cid.Cid) error {
273 274
			// Add the CID to the results
			res.Add(c)
275 276
			return nil
		})
277 278

		// Iterate over all want-haves
279
		pws.wantHaves.ForEach(func(c cid.Cid) error {
280 281
			// Add the CID to the results
			res.Add(c)
282 283
			return nil
		})
284 285 286 287 288
	}

	return res.Keys()
}

dirkmc's avatar
dirkmc committed
289 290 291
func (pwm *peerWantManager) String() string {
	var b bytes.Buffer
	for p, ws := range pwm.peerWants {
Dirk McCormick's avatar
Dirk McCormick committed
292
		b.WriteString(fmt.Sprintf("Peer %s: %d want-have / %d want-block:\n", p, ws.wantHaves.Len(), ws.wantBlocks.Len()))
dirkmc's avatar
dirkmc committed
293
		for _, c := range ws.wantHaves.Keys() {
Dirk McCormick's avatar
Dirk McCormick committed
294
			b.WriteString(fmt.Sprintf("  want-have  %s\n", c))
dirkmc's avatar
dirkmc committed
295 296
		}
		for _, c := range ws.wantBlocks.Keys() {
Dirk McCormick's avatar
Dirk McCormick committed
297
			b.WriteString(fmt.Sprintf("  want-block %s\n", c))
dirkmc's avatar
dirkmc committed
298 299 300 301
		}
	}
	return b.String()
}