diff --git a/go.mod b/go.mod index dda2a109adb6f564a4be36d2f8e9588b8bad728f..e4ba25afb7e8bada8f8f5cdd802c673531e3d0f4 100644 --- a/go.mod +++ b/go.mod @@ -18,10 +18,10 @@ require ( github.com/ipfs/go-log v1.0.4 github.com/ipfs/go-metrics-interface v0.0.1 github.com/ipfs/go-peertaskqueue v0.2.0 - github.com/jbenet/goprocess v0.1.3 + github.com/jbenet/goprocess v0.1.4 github.com/libp2p/go-buffer-pool v0.0.2 github.com/libp2p/go-libp2p v0.8.3 - github.com/libp2p/go-libp2p-core v0.5.0 + github.com/libp2p/go-libp2p-core v0.5.2 github.com/libp2p/go-libp2p-loggables v0.1.0 github.com/libp2p/go-libp2p-netutil v0.1.0 github.com/libp2p/go-libp2p-testing v0.1.1 diff --git a/go.sum b/go.sum index 9cf5253b6e4087c745ed149792b6f5fb6dd6fff8..4f209988d86f09acaace0ec1b508944a80c741aa 100644 --- a/go.sum +++ b/go.sum @@ -193,6 +193,7 @@ github.com/koron/go-ssdp v0.0.0-20191105050749-2e1c40ed0b5d h1:68u9r4wEvL3gYg2jv github.com/koron/go-ssdp v0.0.0-20191105050749-2e1c40ed0b5d/go.mod h1:5Ky9EC2xfoUKUor0Hjgi2BJhCSXJfMOFlmyYrVKGQMk= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= diff --git a/internal/decision/engine.go b/internal/decision/engine.go index 49063bd5cf15d5ba43f7774aa2060fdf2d153c35..b620740533faed51b7fced803fd051b547362791 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -557,13 +557,6 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap l.lk.Lock() defer l.lk.Unlock() - // Record how many bytes were received in the ledger - blks := m.Blocks() - for _, block := range blks { - log.Debugw("Bitswap engine <- block", "local", e.self, "from", p, "cid", block.Cid(), "size", len(block.RawData())) - l.ReceivedBytes(len(block.RawData())) - } - // If the peer sent a full wantlist, replace the ledger's wantlist if m.Full() { l.wantList = wl.New() @@ -664,11 +657,26 @@ func (e *Engine) splitWantsCancels(es []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Ent // ReceiveFrom is called when new blocks are received and added to the block // store, meaning there may be peers who want those blocks, so we should send // the blocks to them. +// +// This function also updates the receive side of the ledger. func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block, haves []cid.Cid) { if len(blks) == 0 { return } + if from != "" { + l := e.findOrCreate(from) + l.lk.Lock() + + // Record how many bytes were received in the ledger + for _, blk := range blks { + log.Debugw("Bitswap engine <- block", "local", e.self, "from", from, "cid", blk.Cid(), "size", len(blk.RawData())) + l.ReceivedBytes(len(blk.RawData())) + } + + l.lk.Unlock() + } + // Get the size of each block blockSizes := make(map[cid.Cid]int, len(blks)) for _, blk := range blks { @@ -678,6 +686,7 @@ func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block, haves []cid.Cid) // Check each peer to see if it wants one of the blocks we received work := false e.lock.RLock() + for _, l := range e.ledgerMap { l.lk.RLock() diff --git a/internal/decision/engine_test.go b/internal/decision/engine_test.go index cf000d96ec6cb9a04683474f6d9fd5936903f891..3cb76597377f3798b57925e9804d966ea5aa680d 100644 --- a/internal/decision/engine_test.go +++ b/internal/decision/engine_test.go @@ -123,6 +123,7 @@ func TestConsistentAccounting(t *testing.T) { sender.Engine.MessageSent(receiver.Peer, m) receiver.Engine.MessageReceived(ctx, sender.Peer, m) + receiver.Engine.ReceiveFrom(sender.Peer, m.Blocks(), nil) } // Ensure sender records the change