Commit 2b699fbc authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

explicitly set bitswap message fullness

parent ef35c2a2
...@@ -288,7 +288,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg ...@@ -288,7 +288,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
bs.dupBlocksRecvd++ bs.dupBlocksRecvd++
} }
bs.counterLk.Unlock() bs.counterLk.Unlock()
log.Debugf("got block %s from %s", block, p) log.Debugf("got block %s from %s (%d,%d)", block, p, bs.blocksRecvd, bs.dupBlocksRecvd)
hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
if err := bs.HasBlock(hasBlockCtx, block); err != nil { if err := bs.HasBlock(hasBlockCtx, block); err != nil {
......
...@@ -163,7 +163,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { ...@@ -163,7 +163,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
} }
for _ = range outch { for _ = range outch {
} }
log.Error("DONE")
}(inst) }(inst)
} }
wg.Wait() wg.Wait()
......
...@@ -41,7 +41,7 @@ func TestConsistentAccounting(t *testing.T) { ...@@ -41,7 +41,7 @@ func TestConsistentAccounting(t *testing.T) {
// Send messages from Ernie to Bert // Send messages from Ernie to Bert
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
m := message.New() m := message.New(false)
content := []string{"this", "is", "message", "i"} content := []string{"this", "is", "message", "i"}
m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " ")))) m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " "))))
...@@ -73,7 +73,7 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) { ...@@ -73,7 +73,7 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {
sanfrancisco := newEngine(ctx, "sf") sanfrancisco := newEngine(ctx, "sf")
seattle := newEngine(ctx, "sea") seattle := newEngine(ctx, "sea")
m := message.New() m := message.New(true)
sanfrancisco.Engine.MessageSent(seattle.Peer, m) sanfrancisco.Engine.MessageSent(seattle.Peer, m)
seattle.Engine.MessageReceived(sanfrancisco.Peer, m) seattle.Engine.MessageReceived(sanfrancisco.Peer, m)
...@@ -164,7 +164,7 @@ func TestPartnerWantsThenCancels(t *testing.T) { ...@@ -164,7 +164,7 @@ func TestPartnerWantsThenCancels(t *testing.T) {
} }
func partnerWants(e *Engine, keys []string, partner peer.ID) { func partnerWants(e *Engine, keys []string, partner peer.ID) {
add := message.New() add := message.New(false)
for i, letter := range keys { for i, letter := range keys {
block := blocks.NewBlock([]byte(letter)) block := blocks.NewBlock([]byte(letter))
add.AddEntry(block.Key(), math.MaxInt32-i) add.AddEntry(block.Key(), math.MaxInt32-i)
...@@ -173,7 +173,7 @@ func partnerWants(e *Engine, keys []string, partner peer.ID) { ...@@ -173,7 +173,7 @@ func partnerWants(e *Engine, keys []string, partner peer.ID) {
} }
func partnerCancels(e *Engine, keys []string, partner peer.ID) { func partnerCancels(e *Engine, keys []string, partner peer.ID) {
cancels := message.New() cancels := message.New(false)
for _, k := range keys { for _, k := range keys {
block := blocks.NewBlock([]byte(k)) block := blocks.NewBlock([]byte(k))
cancels.Cancel(block.Key()) cancels.Cancel(block.Key())
......
...@@ -31,12 +31,7 @@ type BitSwapMessage interface { ...@@ -31,12 +31,7 @@ type BitSwapMessage interface {
Empty() bool Empty() bool
// Sets whether or not the contained wantlist represents the entire wantlist // A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set
// true = full wantlist
// false = wantlist 'patch'
// default: true
SetFull(isFull bool)
Full() bool Full() bool
AddBlock(*blocks.Block) AddBlock(*blocks.Block)
...@@ -56,15 +51,15 @@ type impl struct { ...@@ -56,15 +51,15 @@ type impl struct {
blocks map[u.Key]*blocks.Block blocks map[u.Key]*blocks.Block
} }
func New() BitSwapMessage { func New(full bool) BitSwapMessage {
return newMsg() return newMsg(full)
} }
func newMsg() *impl { func newMsg(full bool) *impl {
return &impl{ return &impl{
blocks: make(map[u.Key]*blocks.Block), blocks: make(map[u.Key]*blocks.Block),
wantlist: make(map[u.Key]Entry), wantlist: make(map[u.Key]Entry),
full: true, full: full,
} }
} }
...@@ -74,8 +69,7 @@ type Entry struct { ...@@ -74,8 +69,7 @@ type Entry struct {
} }
func newMessageFromProto(pbm pb.Message) BitSwapMessage { func newMessageFromProto(pbm pb.Message) BitSwapMessage {
m := newMsg() m := newMsg(pbm.GetWantlist().GetFull())
m.SetFull(pbm.GetWantlist().GetFull())
for _, e := range pbm.GetWantlist().GetEntries() { for _, e := range pbm.GetWantlist().GetEntries() {
m.addEntry(u.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel()) m.addEntry(u.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel())
} }
...@@ -86,10 +80,6 @@ func newMessageFromProto(pbm pb.Message) BitSwapMessage { ...@@ -86,10 +80,6 @@ func newMessageFromProto(pbm pb.Message) BitSwapMessage {
return m return m
} }
func (m *impl) SetFull(full bool) {
m.full = full
}
func (m *impl) Full() bool { func (m *impl) Full() bool {
return m.full return m.full
} }
......
...@@ -13,7 +13,7 @@ import ( ...@@ -13,7 +13,7 @@ import (
func TestAppendWanted(t *testing.T) { func TestAppendWanted(t *testing.T) {
const str = "foo" const str = "foo"
m := New() m := New(true)
m.AddEntry(u.Key(str), 1) m.AddEntry(u.Key(str), 1)
if !wantlistContains(m.ToProto().GetWantlist(), str) { if !wantlistContains(m.ToProto().GetWantlist(), str) {
...@@ -44,7 +44,7 @@ func TestAppendBlock(t *testing.T) { ...@@ -44,7 +44,7 @@ func TestAppendBlock(t *testing.T) {
strs = append(strs, "Celeritas") strs = append(strs, "Celeritas")
strs = append(strs, "Incendia") strs = append(strs, "Incendia")
m := New() m := New(true)
for _, str := range strs { for _, str := range strs {
block := blocks.NewBlock([]byte(str)) block := blocks.NewBlock([]byte(str))
m.AddBlock(block) m.AddBlock(block)
...@@ -61,7 +61,7 @@ func TestAppendBlock(t *testing.T) { ...@@ -61,7 +61,7 @@ func TestAppendBlock(t *testing.T) {
func TestWantlist(t *testing.T) { func TestWantlist(t *testing.T) {
keystrs := []string{"foo", "bar", "baz", "bat"} keystrs := []string{"foo", "bar", "baz", "bat"}
m := New() m := New(true)
for _, s := range keystrs { for _, s := range keystrs {
m.AddEntry(u.Key(s), 1) m.AddEntry(u.Key(s), 1)
} }
...@@ -84,7 +84,7 @@ func TestWantlist(t *testing.T) { ...@@ -84,7 +84,7 @@ func TestWantlist(t *testing.T) {
func TestCopyProtoByValue(t *testing.T) { func TestCopyProtoByValue(t *testing.T) {
const str = "foo" const str = "foo"
m := New() m := New(true)
protoBeforeAppend := m.ToProto() protoBeforeAppend := m.ToProto()
m.AddEntry(u.Key(str), 1) m.AddEntry(u.Key(str), 1)
if wantlistContains(protoBeforeAppend.GetWantlist(), str) { if wantlistContains(protoBeforeAppend.GetWantlist(), str) {
...@@ -93,7 +93,7 @@ func TestCopyProtoByValue(t *testing.T) { ...@@ -93,7 +93,7 @@ func TestCopyProtoByValue(t *testing.T) {
} }
func TestToNetFromNetPreservesWantList(t *testing.T) { func TestToNetFromNetPreservesWantList(t *testing.T) {
original := New() original := New(true)
original.AddEntry(u.Key("M"), 1) original.AddEntry(u.Key("M"), 1)
original.AddEntry(u.Key("B"), 1) original.AddEntry(u.Key("B"), 1)
original.AddEntry(u.Key("D"), 1) original.AddEntry(u.Key("D"), 1)
...@@ -124,7 +124,7 @@ func TestToNetFromNetPreservesWantList(t *testing.T) { ...@@ -124,7 +124,7 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
func TestToAndFromNetMessage(t *testing.T) { func TestToAndFromNetMessage(t *testing.T) {
original := New() original := New(true)
original.AddBlock(blocks.NewBlock([]byte("W"))) original.AddBlock(blocks.NewBlock([]byte("W")))
original.AddBlock(blocks.NewBlock([]byte("E"))) original.AddBlock(blocks.NewBlock([]byte("E")))
original.AddBlock(blocks.NewBlock([]byte("F"))) original.AddBlock(blocks.NewBlock([]byte("F")))
...@@ -172,7 +172,7 @@ func contains(strs []string, x string) bool { ...@@ -172,7 +172,7 @@ func contains(strs []string, x string) bool {
func TestDuplicates(t *testing.T) { func TestDuplicates(t *testing.T) {
b := blocks.NewBlock([]byte("foo")) b := blocks.NewBlock([]byte("foo"))
msg := New() msg := New(true)
msg.AddEntry(b.Key(), 1) msg.AddEntry(b.Key(), 1)
msg.AddEntry(b.Key(), 1) msg.AddEntry(b.Key(), 1)
......
...@@ -31,7 +31,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { ...@@ -31,7 +31,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
fromWaiter peer.ID, fromWaiter peer.ID,
msgFromWaiter bsmsg.BitSwapMessage) { msgFromWaiter bsmsg.BitSwapMessage) {
msgToWaiter := bsmsg.New() msgToWaiter := bsmsg.New(true)
msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr))) msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr)))
waiter.SendMessage(ctx, fromWaiter, msgToWaiter) waiter.SendMessage(ctx, fromWaiter, msgToWaiter)
})) }))
...@@ -55,7 +55,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { ...@@ -55,7 +55,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
} }
})) }))
messageSentAsync := bsmsg.New() messageSentAsync := bsmsg.New(true)
messageSentAsync.AddBlock(blocks.NewBlock([]byte("data"))) messageSentAsync.AddBlock(blocks.NewBlock([]byte("data")))
errSending := waiter.SendMessage( errSending := waiter.SendMessage(
context.Background(), responderPeer.ID(), messageSentAsync) context.Background(), responderPeer.ID(), messageSentAsync)
......
...@@ -2,6 +2,7 @@ package bitswap ...@@ -2,6 +2,7 @@ package bitswap
import ( import (
"sync" "sync"
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision" engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
...@@ -94,9 +95,8 @@ func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { ...@@ -94,9 +95,8 @@ func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) {
// throughout the network stack // throughout the network stack
defer env.Sent() defer env.Sent()
msg := bsmsg.New() msg := bsmsg.New(false)
msg.AddBlock(env.Block) msg.AddBlock(env.Block)
msg.SetFull(false)
err := pm.network.SendMessage(ctx, env.Peer, msg) err := pm.network.SendMessage(ctx, env.Peer, msg)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
...@@ -113,11 +113,10 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { ...@@ -113,11 +113,10 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {
mq := newMsgQueue(p) mq := newMsgQueue(p)
// new peer, we will want to give them our full wantlist // new peer, we will want to give them our full wantlist
fullwantlist := bsmsg.New() fullwantlist := bsmsg.New(true)
for _, e := range pm.wl.Entries() { for _, e := range pm.wl.Entries() {
fullwantlist.AddEntry(e.Key, e.Priority) fullwantlist.AddEntry(e.Key, e.Priority)
} }
fullwantlist.SetFull(true)
mq.out = fullwantlist mq.out = fullwantlist
mq.work <- struct{}{} mq.work <- struct{}{}
...@@ -180,6 +179,7 @@ func (pm *WantManager) Disconnected(p peer.ID) { ...@@ -180,6 +179,7 @@ func (pm *WantManager) Disconnected(p peer.ID) {
// TODO: use goprocess here once i trust it // TODO: use goprocess here once i trust it
func (pm *WantManager) Run() { func (pm *WantManager) Run() {
tock := time.NewTicker(rebroadcastDelay.Get())
for { for {
select { select {
case entries := <-pm.incoming: case entries := <-pm.incoming:
...@@ -198,6 +198,19 @@ func (pm *WantManager) Run() { ...@@ -198,6 +198,19 @@ func (pm *WantManager) Run() {
p.addMessage(entries) p.addMessage(entries)
} }
case <-tock.C:
// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
var es []*bsmsg.Entry
for _, e := range pm.wl.Entries() {
es = append(es, &bsmsg.Entry{Entry: e})
}
for _, p := range pm.peers {
p.outlk.Lock()
p.out = bsmsg.New(true)
p.outlk.Unlock()
p.addMessage(es)
}
case p := <-pm.connect: case p := <-pm.connect:
pm.startPeerHandler(p) pm.startPeerHandler(p)
case p := <-pm.disconnect: case p := <-pm.disconnect:
...@@ -230,7 +243,7 @@ func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) { ...@@ -230,7 +243,7 @@ func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
// if we have no message held, or the one we are given is full // if we have no message held, or the one we are given is full
// overwrite the one we are holding // overwrite the one we are holding
if mq.out == nil { if mq.out == nil {
mq.out = bsmsg.New() mq.out = bsmsg.New(false)
} }
// TODO: add a msg.Combine(...) method // TODO: add a msg.Combine(...) method
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment