Unverified Commit fd213932 authored by Paul Wolneykien's avatar Paul Wolneykien Committed by GitHub

Added `WithScoreLedger` Bitswap option (#430)

* Separate decision engine ledger on two parts: score and the wantlist

This is the first step to make external decision logic (tagging
peers with score values) possible.

The wantlist still resides in the original `ledger` struct while
sent/received byte accounting and scores are extracted to the new
`scoreledger` struct managed by the original `scoreWorker()` logic.
The accounting is integrated into the `Engine` via `ScoreLedger`
interface making it possible to replace the original `scoreWorker()`
with some other logic. The interface, however, doesn't allow a
score logic to directly touch peer tags: the logic may decide about
score values while tagging itself is still under control of Engine.

Note: with this commit it's yet not possible to replace the original
score logic because there is no public methods for that.

* Added "WithScoreLedger" Bitswap option

New `WithScoreLedger(decision.ScoreLedger)` option in the `bitswap`
package is the way to connect a custom `ScoreLedger` implementation
to the decision engine. The `Engine` now has the corresponding
`UseScoreLedger(ScoreLedger)` method.

The `ScoreLedger` and `ScorePeerFunc` types are exposed from the
internal `decision` package to the public one.

Because its options are processed by the `Bitswap` after construction
of its parts but before starting of the engine, the default
`scoreLedger` initialization is moved from `newEngine()` to
`StartWorkers()`.

New `TestWithScoreLedger` test is added. The test checks for start and
stop of the testing score ledger implementation that is specified via
`WithScoreLedger` option.

* Combine score ledger start with initialization of the score function

Having a separate `Init(ScoreFunc)` method seems redundant (thx
@dirkmc for pointing about that). As a bonus, the two-step ledger
starting process is now enclosed in the `startScoreLedger()` function.

* Let's call Stop() to stop a ScoreLedger

The `Close()` method was there to stop the ledger. Let call it
`Stop()` now.

* Get return of the blank Receipt out of conditional block

Explicitly form it as the final resort.
Co-authored-by: default avatarPaul Wolneykien <manowar@altlinux.org>
parent 72d351cb
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
delay "github.com/ipfs/go-ipfs-delay" delay "github.com/ipfs/go-ipfs-delay"
deciface "github.com/ipfs/go-bitswap/decision"
bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager" bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
decision "github.com/ipfs/go-bitswap/internal/decision" decision "github.com/ipfs/go-bitswap/internal/decision"
bsgetter "github.com/ipfs/go-bitswap/internal/getter" bsgetter "github.com/ipfs/go-bitswap/internal/getter"
...@@ -95,6 +96,13 @@ func SetSendDontHaves(send bool) Option { ...@@ -95,6 +96,13 @@ func SetSendDontHaves(send bool) Option {
} }
} }
// Configures the engine to use the given score decision logic.
func WithScoreLedger(scoreLedger deciface.ScoreLedger) Option {
return func(bs *Bitswap) {
bs.engine.UseScoreLedger(scoreLedger)
}
}
// New initializes a BitSwap instance that communicates over the provided // New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network // BitSwapNetwork. This function registers the returned instance as the network
// delegate. Runs until context is cancelled or bitswap.Close is called. // delegate. Runs until context is cancelled or bitswap.Close is called.
......
...@@ -9,11 +9,12 @@ import ( ...@@ -9,11 +9,12 @@ import (
"time" "time"
bitswap "github.com/ipfs/go-bitswap" bitswap "github.com/ipfs/go-bitswap"
deciface "github.com/ipfs/go-bitswap/decision"
decision "github.com/ipfs/go-bitswap/internal/decision" decision "github.com/ipfs/go-bitswap/internal/decision"
bssession "github.com/ipfs/go-bitswap/internal/session" bssession "github.com/ipfs/go-bitswap/internal/session"
"github.com/ipfs/go-bitswap/message"
testinstance "github.com/ipfs/go-bitswap/testinstance" testinstance "github.com/ipfs/go-bitswap/testinstance"
tn "github.com/ipfs/go-bitswap/testnet" tn "github.com/ipfs/go-bitswap/testnet"
"github.com/ipfs/go-bitswap/message"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
detectrace "github.com/ipfs/go-detect-race" detectrace "github.com/ipfs/go-detect-race"
...@@ -803,3 +804,59 @@ func TestBitswapLedgerTwoWay(t *testing.T) { ...@@ -803,3 +804,59 @@ func TestBitswapLedgerTwoWay(t *testing.T) {
} }
} }
} }
type testingScoreLedger struct {
scorePeer deciface.ScorePeerFunc
started chan struct{}
closed chan struct{}
}
func newTestingScoreLedger() *testingScoreLedger {
return &testingScoreLedger{
nil,
make(chan struct{}),
make(chan struct{}),
}
}
func (tsl *testingScoreLedger) GetReceipt(p peer.ID) *deciface.Receipt {
return nil
}
func (tsl *testingScoreLedger) AddToSentBytes(p peer.ID, n int) {}
func (tsl *testingScoreLedger) AddToReceivedBytes(p peer.ID, n int) {}
func (tsl *testingScoreLedger) PeerConnected(p peer.ID) {}
func (tsl *testingScoreLedger) PeerDisconnected(p peer.ID) {}
func (tsl *testingScoreLedger) Start(scorePeer deciface.ScorePeerFunc) {
tsl.scorePeer = scorePeer
close(tsl.started)
}
func (tsl *testingScoreLedger) Stop() {
close(tsl.closed)
}
// Tests start and stop of a custom decision logic
func TestWithScoreLedger(t *testing.T) {
tsl := newTestingScoreLedger()
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
bsOpts := []bitswap.Option{bitswap.WithScoreLedger(tsl)}
ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
defer ig.Close()
i := ig.Next()
defer i.Exchange.Close()
select {
case <-tsl.started:
if tsl.scorePeer == nil {
t.Fatal("Expected the score function to be initialized")
}
case <-time.After(time.Second * 5):
t.Fatal("Expected the score ledger to be started within 5s")
}
i.Exchange.Close()
select {
case <-tsl.closed:
case <-time.After(time.Second * 5):
t.Fatal("Expected the score ledger to be closed within 5s")
}
}
...@@ -2,5 +2,11 @@ package decision ...@@ -2,5 +2,11 @@ package decision
import intdec "github.com/ipfs/go-bitswap/internal/decision" import intdec "github.com/ipfs/go-bitswap/internal/decision"
// Expose type externally // Expose Receipt externally
type Receipt = intdec.Receipt type Receipt = intdec.Receipt
// Expose ScoreLedger externally
type ScoreLedger = intdec.ScoreLedger
// Expose ScorePeerFunc externally
type ScorePeerFunc = intdec.ScorePeerFunc
...@@ -70,25 +70,6 @@ const ( ...@@ -70,25 +70,6 @@ const (
// on their behalf. // on their behalf.
queuedTagWeight = 10 queuedTagWeight = 10
// the alpha for the EWMA used to track short term usefulness
shortTermAlpha = 0.5
// the alpha for the EWMA used to track long term usefulness
longTermAlpha = 0.05
// how frequently the engine should sample usefulness. Peers that
// interact every shortTerm time period are considered "active".
shortTerm = 10 * time.Second
// long term ratio defines what "long term" means in terms of the
// shortTerm duration. Peers that interact once every longTermRatio are
// considered useful over the long term.
longTermRatio = 10
// long/short term scores for tagging peers
longTermScore = 10 // this is a high tag but it grows _very_ slowly.
shortTermScore = 10 // this is a high tag but it'll go away quickly if we aren't using the peer.
// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in // maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
// bytes up to which we will replace a want-have with a want-block // bytes up to which we will replace a want-have with a want-block
maxBlockSizeReplaceHasWithBlock = 1024 maxBlockSizeReplaceHasWithBlock = 1024
...@@ -119,6 +100,29 @@ type PeerTagger interface { ...@@ -119,6 +100,29 @@ type PeerTagger interface {
UntagPeer(p peer.ID, tag string) UntagPeer(p peer.ID, tag string)
} }
// Assigns a specific score to a peer
type ScorePeerFunc func(peer.ID, int)
// ScoreLedger is an external ledger dealing with peer scores.
type ScoreLedger interface {
// Returns aggregated data communication with a given peer.
GetReceipt(p peer.ID) *Receipt
// Increments the sent counter for the given peer.
AddToSentBytes(p peer.ID, n int)
// Increments the received counter for the given peer.
AddToReceivedBytes(p peer.ID, n int)
// PeerConnected should be called when a new peer connects,
// meaning the ledger should open accounting.
PeerConnected(p peer.ID)
// PeerDisconnected should be called when a peer disconnects to
// clean up the accounting.
PeerDisconnected(p peer.ID)
// Starts the ledger sampling process.
Start(scorePeer ScorePeerFunc)
// Stops the sampling process.
Stop()
}
// Engine manages sending requested blocks to peers. // Engine manages sending requested blocks to peers.
type Engine struct { type Engine struct {
// peerRequestQueue is a priority queue of requests received from peers. // peerRequestQueue is a priority queue of requests received from peers.
...@@ -145,9 +149,12 @@ type Engine struct { ...@@ -145,9 +149,12 @@ type Engine struct {
lock sync.RWMutex // protects the fields immediatly below lock sync.RWMutex // protects the fields immediatly below
// ledgerMap lists Ledgers by their Partner key. // ledgerMap lists block-related Ledgers by their Partner key.
ledgerMap map[peer.ID]*ledger ledgerMap map[peer.ID]*ledger
// an external ledger dealing with peer scores
scoreLedger ScoreLedger
ticker *time.Ticker ticker *time.Ticker
taskWorkerLock sync.Mutex taskWorkerLock sync.Mutex
...@@ -157,11 +164,6 @@ type Engine struct { ...@@ -157,11 +164,6 @@ type Engine struct {
// bytes up to which we will replace a want-have with a want-block // bytes up to which we will replace a want-have with a want-block
maxBlockSizeReplaceHasWithBlock int maxBlockSizeReplaceHasWithBlock int
// how frequently the engine should sample peer usefulness
peerSampleInterval time.Duration
// used by the tests to detect when a sample is taken
sampleCh chan struct{}
sendDontHaves bool sendDontHaves bool
self peer.ID self peer.ID
...@@ -169,23 +171,22 @@ type Engine struct { ...@@ -169,23 +171,22 @@ type Engine struct {
// NewEngine creates a new block sending engine for the given block store // NewEngine creates a new block sending engine for the given block store
func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID) *Engine { func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID) *Engine {
return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, shortTerm, nil) return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, nil)
} }
// This constructor is used by the tests // This constructor is used by the tests
func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID, func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID,
maxReplaceSize int, peerSampleInterval time.Duration, sampleCh chan struct{}) *Engine { maxReplaceSize int, scoreLedger ScoreLedger) *Engine {
e := &Engine{ e := &Engine{
ledgerMap: make(map[peer.ID]*ledger), ledgerMap: make(map[peer.ID]*ledger),
scoreLedger: scoreLedger,
bsm: newBlockstoreManager(ctx, bs, blockstoreWorkerCount), bsm: newBlockstoreManager(ctx, bs, blockstoreWorkerCount),
peerTagger: peerTagger, peerTagger: peerTagger,
outbox: make(chan (<-chan *Envelope), outboxChanBuffer), outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}, 1), workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100), ticker: time.NewTicker(time.Millisecond * 100),
maxBlockSizeReplaceHasWithBlock: maxReplaceSize, maxBlockSizeReplaceHasWithBlock: maxReplaceSize,
peerSampleInterval: peerSampleInterval,
sampleCh: sampleCh,
taskWorkerCount: taskWorkerCount, taskWorkerCount: taskWorkerCount,
sendDontHaves: true, sendDontHaves: true,
self: self, self: self,
...@@ -210,11 +211,37 @@ func (e *Engine) SetSendDontHaves(send bool) { ...@@ -210,11 +211,37 @@ func (e *Engine) SetSendDontHaves(send bool) {
e.sendDontHaves = send e.sendDontHaves = send
} }
// Sets the scoreLedger to the given implementation. Should be called
// before StartWorkers().
func (e *Engine) UseScoreLedger(scoreLedger ScoreLedger) {
e.scoreLedger = scoreLedger
}
// Starts the score ledger. Before start the function checks and,
// if it is unset, initializes the scoreLedger with the default
// implementation.
func (e *Engine) startScoreLedger(px process.Process) {
if e.scoreLedger == nil {
e.scoreLedger = NewDefaultScoreLedger()
}
e.scoreLedger.Start(func(p peer.ID, score int) {
if score == 0 {
e.peerTagger.UntagPeer(p, e.tagUseful)
} else {
e.peerTagger.TagPeer(p, e.tagUseful, score)
}
})
px.Go(func(ppx process.Process) {
<-ppx.Closing()
e.scoreLedger.Stop()
})
}
// Start up workers to handle requests from other nodes for the data on this node // Start up workers to handle requests from other nodes for the data on this node
func (e *Engine) StartWorkers(ctx context.Context, px process.Process) { func (e *Engine) StartWorkers(ctx context.Context, px process.Process) {
// Start up blockstore manager // Start up blockstore manager
e.bsm.start(px) e.bsm.start(px)
px.Go(e.scoreWorker) e.startScoreLedger(px)
for i := 0; i < e.taskWorkerCount; i++ { for i := 0; i < e.taskWorkerCount; i++ {
px.Go(func(px process.Process) { px.Go(func(px process.Process) {
...@@ -223,109 +250,6 @@ func (e *Engine) StartWorkers(ctx context.Context, px process.Process) { ...@@ -223,109 +250,6 @@ func (e *Engine) StartWorkers(ctx context.Context, px process.Process) {
} }
} }
// scoreWorker keeps track of how "useful" our peers are, updating scores in the
// connection manager.
//
// It does this by tracking two scores: short-term usefulness and long-term
// usefulness. Short-term usefulness is sampled frequently and highly weights
// new observations. Long-term usefulness is sampled less frequently and highly
// weights on long-term trends.
//
// In practice, we do this by keeping two EWMAs. If we see an interaction
// within the sampling period, we record the score, otherwise, we record a 0.
// The short-term one has a high alpha and is sampled every shortTerm period.
// The long-term one has a low alpha and is sampled every
// longTermRatio*shortTerm period.
//
// To calculate the final score, we sum the short-term and long-term scores then
// adjust it ±25% based on our debt ratio. Peers that have historically been
// more useful to us than we are to them get the highest score.
func (e *Engine) scoreWorker(px process.Process) {
ticker := time.NewTicker(e.peerSampleInterval)
defer ticker.Stop()
type update struct {
peer peer.ID
score int
}
var (
lastShortUpdate, lastLongUpdate time.Time
updates []update
)
for i := 0; ; i = (i + 1) % longTermRatio {
var now time.Time
select {
case now = <-ticker.C:
case <-px.Closing():
return
}
// The long term update ticks every `longTermRatio` short
// intervals.
updateLong := i == 0
e.lock.Lock()
for _, ledger := range e.ledgerMap {
ledger.lk.Lock()
// Update the short-term score.
if ledger.lastExchange.After(lastShortUpdate) {
ledger.shortScore = ewma(ledger.shortScore, shortTermScore, shortTermAlpha)
} else {
ledger.shortScore = ewma(ledger.shortScore, 0, shortTermAlpha)
}
// Update the long-term score.
if updateLong {
if ledger.lastExchange.After(lastLongUpdate) {
ledger.longScore = ewma(ledger.longScore, longTermScore, longTermAlpha)
} else {
ledger.longScore = ewma(ledger.longScore, 0, longTermAlpha)
}
}
// Calculate the new score.
//
// The accounting score adjustment prefers peers _we_
// need over peers that need us. This doesn't help with
// leeching.
score := int((ledger.shortScore + ledger.longScore) * ((ledger.Accounting.Score())*.5 + .75))
// Avoid updating the connection manager unless there's a change. This can be expensive.
if ledger.score != score {
// put these in a list so we can perform the updates outside _global_ the lock.
updates = append(updates, update{ledger.Partner, score})
ledger.score = score
}
ledger.lk.Unlock()
}
e.lock.Unlock()
// record the times.
lastShortUpdate = now
if updateLong {
lastLongUpdate = now
}
// apply the updates
for _, update := range updates {
if update.score == 0 {
e.peerTagger.UntagPeer(update.peer, e.tagUseful)
} else {
e.peerTagger.TagPeer(update.peer, e.tagUseful, update.score)
}
}
// Keep the memory. It's not much and it saves us from having to allocate.
updates = updates[:0]
// Used by the tests
if e.sampleCh != nil {
e.sampleCh <- struct{}{}
}
}
}
func (e *Engine) onPeerAdded(p peer.ID) { func (e *Engine) onPeerAdded(p peer.ID) {
e.peerTagger.TagPeer(p, e.tagQueued, queuedTagWeight) e.peerTagger.TagPeer(p, e.tagQueued, queuedTagWeight)
} }
...@@ -347,21 +271,9 @@ func (e *Engine) WantlistForPeer(p peer.ID) []wl.Entry { ...@@ -347,21 +271,9 @@ func (e *Engine) WantlistForPeer(p peer.ID) []wl.Entry {
return entries return entries
} }
// LedgerForPeer returns aggregated data about blocks swapped and communication // LedgerForPeer returns aggregated data communication with a given peer.
// with a given peer.
func (e *Engine) LedgerForPeer(p peer.ID) *Receipt { func (e *Engine) LedgerForPeer(p peer.ID) *Receipt {
ledger := e.findOrCreate(p) return e.scoreLedger.GetReceipt(p)
ledger.lk.Lock()
defer ledger.lk.Unlock()
return &Receipt{
Peer: ledger.Partner.String(),
Value: ledger.Accounting.Value(),
Sent: ledger.Accounting.BytesSent,
Recv: ledger.Accounting.BytesRecv,
Exchanged: ledger.ExchangeCount(),
}
} }
// Each taskWorker pulls items off the request queue up to the maximum size // Each taskWorker pulls items off the request queue up to the maximum size
...@@ -671,7 +583,7 @@ func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block, haves []cid.Cid) ...@@ -671,7 +583,7 @@ func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block, haves []cid.Cid)
// Record how many bytes were received in the ledger // Record how many bytes were received in the ledger
for _, blk := range blks { for _, blk := range blks {
log.Debugw("Bitswap engine <- block", "local", e.self, "from", from, "cid", blk.Cid(), "size", len(blk.RawData())) log.Debugw("Bitswap engine <- block", "local", e.self, "from", from, "cid", blk.Cid(), "size", len(blk.RawData()))
l.ReceivedBytes(len(blk.RawData())) e.scoreLedger.AddToReceivedBytes(l.Partner, len(blk.RawData()))
} }
l.lk.Unlock() l.lk.Unlock()
...@@ -741,7 +653,7 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) { ...@@ -741,7 +653,7 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
// Remove sent blocks from the want list for the peer // Remove sent blocks from the want list for the peer
for _, block := range m.Blocks() { for _, block := range m.Blocks() {
l.SentBytes(len(block.RawData())) e.scoreLedger.AddToSentBytes(l.Partner, len(block.RawData()))
l.wantList.RemoveType(block.Cid(), pb.Message_Wantlist_Block) l.wantList.RemoveType(block.Cid(), pb.Message_Wantlist_Block)
} }
...@@ -764,6 +676,8 @@ func (e *Engine) PeerConnected(p peer.ID) { ...@@ -764,6 +676,8 @@ func (e *Engine) PeerConnected(p peer.ID) {
if !ok { if !ok {
e.ledgerMap[p] = newLedger(p) e.ledgerMap[p] = newLedger(p)
} }
e.scoreLedger.PeerConnected(p)
} }
// PeerDisconnected is called when a peer disconnects. // PeerDisconnected is called when a peer disconnects.
...@@ -772,6 +686,8 @@ func (e *Engine) PeerDisconnected(p peer.ID) { ...@@ -772,6 +686,8 @@ func (e *Engine) PeerDisconnected(p peer.ID) {
defer e.lock.Unlock() defer e.lock.Unlock()
delete(e.ledgerMap, p) delete(e.ledgerMap, p)
e.scoreLedger.PeerDisconnected(p)
} }
// If the want is a want-have, and it's below a certain size, send the full // If the want is a want-have, and it's below a certain size, send the full
...@@ -782,13 +698,11 @@ func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize in ...@@ -782,13 +698,11 @@ func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize in
} }
func (e *Engine) numBytesSentTo(p peer.ID) uint64 { func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
// NB not threadsafe return e.LedgerForPeer(p).Sent
return e.findOrCreate(p).Accounting.BytesSent
} }
func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 { func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
// NB not threadsafe return e.LedgerForPeer(p).Recv
return e.findOrCreate(p).Accounting.BytesRecv
} }
// ledger lazily instantiates a ledger // ledger lazily instantiates a ledger
......
...@@ -97,7 +97,7 @@ func newTestEngine(ctx context.Context, idStr string) engineSet { ...@@ -97,7 +97,7 @@ func newTestEngine(ctx context.Context, idStr string) engineSet {
func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}) engineSet { func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}) engineSet {
fpt := &fakePeerTagger{} fpt := &fakePeerTagger{}
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
e := newEngine(ctx, bs, fpt, "localhost", 0, peerSampleInterval, sampleCh) e := newEngine(ctx, bs, fpt, "localhost", 0, NewTestScoreLedger(peerSampleInterval, sampleCh))
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
return engineSet{ return engineSet{
Peer: peer.ID(idStr), Peer: peer.ID(idStr),
...@@ -185,7 +185,7 @@ func peerIsPartner(p peer.ID, e *Engine) bool { ...@@ -185,7 +185,7 @@ func peerIsPartner(p peer.ID, e *Engine) bool {
func TestOutboxClosedWhenEngineClosed(t *testing.T) { func TestOutboxClosedWhenEngineClosed(t *testing.T) {
ctx := context.Background() ctx := context.Background()
t.SkipNow() // TODO implement *Engine.Close t.SkipNow() // TODO implement *Engine.Close
e := newEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0, shortTerm, nil) e := newEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
...@@ -513,7 +513,7 @@ func TestPartnerWantHaveWantBlockNonActive(t *testing.T) { ...@@ -513,7 +513,7 @@ func TestPartnerWantHaveWantBlockNonActive(t *testing.T) {
testCases = onlyTestCases testCases = onlyTestCases
} }
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil) e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
for i, testCase := range testCases { for i, testCase := range testCases {
t.Logf("Test case %d:", i) t.Logf("Test case %d:", i)
...@@ -669,7 +669,7 @@ func TestPartnerWantHaveWantBlockActive(t *testing.T) { ...@@ -669,7 +669,7 @@ func TestPartnerWantHaveWantBlockActive(t *testing.T) {
testCases = onlyTestCases testCases = onlyTestCases
} }
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil) e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
var next envChan var next envChan
...@@ -854,7 +854,7 @@ func TestPartnerWantsThenCancels(t *testing.T) { ...@@ -854,7 +854,7 @@ func TestPartnerWantsThenCancels(t *testing.T) {
ctx := context.Background() ctx := context.Background()
for i := 0; i < numRounds; i++ { for i := 0; i < numRounds; i++ {
expected := make([][]string, 0, len(testcases)) expected := make([][]string, 0, len(testcases))
e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil) e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
for _, testcase := range testcases { for _, testcase := range testcases {
set := testcase[0] set := testcase[0]
...@@ -879,7 +879,7 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) { ...@@ -879,7 +879,7 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t) partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t) otherPeer := libp2ptest.RandPeerIDFatal(t)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil) e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
blks := testutil.GenerateBlocksOfSize(4, 8*1024) blks := testutil.GenerateBlocksOfSize(4, 8*1024)
...@@ -923,7 +923,7 @@ func TestSendDontHave(t *testing.T) { ...@@ -923,7 +923,7 @@ func TestSendDontHave(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t) partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t) otherPeer := libp2ptest.RandPeerIDFatal(t)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil) e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
blks := testutil.GenerateBlocksOfSize(4, 8*1024) blks := testutil.GenerateBlocksOfSize(4, 8*1024)
...@@ -987,7 +987,7 @@ func TestWantlistForPeer(t *testing.T) { ...@@ -987,7 +987,7 @@ func TestWantlistForPeer(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t) partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t) otherPeer := libp2ptest.RandPeerIDFatal(t)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil) e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
blks := testutil.GenerateBlocksOfSize(4, 8*1024) blks := testutil.GenerateBlocksOfSize(4, 8*1024)
......
...@@ -2,7 +2,6 @@ package decision ...@@ -2,7 +2,6 @@ package decision
import ( import (
"sync" "sync"
"time"
pb "github.com/ipfs/go-bitswap/message/pb" pb "github.com/ipfs/go-bitswap/message/pb"
wl "github.com/ipfs/go-bitswap/wantlist" wl "github.com/ipfs/go-bitswap/wantlist"
...@@ -18,75 +17,17 @@ func newLedger(p peer.ID) *ledger { ...@@ -18,75 +17,17 @@ func newLedger(p peer.ID) *ledger {
} }
} }
// ledger stores the data exchange relationship between two peers. // Keeps the wantlist for the partner. NOT threadsafe!
// NOT threadsafe
type ledger struct { type ledger struct {
// Partner is the remote Peer. // Partner is the remote Peer.
Partner peer.ID Partner peer.ID
// Accounting tracks bytes sent and received.
Accounting debtRatio
// lastExchange is the time of the last data exchange.
lastExchange time.Time
// These scores keep track of how useful we think this peer is. Short
// tracks short-term usefulness and long tracks long-term usefulness.
shortScore, longScore float64
// Score keeps track of the score used in the peer tagger. We track it
// here to avoid unnecessarily updating the tags in the connection manager.
score int
// exchangeCount is the number of exchanges with this peer
exchangeCount uint64
// wantList is a (bounded, small) set of keys that Partner desires. // wantList is a (bounded, small) set of keys that Partner desires.
wantList *wl.Wantlist wantList *wl.Wantlist
lk sync.RWMutex lk sync.RWMutex
} }
// Receipt is a summary of the ledger for a given peer
// collecting various pieces of aggregated data for external
// reporting purposes.
type Receipt struct {
Peer string
Value float64
Sent uint64
Recv uint64
Exchanged uint64
}
type debtRatio struct {
BytesSent uint64
BytesRecv uint64
}
// Value returns the debt ratio, sent:receive.
func (dr *debtRatio) Value() float64 {
return float64(dr.BytesSent) / float64(dr.BytesRecv+1)
}
// Score returns the debt _score_ on a 0-1 scale.
func (dr *debtRatio) Score() float64 {
if dr.BytesRecv == 0 {
return 0
}
return float64(dr.BytesRecv) / float64(dr.BytesRecv+dr.BytesSent)
}
func (l *ledger) SentBytes(n int) {
l.exchangeCount++
l.lastExchange = time.Now()
l.Accounting.BytesSent += uint64(n)
}
func (l *ledger) ReceivedBytes(n int) {
l.exchangeCount++
l.lastExchange = time.Now()
l.Accounting.BytesRecv += uint64(n)
}
func (l *ledger) Wants(k cid.Cid, priority int32, wantType pb.Message_Wantlist_WantType) { func (l *ledger) Wants(k cid.Cid, priority int32, wantType pb.Message_Wantlist_WantType) {
log.Debugf("peer %s wants %s", l.Partner, k) log.Debugf("peer %s wants %s", l.Partner, k)
l.wantList.Add(k, priority, wantType) l.wantList.Add(k, priority, wantType)
...@@ -99,7 +40,3 @@ func (l *ledger) CancelWant(k cid.Cid) bool { ...@@ -99,7 +40,3 @@ func (l *ledger) CancelWant(k cid.Cid) bool {
func (l *ledger) WantListContains(k cid.Cid) (wl.Entry, bool) { func (l *ledger) WantListContains(k cid.Cid) (wl.Entry, bool) {
return l.wantList.Contains(k) return l.wantList.Contains(k)
} }
func (l *ledger) ExchangeCount() uint64 {
return l.exchangeCount
}
package decision
import (
"sync"
"time"
peer "github.com/libp2p/go-libp2p-core/peer"
)
const (
// the alpha for the EWMA used to track short term usefulness
shortTermAlpha = 0.5
// the alpha for the EWMA used to track long term usefulness
longTermAlpha = 0.05
// how frequently the engine should sample usefulness. Peers that
// interact every shortTerm time period are considered "active".
shortTerm = 10 * time.Second
// long term ratio defines what "long term" means in terms of the
// shortTerm duration. Peers that interact once every longTermRatio are
// considered useful over the long term.
longTermRatio = 10
// long/short term scores for tagging peers
longTermScore = 10 // this is a high tag but it grows _very_ slowly.
shortTermScore = 10 // this is a high tag but it'll go away quickly if we aren't using the peer.
)
// Stores the data exchange relationship between two peers.
type scoreledger struct {
// Partner is the remote Peer.
partner peer.ID
// tracks bytes sent...
bytesSent uint64
// ...and received.
bytesRecv uint64
// lastExchange is the time of the last data exchange.
lastExchange time.Time
// These scores keep track of how useful we think this peer is. Short
// tracks short-term usefulness and long tracks long-term usefulness.
shortScore, longScore float64
// Score keeps track of the score used in the peer tagger. We track it
// here to avoid unnecessarily updating the tags in the connection manager.
score int
// exchangeCount is the number of exchanges with this peer
exchangeCount uint64
// the record lock
lock sync.RWMutex
}
// Receipt is a summary of the ledger for a given peer
// collecting various pieces of aggregated data for external
// reporting purposes.
type Receipt struct {
Peer string
Value float64
Sent uint64
Recv uint64
Exchanged uint64
}
// Increments the sent counter.
func (l *scoreledger) AddToSentBytes(n int) {
l.lock.Lock()
defer l.lock.Unlock()
l.exchangeCount++
l.lastExchange = time.Now()
l.bytesSent += uint64(n)
}
// Increments the received counter.
func (l *scoreledger) AddToReceivedBytes(n int) {
l.lock.Lock()
defer l.lock.Unlock()
l.exchangeCount++
l.lastExchange = time.Now()
l.bytesRecv += uint64(n)
}
// Returns the Receipt for this ledger record.
func (l *scoreledger) Receipt() *Receipt {
l.lock.RLock()
defer l.lock.RUnlock()
return &Receipt{
Peer: l.partner.String(),
Value: float64(l.bytesSent) / float64(l.bytesRecv+1),
Sent: l.bytesSent,
Recv: l.bytesRecv,
Exchanged: l.exchangeCount,
}
}
// DefaultScoreLedger is used by Engine as the default ScoreLedger.
type DefaultScoreLedger struct {
// a sample counting ticker
ticker *time.Ticker
// the score func
scorePeer ScorePeerFunc
// is closed on Close
closing chan struct{}
// protects the fields immediatly below
lock sync.RWMutex
// ledgerMap lists score ledgers by their partner key.
ledgerMap map[peer.ID]*scoreledger
// how frequently the engine should sample peer usefulness
peerSampleInterval time.Duration
// used by the tests to detect when a sample is taken
sampleCh chan struct{}
}
// scoreWorker keeps track of how "useful" our peers are, updating scores in the
// connection manager.
//
// It does this by tracking two scores: short-term usefulness and long-term
// usefulness. Short-term usefulness is sampled frequently and highly weights
// new observations. Long-term usefulness is sampled less frequently and highly
// weights on long-term trends.
//
// In practice, we do this by keeping two EWMAs. If we see an interaction
// within the sampling period, we record the score, otherwise, we record a 0.
// The short-term one has a high alpha and is sampled every shortTerm period.
// The long-term one has a low alpha and is sampled every
// longTermRatio*shortTerm period.
//
// To calculate the final score, we sum the short-term and long-term scores then
// adjust it ±25% based on our debt ratio. Peers that have historically been
// more useful to us than we are to them get the highest score.
func (dsl *DefaultScoreLedger) scoreWorker() {
ticker := time.NewTicker(dsl.peerSampleInterval)
defer ticker.Stop()
type update struct {
peer peer.ID
score int
}
var (
lastShortUpdate, lastLongUpdate time.Time
updates []update
)
for i := 0; ; i = (i + 1) % longTermRatio {
var now time.Time
select {
case now = <-ticker.C:
case <-dsl.closing:
return
}
// The long term update ticks every `longTermRatio` short
// intervals.
updateLong := i == 0
dsl.lock.Lock()
for _, l := range dsl.ledgerMap {
l.lock.Lock()
// Update the short-term score.
if l.lastExchange.After(lastShortUpdate) {
l.shortScore = ewma(l.shortScore, shortTermScore, shortTermAlpha)
} else {
l.shortScore = ewma(l.shortScore, 0, shortTermAlpha)
}
// Update the long-term score.
if updateLong {
if l.lastExchange.After(lastLongUpdate) {
l.longScore = ewma(l.longScore, longTermScore, longTermAlpha)
} else {
l.longScore = ewma(l.longScore, 0, longTermAlpha)
}
}
// Calculate the new score.
//
// The accounting score adjustment prefers peers _we_
// need over peers that need us. This doesn't help with
// leeching.
var lscore float64
if l.bytesRecv == 0 {
lscore = 0
} else {
lscore = float64(l.bytesRecv) / float64(l.bytesRecv+l.bytesSent)
}
score := int((l.shortScore + l.longScore) * (lscore*.5 + .75))
// Avoid updating the connection manager unless there's a change. This can be expensive.
if l.score != score {
// put these in a list so we can perform the updates outside _global_ the lock.
updates = append(updates, update{l.partner, score})
l.score = score
}
l.lock.Unlock()
}
dsl.lock.Unlock()
// record the times.
lastShortUpdate = now
if updateLong {
lastLongUpdate = now
}
// apply the updates
for _, update := range updates {
dsl.scorePeer(update.peer, update.score)
}
// Keep the memory. It's not much and it saves us from having to allocate.
updates = updates[:0]
// Used by the tests
if dsl.sampleCh != nil {
dsl.sampleCh <- struct{}{}
}
}
}
// Returns the score ledger for the given peer or nil if that peer
// is not on the ledger.
func (dsl *DefaultScoreLedger) find(p peer.ID) *scoreledger {
// Take a read lock (as it's less expensive) to check if we have
// a ledger for the peer.
dsl.lock.RLock()
l, ok := dsl.ledgerMap[p]
dsl.lock.RUnlock()
if ok {
return l
}
return nil
}
// Returns a new scoreledger.
func newScoreLedger(p peer.ID) *scoreledger {
return &scoreledger{
partner: p,
}
}
// Lazily instantiates a ledger.
func (dsl *DefaultScoreLedger) findOrCreate(p peer.ID) *scoreledger {
l := dsl.find(p)
if l != nil {
return l
}
// There's no ledger, so take a write lock, then check again and
// create the ledger if necessary.
dsl.lock.Lock()
defer dsl.lock.Unlock()
l, ok := dsl.ledgerMap[p]
if !ok {
l = newScoreLedger(p)
dsl.ledgerMap[p] = l
}
return l
}
// GetReceipt returns aggregated data communication with a given peer.
func (dsl *DefaultScoreLedger) GetReceipt(p peer.ID) *Receipt {
l := dsl.find(p)
if l != nil {
return l.Receipt()
}
// Return a blank receipt otherwise.
return &Receipt{
Peer: p.String(),
Value: 0,
Sent: 0,
Recv: 0,
Exchanged: 0,
}
}
// Starts the default ledger sampling process.
func (dsl *DefaultScoreLedger) Start(scorePeer ScorePeerFunc) {
dsl.init(scorePeer)
go dsl.scoreWorker()
}
// Stops the sampling process.
func (dsl *DefaultScoreLedger) Stop() {
close(dsl.closing)
}
// Initializes the score ledger.
func (dsl *DefaultScoreLedger) init(scorePeer ScorePeerFunc) {
dsl.lock.Lock()
defer dsl.lock.Unlock()
dsl.ledgerMap = make(map[peer.ID]*scoreledger)
dsl.scorePeer = scorePeer
}
// Increments the sent counter for the given peer.
func (dsl *DefaultScoreLedger) AddToSentBytes(p peer.ID, n int) {
l := dsl.findOrCreate(p)
l.AddToSentBytes(n)
}
// Increments the received counter for the given peer.
func (dsl *DefaultScoreLedger) AddToReceivedBytes(p peer.ID, n int) {
l := dsl.findOrCreate(p)
l.AddToReceivedBytes(n)
}
// PeerConnected should be called when a new peer connects, meaning
// we should open accounting.
func (dsl *DefaultScoreLedger) PeerConnected(p peer.ID) {
dsl.lock.Lock()
defer dsl.lock.Unlock()
_, ok := dsl.ledgerMap[p]
if !ok {
dsl.ledgerMap[p] = newScoreLedger(p)
}
}
// PeerDisconnected should be called when a peer disconnects to
// clean up the accounting.
func (dsl *DefaultScoreLedger) PeerDisconnected(p peer.ID) {
dsl.lock.Lock()
defer dsl.lock.Unlock()
delete(dsl.ledgerMap, p)
}
// Creates a new instance of the default score ledger.
func NewDefaultScoreLedger() *DefaultScoreLedger {
return &DefaultScoreLedger{
ledgerMap: make(map[peer.ID]*scoreledger),
ticker: time.NewTicker(time.Millisecond * 100),
closing: make(chan struct{}),
peerSampleInterval: shortTerm,
}
}
// Creates a new instance of the default score ledger with testing
// parameters.
func NewTestScoreLedger(peerSampleInterval time.Duration, sampleCh chan struct{}) *DefaultScoreLedger {
dsl := NewDefaultScoreLedger()
dsl.peerSampleInterval = peerSampleInterval
dsl.sampleCh = sampleCh
return dsl
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment