Commit e6bf8af3 authored by Dirk McCormick's avatar Dirk McCormick Committed by Steven Allen

fix: mark wants sent when they are added to a message to be sent

parent 8894bb6a
......@@ -422,7 +422,7 @@ func (mq *MessageQueue) sendMessage() {
mq.dhTimeoutMgr.Start()
// Convert want lists to a Bitswap Message
message, onSent := mq.extractOutgoingMessage(mq.sender.SupportsHave())
message := mq.extractOutgoingMessage(mq.sender.SupportsHave())
// After processing the message, clear out its fields to save memory
defer mq.msg.Reset(false)
......@@ -442,9 +442,6 @@ func (mq *MessageQueue) sendMessage() {
return
}
// We were able to send successfully.
onSent()
// Set a timer to wait for responses
mq.simulateDontHaveWithTimeout(wantlist)
......@@ -541,7 +538,7 @@ func (mq *MessageQueue) pendingWorkCount() int {
}
// Convert the lists of wants into a Bitswap message
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) {
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) bsmsg.BitSwapMessage {
mq.wllock.Lock()
defer mq.wllock.Unlock()
......@@ -568,7 +565,6 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
}
// Add each regular want-have / want-block to the message
peerSent := peerEntries[:0]
for _, e := range peerEntries {
if msgSize >= mq.maxMessageSize {
break
......@@ -580,12 +576,13 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
mq.peerWants.RemoveType(e.Cid, pb.Message_Wantlist_Have)
} else {
msgSize += mq.msg.AddEntry(e.Cid, e.Priority, e.WantType, true)
peerSent = append(peerSent, e)
// Move the key from pending to sent
mq.peerWants.MarkSent(e)
}
}
// Add each broadcast want-have to the message
bcstSent := bcstEntries[:0]
for _, e := range bcstEntries {
if msgSize >= mq.maxMessageSize {
break
......@@ -601,24 +598,12 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
}
msgSize += mq.msg.AddEntry(e.Cid, e.Priority, wantType, false)
bcstSent = append(bcstSent, e)
}
// Called when the message has been successfully sent.
onMessageSent := func() {
mq.wllock.Lock()
defer mq.wllock.Unlock()
// Move the keys from pending to sent
for _, e := range bcstSent {
mq.bcstWants.MarkSent(e)
}
for _, e := range peerSent {
mq.peerWants.MarkSent(e)
}
// Move the key from pending to sent
mq.bcstWants.MarkSent(e)
}
return mq.msg, onMessageSent
return mq.msg
}
func (mq *MessageQueue) initializeSender() (bsnet.MessageSender, error) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment