Unverified Commit d39c7604 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #326 from ipfs/feat/faster-send-blocks

avoid copying messages and improve logging
parents 288ceffb 8c7bf926
...@@ -37,6 +37,7 @@ import ( ...@@ -37,6 +37,7 @@ import (
) )
var log = logging.Logger("bitswap") var log = logging.Logger("bitswap")
var sflog = log.Desugar()
var _ exchange.SessionExchange = (*Bitswap)(nil) var _ exchange.SessionExchange = (*Bitswap)(nil)
......
...@@ -733,8 +733,7 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) { ...@@ -733,8 +733,7 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
// Remove sent block presences from the want list for the peer // Remove sent block presences from the want list for the peer
for _, bp := range m.BlockPresences() { for _, bp := range m.BlockPresences() {
// TODO: record block presence bytes as well? // Don't record sent data. We reserve that for data blocks.
// l.SentBytes(?)
if bp.Type == pb.Message_Have { if bp.Type == pb.Message_Have {
l.wantList.RemoveType(bp.Cid, pb.Message_Wantlist_Have) l.wantList.RemoveType(bp.Cid, pb.Message_Wantlist_Have)
} }
......
...@@ -466,7 +466,7 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) { ...@@ -466,7 +466,7 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) {
func (mq *MessageQueue) logOutgoingMessage(wantlist []bsmsg.Entry) { func (mq *MessageQueue) logOutgoingMessage(wantlist []bsmsg.Entry) {
// Save some CPU cycles and allocations if log level is higher than debug // Save some CPU cycles and allocations if log level is higher than debug
if ce := sflog.Check(zap.DebugLevel, "Bitswap -> send wants"); ce == nil { if ce := sflog.Check(zap.DebugLevel, "sent message"); ce == nil {
return return
} }
...@@ -474,15 +474,35 @@ func (mq *MessageQueue) logOutgoingMessage(wantlist []bsmsg.Entry) { ...@@ -474,15 +474,35 @@ func (mq *MessageQueue) logOutgoingMessage(wantlist []bsmsg.Entry) {
for _, e := range wantlist { for _, e := range wantlist {
if e.Cancel { if e.Cancel {
if e.WantType == pb.Message_Wantlist_Have { if e.WantType == pb.Message_Wantlist_Have {
log.Debugw("Bitswap -> cancel-have", "local", self, "to", mq.p, "cid", e.Cid) log.Debugw("sent message",
"type", "CANCEL_WANT_HAVE",
"cid", e.Cid,
"local", self,
"to", mq.p,
)
} else { } else {
log.Debugw("Bitswap -> cancel-block", "local", self, "to", mq.p, "cid", e.Cid) log.Debugw("sent message",
"type", "CANCEL_WANT_BLOCK",
"cid", e.Cid,
"local", self,
"to", mq.p,
)
} }
} else { } else {
if e.WantType == pb.Message_Wantlist_Have { if e.WantType == pb.Message_Wantlist_Have {
log.Debugw("Bitswap -> want-have", "local", self, "to", mq.p, "cid", e.Cid) log.Debugw("sent message",
"type", "WANT_HAVE",
"cid", e.Cid,
"local", self,
"to", mq.p,
)
} else { } else {
log.Debugw("Bitswap -> want-block", "local", self, "to", mq.p, "cid", e.Cid) log.Debugw("sent message",
"type", "WANT_BLOCK",
"cid", e.Cid,
"local", self,
"to", mq.p,
)
} }
} }
} }
......
...@@ -5,11 +5,11 @@ import ( ...@@ -5,11 +5,11 @@ import (
"fmt" "fmt"
engine "github.com/ipfs/go-bitswap/internal/decision" engine "github.com/ipfs/go-bitswap/internal/decision"
bsmsg "github.com/ipfs/go-bitswap/message"
pb "github.com/ipfs/go-bitswap/message/pb" pb "github.com/ipfs/go-bitswap/message/pb"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
process "github.com/jbenet/goprocess" process "github.com/jbenet/goprocess"
procctx "github.com/jbenet/goprocess/context" procctx "github.com/jbenet/goprocess/context"
"go.uber.org/zap"
) )
// TaskWorkerCount is the total number of simultaneous threads sending // TaskWorkerCount is the total number of simultaneous threads sending
...@@ -52,29 +52,11 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { ...@@ -52,29 +52,11 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
continue continue
} }
// update the BS ledger to reflect sent message
// TODO: Should only track *useful* messages in ledger
outgoing := bsmsg.New(false)
for _, block := range envelope.Message.Blocks() {
log.Debugw("Bitswap.TaskWorker.Work",
"Target", envelope.Peer,
"Block", block.Cid(),
)
outgoing.AddBlock(block)
}
for _, blockPresence := range envelope.Message.BlockPresences() {
outgoing.AddBlockPresence(blockPresence.Cid, blockPresence.Type)
}
// TODO: Only record message as sent if there was no error? // TODO: Only record message as sent if there was no error?
bs.engine.MessageSent(envelope.Peer, outgoing) // Ideally, yes. But we'd need some way to trigger a retry and/or drop
// the peer.
bs.engine.MessageSent(envelope.Peer, envelope.Message)
bs.sendBlocks(ctx, envelope) bs.sendBlocks(ctx, envelope)
bs.counterLk.Lock()
for _, block := range envelope.Message.Blocks() {
bs.counters.blocksSent++
bs.counters.dataSent += uint64(len(block.RawData()))
}
bs.counterLk.Unlock()
case <-ctx.Done(): case <-ctx.Done():
return return
} }
...@@ -84,41 +66,72 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { ...@@ -84,41 +66,72 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
} }
} }
func (bs *Bitswap) sendBlocks(ctx context.Context, env *engine.Envelope) { func (bs *Bitswap) logOutgoingBlocks(env *engine.Envelope) {
// Blocks need to be sent synchronously to maintain proper backpressure if ce := sflog.Check(zap.DebugLevel, "sent message"); ce == nil {
// throughout the network stack return
defer env.Sent() }
msgSize := 0 self := bs.network.Self()
msg := bsmsg.New(false)
for _, blockPresence := range env.Message.BlockPresences() { for _, blockPresence := range env.Message.BlockPresences() {
c := blockPresence.Cid c := blockPresence.Cid
switch blockPresence.Type { switch blockPresence.Type {
case pb.Message_Have: case pb.Message_Have:
log.Infof("Sending HAVE %s to %s", c.String()[2:8], env.Peer) log.Debugw("sent message",
"type", "HAVE",
"cid", c,
"local", self,
"to", env.Peer,
)
case pb.Message_DontHave: case pb.Message_DontHave:
log.Infof("Sending DONT_HAVE %s to %s", c.String()[2:8], env.Peer) log.Debugw("sent message",
"type", "DONT_HAVE",
"cid", c,
"local", self,
"to", env.Peer,
)
default: default:
panic(fmt.Sprintf("unrecognized BlockPresence type %v", blockPresence.Type)) panic(fmt.Sprintf("unrecognized BlockPresence type %v", blockPresence.Type))
} }
msgSize += bsmsg.BlockPresenceSize(c)
msg.AddBlockPresence(c, blockPresence.Type)
} }
for _, block := range env.Message.Blocks() { for _, block := range env.Message.Blocks() {
msgSize += len(block.RawData()) log.Debugw("sent message",
msg.AddBlock(block) "type", "BLOCK",
log.Infof("Sending block %s to %s", block, env.Peer) "cid", block.Cid(),
"local", self,
"to", env.Peer,
)
} }
}
bs.sentHistogram.Observe(float64(msgSize)) func (bs *Bitswap) sendBlocks(ctx context.Context, env *engine.Envelope) {
err := bs.network.SendMessage(ctx, env.Peer, msg) // Blocks need to be sent synchronously to maintain proper backpressure
// throughout the network stack
defer env.Sent()
err := bs.network.SendMessage(ctx, env.Peer, env.Message)
if err != nil { if err != nil {
// log.Infof("sendblock error: %s", err) log.Debugw("failed to send blocks message",
log.Errorf("SendMessage error: %s. size: %d. block-presence length: %d", err, msg.Size(), len(env.Message.BlockPresences())) "peer", env.Peer,
"error", err,
)
return
}
bs.logOutgoingBlocks(env)
dataSent := 0
blocks := env.Message.Blocks()
for _, b := range blocks {
dataSent += len(b.RawData())
} }
log.Infof("Sent message to %s", env.Peer) bs.counterLk.Lock()
bs.counters.blocksSent += uint64(len(blocks))
bs.counters.dataSent += uint64(dataSent)
bs.counterLk.Unlock()
bs.sentHistogram.Observe(float64(env.Message.Size()))
log.Debugw("sent message", "peer", env.Peer)
} }
func (bs *Bitswap) provideWorker(px process.Process) { func (bs *Bitswap) provideWorker(px process.Process) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment