Unverified Commit 89d39a65 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #307 from ipfs/perf/message-queue

Perf/message queue
parents 03e6d1f0 b4763e26
......@@ -75,6 +75,9 @@ type MessageQueue struct {
rebroadcastIntervalLk sync.RWMutex
rebroadcastInterval time.Duration
rebroadcastTimer *time.Timer
// For performance reasons we just clear out the fields of the message
// instead of creating a new one every time.
msg bsmsg.BitSwapMessage
}
// recallWantlist keeps a list of pending wants, and a list of all wants that
......@@ -176,6 +179,9 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
rebroadcastInterval: defaultRebroadcastInterval,
sendErrorBackoff: sendErrorBackoff,
priority: maxPriority,
// For performance reasons we just clear out the fields of the message
// after using it, instead of creating a new one every time.
msg: bsmsg.New(false),
}
return mq
......@@ -399,20 +405,25 @@ func (mq *MessageQueue) sendMessage() {
mq.dhTimeoutMgr.Start()
// Convert want lists to a Bitswap Message
message, onSent := mq.extractOutgoingMessage(mq.sender.SupportsHave())
if message == nil || message.Empty() {
message := mq.extractOutgoingMessage(mq.sender.SupportsHave())
// After processing the message, clear out its fields to save memory
defer mq.msg.Reset(false)
if message.Empty() {
return
}
mq.logOutgoingMessage(message)
wantlist := message.Wantlist()
mq.logOutgoingMessage(wantlist)
// Try to send this message repeatedly
for i := 0; i < maxRetries; i++ {
if mq.attemptSendAndRecovery(message) {
// We were able to send successfully.
onSent()
mq.onMessageSent(wantlist)
mq.simulateDontHaveWithTimeout(message)
mq.simulateDontHaveWithTimeout(wantlist)
// If the message was too big and only a subset of wants could be
// sent, schedule sending the rest of the wants in the next
......@@ -430,12 +441,12 @@ func (mq *MessageQueue) sendMessage() {
// This is necessary when making requests to peers running an older version of
// Bitswap that doesn't support the DONT_HAVE response, and is also useful to
// mitigate getting blocked by a peer that takes a long time to respond.
func (mq *MessageQueue) simulateDontHaveWithTimeout(msg bsmsg.BitSwapMessage) {
mq.wllock.Lock()
func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) {
// Get the CID of each want-block that expects a DONT_HAVE response
wantlist := msg.Wantlist()
wants := make([]cid.Cid, 0, len(wantlist))
mq.wllock.Lock()
for _, entry := range wantlist {
if entry.WantType == pb.Message_Wantlist_Block && entry.SendDontHave {
// Unlikely, but just in case check that the block hasn't been
......@@ -453,15 +464,14 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(msg bsmsg.BitSwapMessage) {
mq.dhTimeoutMgr.AddPending(wants)
}
func (mq *MessageQueue) logOutgoingMessage(msg bsmsg.BitSwapMessage) {
func (mq *MessageQueue) logOutgoingMessage(wantlist []bsmsg.Entry) {
// Save some CPU cycles and allocations if log level is higher than debug
if ce := sflog.Check(zap.DebugLevel, "Bitswap -> send wants"); ce == nil {
return
}
self := mq.network.Self()
entries := msg.Wantlist()
for _, e := range entries {
for _, e := range wantlist {
if e.Cancel {
if e.WantType == pb.Message_Wantlist_Have {
log.Debugw("Bitswap -> cancel-have", "local", self, "to", mq.p, "cid", e.Cid)
......@@ -478,10 +488,12 @@ func (mq *MessageQueue) logOutgoingMessage(msg bsmsg.BitSwapMessage) {
}
}
// Whether there is work to be processed
func (mq *MessageQueue) hasPendingWork() bool {
return mq.pendingWorkCount() > 0
}
// The amount of work that is waiting to be processed
func (mq *MessageQueue) pendingWorkCount() int {
mq.wllock.Lock()
defer mq.wllock.Unlock()
......@@ -489,10 +501,8 @@ func (mq *MessageQueue) pendingWorkCount() int {
return mq.bcstWants.pending.Len() + mq.peerWants.pending.Len() + mq.cancels.Len()
}
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) {
// Create a new message
msg := bsmsg.New(false)
// Convert the lists of wants into a Bitswap message
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) bsmsg.BitSwapMessage {
mq.wllock.Lock()
defer mq.wllock.Unlock()
......@@ -515,7 +525,7 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
}
e := bcstEntries[i]
msgSize += msg.AddEntry(e.Cid, e.Priority, wantType, false)
msgSize += mq.msg.AddEntry(e.Cid, e.Priority, wantType, false)
}
// Add each regular want-have / want-block to the message
......@@ -526,7 +536,7 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
if !supportsHave && e.WantType == pb.Message_Wantlist_Have {
mq.peerWants.RemoveType(e.Cid, pb.Message_Wantlist_Have)
} else {
msgSize += msg.AddEntry(e.Cid, e.Priority, e.WantType, true)
msgSize += mq.msg.AddEntry(e.Cid, e.Priority, e.WantType, true)
}
}
......@@ -535,26 +545,26 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
for i := 0; i < len(cancels) && msgSize < mq.maxMessageSize; i++ {
c := cancels[i]
msgSize += msg.Cancel(c)
msgSize += mq.msg.Cancel(c)
// Clear the cancel - we make a best effort to let peers know about
// cancels but won't save them to resend if there's a failure.
mq.cancels.Remove(c)
}
// Called when the message has been successfully sent.
return mq.msg
}
// Called when the message has been successfully sent.
func (mq *MessageQueue) onMessageSent(wantlist []bsmsg.Entry) {
// Remove the sent keys from the broadcast and regular wantlists.
onSent := func() {
mq.wllock.Lock()
defer mq.wllock.Unlock()
mq.wllock.Lock()
defer mq.wllock.Unlock()
for _, e := range msg.Wantlist() {
mq.bcstWants.pending.Remove(e.Cid)
mq.peerWants.pending.RemoveType(e.Cid, e.WantType)
}
for _, e := range wantlist {
mq.bcstWants.pending.Remove(e.Cid)
mq.peerWants.pending.RemoveType(e.Cid, e.WantType)
}
return msg, onSent
}
func (mq *MessageQueue) initializeSender() error {
......
......@@ -4,16 +4,18 @@ import (
"context"
"errors"
"fmt"
"math"
"math/rand"
"sync"
"testing"
"time"
"github.com/ipfs/go-bitswap/internal/testutil"
"github.com/ipfs/go-bitswap/message"
pb "github.com/ipfs/go-bitswap/message/pb"
cid "github.com/ipfs/go-cid"
bsmsg "github.com/ipfs/go-bitswap/message"
pb "github.com/ipfs/go-bitswap/message/pb"
bsnet "github.com/ipfs/go-bitswap/network"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
......@@ -84,13 +86,13 @@ type fakeMessageSender struct {
sendError error
fullClosed chan<- struct{}
reset chan<- struct{}
messagesSent chan<- bsmsg.BitSwapMessage
messagesSent chan<- []bsmsg.Entry
sendErrors chan<- error
supportsHave bool
}
func newFakeMessageSender(sendError error, fullClosed chan<- struct{}, reset chan<- struct{},
messagesSent chan<- bsmsg.BitSwapMessage, sendErrors chan<- error, supportsHave bool) *fakeMessageSender {
messagesSent chan<- []bsmsg.Entry, sendErrors chan<- error, supportsHave bool) *fakeMessageSender {
return &fakeMessageSender{
sendError: sendError,
......@@ -110,7 +112,7 @@ func (fms *fakeMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMess
fms.sendErrors <- fms.sendError
return fms.sendError
}
fms.messagesSent <- msg
fms.messagesSent <- msg.Wantlist()
return nil
}
func (fms *fakeMessageSender) clearSendError() {
......@@ -127,9 +129,9 @@ func mockTimeoutCb(peer.ID, []cid.Cid) {}
func collectMessages(ctx context.Context,
t *testing.T,
messagesSent <-chan bsmsg.BitSwapMessage,
timeout time.Duration) []bsmsg.BitSwapMessage {
var messagesReceived []bsmsg.BitSwapMessage
messagesSent <-chan []bsmsg.Entry,
timeout time.Duration) [][]bsmsg.Entry {
var messagesReceived [][]bsmsg.Entry
timeoutctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
for {
......@@ -142,17 +144,17 @@ func collectMessages(ctx context.Context,
}
}
func totalEntriesLength(messages []bsmsg.BitSwapMessage) int {
func totalEntriesLength(messages [][]bsmsg.Entry) int {
totalLength := 0
for _, messages := range messages {
totalLength += len(messages.Wantlist())
for _, m := range messages {
totalLength += len(m)
}
return totalLength
}
func TestStartupAndShutdown(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan bsmsg.BitSwapMessage)
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
......@@ -170,10 +172,10 @@ func TestStartupAndShutdown(t *testing.T) {
}
firstMessage := messages[0]
if len(firstMessage.Wantlist()) != len(bcstwh) {
if len(firstMessage) != len(bcstwh) {
t.Fatal("did not add all wants to want list")
}
for _, entry := range firstMessage.Wantlist() {
for _, entry := range firstMessage {
if entry.Cancel {
t.Fatal("initial add sent cancel entry when it should not have")
}
......@@ -194,7 +196,7 @@ func TestStartupAndShutdown(t *testing.T) {
func TestSendingMessagesDeduped(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan bsmsg.BitSwapMessage)
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
......@@ -217,7 +219,7 @@ func TestSendingMessagesDeduped(t *testing.T) {
func TestSendingMessagesPartialDupe(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan bsmsg.BitSwapMessage)
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
......@@ -240,7 +242,7 @@ func TestSendingMessagesPartialDupe(t *testing.T) {
func TestSendingMessagesPriority(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan bsmsg.BitSwapMessage)
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
......@@ -264,7 +266,7 @@ func TestSendingMessagesPriority(t *testing.T) {
t.Fatal("wrong number of wants")
}
byCid := make(map[cid.Cid]message.Entry)
for _, entry := range messages[0].Wantlist() {
for _, entry := range messages[0] {
byCid[entry.Cid] = entry
}
......@@ -309,7 +311,7 @@ func TestSendingMessagesPriority(t *testing.T) {
func TestCancelOverridesPendingWants(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan bsmsg.BitSwapMessage)
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
......@@ -329,7 +331,7 @@ func TestCancelOverridesPendingWants(t *testing.T) {
t.Fatal("Wrong message count")
}
wb, wh, cl := filterWantTypes(messages[0].Wantlist())
wb, wh, cl := filterWantTypes(messages[0])
if len(wb) != 1 || !wb[0].Equals(wantBlocks[1]) {
t.Fatal("Expected 1 want-block")
}
......@@ -343,7 +345,7 @@ func TestCancelOverridesPendingWants(t *testing.T) {
func TestWantOverridesPendingCancels(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan bsmsg.BitSwapMessage)
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
......@@ -362,7 +364,7 @@ func TestWantOverridesPendingCancels(t *testing.T) {
t.Fatal("Wrong message count")
}
wb, wh, cl := filterWantTypes(messages[0].Wantlist())
wb, wh, cl := filterWantTypes(messages[0])
if len(wb) != 1 || !wb[0].Equals(cancels[0]) {
t.Fatal("Expected 1 want-block")
}
......@@ -376,7 +378,7 @@ func TestWantOverridesPendingCancels(t *testing.T) {
func TestWantlistRebroadcast(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan bsmsg.BitSwapMessage)
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
......@@ -398,7 +400,7 @@ func TestWantlistRebroadcast(t *testing.T) {
// All broadcast want-haves should have been sent
firstMessage := messages[0]
if len(firstMessage.Wantlist()) != len(bcstwh) {
if len(firstMessage) != len(bcstwh) {
t.Fatal("wrong number of wants")
}
......@@ -411,7 +413,7 @@ func TestWantlistRebroadcast(t *testing.T) {
// All the want-haves should have been rebroadcast
firstMessage = messages[0]
if len(firstMessage.Wantlist()) != len(bcstwh) {
if len(firstMessage) != len(bcstwh) {
t.Fatal("did not rebroadcast all wants")
}
......@@ -427,7 +429,7 @@ func TestWantlistRebroadcast(t *testing.T) {
// All new wants should have been sent
firstMessage = messages[0]
if len(firstMessage.Wantlist()) != len(wantHaves)+len(wantBlocks) {
if len(firstMessage) != len(wantHaves)+len(wantBlocks) {
t.Fatal("wrong number of wants")
}
......@@ -438,7 +440,7 @@ func TestWantlistRebroadcast(t *testing.T) {
// Both original and new wants should have been rebroadcast
totalWants := len(bcstwh) + len(wantHaves) + len(wantBlocks)
if len(firstMessage.Wantlist()) != totalWants {
if len(firstMessage) != totalWants {
t.Fatal("did not rebroadcast all wants")
}
......@@ -453,10 +455,10 @@ func TestWantlistRebroadcast(t *testing.T) {
// Cancels for each want should have been sent
firstMessage = messages[0]
if len(firstMessage.Wantlist()) != len(cancels) {
if len(firstMessage) != len(cancels) {
t.Fatal("wrong number of cancels")
}
for _, entry := range firstMessage.Wantlist() {
for _, entry := range firstMessage {
if !entry.Cancel {
t.Fatal("expected cancels")
}
......@@ -466,14 +468,14 @@ func TestWantlistRebroadcast(t *testing.T) {
messageQueue.SetRebroadcastInterval(10 * time.Millisecond)
messages = collectMessages(ctx, t, messagesSent, 15*time.Millisecond)
firstMessage = messages[0]
if len(firstMessage.Wantlist()) != totalWants-len(cancels) {
if len(firstMessage) != totalWants-len(cancels) {
t.Fatal("did not rebroadcast all wants")
}
}
func TestSendingLargeMessages(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan bsmsg.BitSwapMessage)
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
......@@ -504,7 +506,7 @@ func TestSendingLargeMessages(t *testing.T) {
func TestSendToPeerThatDoesntSupportHave(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan bsmsg.BitSwapMessage)
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
......@@ -528,7 +530,7 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) {
if len(messages) != 1 {
t.Fatal("wrong number of messages were sent", len(messages))
}
wl := messages[0].Wantlist()
wl := messages[0]
if len(wl) != len(bcwh) {
t.Fatal("wrong number of entries in wantlist", len(wl))
}
......@@ -547,7 +549,7 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) {
if len(messages) != 1 {
t.Fatal("wrong number of messages were sent", len(messages))
}
wl = messages[0].Wantlist()
wl = messages[0]
if len(wl) != len(wbs) {
t.Fatal("should only send want-blocks (no want-haves)", len(wl))
}
......@@ -560,7 +562,7 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) {
func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan bsmsg.BitSwapMessage)
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
......@@ -593,7 +595,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) {
func TestResendAfterError(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan bsmsg.BitSwapMessage)
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
......@@ -632,7 +634,7 @@ func TestResendAfterError(t *testing.T) {
func TestResendAfterMaxRetries(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan bsmsg.BitSwapMessage)
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, maxRetries*2)
fullClosedChan := make(chan struct{}, 1)
......@@ -705,3 +707,60 @@ func filterWantTypes(wantlist []bsmsg.Entry) ([]cid.Cid, []cid.Cid, []cid.Cid) {
}
return wbs, whs, cls
}
// Simplistic benchmark to allow us to simulate conditions on the gateways
func BenchmarkMessageQueue(b *testing.B) {
ctx := context.Background()
createQueue := func() *MessageQueue {
messagesSent := make(chan []bsmsg.Entry)
sendErrors := make(chan error)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(nil, fullClosedChan, resetChan, messagesSent, sendErrors, true)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
dhtm := &fakeDontHaveTimeoutMgr{}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm)
messageQueue.Startup()
go func() {
for {
<-messagesSent
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
}()
return messageQueue
}
// Create a handful of message queues to start with
var qs []*MessageQueue
for i := 0; i < 5; i++ {
qs = append(qs, createQueue())
}
for n := 0; n < b.N; n++ {
// Create a new message queue every 10 ticks
if n%10 == 0 {
qs = append(qs, createQueue())
}
// Pick a random message queue, favoring those created later
qn := len(qs)
i := int(math.Floor(float64(qn) * float64(1-rand.Float32()*rand.Float32())))
if i >= qn { // because of floating point math
i = qn - 1
}
// Alternately add either a few wants or a lot of broadcast wants
if rand.Intn(2) == 0 {
wants := testutil.GenerateCids(10)
qs[i].AddWants(wants[:2], wants[2:])
} else {
wants := testutil.GenerateCids(60)
qs[i].AddBroadcastWantHaves(wants)
}
}
}
......@@ -65,6 +65,12 @@ type BitSwapMessage interface {
Exportable
Loggable() map[string]interface{}
// Reset the values in the message back to defaults, so it can be reused
Reset(bool)
// Clone the message fields
Clone() BitSwapMessage
}
// Exportable is an interface for structures than can be
......@@ -85,6 +91,33 @@ type BlockPresence struct {
Type pb.Message_BlockPresenceType
}
// Entry is a wantlist entry in a Bitswap message, with flags indicating
// - whether message is a cancel
// - whether requester wants a DONT_HAVE message
// - whether requester wants a HAVE message (instead of the block)
type Entry struct {
wantlist.Entry
Cancel bool
SendDontHave bool
}
// Get the size of the entry on the wire
func (e *Entry) Size() int {
epb := e.ToPB()
return epb.Size()
}
// Get the entry in protobuf form
func (e *Entry) ToPB() pb.Message_Wantlist_Entry {
return pb.Message_Wantlist_Entry{
Block: pb.Cid{Cid: e.Cid},
Priority: int32(e.Priority),
Cancel: e.Cancel,
WantType: e.WantType,
SendDontHave: e.SendDontHave,
}
}
type impl struct {
full bool
wantlist map[cid.Cid]*Entry
......@@ -100,21 +133,42 @@ func New(full bool) BitSwapMessage {
func newMsg(full bool) *impl {
return &impl{
full: full,
wantlist: make(map[cid.Cid]*Entry),
blocks: make(map[cid.Cid]blocks.Block),
blockPresences: make(map[cid.Cid]pb.Message_BlockPresenceType),
wantlist: make(map[cid.Cid]*Entry),
full: full,
}
}
// Entry is a wantlist entry in a Bitswap message, with flags indicating
// - whether message is a cancel
// - whether requester wants a DONT_HAVE message
// - whether requester wants a HAVE message (instead of the block)
type Entry struct {
wantlist.Entry
Cancel bool
SendDontHave bool
// Clone the message fields
func (m *impl) Clone() BitSwapMessage {
msg := newMsg(m.full)
for k := range m.wantlist {
msg.wantlist[k] = m.wantlist[k]
}
for k := range m.blocks {
msg.blocks[k] = m.blocks[k]
}
for k := range m.blockPresences {
msg.blockPresences[k] = m.blockPresences[k]
}
msg.pendingBytes = m.pendingBytes
return msg
}
// Reset the values in the message back to defaults, so it can be reused
func (m *impl) Reset(full bool) {
m.full = full
for k := range m.wantlist {
delete(m.wantlist, k)
}
for k := range m.blocks {
delete(m.blocks, k)
}
for k := range m.blockPresences {
delete(m.blockPresences, k)
}
m.pendingBytes = 0
}
var errCidMissing = errors.New("missing cid")
......@@ -267,8 +321,7 @@ func (m *impl) addEntry(c cid.Cid, priority int32, cancel bool, wantType pb.Mess
}
m.wantlist[c] = e
aspb := entryToPB(e)
return aspb.Size()
return e.Size()
}
func (m *impl) AddBlock(b blocks.Block) {
......@@ -300,8 +353,7 @@ func (m *impl) Size() int {
size += BlockPresenceSize(c)
}
for _, e := range m.wantlist {
epb := entryToPB(e)
size += epb.Size()
size += e.Size()
}
return size
......@@ -337,21 +389,11 @@ func FromMsgReader(r msgio.Reader) (BitSwapMessage, error) {
return newMessageFromProto(pb)
}
func entryToPB(e *Entry) pb.Message_Wantlist_Entry {
return pb.Message_Wantlist_Entry{
Block: pb.Cid{Cid: e.Cid},
Priority: int32(e.Priority),
Cancel: e.Cancel,
WantType: e.WantType,
SendDontHave: e.SendDontHave,
}
}
func (m *impl) ToProtoV0() *pb.Message {
pbm := new(pb.Message)
pbm.Wantlist.Entries = make([]pb.Message_Wantlist_Entry, 0, len(m.wantlist))
for _, e := range m.wantlist {
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, entryToPB(e))
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, e.ToPB())
}
pbm.Wantlist.Full = m.full
......@@ -367,7 +409,7 @@ func (m *impl) ToProtoV1() *pb.Message {
pbm := new(pb.Message)
pbm.Wantlist.Entries = make([]pb.Message_Wantlist_Entry, 0, len(m.wantlist))
for _, e := range m.wantlist {
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, entryToPB(e))
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, e.ToPB())
}
pbm.Wantlist.Full = m.full
......
......@@ -5,6 +5,8 @@ import (
"testing"
pb "github.com/ipfs/go-bitswap/message/pb"
"github.com/ipfs/go-bitswap/wantlist"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
......@@ -289,3 +291,21 @@ func TestAddWantlistEntry(t *testing.T) {
t.Fatal("want should not override cancel")
}
}
func TestEntrySize(t *testing.T) {
blockGenerator := blocksutil.NewBlockGenerator()
c := blockGenerator.Next().Cid()
e := Entry{
Entry: wantlist.Entry{
Cid: c,
Priority: 10,
WantType: pb.Message_Wantlist_Have,
},
SendDontHave: true,
Cancel: false,
}
epb := e.ToPB()
if e.Size() != epb.Size() {
t.Fatal("entry size calculation incorrect", e.Size(), epb.Size())
}
}
......@@ -128,6 +128,8 @@ func (n *network) SendMessage(
to peer.ID,
mes bsmsg.BitSwapMessage) error {
mes = mes.Clone()
n.mu.Lock()
defer n.mu.Unlock()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment