filter.go 7.94 KB
Newer Older
1 2 3 4 5 6 7 8 9
package peerdiversity

import (
	"errors"
	"fmt"
	"net"
	"sort"
	"sync"

tavit ohanian's avatar
tavit ohanian committed
10
	"gitlab.dms3.io/p2p/go-p2p-core/peer"
Aarsh Shah's avatar
Aarsh Shah committed
11

tavit ohanian's avatar
tavit ohanian committed
12 13
	"gitlab.dms3.io/p2p/go-cidranger"
	asnutil "gitlab.dms3.io/p2p/go-p2p-asn-util"
Aarsh Shah's avatar
Aarsh Shah committed
14

15
	ma "github.com/multiformats/go-multiaddr"
16
	manet "github.com/multiformats/go-multiaddr/net"
tavit ohanian's avatar
tavit ohanian committed
17
	logging "gitlab.dms3.io/dms3/public/go-log"
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
)

var dfLog = logging.Logger("diversityFilter")

type asnStore interface {
	AsnForIPv6(ip net.IP) (string, error)
}

// PeerIPGroupKey is a unique key that represents ONE of the IP Groups the peer belongs to.
// A peer has one PeerIPGroupKey per address. Thus, a peer can belong to MULTIPLE Groups if it has
// multiple addresses.
// For now, given a peer address, our grouping mechanism is as follows:
// 1. For IPv6 addresses, we group by the ASN of the IP address.
// 2. For IPv4 addresses, all addresses that belong to same legacy (Class A)/8 allocations
//    OR share the same /16 prefix are in the same group.
type PeerIPGroupKey string

// https://en.wikipedia.org/wiki/List_of_assigned_/8_IPv4_address_blocks
var legacyClassA = []string{"12.0.0.0/8", "17.0.0.0/8", "19.0.0.0/8", "38.0.0.0/8", "48.0.0.0/8", "56.0.0.0/8", "73.0.0.0/8", "53.0.0.0/8"}

// PeerGroupInfo represents the grouping info for a Peer.
type PeerGroupInfo struct {
Aarsh Shah's avatar
Aarsh Shah committed
40 41 42
	Id         peer.ID
	Cpl        int
	IPGroupKey PeerIPGroupKey
43 44 45 46 47 48 49 50 51
}

// PeerIPGroupFilter is the interface that must be implemented by callers who want to
// instantiate a `peerdiversity.Filter`. This interface provides the function hooks
// that are used/called by the `peerdiversity.Filter`.
type PeerIPGroupFilter interface {
	// Allow is called by the Filter to test if a peer with the given
	// grouping info should be allowed/rejected by the Filter. This will be called ONLY
	// AFTER the peer has successfully passed all of the Filter's internal checks.
Aarsh Shah's avatar
Aarsh Shah committed
52
	// Note: If the peer is whitelisted on the Filter, the peer will be allowed by the Filter without calling this function.
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
	Allow(PeerGroupInfo) (allow bool)

	// Increment is called by the Filter when a peer with the given Grouping Info.
	// is added to the Filter state. This will happen after the peer has passed
	// all of the Filter's internal checks and the Allow function defined above for all of it's Groups.
	Increment(PeerGroupInfo)

	// Decrement is called by the Filter when a peer with the given
	// Grouping Info is removed from the Filter. This will happen when the caller/user of the Filter
	// no longer wants the peer and the IP groups it belongs to to count towards the Filter state.
	Decrement(PeerGroupInfo)

	// PeerAddresses is called by the Filter to determine the addresses of the given peer
	// it should use to determine the IP groups it belongs to.
	PeerAddresses(peer.ID) []ma.Multiaddr
}

Aarsh Shah's avatar
Aarsh Shah committed
70 71
// Filter is a peer diversity filter that accepts or rejects peers based on the whitelisting rules configured
// AND the diversity policies defined by the implementation of the PeerIPGroupFilter interface
72 73 74 75 76 77 78
// passed to it.
type Filter struct {
	mu sync.Mutex
	// An implementation of the `PeerIPGroupFilter` interface defined above.
	pgm        PeerIPGroupFilter
	peerGroups map[peer.ID][]PeerGroupInfo

Aarsh Shah's avatar
Aarsh Shah committed
79
	// whitelisted peers
80
	wlpeers map[peer.ID]struct{}
Aarsh Shah's avatar
Aarsh Shah committed
81

82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
	// legacy IPv4 Class A networks.
	legacyCidrs cidranger.Ranger

	logKey string

	cplFnc func(peer.ID) int

	cplPeerGroups map[int]map[peer.ID][]PeerIPGroupKey

	asnStore asnStore
}

// NewFilter creates a Filter for Peer Diversity.
func NewFilter(pgm PeerIPGroupFilter, logKey string, cplFnc func(peer.ID) int) (*Filter, error) {
	if pgm == nil {
		return nil, errors.New("peergroup implementation can not be nil")
	}

	// Crate a Trie for legacy Class N networks
	legacyCidrs := cidranger.NewPCTrieRanger()
	for _, cidr := range legacyClassA {
		_, nn, err := net.ParseCIDR(cidr)
		if err != nil {
			return nil, err
		}
		if err := legacyCidrs.Insert(cidranger.NewBasicRangerEntry(*nn)); err != nil {
			return nil, err
		}
	}

	return &Filter{
		pgm:           pgm,
		peerGroups:    make(map[peer.ID][]PeerGroupInfo),
		wlpeers:       make(map[peer.ID]struct{}),
		legacyCidrs:   legacyCidrs,
		logKey:        logKey,
		cplFnc:        cplFnc,
		cplPeerGroups: make(map[int]map[peer.ID][]PeerIPGroupKey),
		asnStore:      asnutil.Store,
	}, nil
}

