package decision import ( "sync" "time" peer "github.com/libp2p/go-libp2p-core/peer" ) const ( // the alpha for the EWMA used to track short term usefulness shortTermAlpha = 0.5 // the alpha for the EWMA used to track long term usefulness longTermAlpha = 0.05 // how frequently the engine should sample usefulness. Peers that // interact every shortTerm time period are considered "active". shortTerm = 10 * time.Second // long term ratio defines what "long term" means in terms of the // shortTerm duration. Peers that interact once every longTermRatio are // considered useful over the long term. longTermRatio = 10 // long/short term scores for tagging peers longTermScore = 10 // this is a high tag but it grows _very_ slowly. shortTermScore = 10 // this is a high tag but it'll go away quickly if we aren't using the peer. ) // Stores the data exchange relationship between two peers. type scoreledger struct { // Partner is the remote Peer. partner peer.ID // tracks bytes sent... bytesSent uint64 // ...and received. bytesRecv uint64 // lastExchange is the time of the last data exchange. lastExchange time.Time // These scores keep track of how useful we think this peer is. Short // tracks short-term usefulness and long tracks long-term usefulness. shortScore, longScore float64 // Score keeps track of the score used in the peer tagger. We track it // here to avoid unnecessarily updating the tags in the connection manager. score int // exchangeCount is the number of exchanges with this peer exchangeCount uint64 // the record lock lock sync.RWMutex } // Receipt is a summary of the ledger for a given peer // collecting various pieces of aggregated data for external // reporting purposes. type Receipt struct { Peer string Value float64 Sent uint64 Recv uint64 Exchanged uint64 } // Increments the sent counter. func (l *scoreledger) AddToSentBytes(n int) { l.lock.Lock() defer l.lock.Unlock() l.exchangeCount++ l.lastExchange = time.Now() l.bytesSent += uint64(n) } // Increments the received counter. func (l *scoreledger) AddToReceivedBytes(n int) { l.lock.Lock() defer l.lock.Unlock() l.exchangeCount++ l.lastExchange = time.Now() l.bytesRecv += uint64(n) } // Returns the Receipt for this ledger record. func (l *scoreledger) Receipt() *Receipt { l.lock.RLock() defer l.lock.RUnlock() return &Receipt{ Peer: l.partner.String(), Value: float64(l.bytesSent) / float64(l.bytesRecv+1), Sent: l.bytesSent, Recv: l.bytesRecv, Exchanged: l.exchangeCount, } } // DefaultScoreLedger is used by Engine as the default ScoreLedger. type DefaultScoreLedger struct { // the score func scorePeer ScorePeerFunc // is closed on Close closing chan struct{} // protects the fields immediatly below lock sync.RWMutex // ledgerMap lists score ledgers by their partner key. ledgerMap map[peer.ID]*scoreledger // how frequently the engine should sample peer usefulness peerSampleInterval time.Duration // used by the tests to detect when a sample is taken sampleCh chan struct{} } // scoreWorker keeps track of how "useful" our peers are, updating scores in the // connection manager. // // It does this by tracking two scores: short-term usefulness and long-term // usefulness. Short-term usefulness is sampled frequently and highly weights // new observations. Long-term usefulness is sampled less frequently and highly // weights on long-term trends. // // In practice, we do this by keeping two EWMAs. If we see an interaction // within the sampling period, we record the score, otherwise, we record a 0. // The short-term one has a high alpha and is sampled every shortTerm period. // The long-term one has a low alpha and is sampled every // longTermRatio*shortTerm period. // // To calculate the final score, we sum the short-term and long-term scores then // adjust it ±25% based on our debt ratio. Peers that have historically been // more useful to us than we are to them get the highest score. func (dsl *DefaultScoreLedger) scoreWorker() { ticker := time.NewTicker(dsl.peerSampleInterval) defer ticker.Stop() type update struct { peer peer.ID score int } var ( lastShortUpdate, lastLongUpdate time.Time updates []update ) for i := 0; ; i = (i + 1) % longTermRatio { var now time.Time select { case now = <-ticker.C: case <-dsl.closing: return } // The long term update ticks every `longTermRatio` short // intervals. updateLong := i == 0 dsl.lock.Lock() for _, l := range dsl.ledgerMap { l.lock.Lock() // Update the short-term score. if l.lastExchange.After(lastShortUpdate) { l.shortScore = ewma(l.shortScore, shortTermScore, shortTermAlpha) } else { l.shortScore = ewma(l.shortScore, 0, shortTermAlpha) } // Update the long-term score. if updateLong { if l.lastExchange.After(lastLongUpdate) { l.longScore = ewma(l.longScore, longTermScore, longTermAlpha) } else { l.longScore = ewma(l.longScore, 0, longTermAlpha) } } // Calculate the new score. // // The accounting score adjustment prefers peers _we_ // need over peers that need us. This doesn't help with // leeching. var lscore float64 if l.bytesRecv == 0 { lscore = 0 } else { lscore = float64(l.bytesRecv) / float64(l.bytesRecv+l.bytesSent) } score := int((l.shortScore + l.longScore) * (lscore*.5 + .75)) // Avoid updating the connection manager unless there's a change. This can be expensive. if l.score != score { // put these in a list so we can perform the updates outside _global_ the lock. updates = append(updates, update{l.partner, score}) l.score = score } l.lock.Unlock() } dsl.lock.Unlock() // record the times. lastShortUpdate = now if updateLong { lastLongUpdate = now } // apply the updates for _, update := range updates { dsl.scorePeer(update.peer, update.score) } // Keep the memory. It's not much and it saves us from having to allocate. updates = updates[:0] // Used by the tests if dsl.sampleCh != nil { dsl.sampleCh <- struct{}{} } } } // Returns the score ledger for the given peer or nil if that peer // is not on the ledger. func (dsl *DefaultScoreLedger) find(p peer.ID) *scoreledger { // Take a read lock (as it's less expensive) to check if we have // a ledger for the peer. dsl.lock.RLock() l, ok := dsl.ledgerMap[p] dsl.lock.RUnlock() if ok { return l } return nil } // Returns a new scoreledger. func newScoreLedger(p peer.ID) *scoreledger { return &scoreledger{ partner: p, } } // Lazily instantiates a ledger. func (dsl *DefaultScoreLedger) findOrCreate(p peer.ID) *scoreledger { l := dsl.find(p) if l != nil { return l } // There's no ledger, so take a write lock, then check again and // create the ledger if necessary. dsl.lock.Lock() defer dsl.lock.Unlock() l, ok := dsl.ledgerMap[p] if !ok { l = newScoreLedger(p) dsl.ledgerMap[p] = l } return l } // GetReceipt returns aggregated data communication with a given peer. func (dsl *DefaultScoreLedger) GetReceipt(p peer.ID) *Receipt { l := dsl.find(p) if l != nil { return l.Receipt() } // Return a blank receipt otherwise. return &Receipt{ Peer: p.String(), Value: 0, Sent: 0, Recv: 0, Exchanged: 0, } } // Starts the default ledger sampling process. func (dsl *DefaultScoreLedger) Start(scorePeer ScorePeerFunc) { dsl.init(scorePeer) go dsl.scoreWorker() } // Stops the sampling process. func (dsl *DefaultScoreLedger) Stop() { close(dsl.closing) } // Initializes the score ledger. func (dsl *DefaultScoreLedger) init(scorePeer ScorePeerFunc) { dsl.lock.Lock() defer dsl.lock.Unlock() dsl.scorePeer = scorePeer } // Increments the sent counter for the given peer. func (dsl *DefaultScoreLedger) AddToSentBytes(p peer.ID, n int) { l := dsl.findOrCreate(p) l.AddToSentBytes(n) } // Increments the received counter for the given peer. func (dsl *DefaultScoreLedger) AddToReceivedBytes(p peer.ID, n int) { l := dsl.findOrCreate(p) l.AddToReceivedBytes(n) } // PeerConnected should be called when a new peer connects, meaning // we should open accounting. func (dsl *DefaultScoreLedger) PeerConnected(p peer.ID) { dsl.lock.Lock() defer dsl.lock.Unlock() _, ok := dsl.ledgerMap[p] if !ok { dsl.ledgerMap[p] = newScoreLedger(p) } } // PeerDisconnected should be called when a peer disconnects to // clean up the accounting. func (dsl *DefaultScoreLedger) PeerDisconnected(p peer.ID) { dsl.lock.Lock() defer dsl.lock.Unlock() delete(dsl.ledgerMap, p) } // Creates a new instance of the default score ledger. func NewDefaultScoreLedger() *DefaultScoreLedger { return &DefaultScoreLedger{ ledgerMap: make(map[peer.ID]*scoreledger), closing: make(chan struct{}), peerSampleInterval: shortTerm, } } // Creates a new instance of the default score ledger with testing // parameters. func NewTestScoreLedger(peerSampleInterval time.Duration, sampleCh chan struct{}) *DefaultScoreLedger { dsl := NewDefaultScoreLedger() dsl.peerSampleInterval = peerSampleInterval dsl.sampleCh = sampleCh return dsl }