Commit e9862947 authored by Dirk McCormick's avatar Dirk McCormick

perf: improve extractOutgoingMessage() performance

parent cac64200
......@@ -75,6 +75,9 @@ type MessageQueue struct {
rebroadcastIntervalLk sync.RWMutex
rebroadcastInterval time.Duration
rebroadcastTimer *time.Timer
// For performance reasons we just clear out the fields of the message
// instead of creating a new one every time.
msg bsmsg.BitSwapMessage
}
// recallWantlist keeps a list of pending wants, and a list of all wants that
......@@ -410,9 +413,10 @@ func (mq *MessageQueue) sendMessage() {
for i := 0; i < maxRetries; i++ {
if mq.attemptSendAndRecovery(message) {
// We were able to send successfully.
onSent()
wantlist := message.Wantlist()
onSent(wantlist)
mq.simulateDontHaveWithTimeout(message)
mq.simulateDontHaveWithTimeout(wantlist)
// If the message was too big and only a subset of wants could be
// sent, schedule sending the rest of the wants in the next
......@@ -430,12 +434,12 @@ func (mq *MessageQueue) sendMessage() {
// This is necessary when making requests to peers running an older version of
// Bitswap that doesn't support the DONT_HAVE response, and is also useful to
// mitigate getting blocked by a peer that takes a long time to respond.
func (mq *MessageQueue) simulateDontHaveWithTimeout(msg bsmsg.BitSwapMessage) {
mq.wllock.Lock()
func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) {
// Get the CID of each want-block that expects a DONT_HAVE response
wantlist := msg.Wantlist()
wants := make([]cid.Cid, 0, len(wantlist))
mq.wllock.Lock()
for _, entry := range wantlist {
if entry.WantType == pb.Message_Wantlist_Block && entry.SendDontHave {
// Unlikely, but just in case check that the block hasn't been
......@@ -489,9 +493,17 @@ func (mq *MessageQueue) pendingWorkCount() int {
return mq.bcstWants.pending.Len() + mq.peerWants.pending.Len() + mq.cancels.Len()
}
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) {
// Create a new message
msg := bsmsg.New(false)
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func([]bsmsg.Entry)) {
// For performance reasons we just clear out the fields of the message
// instead of creating a new one every time.
if mq.msg == nil {
// Create a new message
mq.msg = bsmsg.New(false)
} else {
// If there's already a message, reset it
mq.msg.Reset(false)
}
msg := mq.msg
mq.wllock.Lock()
defer mq.wllock.Unlock()
......@@ -544,11 +556,11 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
// Called when the message has been successfully sent.
// Remove the sent keys from the broadcast and regular wantlists.
onSent := func() {
onSent := func(wantlist []bsmsg.Entry) {
mq.wllock.Lock()
defer mq.wllock.Unlock()
for _, e := range msg.Wantlist() {
for _, e := range wantlist {
mq.bcstWants.pending.Remove(e.Cid)
mq.peerWants.pending.RemoveType(e.Cid, e.WantType)
}
......
......@@ -65,6 +65,9 @@ type BitSwapMessage interface {
Exportable
Loggable() map[string]interface{}
// Reset the values in the message back to defaults, so it can be reused
Reset(bool)
}
// Exportable is an interface for structures than can be
......@@ -85,6 +88,33 @@ type BlockPresence struct {
Type pb.Message_BlockPresenceType
}
// Entry is a wantlist entry in a Bitswap message, with flags indicating
// - whether message is a cancel
// - whether requester wants a DONT_HAVE message
// - whether requester wants a HAVE message (instead of the block)
type Entry struct {
wantlist.Entry
Cancel bool
SendDontHave bool
}
// Get the size of the entry on the wire
func (e *Entry) Size() int {
epb := e.ToPB()
return epb.Size()
}
// Get the entry in protobuf form
func (e *Entry) ToPB() pb.Message_Wantlist_Entry {
return pb.Message_Wantlist_Entry{
Block: pb.Cid{Cid: e.Cid},
Priority: int32(e.Priority),
Cancel: e.Cancel,
WantType: e.WantType,
SendDontHave: e.SendDontHave,
}
}
type impl struct {
full bool
wantlist map[cid.Cid]*Entry
......@@ -107,14 +137,19 @@ func newMsg(full bool) *impl {
}
}
// Entry is a wantlist entry in a Bitswap message, with flags indicating
// - whether message is a cancel
// - whether requester wants a DONT_HAVE message
// - whether requester wants a HAVE message (instead of the block)
type Entry struct {
wantlist.Entry
Cancel bool
SendDontHave bool
// Reset the values in the message back to defaults, so it can be reused
func (m *impl) Reset(full bool) {
m.full = full
for k := range m.wantlist {
delete(m.wantlist, k)
}
for k := range m.blocks {
delete(m.blocks, k)
}
for k := range m.blockPresences {
delete(m.blockPresences, k)
}
m.pendingBytes = 0
}
var errCidMissing = errors.New("missing cid")
......@@ -267,8 +302,7 @@ func (m *impl) addEntry(c cid.Cid, priority int32, cancel bool, wantType pb.Mess
}
m.wantlist[c] = e
aspb := entryToPB(e)
return aspb.Size()
return e.Size()
}
func (m *impl) AddBlock(b blocks.Block) {
......@@ -300,8 +334,7 @@ func (m *impl) Size() int {
size += BlockPresenceSize(c)
}
for _, e := range m.wantlist {
epb := entryToPB(e)
size += epb.Size()
size += e.Size()
}
return size
......@@ -337,21 +370,11 @@ func FromMsgReader(r msgio.Reader) (BitSwapMessage, error) {
return newMessageFromProto(pb)
}
func entryToPB(e *Entry) pb.Message_Wantlist_Entry {
return pb.Message_Wantlist_Entry{
Block: pb.Cid{Cid: e.Cid},
Priority: int32(e.Priority),
Cancel: e.Cancel,
WantType: e.WantType,
SendDontHave: e.SendDontHave,
}
}
func (m *impl) ToProtoV0() *pb.Message {
pbm := new(pb.Message)
pbm.Wantlist.Entries = make([]pb.Message_Wantlist_Entry, 0, len(m.wantlist))
for _, e := range m.wantlist {
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, entryToPB(e))
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, e.ToPB())
}
pbm.Wantlist.Full = m.full
......@@ -367,7 +390,7 @@ func (m *impl) ToProtoV1() *pb.Message {
pbm := new(pb.Message)
pbm.Wantlist.Entries = make([]pb.Message_Wantlist_Entry, 0, len(m.wantlist))
for _, e := range m.wantlist {
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, entryToPB(e))
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, e.ToPB())
}
pbm.Wantlist.Full = m.full
......
......@@ -2,9 +2,12 @@ package message
import (
"bytes"
"fmt"
"testing"
pb "github.com/ipfs/go-bitswap/message/pb"
"github.com/ipfs/go-bitswap/wantlist"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
......@@ -289,3 +292,23 @@ func TestAddWantlistEntry(t *testing.T) {
t.Fatal("want should not override cancel")
}
}
func TestEntrySize(t *testing.T) {
blockGenerator := blocksutil.NewBlockGenerator()
c := blockGenerator.Next().Cid()
e := Entry{
Entry: wantlist.Entry{
Cid: c,
Priority: 10,
WantType: pb.Message_Wantlist_Have,
},
SendDontHave: true,
Cancel: false,
}
fmt.Println(len(c.Bytes()))
fmt.Println(len(c.KeyString()))
epb := e.ToPB()
if e.Size() != epb.Size() {
t.Fatal("entry size calculation incorrect", e.Size(), epb.Size())
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment