Commit 9d580a65 authored by Steven Allen's avatar Steven Allen

engine: tag peers based on usefulness

This patch tracks two usefulness metrics: short-term usefulness and long-term
usefulness. Short-term usefulness is sampled frequently and highly weights new
observations. Long-term usefulness is sampled less frequently and highly weights
on long-term trends.

In practice, we do this by keeping two EWMAs. If we see an interaction within
the sampling period, we record the score, otherwise, we record a 0. The
short-term one has a high alpha and is sampled every shortTerm period. The
long-term one has a low alpha and is sampled every longTermRatio*shortTerm
period.

To calculate the final score, we sum the short-term and long-term scores then
adjust it ±25% based on our debt ratio. Peers that have historically been more
useful to us than we are to them get the highest score.
parent 565aa560
......@@ -57,11 +57,35 @@ const (
outboxChanBuffer = 0
// maxMessageSize is the maximum size of the batched payload
maxMessageSize = 512 * 1024
// tagPrefix is the tag given to peers associated an engine
tagPrefix = "bs-engine-%s"
// tagFormat is the tag given to peers associated an engine
tagFormat = "bs-engine-%s-%s"
// tagWeight is the default weight for peers associated with an engine
tagWeight = 5
// queuedTagWeight is the default weight for peers that have work queued
// on their behalf.
queuedTagWeight = 10
// the alpha for the EWMA used to track short term usefulness
shortTermAlpha = 0.5
// the alpha for the EWMA used to track long term usefulness
longTermAlpha = 0.05
// long term ratio defines what "long term" means in terms of the
// shortTerm duration. Peers that interact once every longTermRatio are
// considered useful over the long term.
longTermRatio = 10
// long/short term scores for tagging peers
longTermScore = 10 // this is a high tag but it grows _very_ slowly.
shortTermScore = 10 // this is a high tag but it'll go away quickly if we aren't using the peer.
)
var (
// how frequently the engine should sample usefulness. Peers that
// interact every shortTerm time period are considered "active".
//
// this is only a variable to make testing easier.
shortTerm = 10 * time.Second
)
// Envelope contains a message for a Peer.
......@@ -105,7 +129,8 @@ type Engine struct {
peerTagger PeerTagger
tag string
tagQueued, tagUseful string
lock sync.Mutex // protects the fields immediatly below
// ledgerMap lists Ledgers by their Partner key.
ledgerMap map[peer.ID]*ledger
......@@ -123,18 +148,113 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger)
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100),
}
e.tag = fmt.Sprintf(tagPrefix, uuid.New().String())
e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String())
e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String())
e.peerRequestQueue = peertaskqueue.New(peertaskqueue.OnPeerAddedHook(e.onPeerAdded), peertaskqueue.OnPeerRemovedHook(e.onPeerRemoved))
go e.taskWorker(ctx)
go e.scoreWorker(ctx)
return e
}
// scoreWorker keeps track of how "useful" our peers are, updating scores in the
// connection manager.
//
// It does this by tracking two scores: short-term usefulness and long-term
// usefulness. Short-term usefulness is sampled frequently and highly weights
// new observations. Long-term usefulness is sampled less frequently and highly
// weights on long-term trends.
//
// In practice, we do this by keeping two EWMAs. If we see an interaction
// within the sampling period, we record the score, otherwise, we record a 0.
// The short-term one has a high alpha and is sampled every shortTerm period.
// The long-term one has a low alpha and is sampled every
// longTermRatio*shortTerm period.
//
// To calculate the final score, we sum the short-term and long-term scores then
// adjust it ±25% based on our debt ratio. Peers that have historically been more useful to us than we are to them get the highest score.
func (e *Engine) scoreWorker(ctx context.Context) {
ticker := time.NewTicker(shortTerm)
defer ticker.Stop()
type update struct {
peer peer.ID
score int
}
var (
lastShortUpdate, lastLongUpdate time.Time
updates []update
)
for i := 0; ; i = (i + 1) % longTermRatio {
var now time.Time
select {
case now = <-ticker.C:
case <-ctx.Done():
return
}
// The long term update ticks every `longTermRatio` short
// intervals.
updateLong := i == 0
e.lock.Lock()
for _, ledger := range e.ledgerMap {
ledger.lk.Lock()
// Update the short-term score.
if ledger.lastExchange.After(lastShortUpdate) {
ledger.shortScore = ewma(ledger.shortScore, shortTermScore, shortTermAlpha)
} else {
ledger.shortScore = ewma(ledger.shortScore, 0, shortTermAlpha)
}
// Update the long-term score.
if updateLong {
if ledger.lastExchange.After(lastLongUpdate) {
ledger.longScore = ewma(ledger.longScore, longTermScore, longTermAlpha)
} else {
ledger.longScore = ewma(ledger.longScore, 0, longTermAlpha)
}
}
// Calculate the new score.
score := int((ledger.shortScore + ledger.longScore) * ((ledger.Accounting.Score())*.5 + .75))
// Avoid updating the connection manager unless there's a change. This can be expensive.
if ledger.score != score {
// put these in a list so we can perform the updates outside _global_ the lock.
updates = append(updates, update{ledger.Partner, score})
ledger.score = score
}
ledger.lk.Unlock()
}
e.lock.Unlock()
// record the times.
lastShortUpdate = now
if updateLong {
lastLongUpdate = now
}
// apply the updates
for _, update := range updates {
if update.score == 0 {
e.peerTagger.UntagPeer(update.peer, e.tagUseful)
} else {
e.peerTagger.TagPeer(update.peer, e.tagUseful, update.score)
}
}
// Keep the memory. It's not much and it saves us from having to allocate.
updates = updates[:0]
}
}
func (e *Engine) onPeerAdded(p peer.ID) {
e.peerTagger.TagPeer(p, e.tag, tagWeight)
e.peerTagger.TagPeer(p, e.tagQueued, queuedTagWeight)
}
func (e *Engine) onPeerRemoved(p peer.ID) {
e.peerTagger.UntagPeer(p, e.tag)
e.peerTagger.UntagPeer(p, e.tagQueued)
}
// WantlistForPeer returns the currently understood want list for a given peer
......
package decision
func ewma(old, new, alpha float64) float64 {
return new*alpha + (1-alpha)*old
}
......@@ -12,9 +12,8 @@ import (
func newLedger(p peer.ID) *ledger {
return &ledger{
wantList: wl.New(),
Partner: p,
sentToPeer: make(map[string]time.Time),
wantList: wl.New(),
Partner: p,
}
}
......@@ -30,16 +29,19 @@ type ledger struct {
// lastExchange is the time of the last data exchange.
lastExchange time.Time
// These scores keep track of how useful we think this peer is. Short
// tracks short-term usefulness and long tracks long-term usefulness.
shortScore, longScore float64
// Score keeps track of the score used in the peer tagger. We track it
// here to avoid unnecessarily updating the tags in the connection manager.
score int
// exchangeCount is the number of exchanges with this peer
exchangeCount uint64
// wantList is a (bounded, small) set of keys that Partner desires.
wantList *wl.Wantlist
// sentToPeer is a set of keys to ensure we dont send duplicate blocks
// to a given peer
sentToPeer map[string]time.Time
// ref is the reference count for this ledger, its used to ensure we
// don't drop the reference to this ledger in multi-connection scenarios
ref int
......@@ -63,10 +65,19 @@ type debtRatio struct {
BytesRecv uint64
}
// Value returns the debt ratio, sent:receive.
func (dr *debtRatio) Value() float64 {
return float64(dr.BytesSent) / float64(dr.BytesRecv+1)
}
// Score returns the debt _score_ on a 0-1 scale.
func (dr *debtRatio) Score() float64 {
if dr.BytesRecv == 0 {
return 0
}
return float64(dr.BytesRecv) / float64(dr.BytesRecv+dr.BytesSent)
}
func (l *ledger) SentBytes(n int) {
l.exchangeCount++
l.lastExchange = time.Now()
......
......@@ -7,7 +7,6 @@ require (
github.com/gogo/protobuf v1.2.1
github.com/golang/protobuf v1.3.1 // indirect
github.com/google/uuid v1.1.1
github.com/hashicorp/golang-lru v0.5.1
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-cid v0.0.2
github.com/ipfs/go-datastore v0.0.5
......@@ -38,3 +37,5 @@ require (
golang.org/x/text v0.3.2 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
)
go 1.12
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment