Unverified Commit fef4be29 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #191 from ipfs/feat/tag-ewma

engine: tag peers based on usefulness
parents 5fa55e8a fcb13fc9
......@@ -57,11 +57,35 @@ const (
outboxChanBuffer = 0
// maxMessageSize is the maximum size of the batched payload
maxMessageSize = 512 * 1024
// tagPrefix is the tag given to peers associated an engine
tagPrefix = "bs-engine-%s"
// tagFormat is the tag given to peers associated an engine
tagFormat = "bs-engine-%s-%s"
// tagWeight is the default weight for peers associated with an engine
tagWeight = 5
// queuedTagWeight is the default weight for peers that have work queued
// on their behalf.
queuedTagWeight = 10
// the alpha for the EWMA used to track short term usefulness
shortTermAlpha = 0.5
// the alpha for the EWMA used to track long term usefulness
longTermAlpha = 0.05
// long term ratio defines what "long term" means in terms of the
// shortTerm duration. Peers that interact once every longTermRatio are
// considered useful over the long term.
longTermRatio = 10
// long/short term scores for tagging peers
longTermScore = 10 // this is a high tag but it grows _very_ slowly.
shortTermScore = 10 // this is a high tag but it'll go away quickly if we aren't using the peer.
)
var (
// how frequently the engine should sample usefulness. Peers that
// interact every shortTerm time period are considered "active".
//
// this is only a variable to make testing easier.
shortTerm = 10 * time.Second
)
// Envelope contains a message for a Peer.
......@@ -105,7 +129,8 @@ type Engine struct {
peerTagger PeerTagger
tag string
tagQueued, tagUseful string
lock sync.Mutex // protects the fields immediatly below
// ledgerMap lists Ledgers by their Partner key.
ledgerMap map[peer.ID]*ledger
......@@ -123,18 +148,118 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger)
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100),
}
e.tag = fmt.Sprintf(tagPrefix, uuid.New().String())
e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String())
e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String())
e.peerRequestQueue = peertaskqueue.New(peertaskqueue.OnPeerAddedHook(e.onPeerAdded), peertaskqueue.OnPeerRemovedHook(e.onPeerRemoved))
go e.taskWorker(ctx)
go e.scoreWorker(ctx)
return e
}
// scoreWorker keeps track of how "useful" our peers are, updating scores in the
// connection manager.
//
// It does this by tracking two scores: short-term usefulness and long-term
// usefulness. Short-term usefulness is sampled frequently and highly weights
// new observations. Long-term usefulness is sampled less frequently and highly
// weights on long-term trends.
//
// In practice, we do this by keeping two EWMAs. If we see an interaction
// within the sampling period, we record the score, otherwise, we record a 0.
// The short-term one has a high alpha and is sampled every shortTerm period.
// The long-term one has a low alpha and is sampled every
// longTermRatio*shortTerm period.
//
// To calculate the final score, we sum the short-term and long-term scores then
// adjust it ±25% based on our debt ratio. Peers that have historically been
// more useful to us than we are to them get the highest score.
func (e *Engine) scoreWorker(ctx context.Context) {
ticker := time.NewTicker(shortTerm)
defer ticker.Stop()
type update struct {
peer peer.ID
score int
}
var (
lastShortUpdate, lastLongUpdate time.Time
updates []update
)
for i := 0; ; i = (i + 1) % longTermRatio {
var now time.Time
select {
case now = <-ticker.C:
case <-ctx.Done():
return
}
// The long term update ticks every `longTermRatio` short
// intervals.
updateLong := i == 0
e.lock.Lock()
for _, ledger := range e.ledgerMap {
ledger.lk.Lock()
// Update the short-term score.
if ledger.lastExchange.After(lastShortUpdate) {
ledger.shortScore = ewma(ledger.shortScore, shortTermScore, shortTermAlpha)
} else {
ledger.shortScore = ewma(ledger.shortScore, 0, shortTermAlpha)
}
// Update the long-term score.
if updateLong {
if ledger.lastExchange.After(lastLongUpdate) {
ledger.longScore = ewma(ledger.longScore, longTermScore, longTermAlpha)
} else {
ledger.longScore = ewma(ledger.longScore, 0, longTermAlpha)
}
}
// Calculate the new score.
//
// The accounting score adjustment prefers peers _we_
// need over peers that need us. This doesn't help with
// leeching.
score := int((ledger.shortScore + ledger.longScore) * ((ledger.Accounting.Score())*.5 + .75))
// Avoid updating the connection manager unless there's a change. This can be expensive.
if ledger.score != score {
// put these in a list so we can perform the updates outside _global_ the lock.
updates = append(updates, update{ledger.Partner, score})
ledger.score = score
}
ledger.lk.Unlock()
}
e.lock.Unlock()
// record the times.
lastShortUpdate = now
if updateLong {
lastLongUpdate = now
}
// apply the updates
for _, update := range updates {
if update.score == 0 {
e.peerTagger.UntagPeer(update.peer, e.tagUseful)
} else {
e.peerTagger.TagPeer(update.peer, e.tagUseful, update.score)
}
}
// Keep the memory. It's not much and it saves us from having to allocate.
updates = updates[:0]
}
}
func (e *Engine) onPeerAdded(p peer.ID) {
e.peerTagger.TagPeer(p, e.tag, tagWeight)
e.peerTagger.TagPeer(p, e.tagQueued, queuedTagWeight)
}
func (e *Engine) onPeerRemoved(p peer.ID) {
e.peerTagger.UntagPeer(p, e.tag)
e.peerTagger.UntagPeer(p, e.tagQueued)
}
// WantlistForPeer returns the currently understood want list for a given peer
......
......@@ -19,38 +19,63 @@ import (
testutil "github.com/libp2p/go-libp2p-core/test"
)
type peerTag struct {
done chan struct{}
peers map[peer.ID]int
}
type fakePeerTagger struct {
lk sync.Mutex
wait sync.WaitGroup
taggedPeers []peer.ID
lk sync.Mutex
tags map[string]*peerTag
}
func (fpt *fakePeerTagger) TagPeer(p peer.ID, tag string, n int) {
fpt.wait.Add(1)
fpt.lk.Lock()
defer fpt.lk.Unlock()
fpt.taggedPeers = append(fpt.taggedPeers, p)
if fpt.tags == nil {
fpt.tags = make(map[string]*peerTag, 1)
}
pt, ok := fpt.tags[tag]
if !ok {
pt = &peerTag{peers: make(map[peer.ID]int, 1), done: make(chan struct{})}
fpt.tags[tag] = pt
}
pt.peers[p] = n
}
func (fpt *fakePeerTagger) UntagPeer(p peer.ID, tag string) {
defer fpt.wait.Done()
fpt.lk.Lock()
defer fpt.lk.Unlock()
for i := 0; i < len(fpt.taggedPeers); i++ {
if fpt.taggedPeers[i] == p {
fpt.taggedPeers[i] = fpt.taggedPeers[len(fpt.taggedPeers)-1]
fpt.taggedPeers = fpt.taggedPeers[:len(fpt.taggedPeers)-1]
return
}
pt := fpt.tags[tag]
if pt == nil {
return
}
delete(pt.peers, p)
if len(pt.peers) == 0 {
close(pt.done)
delete(fpt.tags, tag)
}
}
func (fpt *fakePeerTagger) count() int {
func (fpt *fakePeerTagger) count(tag string) int {
fpt.lk.Lock()
defer fpt.lk.Unlock()
return len(fpt.taggedPeers)
if pt, ok := fpt.tags[tag]; ok {
return len(pt.peers)
}
return 0
}
func (fpt *fakePeerTagger) wait(tag string) {
fpt.lk.Lock()
pt := fpt.tags[tag]
if pt == nil {
fpt.lk.Unlock()
return
}
doneCh := pt.done
fpt.lk.Unlock()
<-doneCh
}
type engineSet struct {
......@@ -241,16 +266,56 @@ func TestTaggingPeers(t *testing.T) {
next := <-sanfrancisco.Engine.Outbox()
envelope := <-next
if sanfrancisco.PeerTagger.count() != 1 {
if sanfrancisco.PeerTagger.count(sanfrancisco.Engine.tagQueued) != 1 {
t.Fatal("Incorrect number of peers tagged")
}
envelope.Sent()
<-sanfrancisco.Engine.Outbox()
sanfrancisco.PeerTagger.wait.Wait()
if sanfrancisco.PeerTagger.count() != 0 {
sanfrancisco.PeerTagger.wait(sanfrancisco.Engine.tagQueued)
if sanfrancisco.PeerTagger.count(sanfrancisco.Engine.tagQueued) != 0 {
t.Fatal("Peers should be untagged but weren't")
}
}
func TestTaggingUseful(t *testing.T) {
oldShortTerm := shortTerm
shortTerm = 1 * time.Millisecond
defer func() { shortTerm = oldShortTerm }()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
me := newEngine(ctx, "engine")
friend := peer.ID("friend")
block := blocks.NewBlock([]byte("foobar"))
msg := message.New(false)
msg.AddBlock(block)
for i := 0; i < 3; i++ {
if me.PeerTagger.count(me.Engine.tagUseful) != 0 {
t.Fatal("Peers should be untagged but weren't")
}
me.Engine.MessageSent(friend, msg)
time.Sleep(shortTerm * 2)
if me.PeerTagger.count(me.Engine.tagUseful) != 1 {
t.Fatal("Peers should be tagged but weren't")
}
time.Sleep(shortTerm * 8)
}
if me.PeerTagger.count(me.Engine.tagUseful) == 0 {
t.Fatal("peers should still be tagged due to long-term usefulness")
}
time.Sleep(shortTerm * 2)
if me.PeerTagger.count(me.Engine.tagUseful) == 0 {
t.Fatal("peers should still be tagged due to long-term usefulness")
}
time.Sleep(shortTerm * 10)
if me.PeerTagger.count(me.Engine.tagUseful) != 0 {
t.Fatal("peers should finally be untagged")
}
}
func partnerWants(e *Engine, keys []string, partner peer.ID) {
add := message.New(false)
for i, letter := range keys {
......
package decision
func ewma(old, new, alpha float64) float64 {
return new*alpha + (1-alpha)*old
}
......@@ -12,9 +12,8 @@ import (
func newLedger(p peer.ID) *ledger {
return &ledger{
wantList: wl.New(),
Partner: p,
sentToPeer: make(map[string]time.Time),
wantList: wl.New(),
Partner: p,
}
}
......@@ -30,16 +29,19 @@ type ledger struct {
// lastExchange is the time of the last data exchange.
lastExchange time.Time
// These scores keep track of how useful we think this peer is. Short
// tracks short-term usefulness and long tracks long-term usefulness.
shortScore, longScore float64
// Score keeps track of the score used in the peer tagger. We track it
// here to avoid unnecessarily updating the tags in the connection manager.
score int
// exchangeCount is the number of exchanges with this peer
exchangeCount uint64
// wantList is a (bounded, small) set of keys that Partner desires.
wantList *wl.Wantlist
// sentToPeer is a set of keys to ensure we dont send duplicate blocks
// to a given peer
sentToPeer map[string]time.Time
// ref is the reference count for this ledger, its used to ensure we
// don't drop the reference to this ledger in multi-connection scenarios
ref int
......@@ -63,10 +65,19 @@ type debtRatio struct {
BytesRecv uint64
}
// Value returns the debt ratio, sent:receive.
func (dr *debtRatio) Value() float64 {
return float64(dr.BytesSent) / float64(dr.BytesRecv+1)
}
// Score returns the debt _score_ on a 0-1 scale.
func (dr *debtRatio) Score() float64 {
if dr.BytesRecv == 0 {
return 0
}
return float64(dr.BytesRecv) / float64(dr.BytesRecv+dr.BytesSent)
}
func (l *ledger) SentBytes(n int) {
l.exchangeCount++
l.lastExchange = time.Now()
......
......@@ -7,7 +7,6 @@ require (
github.com/gogo/protobuf v1.2.1
github.com/golang/protobuf v1.3.1 // indirect
github.com/google/uuid v1.1.1
github.com/hashicorp/golang-lru v0.5.1
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-cid v0.0.2
github.com/ipfs/go-datastore v0.0.5
......@@ -38,3 +37,5 @@ require (
golang.org/x/text v0.3.2 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
)
go 1.12
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment