Commit 92e2d247 authored by Jeromy's avatar Jeromy

a little more correctness on the new bitswap impl

parent 36798def
...@@ -128,12 +128,35 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks. ...@@ -128,12 +128,35 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.
promise := bs.notifications.Subscribe(ctx, keys...) promise := bs.notifications.Subscribe(ctx, keys...)
select { select {
case bs.batchRequests <- keys: case bs.batchRequests <- keys:
return promise, nil return pipeBlocks(ctx, promise, len(keys)), nil
case <-ctx.Done(): case <-ctx.Done():
return nil, ctx.Err() return nil, ctx.Err()
} }
} }
func pipeBlocks(ctx context.Context, in <-chan *blocks.Block, count int) <-chan *blocks.Block {
out := make(chan *blocks.Block, 1)
go func() {
defer close(out)
for i := 0; i < count; i++ {
select {
case blk, ok := <-in:
if !ok {
return
}
select {
case out <- blk:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out
}
func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error { func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error {
if peers == nil { if peers == nil {
panic("Cant send wantlist to nil peerchan") panic("Cant send wantlist to nil peerchan")
...@@ -220,7 +243,7 @@ func (bs *bitswap) loop(parent context.Context) { ...@@ -220,7 +243,7 @@ func (bs *bitswap) loop(parent context.Context) {
// HasBlock announces the existance of a block to this bitswap service. The // HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers. // service will potentially notify its peers.
func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
log.Debugf("Has Block %v", blk.Key()) log.Debugf("Has Block %s", blk.Key())
bs.wantlist.Remove(blk.Key()) bs.wantlist.Remove(blk.Key())
bs.sendToPeersThatWant(ctx, blk) bs.sendToPeersThatWant(ctx, blk)
return bs.routing.Provide(ctx, blk.Key()) return bs.routing.Provide(ctx, blk.Key())
...@@ -262,10 +285,6 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm ...@@ -262,10 +285,6 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
} }
} }
message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() {
message.AddWanted(wanted)
}
for _, key := range incoming.Wantlist() { for _, key := range incoming.Wantlist() {
// TODO: might be better to check if we have the block before checking // TODO: might be better to check if we have the block before checking
// if we should send it to someone // if we should send it to someone
...@@ -273,14 +292,22 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm ...@@ -273,14 +292,22 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil { if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
continue continue
} else { } else {
message.AddBlock(block) // Create a separate message to send this block in
blkmsg := bsmsg.New()
// TODO: only send this the first time
for _, k := range bs.wantlist.Keys() {
blkmsg.AddWanted(k)
}
blkmsg.AddBlock(block)
bs.strategy.MessageSent(p, blkmsg)
bs.send(ctx, p, blkmsg)
} }
} }
} }
bs.strategy.MessageSent(p, message) return nil, nil
log.Debug("Returning message.")
return p, message
} }
func (bs *bitswap) ReceiveError(err error) { func (bs *bitswap) ReceiveError(err error) {
......
...@@ -106,7 +106,7 @@ func TestLargeSwarm(t *testing.T) { ...@@ -106,7 +106,7 @@ func TestLargeSwarm(t *testing.T) {
t.SkipNow() t.SkipNow()
} }
t.Parallel() t.Parallel()
numInstances := 500 numInstances := 5
numBlocks := 2 numBlocks := 2
PerformDistributionTest(t, numInstances, numBlocks) PerformDistributionTest(t, numInstances, numBlocks)
} }
......
...@@ -61,6 +61,7 @@ func (l *ledger) ReceivedBytes(n int) { ...@@ -61,6 +61,7 @@ func (l *ledger) ReceivedBytes(n int) {
// TODO: this needs to be different. We need timeouts. // TODO: this needs to be different. We need timeouts.
func (l *ledger) Wants(k u.Key) { func (l *ledger) Wants(k u.Key) {
log.Debugf("peer %s wants %s", l.Partner, k)
l.wantList[k] = struct{}{} l.wantList[k] = struct{}{}
} }
......
...@@ -10,6 +10,8 @@ import ( ...@@ -10,6 +10,8 @@ import (
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
) )
var log = u.Logger("strategy")
// TODO niceness should be on a per-peer basis. Use-case: Certain peers are // TODO niceness should be on a per-peer basis. Use-case: Certain peers are
// "trusted" and/or controlled by a single human user. The user may want for // "trusted" and/or controlled by a single human user. The user may want for
// these peers to exchange data freely // these peers to exchange data freely
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment