diff --git a/bitswap.go b/bitswap.go index 9afe5d2750c0e39a26b32985f500953cdeb1a955..8af786a80dcb481eb131494feb3ab8ece3acc925 100644 --- a/bitswap.go +++ b/bitswap.go @@ -11,6 +11,7 @@ import ( delay "github.com/ipfs/go-ipfs-delay" + deciface "github.com/ipfs/go-bitswap/decision" bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager" decision "github.com/ipfs/go-bitswap/internal/decision" bsgetter "github.com/ipfs/go-bitswap/internal/getter" @@ -95,6 +96,13 @@ func SetSendDontHaves(send bool) Option { } } +// Configures the engine to use the given score decision logic. +func WithScoreLedger(scoreLedger deciface.ScoreLedger) Option { + return func(bs *Bitswap) { + bs.engine.UseScoreLedger(scoreLedger) + } +} + // New initializes a BitSwap instance that communicates over the provided // BitSwapNetwork. This function registers the returned instance as the network // delegate. Runs until context is cancelled or bitswap.Close is called. diff --git a/bitswap_test.go b/bitswap_test.go index ba89e038d19a97265b708522b9fdf83ce5f7cfce..b95faa30d5d4f6f45857f2c1ff75b7ea5f1af3b3 100644 --- a/bitswap_test.go +++ b/bitswap_test.go @@ -9,11 +9,12 @@ import ( "time" bitswap "github.com/ipfs/go-bitswap" + deciface "github.com/ipfs/go-bitswap/decision" decision "github.com/ipfs/go-bitswap/internal/decision" bssession "github.com/ipfs/go-bitswap/internal/session" + "github.com/ipfs/go-bitswap/message" testinstance "github.com/ipfs/go-bitswap/testinstance" tn "github.com/ipfs/go-bitswap/testnet" - "github.com/ipfs/go-bitswap/message" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" detectrace "github.com/ipfs/go-detect-race" @@ -803,3 +804,59 @@ func TestBitswapLedgerTwoWay(t *testing.T) { } } } + +type testingScoreLedger struct { + scorePeer deciface.ScorePeerFunc + started chan struct{} + closed chan struct{} +} + +func newTestingScoreLedger() *testingScoreLedger { + return &testingScoreLedger{ + nil, + make(chan struct{}), + make(chan struct{}), + } +} + +func (tsl *testingScoreLedger) GetReceipt(p peer.ID) *deciface.Receipt { + return nil +} +func (tsl *testingScoreLedger) AddToSentBytes(p peer.ID, n int) {} +func (tsl *testingScoreLedger) AddToReceivedBytes(p peer.ID, n int) {} +func (tsl *testingScoreLedger) PeerConnected(p peer.ID) {} +func (tsl *testingScoreLedger) PeerDisconnected(p peer.ID) {} +func (tsl *testingScoreLedger) Start(scorePeer deciface.ScorePeerFunc) { + tsl.scorePeer = scorePeer + close(tsl.started) +} +func (tsl *testingScoreLedger) Stop() { + close(tsl.closed) +} + +// Tests start and stop of a custom decision logic +func TestWithScoreLedger(t *testing.T) { + tsl := newTestingScoreLedger() + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + bsOpts := []bitswap.Option{bitswap.WithScoreLedger(tsl)} + ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts) + defer ig.Close() + i := ig.Next() + defer i.Exchange.Close() + + select { + case <-tsl.started: + if tsl.scorePeer == nil { + t.Fatal("Expected the score function to be initialized") + } + case <-time.After(time.Second * 5): + t.Fatal("Expected the score ledger to be started within 5s") + } + + i.Exchange.Close() + select { + case <-tsl.closed: + case <-time.After(time.Second * 5): + t.Fatal("Expected the score ledger to be closed within 5s") + } +} diff --git a/decision/decision.go b/decision/decision.go index 8dd310f69ea8e09421607f24d3bf0b734d68c8d1..4afc463ec08db9bb344cc758729c5b3cc4f2134a 100644 --- a/decision/decision.go +++ b/decision/decision.go @@ -2,5 +2,11 @@ package decision import intdec "github.com/ipfs/go-bitswap/internal/decision" -// Expose type externally +// Expose Receipt externally type Receipt = intdec.Receipt + +// Expose ScoreLedger externally +type ScoreLedger = intdec.ScoreLedger + +// Expose ScorePeerFunc externally +type ScorePeerFunc = intdec.ScorePeerFunc diff --git a/internal/decision/engine.go b/internal/decision/engine.go index 2a6dc60f6d1744f67209a408ea2ab11862f49705..28584fb100e7468c3a84114c1962c32cd9f92d71 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -70,25 +70,6 @@ const ( // on their behalf. queuedTagWeight = 10 - // the alpha for the EWMA used to track short term usefulness - shortTermAlpha = 0.5 - - // the alpha for the EWMA used to track long term usefulness - longTermAlpha = 0.05 - - // how frequently the engine should sample usefulness. Peers that - // interact every shortTerm time period are considered "active". - shortTerm = 10 * time.Second - - // long term ratio defines what "long term" means in terms of the - // shortTerm duration. Peers that interact once every longTermRatio are - // considered useful over the long term. - longTermRatio = 10 - - // long/short term scores for tagging peers - longTermScore = 10 // this is a high tag but it grows _very_ slowly. - shortTermScore = 10 // this is a high tag but it'll go away quickly if we aren't using the peer. - // maxBlockSizeReplaceHasWithBlock is the maximum size of the block in // bytes up to which we will replace a want-have with a want-block maxBlockSizeReplaceHasWithBlock = 1024 @@ -119,6 +100,29 @@ type PeerTagger interface { UntagPeer(p peer.ID, tag string) } +// Assigns a specific score to a peer +type ScorePeerFunc func(peer.ID, int) + +// ScoreLedger is an external ledger dealing with peer scores. +type ScoreLedger interface { + // Returns aggregated data communication with a given peer. + GetReceipt(p peer.ID) *Receipt + // Increments the sent counter for the given peer. + AddToSentBytes(p peer.ID, n int) + // Increments the received counter for the given peer. + AddToReceivedBytes(p peer.ID, n int) + // PeerConnected should be called when a new peer connects, + // meaning the ledger should open accounting. + PeerConnected(p peer.ID) + // PeerDisconnected should be called when a peer disconnects to + // clean up the accounting. + PeerDisconnected(p peer.ID) + // Starts the ledger sampling process. + Start(scorePeer ScorePeerFunc) + // Stops the sampling process. + Stop() +} + // Engine manages sending requested blocks to peers. type Engine struct { // peerRequestQueue is a priority queue of requests received from peers. @@ -145,9 +149,12 @@ type Engine struct { lock sync.RWMutex // protects the fields immediatly below - // ledgerMap lists Ledgers by their Partner key. + // ledgerMap lists block-related Ledgers by their Partner key. ledgerMap map[peer.ID]*ledger + // an external ledger dealing with peer scores + scoreLedger ScoreLedger + ticker *time.Ticker taskWorkerLock sync.Mutex @@ -157,11 +164,6 @@ type Engine struct { // bytes up to which we will replace a want-have with a want-block maxBlockSizeReplaceHasWithBlock int - // how frequently the engine should sample peer usefulness - peerSampleInterval time.Duration - // used by the tests to detect when a sample is taken - sampleCh chan struct{} - sendDontHaves bool self peer.ID @@ -169,23 +171,22 @@ type Engine struct { // NewEngine creates a new block sending engine for the given block store func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID) *Engine { - return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, shortTerm, nil) + return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, nil) } // This constructor is used by the tests func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID, - maxReplaceSize int, peerSampleInterval time.Duration, sampleCh chan struct{}) *Engine { + maxReplaceSize int, scoreLedger ScoreLedger) *Engine { e := &Engine{ ledgerMap: make(map[peer.ID]*ledger), + scoreLedger: scoreLedger, bsm: newBlockstoreManager(ctx, bs, blockstoreWorkerCount), peerTagger: peerTagger, outbox: make(chan (<-chan *Envelope), outboxChanBuffer), workSignal: make(chan struct{}, 1), ticker: time.NewTicker(time.Millisecond * 100), maxBlockSizeReplaceHasWithBlock: maxReplaceSize, - peerSampleInterval: peerSampleInterval, - sampleCh: sampleCh, taskWorkerCount: taskWorkerCount, sendDontHaves: true, self: self, @@ -210,11 +211,37 @@ func (e *Engine) SetSendDontHaves(send bool) { e.sendDontHaves = send } +// Sets the scoreLedger to the given implementation. Should be called +// before StartWorkers(). +func (e *Engine) UseScoreLedger(scoreLedger ScoreLedger) { + e.scoreLedger = scoreLedger +} + +// Starts the score ledger. Before start the function checks and, +// if it is unset, initializes the scoreLedger with the default +// implementation. +func (e *Engine) startScoreLedger(px process.Process) { + if e.scoreLedger == nil { + e.scoreLedger = NewDefaultScoreLedger() + } + e.scoreLedger.Start(func(p peer.ID, score int) { + if score == 0 { + e.peerTagger.UntagPeer(p, e.tagUseful) + } else { + e.peerTagger.TagPeer(p, e.tagUseful, score) + } + }) + px.Go(func(ppx process.Process) { + <-ppx.Closing() + e.scoreLedger.Stop() + }) +} + // Start up workers to handle requests from other nodes for the data on this node func (e *Engine) StartWorkers(ctx context.Context, px process.Process) { // Start up blockstore manager e.bsm.start(px) - px.Go(e.scoreWorker) + e.startScoreLedger(px) for i := 0; i < e.taskWorkerCount; i++ { px.Go(func(px process.Process) { @@ -223,109 +250,6 @@ func (e *Engine) StartWorkers(ctx context.Context, px process.Process) { } } -// scoreWorker keeps track of how "useful" our peers are, updating scores in the -// connection manager. -// -// It does this by tracking two scores: short-term usefulness and long-term -// usefulness. Short-term usefulness is sampled frequently and highly weights -// new observations. Long-term usefulness is sampled less frequently and highly -// weights on long-term trends. -// -// In practice, we do this by keeping two EWMAs. If we see an interaction -// within the sampling period, we record the score, otherwise, we record a 0. -// The short-term one has a high alpha and is sampled every shortTerm period. -// The long-term one has a low alpha and is sampled every -// longTermRatio*shortTerm period. -// -// To calculate the final score, we sum the short-term and long-term scores then -// adjust it ±25% based on our debt ratio. Peers that have historically been -// more useful to us than we are to them get the highest score. -func (e *Engine) scoreWorker(px process.Process) { - ticker := time.NewTicker(e.peerSampleInterval) - defer ticker.Stop() - - type update struct { - peer peer.ID - score int - } - var ( - lastShortUpdate, lastLongUpdate time.Time - updates []update - ) - - for i := 0; ; i = (i + 1) % longTermRatio { - var now time.Time - select { - case now = <-ticker.C: - case <-px.Closing(): - return - } - - // The long term update ticks every `longTermRatio` short - // intervals. - updateLong := i == 0 - - e.lock.Lock() - for _, ledger := range e.ledgerMap { - ledger.lk.Lock() - - // Update the short-term score. - if ledger.lastExchange.After(lastShortUpdate) { - ledger.shortScore = ewma(ledger.shortScore, shortTermScore, shortTermAlpha) - } else { - ledger.shortScore = ewma(ledger.shortScore, 0, shortTermAlpha) - } - - // Update the long-term score. - if updateLong { - if ledger.lastExchange.After(lastLongUpdate) { - ledger.longScore = ewma(ledger.longScore, longTermScore, longTermAlpha) - } else { - ledger.longScore = ewma(ledger.longScore, 0, longTermAlpha) - } - } - - // Calculate the new score. - // - // The accounting score adjustment prefers peers _we_ - // need over peers that need us. This doesn't help with - // leeching. - score := int((ledger.shortScore + ledger.longScore) * ((ledger.Accounting.Score())*.5 + .75)) - - // Avoid updating the connection manager unless there's a change. This can be expensive. - if ledger.score != score { - // put these in a list so we can perform the updates outside _global_ the lock. - updates = append(updates, update{ledger.Partner, score}) - ledger.score = score - } - ledger.lk.Unlock() - } - e.lock.Unlock() - - // record the times. - lastShortUpdate = now - if updateLong { - lastLongUpdate = now - } - - // apply the updates - for _, update := range updates { - if update.score == 0 { - e.peerTagger.UntagPeer(update.peer, e.tagUseful) - } else { - e.peerTagger.TagPeer(update.peer, e.tagUseful, update.score) - } - } - // Keep the memory. It's not much and it saves us from having to allocate. - updates = updates[:0] - - // Used by the tests - if e.sampleCh != nil { - e.sampleCh <- struct{}{} - } - } -} - func (e *Engine) onPeerAdded(p peer.ID) { e.peerTagger.TagPeer(p, e.tagQueued, queuedTagWeight) } @@ -347,21 +271,9 @@ func (e *Engine) WantlistForPeer(p peer.ID) []wl.Entry { return entries } -// LedgerForPeer returns aggregated data about blocks swapped and communication -// with a given peer. +// LedgerForPeer returns aggregated data communication with a given peer. func (e *Engine) LedgerForPeer(p peer.ID) *Receipt { - ledger := e.findOrCreate(p) - - ledger.lk.Lock() - defer ledger.lk.Unlock() - - return &Receipt{ - Peer: ledger.Partner.String(), - Value: ledger.Accounting.Value(), - Sent: ledger.Accounting.BytesSent, - Recv: ledger.Accounting.BytesRecv, - Exchanged: ledger.ExchangeCount(), - } + return e.scoreLedger.GetReceipt(p) } // Each taskWorker pulls items off the request queue up to the maximum size @@ -671,7 +583,7 @@ func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block, haves []cid.Cid) // Record how many bytes were received in the ledger for _, blk := range blks { log.Debugw("Bitswap engine <- block", "local", e.self, "from", from, "cid", blk.Cid(), "size", len(blk.RawData())) - l.ReceivedBytes(len(blk.RawData())) + e.scoreLedger.AddToReceivedBytes(l.Partner, len(blk.RawData())) } l.lk.Unlock() @@ -741,7 +653,7 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) { // Remove sent blocks from the want list for the peer for _, block := range m.Blocks() { - l.SentBytes(len(block.RawData())) + e.scoreLedger.AddToSentBytes(l.Partner, len(block.RawData())) l.wantList.RemoveType(block.Cid(), pb.Message_Wantlist_Block) } @@ -764,6 +676,8 @@ func (e *Engine) PeerConnected(p peer.ID) { if !ok { e.ledgerMap[p] = newLedger(p) } + + e.scoreLedger.PeerConnected(p) } // PeerDisconnected is called when a peer disconnects. @@ -772,6 +686,8 @@ func (e *Engine) PeerDisconnected(p peer.ID) { defer e.lock.Unlock() delete(e.ledgerMap, p) + + e.scoreLedger.PeerDisconnected(p) } // If the want is a want-have, and it's below a certain size, send the full @@ -782,13 +698,11 @@ func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize in } func (e *Engine) numBytesSentTo(p peer.ID) uint64 { - // NB not threadsafe - return e.findOrCreate(p).Accounting.BytesSent + return e.LedgerForPeer(p).Sent } func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 { - // NB not threadsafe - return e.findOrCreate(p).Accounting.BytesRecv + return e.LedgerForPeer(p).Recv } // ledger lazily instantiates a ledger diff --git a/internal/decision/engine_test.go b/internal/decision/engine_test.go index 3cb76597377f3798b57925e9804d966ea5aa680d..3046dc0d1d5ecb28a15390816172f08c8dcad974 100644 --- a/internal/decision/engine_test.go +++ b/internal/decision/engine_test.go @@ -97,7 +97,7 @@ func newTestEngine(ctx context.Context, idStr string) engineSet { func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}) engineSet { fpt := &fakePeerTagger{} bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - e := newEngine(ctx, bs, fpt, "localhost", 0, peerSampleInterval, sampleCh) + e := newEngine(ctx, bs, fpt, "localhost", 0, NewTestScoreLedger(peerSampleInterval, sampleCh)) e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) return engineSet{ Peer: peer.ID(idStr), @@ -185,7 +185,7 @@ func peerIsPartner(p peer.ID, e *Engine) bool { func TestOutboxClosedWhenEngineClosed(t *testing.T) { ctx := context.Background() t.SkipNow() // TODO implement *Engine.Close - e := newEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0, shortTerm, nil) + e := newEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) var wg sync.WaitGroup wg.Add(1) @@ -513,7 +513,7 @@ func TestPartnerWantHaveWantBlockNonActive(t *testing.T) { testCases = onlyTestCases } - e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil) + e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) for i, testCase := range testCases { t.Logf("Test case %d:", i) @@ -669,7 +669,7 @@ func TestPartnerWantHaveWantBlockActive(t *testing.T) { testCases = onlyTestCases } - e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil) + e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) var next envChan @@ -854,7 +854,7 @@ func TestPartnerWantsThenCancels(t *testing.T) { ctx := context.Background() for i := 0; i < numRounds; i++ { expected := make([][]string, 0, len(testcases)) - e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil) + e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) for _, testcase := range testcases { set := testcase[0] @@ -879,7 +879,7 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) { partner := libp2ptest.RandPeerIDFatal(t) otherPeer := libp2ptest.RandPeerIDFatal(t) - e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil) + e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) blks := testutil.GenerateBlocksOfSize(4, 8*1024) @@ -923,7 +923,7 @@ func TestSendDontHave(t *testing.T) { partner := libp2ptest.RandPeerIDFatal(t) otherPeer := libp2ptest.RandPeerIDFatal(t) - e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil) + e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) blks := testutil.GenerateBlocksOfSize(4, 8*1024) @@ -987,7 +987,7 @@ func TestWantlistForPeer(t *testing.T) { partner := libp2ptest.RandPeerIDFatal(t) otherPeer := libp2ptest.RandPeerIDFatal(t) - e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil) + e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil)) e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil })) blks := testutil.GenerateBlocksOfSize(4, 8*1024) diff --git a/internal/decision/ledger.go b/internal/decision/ledger.go index 87fedc458c308ee7183c5465183ecca8835c7822..a607ff4f4e5a638d65775f68afbab21d57c215f3 100644 --- a/internal/decision/ledger.go +++ b/internal/decision/ledger.go @@ -2,7 +2,6 @@ package decision import ( "sync" - "time" pb "github.com/ipfs/go-bitswap/message/pb" wl "github.com/ipfs/go-bitswap/wantlist" @@ -18,75 +17,17 @@ func newLedger(p peer.ID) *ledger { } } -// ledger stores the data exchange relationship between two peers. -// NOT threadsafe +// Keeps the wantlist for the partner. NOT threadsafe! type ledger struct { // Partner is the remote Peer. Partner peer.ID - // Accounting tracks bytes sent and received. - Accounting debtRatio - - // lastExchange is the time of the last data exchange. - lastExchange time.Time - - // These scores keep track of how useful we think this peer is. Short - // tracks short-term usefulness and long tracks long-term usefulness. - shortScore, longScore float64 - // Score keeps track of the score used in the peer tagger. We track it - // here to avoid unnecessarily updating the tags in the connection manager. - score int - - // exchangeCount is the number of exchanges with this peer - exchangeCount uint64 - // wantList is a (bounded, small) set of keys that Partner desires. wantList *wl.Wantlist lk sync.RWMutex } -// Receipt is a summary of the ledger for a given peer -// collecting various pieces of aggregated data for external -// reporting purposes. -type Receipt struct { - Peer string - Value float64 - Sent uint64 - Recv uint64 - Exchanged uint64 -} - -type debtRatio struct { - BytesSent uint64 - BytesRecv uint64 -} - -// Value returns the debt ratio, sent:receive. -func (dr *debtRatio) Value() float64 { - return float64(dr.BytesSent) / float64(dr.BytesRecv+1) -} - -// Score returns the debt _score_ on a 0-1 scale. -func (dr *debtRatio) Score() float64 { - if dr.BytesRecv == 0 { - return 0 - } - return float64(dr.BytesRecv) / float64(dr.BytesRecv+dr.BytesSent) -} - -func (l *ledger) SentBytes(n int) { - l.exchangeCount++ - l.lastExchange = time.Now() - l.Accounting.BytesSent += uint64(n) -} - -func (l *ledger) ReceivedBytes(n int) { - l.exchangeCount++ - l.lastExchange = time.Now() - l.Accounting.BytesRecv += uint64(n) -} - func (l *ledger) Wants(k cid.Cid, priority int32, wantType pb.Message_Wantlist_WantType) { log.Debugf("peer %s wants %s", l.Partner, k) l.wantList.Add(k, priority, wantType) @@ -99,7 +40,3 @@ func (l *ledger) CancelWant(k cid.Cid) bool { func (l *ledger) WantListContains(k cid.Cid) (wl.Entry, bool) { return l.wantList.Contains(k) } - -func (l *ledger) ExchangeCount() uint64 { - return l.exchangeCount -} diff --git a/internal/decision/scoreledger.go b/internal/decision/scoreledger.go new file mode 100644 index 0000000000000000000000000000000000000000..5ffd6bb8a7f4e141a188f9283670f1fc0f137897 --- /dev/null +++ b/internal/decision/scoreledger.go @@ -0,0 +1,350 @@ +package decision + +import ( + "sync" + "time" + + peer "github.com/libp2p/go-libp2p-core/peer" +) + +const ( + // the alpha for the EWMA used to track short term usefulness + shortTermAlpha = 0.5 + + // the alpha for the EWMA used to track long term usefulness + longTermAlpha = 0.05 + + // how frequently the engine should sample usefulness. Peers that + // interact every shortTerm time period are considered "active". + shortTerm = 10 * time.Second + + // long term ratio defines what "long term" means in terms of the + // shortTerm duration. Peers that interact once every longTermRatio are + // considered useful over the long term. + longTermRatio = 10 + + // long/short term scores for tagging peers + longTermScore = 10 // this is a high tag but it grows _very_ slowly. + shortTermScore = 10 // this is a high tag but it'll go away quickly if we aren't using the peer. +) + +// Stores the data exchange relationship between two peers. +type scoreledger struct { + // Partner is the remote Peer. + partner peer.ID + + // tracks bytes sent... + bytesSent uint64 + + // ...and received. + bytesRecv uint64 + + // lastExchange is the time of the last data exchange. + lastExchange time.Time + + // These scores keep track of how useful we think this peer is. Short + // tracks short-term usefulness and long tracks long-term usefulness. + shortScore, longScore float64 + + // Score keeps track of the score used in the peer tagger. We track it + // here to avoid unnecessarily updating the tags in the connection manager. + score int + + // exchangeCount is the number of exchanges with this peer + exchangeCount uint64 + + // the record lock + lock sync.RWMutex +} + +// Receipt is a summary of the ledger for a given peer +// collecting various pieces of aggregated data for external +// reporting purposes. +type Receipt struct { + Peer string + Value float64 + Sent uint64 + Recv uint64 + Exchanged uint64 +} + +// Increments the sent counter. +func (l *scoreledger) AddToSentBytes(n int) { + l.lock.Lock() + defer l.lock.Unlock() + l.exchangeCount++ + l.lastExchange = time.Now() + l.bytesSent += uint64(n) +} + +// Increments the received counter. +func (l *scoreledger) AddToReceivedBytes(n int) { + l.lock.Lock() + defer l.lock.Unlock() + l.exchangeCount++ + l.lastExchange = time.Now() + l.bytesRecv += uint64(n) +} + +// Returns the Receipt for this ledger record. +func (l *scoreledger) Receipt() *Receipt { + l.lock.RLock() + defer l.lock.RUnlock() + + return &Receipt{ + Peer: l.partner.String(), + Value: float64(l.bytesSent) / float64(l.bytesRecv+1), + Sent: l.bytesSent, + Recv: l.bytesRecv, + Exchanged: l.exchangeCount, + } +} + +// DefaultScoreLedger is used by Engine as the default ScoreLedger. +type DefaultScoreLedger struct { + // a sample counting ticker + ticker *time.Ticker + // the score func + scorePeer ScorePeerFunc + // is closed on Close + closing chan struct{} + // protects the fields immediatly below + lock sync.RWMutex + // ledgerMap lists score ledgers by their partner key. + ledgerMap map[peer.ID]*scoreledger + // how frequently the engine should sample peer usefulness + peerSampleInterval time.Duration + // used by the tests to detect when a sample is taken + sampleCh chan struct{} +} + +// scoreWorker keeps track of how "useful" our peers are, updating scores in the +// connection manager. +// +// It does this by tracking two scores: short-term usefulness and long-term +// usefulness. Short-term usefulness is sampled frequently and highly weights +// new observations. Long-term usefulness is sampled less frequently and highly +// weights on long-term trends. +// +// In practice, we do this by keeping two EWMAs. If we see an interaction +// within the sampling period, we record the score, otherwise, we record a 0. +// The short-term one has a high alpha and is sampled every shortTerm period. +// The long-term one has a low alpha and is sampled every +// longTermRatio*shortTerm period. +// +// To calculate the final score, we sum the short-term and long-term scores then +// adjust it ±25% based on our debt ratio. Peers that have historically been +// more useful to us than we are to them get the highest score. +func (dsl *DefaultScoreLedger) scoreWorker() { + ticker := time.NewTicker(dsl.peerSampleInterval) + defer ticker.Stop() + + type update struct { + peer peer.ID + score int + } + var ( + lastShortUpdate, lastLongUpdate time.Time + updates []update + ) + + for i := 0; ; i = (i + 1) % longTermRatio { + var now time.Time + select { + case now = <-ticker.C: + case <-dsl.closing: + return + } + + // The long term update ticks every `longTermRatio` short + // intervals. + updateLong := i == 0 + + dsl.lock.Lock() + for _, l := range dsl.ledgerMap { + l.lock.Lock() + + // Update the short-term score. + if l.lastExchange.After(lastShortUpdate) { + l.shortScore = ewma(l.shortScore, shortTermScore, shortTermAlpha) + } else { + l.shortScore = ewma(l.shortScore, 0, shortTermAlpha) + } + + // Update the long-term score. + if updateLong { + if l.lastExchange.After(lastLongUpdate) { + l.longScore = ewma(l.longScore, longTermScore, longTermAlpha) + } else { + l.longScore = ewma(l.longScore, 0, longTermAlpha) + } + } + + // Calculate the new score. + // + // The accounting score adjustment prefers peers _we_ + // need over peers that need us. This doesn't help with + // leeching. + var lscore float64 + if l.bytesRecv == 0 { + lscore = 0 + } else { + lscore = float64(l.bytesRecv) / float64(l.bytesRecv+l.bytesSent) + } + score := int((l.shortScore + l.longScore) * (lscore*.5 + .75)) + + // Avoid updating the connection manager unless there's a change. This can be expensive. + if l.score != score { + // put these in a list so we can perform the updates outside _global_ the lock. + updates = append(updates, update{l.partner, score}) + l.score = score + } + l.lock.Unlock() + } + dsl.lock.Unlock() + + // record the times. + lastShortUpdate = now + if updateLong { + lastLongUpdate = now + } + + // apply the updates + for _, update := range updates { + dsl.scorePeer(update.peer, update.score) + } + // Keep the memory. It's not much and it saves us from having to allocate. + updates = updates[:0] + + // Used by the tests + if dsl.sampleCh != nil { + dsl.sampleCh <- struct{}{} + } + } +} + +// Returns the score ledger for the given peer or nil if that peer +// is not on the ledger. +func (dsl *DefaultScoreLedger) find(p peer.ID) *scoreledger { + // Take a read lock (as it's less expensive) to check if we have + // a ledger for the peer. + dsl.lock.RLock() + l, ok := dsl.ledgerMap[p] + dsl.lock.RUnlock() + if ok { + return l + } + return nil +} + +// Returns a new scoreledger. +func newScoreLedger(p peer.ID) *scoreledger { + return &scoreledger{ + partner: p, + } +} + +// Lazily instantiates a ledger. +func (dsl *DefaultScoreLedger) findOrCreate(p peer.ID) *scoreledger { + l := dsl.find(p) + if l != nil { + return l + } + + // There's no ledger, so take a write lock, then check again and + // create the ledger if necessary. + dsl.lock.Lock() + defer dsl.lock.Unlock() + l, ok := dsl.ledgerMap[p] + if !ok { + l = newScoreLedger(p) + dsl.ledgerMap[p] = l + } + return l +} + +// GetReceipt returns aggregated data communication with a given peer. +func (dsl *DefaultScoreLedger) GetReceipt(p peer.ID) *Receipt { + l := dsl.find(p) + if l != nil { + return l.Receipt() + } + + // Return a blank receipt otherwise. + return &Receipt{ + Peer: p.String(), + Value: 0, + Sent: 0, + Recv: 0, + Exchanged: 0, + } +} + +// Starts the default ledger sampling process. +func (dsl *DefaultScoreLedger) Start(scorePeer ScorePeerFunc) { + dsl.init(scorePeer) + go dsl.scoreWorker() +} + +// Stops the sampling process. +func (dsl *DefaultScoreLedger) Stop() { + close(dsl.closing) +} + +// Initializes the score ledger. +func (dsl *DefaultScoreLedger) init(scorePeer ScorePeerFunc) { + dsl.lock.Lock() + defer dsl.lock.Unlock() + dsl.ledgerMap = make(map[peer.ID]*scoreledger) + dsl.scorePeer = scorePeer +} + +// Increments the sent counter for the given peer. +func (dsl *DefaultScoreLedger) AddToSentBytes(p peer.ID, n int) { + l := dsl.findOrCreate(p) + l.AddToSentBytes(n) +} + +// Increments the received counter for the given peer. +func (dsl *DefaultScoreLedger) AddToReceivedBytes(p peer.ID, n int) { + l := dsl.findOrCreate(p) + l.AddToReceivedBytes(n) +} + +// PeerConnected should be called when a new peer connects, meaning +// we should open accounting. +func (dsl *DefaultScoreLedger) PeerConnected(p peer.ID) { + dsl.lock.Lock() + defer dsl.lock.Unlock() + _, ok := dsl.ledgerMap[p] + if !ok { + dsl.ledgerMap[p] = newScoreLedger(p) + } +} + +// PeerDisconnected should be called when a peer disconnects to +// clean up the accounting. +func (dsl *DefaultScoreLedger) PeerDisconnected(p peer.ID) { + dsl.lock.Lock() + defer dsl.lock.Unlock() + delete(dsl.ledgerMap, p) +} + +// Creates a new instance of the default score ledger. +func NewDefaultScoreLedger() *DefaultScoreLedger { + return &DefaultScoreLedger{ + ledgerMap: make(map[peer.ID]*scoreledger), + ticker: time.NewTicker(time.Millisecond * 100), + closing: make(chan struct{}), + peerSampleInterval: shortTerm, + } +} + +// Creates a new instance of the default score ledger with testing +// parameters. +func NewTestScoreLedger(peerSampleInterval time.Duration, sampleCh chan struct{}) *DefaultScoreLedger { + dsl := NewDefaultScoreLedger() + dsl.peerSampleInterval = peerSampleInterval + dsl.sampleCh = sampleCh + return dsl +}