score.go 22 KB
Newer Older
vyzo's avatar
vyzo committed
1 2 3
package pubsub

import (
vyzo's avatar
vyzo committed
4
	"context"
vyzo's avatar
vyzo committed
5
	"fmt"
6
	"net"
vyzo's avatar
vyzo committed
7
	"sync"
vyzo's avatar
vyzo committed
8 9
	"time"

vyzo's avatar
vyzo committed
10
	"github.com/libp2p/go-libp2p-core/host"
vyzo's avatar
vyzo committed
11 12
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/protocol"
vyzo's avatar
vyzo committed
13

14
	manet "github.com/multiformats/go-multiaddr-net"
vyzo's avatar
vyzo committed
15 16
)

vyzo's avatar
vyzo committed
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
type peerStats struct {
	// true if the peer is currently connected
	connected bool

	// expiration time of the score stats for disconnected peers
	expire time.Time

	// per topc stats
	topics map[string]*topicStats

	// IP tracking; store as string for easy processing
	ips []string
}

type topicStats struct {
	// true if the peer is in the mesh
	inMesh bool

	// time when the peer was (last) GRAFTed; valid only when in mesh
	graftTime time.Time

	// time in mesh (updated during refresh/decay to avoid calling gettimeofday on
	// every score invocation)
	meshTime time.Duration

	// first message deliveries
	firstMessageDeliveries float64

	// mesh message deliveries
	meshMessageDeliveries float64

	// true if the peer has been enough time in the mesh to activate mess message deliveries
	meshMessageDeliveriesActive bool

vyzo's avatar
vyzo committed
51 52 53
	// sticky mesh rate failure penalty counter
	meshFailurePenalty float64

vyzo's avatar
vyzo committed
54 55 56 57
	// invalid message counter
	invalidMessageDeliveries float64
}

vyzo's avatar
vyzo committed
58
type peerScore struct {
vyzo's avatar
vyzo committed
59 60 61 62 63 64 65 66
	sync.Mutex

	// the score parameters
	params *PeerScoreParams

	// per peer stats for score calculation
	peerStats map[peer.ID]*peerStats

Raúl Kripalani's avatar
Raúl Kripalani committed
67
	// IP colocation tracking; maps IP => set of peers.
vyzo's avatar
vyzo committed
68
	peerIPs map[string]map[peer.ID]struct{}
vyzo's avatar
vyzo committed
69 70 71 72 73

	// message delivery tracking
	deliveries *messageDeliveries

	msgID MsgIdFunction
vyzo's avatar
vyzo committed
74
	host  host.Host
75 76 77 78

	// debugging inspection
	inspect       PeerScoreInspectFn
	inspectPeriod time.Duration
vyzo's avatar
vyzo committed
79 80
}

Raúl Kripalani's avatar
Raúl Kripalani committed
81 82
var _ scoreTracer = (*peerScore)(nil)

vyzo's avatar
vyzo committed
83
type messageDeliveries struct {
84 85 86 87 88
	records map[string]*deliveryRecord

	// queue for cleaning up old delivery records
	head *deliveryEntry
	tail *deliveryEntry
vyzo's avatar
vyzo committed
89 90 91 92
}

type deliveryRecord struct {
	status    int
93
	firstSeen time.Time
94
	validated time.Time
95
	peers     map[peer.ID]struct{}
vyzo's avatar
vyzo committed
96 97
}

98 99 100 101 102 103
type deliveryEntry struct {
	id     string
	expire time.Time
	next   *deliveryEntry
}

vyzo's avatar
vyzo committed
104 105
// delivery record status
const (
106 107 108
	deliveryUnknown   = iota // we don't know (yet) if the message is valid
	deliveryValid            // we know the message is valid
	deliveryInvalid          // we know the message is invalid
vyzo's avatar
vyzo committed
109
	deliveryIgnored          // we were intructed by the validator to ignore the message
110
	deliveryThrottled        // we can't tell if it is valid because validation throttled
vyzo's avatar
vyzo committed
111 112
)

113 114
type PeerScoreInspectFn func(map[peer.ID]float64)

115
// WithPeerScoreInspect is a gossipsub router option that enables peer score debugging.
116 117 118
// When this option is enabled, the supplied function will be invoked periodically to allow
// the application to inspec or dump the scores for connected peers.
// This option must be passed _after_ the WithPeerScore option.
119
func WithPeerScoreInspect(inspect PeerScoreInspectFn, period time.Duration) Option {
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
	return func(ps *PubSub) error {
		gs, ok := ps.rt.(*GossipSubRouter)
		if !ok {
			return fmt.Errorf("pubsub router is not gossipsub")
		}

		if gs.score == nil {
			return fmt.Errorf("peer scoring is not enabled")
		}

		gs.score.inspect = inspect
		gs.score.inspectPeriod = period

		return nil
	}
}

