Commit d2cb0fee authored by Dirk McCormick's avatar Dirk McCormick

refactor: reuse message queue message for perf

parent 2b839164
......@@ -179,6 +179,9 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
rebroadcastInterval: defaultRebroadcastInterval,
sendErrorBackoff: sendErrorBackoff,
priority: maxPriority,
// For performance reasons we just clear out the fields of the message
// after using it, instead of creating a new one every time.
msg: bsmsg.New(false),
}
return mq
......@@ -402,19 +405,23 @@ func (mq *MessageQueue) sendMessage() {
mq.dhTimeoutMgr.Start()
// Convert want lists to a Bitswap Message
message, onSent := mq.extractOutgoingMessage(mq.sender.SupportsHave())
if message == nil || message.Empty() {
message := mq.extractOutgoingMessage(mq.sender.SupportsHave())
// After processing the message, clear out its fields to save memory
defer mq.msg.Reset(false)
if message.Empty() {
return
}
mq.logOutgoingMessage(message)
wantlist := message.Wantlist()
mq.logOutgoingMessage(wantlist)
// Try to send this message repeatedly
for i := 0; i < maxRetries; i++ {
if mq.attemptSendAndRecovery(message) {
// We were able to send successfully.
wantlist := message.Wantlist()
onSent(wantlist)
mq.onMessageSent(wantlist)
mq.simulateDontHaveWithTimeout(wantlist)
......@@ -457,15 +464,14 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) {
mq.dhTimeoutMgr.AddPending(wants)
}
func (mq *MessageQueue) logOutgoingMessage(msg bsmsg.BitSwapMessage) {
func (mq *MessageQueue) logOutgoingMessage(wantlist []bsmsg.Entry) {
// Save some CPU cycles and allocations if log level is higher than debug
if ce := sflog.Check(zap.DebugLevel, "Bitswap -> send wants"); ce == nil {
return
}
self := mq.network.Self()
entries := msg.Wantlist()
for _, e := range entries {
for _, e := range wantlist {
if e.Cancel {
if e.WantType == pb.Message_Wantlist_Have {
log.Debugw("Bitswap -> cancel-have", "local", self, "to", mq.p, "cid", e.Cid)
......@@ -493,16 +499,7 @@ func (mq *MessageQueue) pendingWorkCount() int {
return mq.bcstWants.pending.Len() + mq.peerWants.pending.Len() + mq.cancels.Len()
}
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func([]bsmsg.Entry)) {
// For performance reasons we just clear out the fields of the message
// instead of creating a new one every time.
if mq.msg == nil {
// Create a new message
mq.msg = bsmsg.New(false)
} else {
// If there's already a message, reset it
mq.msg.Reset(false)
}
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) bsmsg.BitSwapMessage {
msg := mq.msg
mq.wllock.Lock()
......@@ -554,19 +551,19 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
mq.cancels.Remove(c)
}
// Called when the message has been successfully sent.
return msg
}
// Called when the message has been successfully sent.
func (mq *MessageQueue) onMessageSent(wantlist []bsmsg.Entry) {
// Remove the sent keys from the broadcast and regular wantlists.
onSent := func(wantlist []bsmsg.Entry) {
mq.wllock.Lock()
defer mq.wllock.Unlock()
mq.wllock.Lock()
defer mq.wllock.Unlock()
for _, e := range wantlist {
mq.bcstWants.pending.Remove(e.Cid)
mq.peerWants.pending.RemoveType(e.Cid, e.WantType)
}
for _, e := range wantlist {
mq.bcstWants.pending.Remove(e.Cid)
mq.peerWants.pending.RemoveType(e.Cid, e.WantType)
}
return msg, onSent
}
func (mq *MessageQueue) initializeSender() error {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment