peerwantmanager.go 6.64 KB
Newer Older
dirkmc's avatar
dirkmc committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
package peermanager

import (
	"bytes"
	"fmt"

	cid "github.com/ipfs/go-cid"
	peer "github.com/libp2p/go-libp2p-core/peer"
)

// Gauge can be used to keep track of a metric that increases and decreases
// incrementally. It is used by the peerWantManager to track the number of
// want-blocks that are active (ie sent but no response received)
type Gauge interface {
	Inc()
	Dec()
}

// peerWantManager keeps track of which want-haves and want-blocks have been
// sent to each peer, so that the PeerManager doesn't send duplicates.
type peerWantManager struct {
	peerWants map[peer.ID]*peerWant
	// Keeps track of the number of active want-blocks
	wantBlockGauge Gauge
}

type peerWant struct {
	wantBlocks *cid.Set
	wantHaves  *cid.Set
}

// New creates a new peerWantManager with a Gauge that keeps track of the
// number of active want-blocks (ie sent but no response received)
func newPeerWantManager(wantBlockGauge Gauge) *peerWantManager {
	return &peerWantManager{
		peerWants:      make(map[peer.ID]*peerWant),
		wantBlockGauge: wantBlockGauge,
	}
}

// AddPeer adds a peer whose wants we need to keep track of
42
func (pwm *peerWantManager) addPeer(p peer.ID) {
dirkmc's avatar
dirkmc committed
43 44 45 46 47 48 49 50 51
	if _, ok := pwm.peerWants[p]; !ok {
		pwm.peerWants[p] = &peerWant{
			wantBlocks: cid.NewSet(),
			wantHaves:  cid.NewSet(),
		}
	}
}

// RemovePeer removes a peer and its associated wants from tracking
52
func (pwm *peerWantManager) removePeer(p peer.ID) {
53 54 55 56 57 58 59 60 61 62
	pws, ok := pwm.peerWants[p]
	if !ok {
		return
	}

	// Decrement the gauge by the number of pending want-blocks to the peer
	for range pws.wantBlocks.Keys() {
		pwm.wantBlockGauge.Dec()
	}

dirkmc's avatar
dirkmc committed
63 64 65 66 67
	delete(pwm.peerWants, p)
}

// PrepareBroadcastWantHaves filters the list of want-haves for each peer,
// returning a map of peers to the want-haves they have not yet been sent.
68
func (pwm *peerWantManager) prepareBroadcastWantHaves(wantHaves []cid.Cid) map[peer.ID][]cid.Cid {
dirkmc's avatar
dirkmc committed
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
	res := make(map[peer.ID][]cid.Cid)

	// Iterate over all known peers
	for p, pws := range pwm.peerWants {
		// Iterate over all want-haves
		for _, c := range wantHaves {
			// If the CID has not been sent as a want-block or want-have
			if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
				// Record that the CID has been sent as a want-have
				pws.wantHaves.Add(c)

				// Add the CID to the results
				if _, ok := res[p]; !ok {
					res[p] = make([]cid.Cid, 0, 1)
				}
				res[p] = append(res[p], c)
			}
		}
	}

	return res
}

// PrepareSendWants filters the list of want-blocks and want-haves such that
// it only contains wants that have not already been sent to the peer.
94
func (pwm *peerWantManager) prepareSendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) ([]cid.Cid, []cid.Cid) {
dirkmc's avatar
dirkmc committed
95 96 97 98
	resWantBlks := make([]cid.Cid, 0)
	resWantHvs := make([]cid.Cid, 0)

	// Get the existing want-blocks and want-haves for the peer
99 100 101 102 103 104 105 106 107 108
	pws, ok := pwm.peerWants[p]

	if !ok {
		// In practice this should never happen:
		// - PeerManager calls addPeer() as soon as the peer connects
		// - PeerManager calls removePeer() as soon as the peer disconnects
		// - All calls to PeerWantManager are locked
		log.Errorf("prepareSendWants() called with peer %s but peer not found in peerWantManager", string(p))
		return resWantBlks, resWantHvs
	}
dirkmc's avatar
dirkmc committed
109

110 111 112 113 114 115
	// Iterate over the requested want-blocks
	for _, c := range wantBlocks {
		// If the want-block hasn't been sent to the peer
		if !pws.wantBlocks.Has(c) {
			// Record that the CID was sent as a want-block
			pws.wantBlocks.Add(c)
dirkmc's avatar
dirkmc committed
116

117 118
			// Add the CID to the results
			resWantBlks = append(resWantBlks, c)
dirkmc's avatar
dirkmc committed
119

120 121 122 123 124
			// Make sure the CID is no longer recorded as a want-have
			pws.wantHaves.Remove(c)

			// Increment the count of want-blocks
			pwm.wantBlockGauge.Inc()
dirkmc's avatar
dirkmc committed
125
		}
126
	}
dirkmc's avatar
dirkmc committed
127

128 129 130 131 132 133
	// Iterate over the requested want-haves
	for _, c := range wantHaves {
		// If the CID has not been sent as a want-block or want-have
		if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
			// Record that the CID was sent as a want-have
			pws.wantHaves.Add(c)
dirkmc's avatar
dirkmc committed
134

135 136
			// Add the CID to the results
			resWantHvs = append(resWantHvs, c)
dirkmc's avatar
dirkmc committed
137 138 139 140 141 142 143 144 145
		}
	}

	return resWantBlks, resWantHvs
}

// PrepareSendCancels filters the list of cancels for each peer,
// returning a map of peers which only contains cancels for wants that have
// been sent to the peer.
146
func (pwm *peerWantManager) prepareSendCancels(cancelKs []cid.Cid) map[peer.ID][]cid.Cid {
dirkmc's avatar
dirkmc committed
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
	res := make(map[peer.ID][]cid.Cid)

	// Iterate over all known peers
	for p, pws := range pwm.peerWants {
		// Iterate over all requested cancels
		for _, c := range cancelKs {
			isWantBlock := pws.wantBlocks.Has(c)
			isWantHave := pws.wantHaves.Has(c)

			// If the CID was sent as a want-block, decrement the want-block count
			if isWantBlock {
				pwm.wantBlockGauge.Dec()
			}

			// If the CID was sent as a want-block or want-have
			if isWantBlock || isWantHave {
				// Remove the CID from the recorded want-blocks and want-haves
				pws.wantBlocks.Remove(c)
				pws.wantHaves.Remove(c)

				// Add the CID to the results
				if _, ok := res[p]; !ok {
					res[p] = make([]cid.Cid, 0, 1)
				}
				res[p] = append(res[p], c)
			}
		}
	}

	return res
}

// GetWantBlocks returns the set of all want-blocks sent to all peers
180
func (pwm *peerWantManager) getWantBlocks() []cid.Cid {
dirkmc's avatar
dirkmc committed
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
	res := cid.NewSet()

	// Iterate over all known peers
	for _, pws := range pwm.peerWants {
		// Iterate over all want-blocks
		for _, c := range pws.wantBlocks.Keys() {
			// Add the CID to the results
			res.Add(c)
		}
	}

	return res.Keys()
}

// GetWantHaves returns the set of all want-haves sent to all peers
196
func (pwm *peerWantManager) getWantHaves() []cid.Cid {
dirkmc's avatar
dirkmc committed
197 198 199 200 201 202 203 204 205 206 207 208 209 210
	res := cid.NewSet()

	// Iterate over all known peers
	for _, pws := range pwm.peerWants {
		// Iterate over all want-haves
		for _, c := range pws.wantHaves.Keys() {
			// Add the CID to the results
			res.Add(c)
		}
	}

	return res.Keys()
}

211
// GetWants returns the set of all wants (both want-blocks and want-haves).
212
func (pwm *peerWantManager) getWants() []cid.Cid {
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
	res := cid.NewSet()

	// Iterate over all known peers
	for _, pws := range pwm.peerWants {
		// Iterate over all want-blocks
		for _, c := range pws.wantBlocks.Keys() {
			// Add the CID to the results
			res.Add(c)
		}

		// Iterate over all want-haves
		for _, c := range pws.wantHaves.Keys() {
			// Add the CID to the results
			res.Add(c)
		}
	}

	return res.Keys()
}

dirkmc's avatar
dirkmc committed
233 234 235
func (pwm *peerWantManager) String() string {
	var b bytes.Buffer
	for p, ws := range pwm.peerWants {
Dirk McCormick's avatar
Dirk McCormick committed
236
		b.WriteString(fmt.Sprintf("Peer %s: %d want-have / %d want-block:\n", p, ws.wantHaves.Len(), ws.wantBlocks.Len()))
dirkmc's avatar
dirkmc committed
237
		for _, c := range ws.wantHaves.Keys() {
Dirk McCormick's avatar
Dirk McCormick committed
238
			b.WriteString(fmt.Sprintf("  want-have  %s\n", c))
dirkmc's avatar
dirkmc committed
239 240
		}
		for _, c := range ws.wantBlocks.Keys() {
Dirk McCormick's avatar
Dirk McCormick committed
241
			b.WriteString(fmt.Sprintf("  want-block %s\n", c))
dirkmc's avatar
dirkmc committed
242 243 244 245
		}
	}
	return b.String()
}