// implementation
138
func newPeerScore(params *PeerScoreParams) *peerScore {
vyzo's avatar
vyzo committed
139 140 141 142 143 144 145
	return &peerScore{
		params:     params,
		peerStats:  make(map[peer.ID]*peerStats),
		peerIPs:    make(map[string]map[peer.ID]struct{}),
		deliveries: &messageDeliveries{records: make(map[string]*deliveryRecord)},
		msgID:      DefaultMsgIdFn,
	}
vyzo's avatar
vyzo committed
146 147 148
}

// router interface
149
func (ps *peerScore) Start(gs *GossipSubRouter) {
150 151 152 153
	if ps == nil {
		return
	}

vyzo's avatar
vyzo committed
154 155 156
	ps.msgID = gs.p.msgID
	ps.host = gs.p.host
	go ps.background(gs.p.ctx)
157 158
}

159
func (ps *peerScore) Score(p peer.ID) float64 {
vyzo's avatar
vyzo committed
160 161 162 163
	if ps == nil {
		return 0
	}

vyzo's avatar
vyzo committed
164 165 166
	ps.Lock()
	defer ps.Unlock()

167 168 169 170
	return ps.score(p)
}

func (ps *peerScore) score(p peer.ID) float64 {
vyzo's avatar
vyzo committed
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
	pstats, ok := ps.peerStats[p]
	if !ok {
		return 0
	}

	var score float64

	// topic scores
	for topic, tstats := range pstats.topics {
		// the topic parameters
		topicParams, ok := ps.params.Topics[topic]
		if !ok {
			// we are not scoring this topic
			continue
		}

Raúl Kripalani's avatar
Raúl Kripalani committed
187 188 189
		// the topic score
		var topicScore float64

vyzo's avatar
vyzo committed
190 191 192
		// P1: time in Mesh
		if tstats.inMesh {
			p1 := float64(tstats.meshTime / topicParams.TimeInMeshQuantum)
vyzo's avatar
vyzo committed
193 194 195
			if p1 > topicParams.TimeInMeshCap {
				p1 = topicParams.TimeInMeshCap
			}
vyzo's avatar
vyzo committed
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
			topicScore += p1 * topicParams.TimeInMeshWeight
		}

		// P2: first message deliveries
		p2 := tstats.firstMessageDeliveries
		topicScore += p2 * topicParams.FirstMessageDeliveriesWeight

		// P3: mesh message deliveries
		if tstats.meshMessageDeliveriesActive {
			if tstats.meshMessageDeliveries < topicParams.MeshMessageDeliveriesThreshold {
				deficit := topicParams.MeshMessageDeliveriesThreshold - tstats.meshMessageDeliveries
				p3 := deficit * deficit
				topicScore += p3 * topicParams.MeshMessageDeliveriesWeight
			}
		}

vyzo's avatar
vyzo committed
212
		// P3b:
Raúl Kripalani's avatar
Raúl Kripalani committed
213
		// NOTE: the weight of P3b is negative (validated in TopicScoreParams.validate), so this detracts.
vyzo's avatar
vyzo committed
214 215 216
		p3b := tstats.meshFailurePenalty
		topicScore += p3b * topicParams.MeshFailurePenaltyWeight

vyzo's avatar
vyzo committed
217
		// P4: invalid messages
Raúl Kripalani's avatar
Raúl Kripalani committed
218
		// NOTE: the weight of P4 is negative (validated in TopicScoreParams.validate), so this detracts.
vyzo's avatar
vyzo committed
219 220 221 222 223 224 225
		p4 := tstats.invalidMessageDeliveries
		topicScore += p4 * topicParams.InvalidMessageDeliveriesWeight

		// update score, mixing with topic weight
		score += topicScore * topicParams.TopicWeight
	}

vyzo's avatar
vyzo committed
226 227 228 229 230
	// apply the topic score cap, if any
	if ps.params.TopicScoreCap > 0 && score > ps.params.TopicScoreCap {
		score = ps.params.TopicScoreCap
	}

vyzo's avatar
vyzo committed
231 232 233 234 235 236
	// P5: application-specific score
	p5 := ps.params.AppSpecificScore(p)
	score += p5 * ps.params.AppSpecificWeight

	// P6: IP collocation factor
	for _, ip := range pstats.ips {
237 238 239 240 241
		_, whitelisted := ps.params.IPColocationFactorWhitelist[ip]
		if whitelisted {
			continue
		}

Raúl Kripalani's avatar
Raúl Kripalani committed
242 243 244 245
		// P6 has a cliff (IPColocationFactorThreshold); it's only applied iff
		// at least that many peers are connected to us from that source IP
		// addr. It is quadratic, and the weight is negative (validated by
		// PeerScoreParams.validate).
vyzo's avatar
vyzo committed
246 247 248 249 250 251 252 253 254
		peersInIP := len(ps.peerIPs[ip])
		if peersInIP > ps.params.IPColocationFactorThreshold {
			surpluss := float64(peersInIP - ps.params.IPColocationFactorThreshold)
			p6 := surpluss * surpluss
			score += p6 * ps.params.IPColocationFactorWeight
		}
	}

	return score
vyzo's avatar
vyzo committed
255 256
}

257
// periodic maintenance
vyzo's avatar
vyzo committed
258 259 260 261 262 263 264 265 266 267
func (ps *peerScore) background(ctx context.Context) {
	refreshScores := time.NewTicker(ps.params.DecayInterval)
	defer refreshScores.Stop()

	refreshIPs := time.NewTicker(time.Minute)
	defer refreshIPs.Stop()

	gcDeliveryRecords := time.NewTicker(time.Minute)
	defer gcDeliveryRecords.Stop()

268 269 270 271
	var inspectScores <-chan time.Time
	if ps.inspect != nil {
		ticker := time.NewTicker(ps.inspectPeriod)
		defer ticker.Stop()
272 273
		// also dump at exit for one final sample
		defer ps.inspectScores()
274 275 276
		inspectScores = ticker.C
	}

vyzo's avatar
vyzo committed
277 278 279 280 281 282 283 284 285 286 287
	for {
		select {
		case <-refreshScores.C:
			ps.refreshScores()

		case <-refreshIPs.C:
			ps.refreshIPs()

		case <-gcDeliveryRecords.C:
			ps.gcDeliveryRecords()

288 289 290
		case <-inspectScores:
			ps.inspectScores()

vyzo's avatar
vyzo committed
291 292 293 294 295 296
		case <-ctx.Done():
			return
		}
	}
}

Raúl Kripalani's avatar
Raúl Kripalani committed
297
// inspectScores dumps all tracked scores into the inspect function.
298 299 300 301 302 303 304 305
func (ps *peerScore) inspectScores() {
	ps.Lock()
	scores := make(map[peer.ID]float64, len(ps.peerStats))
	for p := range ps.peerStats {
		scores[p] = ps.score(p)
	}
	ps.Unlock()

Raúl Kripalani's avatar
Raúl Kripalani committed
306 307 308 309 310
	// Since this is a user-injected function, it could be performing I/O, and
	// we don't want to block the scorer's background loop. Therefore, we launch
	// it in a separate goroutine. If the function needs to synchronise, it
	// should do so locally.
	go ps.inspect(scores)
311 312
}

Raúl Kripalani's avatar
Raúl Kripalani committed
313 314
// refreshScores decays scores, and purges score records for disconnected peers,
// once their expiry has elapsed.
315 316 317 318 319 320 321 322 323
func (ps *peerScore) refreshScores() {
	ps.Lock()
	defer ps.Unlock()

	now := time.Now()
	for p, pstats := range ps.peerStats {
		if !pstats.connected {
			// has the retention period expired?
			if now.After(pstats.expire) {
vyzo's avatar
vyzo committed
324 325
				// yes, throw it away (but clean up the IP tracking first)
				ps.removeIPs(p, pstats.ips)
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345
				delete(ps.peerStats, p)
			}

			// we don't decay retained scores, as the peer is not active.
			// this way the peer cannot reset a negative score by simply disconnecting and reconnecting,
			// unless the retention period has ellapsed.
			// similarly, a well behaved peer does not lose its score by getting disconnected.
			continue
		}

		for topic, tstats := range pstats.topics {
			// the topic parameters
			topicParams, ok := ps.params.Topics[topic]
			if !ok {
				// we are not scoring this topic
				continue
			}

			// decay counters
			tstats.firstMessageDeliveries *= topicParams.FirstMessageDeliveriesDecay
346 347 348
			if tstats.firstMessageDeliveries < ps.params.DecayToZero {
				tstats.firstMessageDeliveries = 0
			}
349
			tstats.meshMessageDeliveries *= topicParams.MeshMessageDeliveriesDecay
350 351 352
			if tstats.meshMessageDeliveries < ps.params.DecayToZero {
				tstats.meshMessageDeliveries = 0
			}
vyzo's avatar
vyzo committed
353 354 355 356
			tstats.meshFailurePenalty *= topicParams.MeshFailurePenaltyDecay
			if tstats.meshFailurePenalty < ps.params.DecayToZero {
				tstats.meshFailurePenalty = 0
			}
357
			tstats.invalidMessageDeliveries *= topicParams.InvalidMessageDeliveriesDecay
358 359 360
			if tstats.invalidMessageDeliveries < ps.params.DecayToZero {
				tstats.invalidMessageDeliveries = 0
			}
361 362 363 364 365 366 367 368 369 370 371
			// update mesh time and activate mesh message delivery parameter if need be
			if tstats.inMesh {
				tstats.meshTime = now.Sub(tstats.graftTime)
				if tstats.meshTime > topicParams.MeshMessageDeliveriesActivation {
					tstats.meshMessageDeliveriesActive = true
				}
			}
		}
	}
}

Raúl Kripalani's avatar
Raúl Kripalani committed
372
// refreshIPs refreshes IPs we know of peers we're tracking.
vyzo's avatar
vyzo committed
373 374 375 376
func (ps *peerScore) refreshIPs() {
	ps.Lock()
	defer ps.Unlock()

377
	// peer IPs may change, so we periodically refresh them
Raúl Kripalani's avatar
Raúl Kripalani committed
378 379 380 381 382
	//
	// TODO: it could be more efficient to collect connections for all peers
	// from the Network, populate a new map, and replace it in place. We are
	// incurring in those allocs anyway, and maybe even in more, in the form of
	// slices.
vyzo's avatar
vyzo committed
383 384 385 386 387 388 389
	for p, pstats := range ps.peerStats {
		if pstats.connected {
			ips := ps.getIPs(p)
			ps.setIPs(p, ips, pstats.ips)
			pstats.ips = ips
		}
	}
390 391
}

vyzo's avatar
vyzo committed
392 393 394 395 396 397 398
func (ps *peerScore) gcDeliveryRecords() {
	ps.Lock()
	defer ps.Unlock()

	ps.deliveries.gc()
}

vyzo's avatar
vyzo committed
399 400
// tracer interface
func (ps *peerScore) AddPeer(p peer.ID, proto protocol.ID) {
vyzo's avatar
vyzo committed
401 402 403 404 405 406 407 408 409 410 411
	ps.Lock()
	defer ps.Unlock()

	pstats, ok := ps.peerStats[p]
	if !ok {
		pstats = &peerStats{topics: make(map[string]*topicStats)}
		ps.peerStats[p] = pstats
	}

	pstats.connected = true
	ips := ps.getIPs(p)
vyzo's avatar
vyzo committed
412
	ps.setIPs(p, ips, pstats.ips)
vyzo's avatar
vyzo committed
413
	pstats.ips = ips
vyzo's avatar
vyzo committed
414 415 416
}

func (ps *peerScore) RemovePeer(p peer.ID) {
vyzo's avatar
vyzo committed
417 418 419 420 421 422 423 424
	ps.Lock()
	defer ps.Unlock()

	pstats, ok := ps.peerStats[p]
	if !ok {
		return
	}

vyzo's avatar
vyzo committed
425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446
	// decide whether to retain the score; this currently only retains non-positive scores
	// to dissuade attacks on the score function.
	if ps.score(p) > 0 {
		ps.removeIPs(p, pstats.ips)
		delete(ps.peerStats, p)
		return
	}

	// furthermore, when we decide to retain the score, the firstMessageDelivery counters are
	// reset to 0 and mesh delivery penalties applied.
	for topic, tstats := range pstats.topics {
		tstats.firstMessageDeliveries = 0

		threshold := ps.params.Topics[topic].MeshMessageDeliveriesThreshold
		if tstats.inMesh && tstats.meshMessageDeliveriesActive && tstats.meshMessageDeliveries < threshold {
			deficit := threshold - tstats.meshMessageDeliveries
			tstats.meshFailurePenalty += deficit * deficit
		}

		tstats.inMesh = false
	}

vyzo's avatar
vyzo committed
447 448
	pstats.connected = false
	pstats.expire = time.Now().Add(ps.params.RetainScore)
vyzo's avatar
vyzo committed
449
}
vyzo's avatar
vyzo committed
450

