Commit cb3a8bfc authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

Merge pull request #216 from jbenet/fix/bsmsg-duplicates

fix(bitswap/message) duplicate entries
parents e8ec8ce9 42dfc502
......@@ -78,9 +78,8 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
go func() {
message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() {
message.AppendWanted(wanted)
message.AddWanted(wanted)
}
message.AppendWanted(k)
for peerToQuery := range peersToQuery {
log.Debugf("bitswap got peersToQuery: %s", peerToQuery)
go func(p peer.Peer) {
......@@ -168,7 +167,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() {
message.AppendWanted(wanted)
message.AddWanted(wanted)
}
for _, key := range incoming.Wantlist() {
// TODO: might be better to check if we have the block before checking
......@@ -177,7 +176,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
continue
} else {
message.AppendBlock(*block)
message.AddBlock(*block)
}
}
}
......@@ -207,9 +206,9 @@ func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block)
log.Debugf("%v wants %v", p, block.Key())
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
message := bsmsg.New()
message.AppendBlock(block)
message.AddBlock(block)
for _, wanted := range bs.wantlist.Keys() {
message.AppendWanted(wanted)
message.AddWanted(wanted)
}
go bs.send(ctx, p, message)
}
......
......@@ -14,10 +14,25 @@ import (
// TODO move bs/msg/internal/pb to bs/internal/pb and rename pb package to bitswap_pb
type BitSwapMessage interface {
// Wantlist returns a slice of unique keys that represent data wanted by
// the sender.
Wantlist() []u.Key
// Blocks returns a slice of unique blocks
Blocks() []blocks.Block
AppendWanted(k u.Key)
AppendBlock(b blocks.Block)
// AddWanted adds the key to the Wantlist.
//
// Insertion order determines priority. That is, earlier insertions are
// deemed higher priority than keys inserted later.
//
// t = 0, msg.AddWanted(A)
// t = 1, msg.AddWanted(B)
//
// implies Priority(A) > Priority(B)
AddWanted(u.Key)
AddBlock(blocks.Block)
Exportable
}
......@@ -26,44 +41,55 @@ type Exportable interface {
ToNet(p peer.Peer) (nm.NetMessage, error)
}
// message wraps a proto message for convenience
type message struct {
wantlist []u.Key
blocks []blocks.Block
type impl struct {
existsInWantlist map[u.Key]struct{} // map to detect duplicates
wantlist []u.Key // slice to preserve ordering
blocks map[u.Key]blocks.Block // map to detect duplicates
}
func New() *message {
return new(message)
func New() BitSwapMessage {
return &impl{
blocks: make(map[u.Key]blocks.Block),
existsInWantlist: make(map[u.Key]struct{}),
wantlist: make([]u.Key, 0),
}
}
func newMessageFromProto(pbm pb.Message) BitSwapMessage {
m := New()
for _, s := range pbm.GetWantlist() {
m.AppendWanted(u.Key(s))
m.AddWanted(u.Key(s))
}
for _, d := range pbm.GetBlocks() {
b := blocks.NewBlock(d)
m.AppendBlock(*b)
m.AddBlock(*b)
}
return m
}
// TODO(brian): convert these into keys
func (m *message) Wantlist() []u.Key {
func (m *impl) Wantlist() []u.Key {
return m.wantlist
}
// TODO(brian): convert these into blocks
func (m *message) Blocks() []blocks.Block {
return m.blocks
func (m *impl) Blocks() []blocks.Block {
bs := make([]blocks.Block, 0)
for _, block := range m.blocks {
bs = append(bs, block)
}
return bs
}
func (m *message) AppendWanted(k u.Key) {
func (m *impl) AddWanted(k u.Key) {
_, exists := m.existsInWantlist[k]
if exists {
return
}
m.existsInWantlist[k] = struct{}{}
m.wantlist = append(m.wantlist, k)
}
func (m *message) AppendBlock(b blocks.Block) {
m.blocks = append(m.blocks, b)
func (m *impl) AddBlock(b blocks.Block) {
m.blocks[b.Key()] = b
}
func FromNet(nmsg netmsg.NetMessage) (BitSwapMessage, error) {
......@@ -75,7 +101,7 @@ func FromNet(nmsg netmsg.NetMessage) (BitSwapMessage, error) {
return m, nil
}
func (m *message) ToProto() *pb.Message {
func (m *impl) ToProto() *pb.Message {
pb := new(pb.Message)
for _, k := range m.Wantlist() {
pb.Wantlist = append(pb.Wantlist, string(k))
......@@ -86,6 +112,6 @@ func (m *message) ToProto() *pb.Message {
return pb
}
func (m *message) ToNet(p peer.Peer) (nm.NetMessage, error) {
func (m *impl) ToNet(p peer.Peer) (nm.NetMessage, error) {
return nm.FromObject(p, m.ToProto())
}
......@@ -13,7 +13,7 @@ import (
func TestAppendWanted(t *testing.T) {
const str = "foo"
m := New()
m.AppendWanted(u.Key(str))
m.AddWanted(u.Key(str))
if !contains(m.ToProto().GetWantlist(), str) {
t.Fail()
......@@ -42,7 +42,7 @@ func TestAppendBlock(t *testing.T) {
m := New()
for _, str := range strs {
block := blocks.NewBlock([]byte(str))
m.AppendBlock(*block)
m.AddBlock(*block)
}
// assert strings are in proto message
......@@ -58,7 +58,7 @@ func TestWantlist(t *testing.T) {
keystrs := []string{"foo", "bar", "baz", "bat"}
m := New()
for _, s := range keystrs {
m.AppendWanted(u.Key(s))
m.AddWanted(u.Key(s))
}
exported := m.Wantlist()
......@@ -81,7 +81,7 @@ func TestCopyProtoByValue(t *testing.T) {
const str = "foo"
m := New()
protoBeforeAppend := m.ToProto()
m.AppendWanted(u.Key(str))
m.AddWanted(u.Key(str))
if contains(protoBeforeAppend.GetWantlist(), str) {
t.Fail()
}
......@@ -101,11 +101,11 @@ func TestToNetMethodSetsPeer(t *testing.T) {
func TestToNetFromNetPreservesWantList(t *testing.T) {
original := New()
original.AppendWanted(u.Key("M"))
original.AppendWanted(u.Key("B"))
original.AppendWanted(u.Key("D"))
original.AppendWanted(u.Key("T"))
original.AppendWanted(u.Key("F"))
original.AddWanted(u.Key("M"))
original.AddWanted(u.Key("B"))
original.AddWanted(u.Key("D"))
original.AddWanted(u.Key("T"))
original.AddWanted(u.Key("F"))
p := peer.WithIDString("X")
netmsg, err := original.ToNet(p)
......@@ -133,10 +133,10 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
func TestToAndFromNetMessage(t *testing.T) {
original := New()
original.AppendBlock(*blocks.NewBlock([]byte("W")))
original.AppendBlock(*blocks.NewBlock([]byte("E")))
original.AppendBlock(*blocks.NewBlock([]byte("F")))
original.AppendBlock(*blocks.NewBlock([]byte("M")))
original.AddBlock(*blocks.NewBlock([]byte("W")))
original.AddBlock(*blocks.NewBlock([]byte("E")))
original.AddBlock(*blocks.NewBlock([]byte("F")))
original.AddBlock(*blocks.NewBlock([]byte("M")))
p := peer.WithIDString("X")
netmsg, err := original.ToNet(p)
......@@ -169,3 +169,20 @@ func contains(s []string, x string) bool {
}
return false
}
func TestDuplicates(t *testing.T) {
b := blocks.NewBlock([]byte("foo"))
msg := New()
msg.AddWanted(b.Key())
msg.AddWanted(b.Key())
if len(msg.Wantlist()) != 1 {
t.Fatal("Duplicate in BitSwapMessage")
}
msg.AddBlock(*b)
msg.AddBlock(*b)
if len(msg.Blocks()) != 1 {
t.Fatal("Duplicate in BitSwapMessage")
}
}
......@@ -30,7 +30,7 @@ func TestConsistentAccounting(t *testing.T) {
m := message.New()
content := []string{"this", "is", "message", "i"}
m.AppendBlock(*blocks.NewBlock([]byte(strings.Join(content, " "))))
m.AddBlock(*blocks.NewBlock([]byte(strings.Join(content, " "))))
sender.MessageSent(receiver.Peer, m)
receiver.MessageReceived(sender.Peer, m)
......@@ -60,7 +60,7 @@ func TestBlockRecordedAsWantedAfterMessageReceived(t *testing.T) {
block := blocks.NewBlock([]byte("data wanted by beggar"))
messageFromBeggarToChooser := message.New()
messageFromBeggarToChooser.AppendWanted(block.Key())
messageFromBeggarToChooser.AddWanted(block.Key())
chooser.MessageReceived(beggar.Peer, messageFromBeggarToChooser)
// for this test, doesn't matter if you record that beggar sent
......
......@@ -33,7 +33,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
// TODO test contents of incoming message
m := bsmsg.New()
m.AppendBlock(*blocks.NewBlock([]byte(expectedStr)))
m.AddBlock(*blocks.NewBlock([]byte(expectedStr)))
return from, m
}))
......@@ -41,7 +41,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) {
t.Log("Build a message and send a synchronous request to recipient")
message := bsmsg.New()
message.AppendBlock(*blocks.NewBlock([]byte("data")))
message.AddBlock(*blocks.NewBlock([]byte("data")))
response, err := initiator.SendRequest(
context.Background(), peer.WithID(idOfRecipient), message)
if err != nil {
......@@ -77,7 +77,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
peer.Peer, bsmsg.BitSwapMessage) {
msgToWaiter := bsmsg.New()
msgToWaiter.AppendBlock(*blocks.NewBlock([]byte(expectedStr)))
msgToWaiter.AddBlock(*blocks.NewBlock([]byte(expectedStr)))
return fromWaiter, msgToWaiter
}))
......@@ -105,7 +105,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
}))
messageSentAsync := bsmsg.New()
messageSentAsync.AppendBlock(*blocks.NewBlock([]byte("data")))
messageSentAsync.AddBlock(*blocks.NewBlock([]byte("data")))
errSending := waiter.SendMessage(
context.Background(), peer.WithID(idOfResponder), messageSentAsync)
if errSending != nil {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment