scoreledger.go 9.03 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
package decision

import (
	"sync"
	"time"

	peer "github.com/libp2p/go-libp2p-core/peer"
)

const (
	// the alpha for the EWMA used to track short term usefulness
	shortTermAlpha = 0.5

	// the alpha for the EWMA used to track long term usefulness
	longTermAlpha = 0.05

	// how frequently the engine should sample usefulness. Peers that
	// interact every shortTerm time period are considered "active".
	shortTerm = 10 * time.Second

	// long term ratio defines what "long term" means in terms of the
	// shortTerm duration. Peers that interact once every longTermRatio are
	// considered useful over the long term.
	longTermRatio = 10

	// long/short term scores for tagging peers
	longTermScore  = 10 // this is a high tag but it grows _very_ slowly.
	shortTermScore = 10 // this is a high tag but it'll go away quickly if we aren't using the peer.
)

// Stores the data exchange relationship between two peers.
type scoreledger struct {
	// Partner is the remote Peer.
	partner peer.ID

	// tracks bytes sent...
	bytesSent uint64

	// ...and received.
	bytesRecv uint64

	// lastExchange is the time of the last data exchange.
	lastExchange time.Time

	// These scores keep track of how useful we think this peer is. Short
	// tracks short-term usefulness and long tracks long-term usefulness.
	shortScore, longScore float64

	// Score keeps track of the score used in the peer tagger. We track it
	// here to avoid unnecessarily updating the tags in the connection manager.
	score int

	// exchangeCount is the number of exchanges with this peer
	exchangeCount uint64

	// the record lock
	lock sync.RWMutex
}

// Receipt is a summary of the ledger for a given peer
// collecting various pieces of aggregated data for external
// reporting purposes.
type Receipt struct {
	Peer      string
	Value     float64
	Sent      uint64
	Recv      uint64
	Exchanged uint64
}

// Increments the sent counter.
func (l *scoreledger) AddToSentBytes(n int) {
	l.lock.Lock()
	defer l.lock.Unlock()
	l.exchangeCount++
	l.lastExchange = time.Now()
	l.bytesSent += uint64(n)
}

// Increments the received counter.
func (l *scoreledger) AddToReceivedBytes(n int) {
	l.lock.Lock()
	defer l.lock.Unlock()
	l.exchangeCount++
	l.lastExchange = time.Now()
	l.bytesRecv += uint64(n)
}

// Returns the Receipt for this ledger record.
func (l *scoreledger) Receipt() *Receipt {
	l.lock.RLock()
	defer l.lock.RUnlock()

	return &Receipt{
		Peer:      l.partner.String(),
		Value:     float64(l.bytesSent) / float64(l.bytesRecv+1),
		Sent:      l.bytesSent,
		Recv:      l.bytesRecv,
		Exchanged: l.exchangeCount,
	}
}

// DefaultScoreLedger is used by Engine as the default ScoreLedger.
type DefaultScoreLedger struct {
	// the score func
	scorePeer ScorePeerFunc
	// is closed on Close
	closing chan struct{}
	// protects the fields immediatly below
	lock sync.RWMutex
	// ledgerMap lists score ledgers by their partner key.
	ledgerMap map[peer.ID]*scoreledger
	// how frequently the engine should sample peer usefulness
	peerSampleInterval time.Duration
	// used by the tests to detect when a sample is taken
	sampleCh chan struct{}
}

// scoreWorker keeps track of how "useful" our peers are, updating scores in the
// connection manager.
//
// It does this by tracking two scores: short-term usefulness and long-term
// usefulness. Short-term usefulness is sampled frequently and highly weights
// new observations. Long-term usefulness is sampled less frequently and highly
// weights on long-term trends.
//
// In practice, we do this by keeping two EWMAs. If we see an interaction
// within the sampling period, we record the score, otherwise, we record a 0.
// The short-term one has a high alpha and is sampled every shortTerm period.
// The long-term one has a low alpha and is sampled every
// longTermRatio*shortTerm period.
//
// To calculate the final score, we sum the short-term and long-term scores then
// adjust it ±25% based on our debt ratio. Peers that have historically been
// more useful to us than we are to them get the highest score.
func (dsl *DefaultScoreLedger) scoreWorker() {
	ticker := time.NewTicker(dsl.peerSampleInterval)
	defer ticker.Stop()

	type update struct {
		peer  peer.ID
		score int
	}
	var (
		lastShortUpdate, lastLongUpdate time.Time
		updates                         []update
	)

	for i := 0; ; i = (i + 1) % longTermRatio {
		var now time.Time
		select {
		case now = <-ticker.C:
		case <-dsl.closing:
			return
		}

		// The long term update ticks every `longTermRatio` short
		// intervals.
		updateLong := i == 0

		dsl.lock.Lock()
		for _, l := range dsl.ledgerMap {
			l.lock.Lock()

			// Update the short-term score.
			if l.lastExchange.After(lastShortUpdate) {
				l.shortScore = ewma(l.shortScore, shortTermScore, shortTermAlpha)
			} else {
				l.shortScore = ewma(l.shortScore, 0, shortTermAlpha)
			}

			// Update the long-term score.
			if updateLong {
				if l.lastExchange.After(lastLongUpdate) {
					l.longScore = ewma(l.longScore, longTermScore, longTermAlpha)
				} else {
					l.longScore = ewma(l.longScore, 0, longTermAlpha)
				}
			}

			// Calculate the new score.
			//
			// The accounting score adjustment prefers peers _we_
			// need over peers that need us. This doesn't help with
			// leeching.
			var lscore float64
			if l.bytesRecv == 0 {
				lscore = 0
			} else {
				lscore = float64(l.bytesRecv) / float64(l.bytesRecv+l.bytesSent)
			}
			score := int((l.shortScore + l.longScore) * (lscore*.5 + .75))

			// Avoid updating the connection manager unless there's a change. This can be expensive.
			if l.score != score {
				// put these in a list so we can perform the updates outside _global_ the lock.
				updates = append(updates, update{l.partner, score})
				l.score = score
			}
			l.lock.Unlock()
		}
		dsl.lock.Unlock()

		// record the times.
		lastShortUpdate = now
		if updateLong {
			lastLongUpdate = now
		}

		// apply the updates
		for _, update := range updates {
			dsl.scorePeer(update.peer, update.score)
		}
		// Keep the memory. It's not much and it saves us from having to allocate.
		updates = updates[:0]

		// Used by the tests
		if dsl.sampleCh != nil {
			dsl.sampleCh <- struct{}{}
		}
	}
}

// Returns the score ledger for the given peer or nil if that peer
// is not on the ledger.
func (dsl *DefaultScoreLedger) find(p peer.ID) *scoreledger {
	// Take a read lock (as it's less expensive) to check if we have
	// a ledger for the peer.
	dsl.lock.RLock()
	l, ok := dsl.ledgerMap[p]
	dsl.lock.RUnlock()
	if ok {
		return l
	}
	return nil
}

// Returns a new scoreledger.
func newScoreLedger(p peer.ID) *scoreledger {
	return &scoreledger{
		partner: p,
	}
}

// Lazily instantiates a ledger.
func (dsl *DefaultScoreLedger) findOrCreate(p peer.ID) *scoreledger {
	l := dsl.find(p)
	if l != nil {
		return l
	}

	// There's no ledger, so take a write lock, then check again and
	// create the ledger if necessary.
	dsl.lock.Lock()
	defer dsl.lock.Unlock()
	l, ok := dsl.ledgerMap[p]
	if !ok {
		l = newScoreLedger(p)
		dsl.ledgerMap[p] = l
	}
	return l
}

// GetReceipt returns aggregated data communication with a given peer.
func (dsl *DefaultScoreLedger) GetReceipt(p peer.ID) *Receipt {
	l := dsl.find(p)
	if l != nil {
		return l.Receipt()
	}

	// Return a blank receipt otherwise.
	return &Receipt{
		Peer:      p.String(),
		Value:     0,
		Sent:      0,
		Recv:      0,
		Exchanged: 0,
	}
}

// Starts the default ledger sampling process.
func (dsl *DefaultScoreLedger) Start(scorePeer ScorePeerFunc) {
	dsl.init(scorePeer)
	go dsl.scoreWorker()
}

// Stops the sampling process.
func (dsl *DefaultScoreLedger) Stop() {
	close(dsl.closing)
}

// Initializes the score ledger.
func (dsl *DefaultScoreLedger) init(scorePeer ScorePeerFunc) {
	dsl.lock.Lock()
	defer dsl.lock.Unlock()
	dsl.scorePeer = scorePeer
}

// Increments the sent counter for the given peer.
func (dsl *DefaultScoreLedger) AddToSentBytes(p peer.ID, n int) {
	l := dsl.findOrCreate(p)
	l.AddToSentBytes(n)
}

// Increments the received counter for the given peer.
func (dsl *DefaultScoreLedger) AddToReceivedBytes(p peer.ID, n int) {
	l := dsl.findOrCreate(p)
	l.AddToReceivedBytes(n)
}

// PeerConnected should be called when a new peer connects, meaning
// we should open accounting.
func (dsl *DefaultScoreLedger) PeerConnected(p peer.ID) {
	dsl.lock.Lock()
	defer dsl.lock.Unlock()
	_, ok := dsl.ledgerMap[p]
	if !ok {
		dsl.ledgerMap[p] = newScoreLedger(p)
	}
}

// PeerDisconnected should be called when a peer disconnects to
// clean up the accounting.
func (dsl *DefaultScoreLedger) PeerDisconnected(p peer.ID) {
	dsl.lock.Lock()
	defer dsl.lock.Unlock()
	delete(dsl.ledgerMap, p)
}

// Creates a new instance of the default score ledger.
func NewDefaultScoreLedger() *DefaultScoreLedger {
	return &DefaultScoreLedger{
		ledgerMap:          make(map[peer.ID]*scoreledger),
		closing:            make(chan struct{}),
		peerSampleInterval: shortTerm,
	}
}

// Creates a new instance of the default score ledger with testing
// parameters.
func NewTestScoreLedger(peerSampleInterval time.Duration, sampleCh chan struct{}) *DefaultScoreLedger {
	dsl := NewDefaultScoreLedger()
	dsl.peerSampleInterval = peerSampleInterval
	dsl.sampleCh = sampleCh
	return dsl
}