vyzo's avatar
vyzo committed
451 452 453 454
func (ps *peerScore) Join(topic string)  {}
func (ps *peerScore) Leave(topic string) {}

func (ps *peerScore) Graft(p peer.ID, topic string) {
vyzo's avatar
vyzo committed
455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470
	ps.Lock()
	defer ps.Unlock()

	pstats, ok := ps.peerStats[p]
	if !ok {
		return
	}

	tstats, ok := pstats.getTopicStats(topic, ps.params)
	if !ok {
		return
	}

	tstats.inMesh = true
	tstats.graftTime = time.Now()
	tstats.meshTime = 0
vyzo's avatar
vyzo committed
471
	tstats.meshMessageDeliveriesActive = false
vyzo's avatar
vyzo committed
472 473
}

vyzo's avatar
vyzo committed
474
func (ps *peerScore) Prune(p peer.ID, topic string) {
vyzo's avatar
vyzo committed
475 476 477 478 479 480 481 482 483 484 485 486 487
	ps.Lock()
	defer ps.Unlock()

	pstats, ok := ps.peerStats[p]
	if !ok {
		return
	}

	tstats, ok := pstats.getTopicStats(topic, ps.params)
	if !ok {
		return
	}

488 489 490 491 492
	// sticky mesh delivery rate failure penalty
	threshold := ps.params.Topics[topic].MeshMessageDeliveriesThreshold
	if tstats.meshMessageDeliveriesActive && tstats.meshMessageDeliveries < threshold {
		deficit := threshold - tstats.meshMessageDeliveries
		tstats.meshFailurePenalty += deficit * deficit
vyzo's avatar
vyzo committed
493 494
	}

vyzo's avatar
vyzo committed
495
	tstats.inMesh = false
vyzo's avatar
vyzo committed
496
}
vyzo's avatar
vyzo committed
497

vyzo's avatar
vyzo committed
498 499 500 501 502 503 504 505 506
func (ps *peerScore) ValidateMessage(msg *Message) {
	ps.Lock()
	defer ps.Unlock()

	// the pubsub subsystem is beginning validation; create a record to track time in
	// the validation pipeline with an accurate firstSeen time.
	_ = ps.deliveries.getRecord(ps.msgID(msg.Message))
}

vyzo's avatar
vyzo committed
507 508 509 510 511 512 513 514
func (ps *peerScore) DeliverMessage(msg *Message) {
	ps.Lock()
	defer ps.Unlock()

	ps.markFirstMessageDelivery(msg.ReceivedFrom, msg)

	drec := ps.deliveries.getRecord(ps.msgID(msg.Message))

515 516
	// defensive check that this is the first delivery trace -- delivery status should be unknown
	if drec.status != deliveryUnknown {
vyzo's avatar
vyzo committed
517
		log.Warnf("unexpected delivery trace: message from %s was first seen %s ago and has delivery status %d", msg.ReceivedFrom, time.Now().Sub(drec.firstSeen), drec.status)
518 519 520
		return
	}

521
	// mark the message as valid and reward mesh peers that have already forwarded it to us
522
	drec.status = deliveryValid
523
	drec.validated = time.Now()
524
	for p := range drec.peers {
525 526 527 528 529
		// this check is to make sure a peer can't send us a message twice and get a double count
		// if it is a first delivery.
		if p != msg.ReceivedFrom {
			ps.markDuplicateMessageDelivery(p, msg, time.Time{})
		}
vyzo's avatar
vyzo committed
530
	}
vyzo's avatar
vyzo committed
531 532
}

