diff --git a/bitswap.go b/bitswap.go index f2217b85c23e80570141a5a68ae72bf6036fbc44..aab1429fa247899ad78b5209339851b843c103a3 100644 --- a/bitswap.go +++ b/bitswap.go @@ -37,6 +37,7 @@ import ( ) var log = logging.Logger("bitswap") +var sflog = log.Desugar() var _ exchange.SessionExchange = (*Bitswap)(nil) diff --git a/internal/decision/engine.go b/internal/decision/engine.go index 6fe8875cdfa6acfed10d20ea1e81e7841ae4f11a..b744cb54338f04e17a8766c5ce7cf8c51932a5d5 100644 --- a/internal/decision/engine.go +++ b/internal/decision/engine.go @@ -733,8 +733,7 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) { // Remove sent block presences from the want list for the peer for _, bp := range m.BlockPresences() { - // TODO: record block presence bytes as well? - // l.SentBytes(?) + // Don't record sent data. We reserve that for data blocks. if bp.Type == pb.Message_Have { l.wantList.RemoveType(bp.Cid, pb.Message_Wantlist_Have) } diff --git a/internal/messagequeue/messagequeue.go b/internal/messagequeue/messagequeue.go index 5debcd3031bef2ddc8c1317af9109033a4603fcb..daf8664bfb92ffbaf964849bb567807e26e4b4ee 100644 --- a/internal/messagequeue/messagequeue.go +++ b/internal/messagequeue/messagequeue.go @@ -466,7 +466,7 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) { func (mq *MessageQueue) logOutgoingMessage(wantlist []bsmsg.Entry) { // Save some CPU cycles and allocations if log level is higher than debug - if ce := sflog.Check(zap.DebugLevel, "Bitswap -> send wants"); ce == nil { + if ce := sflog.Check(zap.DebugLevel, "sent message"); ce == nil { return } @@ -474,15 +474,35 @@ func (mq *MessageQueue) logOutgoingMessage(wantlist []bsmsg.Entry) { for _, e := range wantlist { if e.Cancel { if e.WantType == pb.Message_Wantlist_Have { - log.Debugw("Bitswap -> cancel-have", "local", self, "to", mq.p, "cid", e.Cid) + log.Debugw("sent message", + "type", "CANCEL_WANT_HAVE", + "cid", e.Cid, + "local", self, + "to", mq.p, + ) } else { - log.Debugw("Bitswap -> cancel-block", "local", self, "to", mq.p, "cid", e.Cid) + log.Debugw("sent message", + "type", "CANCEL_WANT_BLOCK", + "cid", e.Cid, + "local", self, + "to", mq.p, + ) } } else { if e.WantType == pb.Message_Wantlist_Have { - log.Debugw("Bitswap -> want-have", "local", self, "to", mq.p, "cid", e.Cid) + log.Debugw("sent message", + "type", "WANT_HAVE", + "cid", e.Cid, + "local", self, + "to", mq.p, + ) } else { - log.Debugw("Bitswap -> want-block", "local", self, "to", mq.p, "cid", e.Cid) + log.Debugw("sent message", + "type", "WANT_BLOCK", + "cid", e.Cid, + "local", self, + "to", mq.p, + ) } } } diff --git a/workers.go b/workers.go index 04dc2757b089b52f3d28ef249641f0429b38cdaf..208c02bffc2c298a2a6c64bb232ab128bd06980f 100644 --- a/workers.go +++ b/workers.go @@ -5,11 +5,11 @@ import ( "fmt" engine "github.com/ipfs/go-bitswap/internal/decision" - bsmsg "github.com/ipfs/go-bitswap/message" pb "github.com/ipfs/go-bitswap/message/pb" cid "github.com/ipfs/go-cid" process "github.com/jbenet/goprocess" procctx "github.com/jbenet/goprocess/context" + "go.uber.org/zap" ) // TaskWorkerCount is the total number of simultaneous threads sending @@ -52,29 +52,11 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { continue } - // update the BS ledger to reflect sent message - // TODO: Should only track *useful* messages in ledger - outgoing := bsmsg.New(false) - for _, block := range envelope.Message.Blocks() { - log.Debugw("Bitswap.TaskWorker.Work", - "Target", envelope.Peer, - "Block", block.Cid(), - ) - outgoing.AddBlock(block) - } - for _, blockPresence := range envelope.Message.BlockPresences() { - outgoing.AddBlockPresence(blockPresence.Cid, blockPresence.Type) - } // TODO: Only record message as sent if there was no error? - bs.engine.MessageSent(envelope.Peer, outgoing) - + // Ideally, yes. But we'd need some way to trigger a retry and/or drop + // the peer. + bs.engine.MessageSent(envelope.Peer, envelope.Message) bs.sendBlocks(ctx, envelope) - bs.counterLk.Lock() - for _, block := range envelope.Message.Blocks() { - bs.counters.blocksSent++ - bs.counters.dataSent += uint64(len(block.RawData())) - } - bs.counterLk.Unlock() case <-ctx.Done(): return } @@ -84,41 +66,72 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { } } -func (bs *Bitswap) sendBlocks(ctx context.Context, env *engine.Envelope) { - // Blocks need to be sent synchronously to maintain proper backpressure - // throughout the network stack - defer env.Sent() +func (bs *Bitswap) logOutgoingBlocks(env *engine.Envelope) { + if ce := sflog.Check(zap.DebugLevel, "sent message"); ce == nil { + return + } - msgSize := 0 - msg := bsmsg.New(false) + self := bs.network.Self() for _, blockPresence := range env.Message.BlockPresences() { c := blockPresence.Cid switch blockPresence.Type { case pb.Message_Have: - log.Infof("Sending HAVE %s to %s", c.String()[2:8], env.Peer) + log.Debugw("sent message", + "type", "HAVE", + "cid", c, + "local", self, + "to", env.Peer, + ) case pb.Message_DontHave: - log.Infof("Sending DONT_HAVE %s to %s", c.String()[2:8], env.Peer) + log.Debugw("sent message", + "type", "DONT_HAVE", + "cid", c, + "local", self, + "to", env.Peer, + ) default: panic(fmt.Sprintf("unrecognized BlockPresence type %v", blockPresence.Type)) } - msgSize += bsmsg.BlockPresenceSize(c) - msg.AddBlockPresence(c, blockPresence.Type) } for _, block := range env.Message.Blocks() { - msgSize += len(block.RawData()) - msg.AddBlock(block) - log.Infof("Sending block %s to %s", block, env.Peer) + log.Debugw("sent message", + "type", "BLOCK", + "cid", block.Cid(), + "local", self, + "to", env.Peer, + ) } +} - bs.sentHistogram.Observe(float64(msgSize)) - err := bs.network.SendMessage(ctx, env.Peer, msg) +func (bs *Bitswap) sendBlocks(ctx context.Context, env *engine.Envelope) { + // Blocks need to be sent synchronously to maintain proper backpressure + // throughout the network stack + defer env.Sent() + + err := bs.network.SendMessage(ctx, env.Peer, env.Message) if err != nil { - // log.Infof("sendblock error: %s", err) - log.Errorf("SendMessage error: %s. size: %d. block-presence length: %d", err, msg.Size(), len(env.Message.BlockPresences())) + log.Debugw("failed to send blocks message", + "peer", env.Peer, + "error", err, + ) + return + } + + bs.logOutgoingBlocks(env) + + dataSent := 0 + blocks := env.Message.Blocks() + for _, b := range blocks { + dataSent += len(b.RawData()) } - log.Infof("Sent message to %s", env.Peer) + bs.counterLk.Lock() + bs.counters.blocksSent += uint64(len(blocks)) + bs.counters.dataSent += uint64(dataSent) + bs.counterLk.Unlock() + bs.sentHistogram.Observe(float64(env.Message.Size())) + log.Debugw("sent message", "peer", env.Peer) } func (bs *Bitswap) provideWorker(px process.Process) {