func (f *Filter) Remove(p peer.ID) {
	f.mu.Lock()
	defer f.mu.Unlock()

	cpl := f.cplFnc(p)

	for _, info := range f.peerGroups[p] {
		f.pgm.Decrement(info)
	}
	f.peerGroups[p] = nil
	delete(f.peerGroups, p)
	delete(f.cplPeerGroups[cpl], p)

	if len(f.cplPeerGroups[cpl]) == 0 {
		delete(f.cplPeerGroups, cpl)
	}
}

// TryAdd attempts to add the peer to the Filter state and returns true if it's successful, false otherwise.
func (f *Filter) TryAdd(p peer.ID) bool {
	f.mu.Lock()
	defer f.mu.Unlock()

Aarsh Shah's avatar
Aarsh Shah committed
147 148 149 150
	if _, ok := f.wlpeers[p]; ok {
		return true
	}

151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
	cpl := f.cplFnc(p)

	// don't allow peers for which we can't determine addresses.
	addrs := f.pgm.PeerAddresses(p)
	if len(addrs) == 0 {
		dfLog.Debugw("no addresses found for peer", "appKey", f.logKey, "peer", p.Pretty())
		return false
	}

	peerGroups := make([]PeerGroupInfo, 0, len(addrs))
	for _, a := range addrs {
		ip, err := manet.ToIP(a)
		if err != nil {
			dfLog.Errorw("failed to parse IP from multiaddr", "appKey", f.logKey,
				"multiaddr", a.String(), "err", err)
			return false
		}

		// reject the peer if we can't determine a grouping for one of it's address.
		key, err := f.ipGroupKey(ip)
		if err != nil {
			dfLog.Errorw("failed to find Group Key", "appKey", f.logKey, "ip", ip.String(), "peer", p,
				"err", err)
			return false
		}
		if len(key) == 0 {
177
			dfLog.Errorw("group key is empty", "appKey", f.logKey, "ip", ip.String(), "peer", p)
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
			return false
		}
		group := PeerGroupInfo{Id: p, Cpl: cpl, IPGroupKey: key}

		if !f.pgm.Allow(group) {
			return false
		}

		peerGroups = append(peerGroups, group)
	}

	if _, ok := f.cplPeerGroups[cpl]; !ok {
		f.cplPeerGroups[cpl] = make(map[peer.ID][]PeerIPGroupKey)
	}

	for _, g := range peerGroups {
Aarsh Shah's avatar
Aarsh Shah committed
194 195
		f.pgm.Increment(g)

196 197 198 199 200 201 202
		f.peerGroups[p] = append(f.peerGroups[p], g)
		f.cplPeerGroups[cpl][p] = append(f.cplPeerGroups[cpl][p], g.IPGroupKey)
	}

	return true
}

Aarsh Shah's avatar
Aarsh Shah committed
203
// WhitelistPeers will always allow the given peers.
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
func (f *Filter) WhitelistPeers(peers ...peer.ID) {
	f.mu.Lock()
	defer f.mu.Unlock()

	for _, p := range peers {
		f.wlpeers[p] = struct{}{}
	}
}

// returns the PeerIPGroupKey to which the given IP belongs.
func (f *Filter) ipGroupKey(ip net.IP) (PeerIPGroupKey, error) {
	switch bz := ip.To4(); bz {
	case nil:
		// TODO Clean up the ASN codebase
		// ipv6 Address -> get ASN
		s, err := f.asnStore.AsnForIPv6(ip)
		if err != nil {
			return "", fmt.Errorf("failed to fetch ASN for IPv6 addr %s: %w", ip.String(), err)
		}
223 224 225 226 227 228 229

		// if no ASN found then fallback on using the /32 prefix
		if len(s) == 0 {
			dfLog.Debugw("ASN not known", "appKey", f.logKey, "ip", ip)
			s = fmt.Sprintf("unknown ASN: %s", net.CIDRMask(32, 128).String())
		}

230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
		return PeerIPGroupKey(s), nil
	default:
		// If it belongs to a legacy Class 8, we return the /8 prefix as the key
		rs, _ := f.legacyCidrs.ContainingNetworks(ip)
		if len(rs) != 0 {
			key := ip.Mask(net.IPv4Mask(255, 0, 0, 0)).String()
			return PeerIPGroupKey(key), nil
		}

		// otherwise -> /16 prefix
		key := ip.Mask(net.IPv4Mask(255, 255, 0, 0)).String()
		return PeerIPGroupKey(key), nil
	}
}

// CplDiversityStats contains the peer diversity stats for a Cpl.
type CplDiversityStats struct {
	Cpl   int
	Peers map[peer.ID][]PeerIPGroupKey
}

// GetDiversityStats returns the diversity stats for each CPL and is sorted by the CPL.
func (f *Filter) GetDiversityStats() []CplDiversityStats {
	f.mu.Lock()
	defer f.mu.Unlock()

	stats := make([]CplDiversityStats, 0, len(f.cplPeerGroups))

	var sortedCpls []int
	for cpl := range f.cplPeerGroups {
		sortedCpls = append(sortedCpls, cpl)
	}
	sort.Ints(sortedCpls)

	for _, cpl := range sortedCpls {
		ps := make(map[peer.ID][]PeerIPGroupKey, len(f.cplPeerGroups[cpl]))
		cd := CplDiversityStats{cpl, ps}

		for p, groups := range f.cplPeerGroups[cpl] {
			ps[p] = groups
		}
		stats = append(stats, cd)
	}

	return stats
}