vyzo's avatar
vyzo committed
533 534 535 536
func (ps *peerScore) RejectMessage(msg *Message, reason string) {
	ps.Lock()
	defer ps.Unlock()

537 538
	switch reason {
	// we don't track those messages, but we penalize the peer as they are clearly invalid
539
	case rejectMissingSignature:
540
		fallthrough
541
	case rejectInvalidSignature:
542 543
		fallthrough
	case rejectSelfOrigin:
vyzo's avatar
vyzo committed
544
		ps.markInvalidMessageDelivery(msg.ReceivedFrom, msg)
545 546 547
		return

		// we ignore those messages, so do nothing.
548
	case rejectBlacklstedPeer:
549
		fallthrough
550
	case rejectBlacklistedSource:
vyzo's avatar
vyzo committed
551
		return
552

553
	case rejectValidationQueueFull:
554 555 556 557
		// the message was rejected before it entered the validation pipeline;
		// we don't know if this message has a valid signature, and thus we also don't know if
		// it has a valid message ID; all we can do is ignore it.
		return
vyzo's avatar
vyzo committed
558 559 560
	}

	drec := ps.deliveries.getRecord(ps.msgID(msg.Message))
vyzo's avatar
vyzo committed
561

562 563
	// defensive check that this is the first rejection trace -- delivery status should be unknown
	if drec.status != deliveryUnknown {
vyzo's avatar
vyzo committed
564
		log.Warnf("unexpected rejection trace: message from %s was first seen %s ago and has delivery status %d", msg.ReceivedFrom, time.Now().Sub(drec.firstSeen), drec.status)
565 566 567
		return
	}

vyzo's avatar
vyzo committed
568 569
	switch reason {
	case rejectValidationThrottled:
vyzo's avatar
vyzo committed
570 571
		// if we reject with "validation throttled" we don't penalize the peer(s) that forward it
		// because we don't know if it was valid.
572
		drec.status = deliveryThrottled
vyzo's avatar
vyzo committed
573 574 575
		// release the delivery time tracking map to free some memory early
		drec.peers = nil
		return
vyzo's avatar
vyzo committed
576 577 578 579 580 581
	case rejectValidationIgnored:
		// we were explicitly instructed by the validator to ignore the message but not penalize
		// the peer
		drec.status = deliveryIgnored
		drec.peers = nil
		return
vyzo's avatar
vyzo committed
582 583 584
	}

	// mark the message as invalid and penalize peers that have already forwarded it.
585
	drec.status = deliveryInvalid
586 587

	ps.markInvalidMessageDelivery(msg.ReceivedFrom, msg)
vyzo's avatar
vyzo committed
588 589 590 591 592 593
	for p := range drec.peers {
		ps.markInvalidMessageDelivery(p, msg)
	}

	// release the delivery time tracking map to free some memory early
	drec.peers = nil
vyzo's avatar
vyzo committed
594 595
}

vyzo's avatar
vyzo committed
596 597 598 599 600 601 602 603 604 605 606
func (ps *peerScore) DuplicateMessage(msg *Message) {
	ps.Lock()
	defer ps.Unlock()

	drec := ps.deliveries.getRecord(ps.msgID(msg.Message))

	_, ok := drec.peers[msg.ReceivedFrom]
	if ok {
		// we have already seen this duplicate!
		return
	}
vyzo's avatar
vyzo committed
607

vyzo's avatar
vyzo committed
608
	switch drec.status {
609
	case deliveryUnknown:
610
		// the message is being validated; track the peer delivery and wait for
vyzo's avatar
vyzo committed
611
		// the Deliver/Reject notification.
612
		drec.peers[msg.ReceivedFrom] = struct{}{}
vyzo's avatar
vyzo committed
613

614
	case deliveryValid:
vyzo's avatar
vyzo committed
615
		// mark the peer delivery time to only count a duplicate delivery once.
616
		drec.peers[msg.ReceivedFrom] = struct{}{}
617
		ps.markDuplicateMessageDelivery(msg.ReceivedFrom, msg, drec.validated)
vyzo's avatar
vyzo committed
618

619
	case deliveryInvalid:
vyzo's avatar
vyzo committed
620 621 622
		// we no longer track delivery time
		ps.markInvalidMessageDelivery(msg.ReceivedFrom, msg)

623
	case deliveryThrottled:
624
		// the message was throttled; do nothing (we don't know if it was valid)
vyzo's avatar
vyzo committed
625 626
	case deliveryIgnored:
		// the message was ignored; do nothing
vyzo's avatar
vyzo committed
627
	}
vyzo's avatar
vyzo committed
628 629
}

vyzo's avatar
vyzo committed
630 631
// message delivery records
func (d *messageDeliveries) getRecord(id string) *deliveryRecord {
632 633 634 635 636
	rec, ok := d.records[id]
	if ok {
		return rec
	}

637 638 639
	now := time.Now()

	rec = &deliveryRecord{peers: make(map[peer.ID]struct{}), firstSeen: now}
640 641
	d.records[id] = rec

642
	entry := &deliveryEntry{id: id, expire: now.Add(TimeCacheDuration)}
643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667
	if d.tail != nil {
		d.tail.next = entry
		d.tail = entry
	} else {
		d.head = entry
		d.tail = entry
	}

	return rec
}

func (d *messageDeliveries) gc() {
	if d.head == nil {
		return
	}

	now := time.Now()
	for d.head != nil && now.After(d.head.expire) {
		delete(d.records, d.head.id)
		d.head = d.head.next
	}

	if d.head == nil {
		d.tail = nil
	}
vyzo's avatar
vyzo committed
668 669
}

Raúl Kripalani's avatar
Raúl Kripalani committed
670 671 672
// getTopicStats returns existing topic stats for a given a given (peer, topic)
// tuple, or initialises a new topicStats object and inserts it in the
// peerStats, iff the topic is scored.
vyzo's avatar
vyzo committed
673 674 675 676 677 678 679 680 681 682
func (pstats *peerStats) getTopicStats(topic string, params *PeerScoreParams) (*topicStats, bool) {
	tstats, ok := pstats.topics[topic]
	if ok {
		return tstats, true
	}

	_, scoredTopic := params.Topics[topic]
	if !scoredTopic {
		return nil, false
	}
vyzo's avatar
vyzo committed
683

vyzo's avatar
vyzo committed
684 685 686 687
	tstats = &topicStats{}
	pstats.topics[topic] = tstats

	return tstats, true
vyzo's avatar
vyzo committed
688 689
}

Raúl Kripalani's avatar
Raúl Kripalani committed
690 691
// markInvalidMessageDelivery increments the "invalid message deliveries"
// counter for all scored topics the message is published in.
vyzo's avatar
vyzo committed
692 693 694 695 696 697 698 699 700 701 702
func (ps *peerScore) markInvalidMessageDelivery(p peer.ID, msg *Message) {
	pstats, ok := ps.peerStats[p]
	if !ok {
		return
	}

	for _, topic := range msg.GetTopicIDs() {
		tstats, ok := pstats.getTopicStats(topic, ps.params)
		if !ok {
			continue
		}
vyzo's avatar
vyzo committed
703

vyzo's avatar
vyzo committed
704 705
		tstats.invalidMessageDeliveries += 1
	}
vyzo's avatar
vyzo committed
706 707
}

Raúl Kripalani's avatar
Raúl Kripalani committed
708 709 710
// markFirstMessageDelivery increments the "first message deliveries" counter
// for all scored topics the message is published in, as well as the "mesh
// message deliveries" counter, if the peer is in the mesh for the topic.
vyzo's avatar
vyzo committed
711 712 713 714 715 716 717 718 719 720 721 722
func (ps *peerScore) markFirstMessageDelivery(p peer.ID, msg *Message) {
	pstats, ok := ps.peerStats[p]
	if !ok {
		return
	}

	for _, topic := range msg.GetTopicIDs() {
		tstats, ok := pstats.getTopicStats(topic, ps.params)
		if !ok {
			continue
		}

vyzo's avatar
vyzo committed
723
		cap := ps.params.Topics[topic].FirstMessageDeliveriesCap
vyzo's avatar
vyzo committed
724
		tstats.firstMessageDeliveries += 1
vyzo's avatar
vyzo committed
725 726
		if tstats.firstMessageDeliveries > cap {
			tstats.firstMessageDeliveries = cap
vyzo's avatar
vyzo committed
727 728 729 730 731
		}

		if !tstats.inMesh {
			continue
		}
vyzo's avatar
vyzo committed
732

vyzo's avatar
vyzo committed
733
		cap = ps.params.Topics[topic].MeshMessageDeliveriesCap
vyzo's avatar
vyzo committed
734
		tstats.meshMessageDeliveries += 1
vyzo's avatar
vyzo committed
735 736
		if tstats.meshMessageDeliveries > cap {
			tstats.meshMessageDeliveries = cap
vyzo's avatar
vyzo committed
737 738
		}
	}
vyzo's avatar
vyzo committed
739 740
}

Raúl Kripalani's avatar
Raúl Kripalani committed
741 742 743
// markDuplicateMessageDelivery increments the "mesh message deliveries" counter
// for messages we've seen before, as long the message was received within the
// P3 window.
744
func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, validated time.Time) {
745 746
	var now time.Time

vyzo's avatar
vyzo committed
747 748 749 750 751
	pstats, ok := ps.peerStats[p]
	if !ok {
		return
	}

752
	if !validated.IsZero() {
753 754 755
		now = time.Now()
	}

vyzo's avatar
vyzo committed
756 757 758 759 760 761 762 763 764
	for _, topic := range msg.GetTopicIDs() {
		tstats, ok := pstats.getTopicStats(topic, ps.params)
		if !ok {
			continue
		}

		if !tstats.inMesh {
			continue
		}
vyzo's avatar
vyzo committed
765

Raúl Kripalani's avatar
Raúl Kripalani committed
766 767
		tparams := ps.params.Topics[topic]

768 769 770
		// check against the mesh delivery window -- if the validated time is passed as 0, then
		// the message was received before we finished validation and thus falls within the mesh
		// delivery window.
Raúl Kripalani's avatar
Raúl Kripalani committed
771
		if !validated.IsZero() && now.After(validated.Add(tparams.MeshMessageDeliveriesWindow)) {
vyzo's avatar
vyzo committed
772 773 774
			continue
		}

Raúl Kripalani's avatar
Raúl Kripalani committed
775
		cap := tparams.MeshMessageDeliveriesCap
vyzo's avatar
vyzo committed
776
		tstats.meshMessageDeliveries += 1
vyzo's avatar
vyzo committed
777 778
		if tstats.meshMessageDeliveries > cap {
			tstats.meshMessageDeliveries = cap
vyzo's avatar
vyzo committed
779 780
		}
	}
vyzo's avatar
vyzo committed
781
}
vyzo's avatar
vyzo committed
782

Raúl Kripalani's avatar
Raúl Kripalani committed
783
// getIPs gets the current IPs for a peer.
vyzo's avatar
vyzo committed
784
func (ps *peerScore) getIPs(p peer.ID) []string {
vyzo's avatar
vyzo committed
785 786 787 788 789 790
	// in unit tests this can be nil
	if ps.host == nil {
		return nil
	}

	conns := ps.host.Network().ConnsToPeer(p)
791
	res := make([]string, 0, 1)
vyzo's avatar
vyzo committed
792 793
	for _, c := range conns {
		remote := c.RemoteMultiaddr()
794 795 796 797
		ip, err := manet.ToIP(remote)
		if err != nil {
			continue
		}
vyzo's avatar
vyzo committed
798

799 800
		// ignore those; loopback is used for unit testing
		if ip.IsLoopback() {
801 802 803
			continue
		}

804
		if len(ip.To4()) == 4 {
805 806
			// IPv4 address
			ip4 := ip.String()
vyzo's avatar
vyzo committed
807
			res = append(res, ip4)
808
		} else {
809 810
			// IPv6 address -- we add both the actual address and the /64 subnet
			ip6 := ip.String()
vyzo's avatar
vyzo committed
811
			res = append(res, ip6)
812

813 814
			ip6mask := ip.Mask(net.CIDRMask(64, 128)).String()
			res = append(res, ip6mask)
vyzo's avatar
vyzo committed
815 816 817 818
		}
	}

	return res
vyzo's avatar
vyzo committed
819 820
}

Raúl Kripalani's avatar
Raúl Kripalani committed
821 822
// setIPs adds tracking for the new IPs in the list, and removes tracking from
// the obsolete IPs.
vyzo's avatar
vyzo committed
823
func (ps *peerScore) setIPs(p peer.ID, newips, oldips []string) {
vyzo's avatar
vyzo committed
824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849
addNewIPs:
	// add the new IPs to the tracking
	for _, ip := range newips {
		// check if it is in the old ips list
		for _, xip := range oldips {
			if ip == xip {
				continue addNewIPs
			}
		}
		// no, it's a new one -- add it to the tracker
		peers, ok := ps.peerIPs[ip]
		if !ok {
			peers = make(map[peer.ID]struct{})
			ps.peerIPs[ip] = peers
		}
		peers[p] = struct{}{}
	}

removeOldIPs:
	// remove the obsolete old IPs from the tracking
	for _, ip := range oldips {
		// check if it is in the new ips list
		for _, xip := range newips {
			if ip == xip {
				continue removeOldIPs
			}
vyzo's avatar
vyzo committed
850 851 852 853 854 855 856 857 858
		}
		// no, it's obsolete -- remove it from the tracker
		peers, ok := ps.peerIPs[ip]
		if !ok {
			continue
		}
		delete(peers, p)
		if len(peers) == 0 {
			delete(ps.peerIPs, ip)
vyzo's avatar
vyzo committed
859 860 861 862
		}
	}
}

Raúl Kripalani's avatar
Raúl Kripalani committed
863
// removeIPs removes an IP list from the tracking list for a peer.
vyzo's avatar
vyzo committed
864 865 866 867 868 869 870 871 872 873 874 875 876
func (ps *peerScore) removeIPs(p peer.ID, ips []string) {
	for _, ip := range ips {
		peers, ok := ps.peerIPs[ip]
		if !ok {
			continue
		}

		delete(peers, p)
		if len(peers) == 0 {
			delete(ps.peerIPs, ip)
		}
	}
}