Commit 9120d107 authored by Jeromy's avatar Jeromy

a little more correctness on the new bitswap impl

parent a932bfdf
......@@ -98,7 +98,7 @@ func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, er
// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks.Block {
func (s *BlockService) GetBlocks(parent context.Context, ks []u.Key) <-chan *blocks.Block {
out := make(chan *blocks.Block, 32)
go func() {
var toFetch []u.Key
......@@ -112,11 +112,13 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks
out <- block
}
ctx, cancel := context.WithCancel(parent)
nblocks, err := s.Remote.GetBlocks(ctx, toFetch)
if err != nil {
log.Errorf("Error with GetBlocks: %s", err)
return
}
for blk := range nblocks {
out <- blk
}
......
......@@ -128,12 +128,35 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.
promise := bs.notifications.Subscribe(ctx, keys...)
select {
case bs.batchRequests <- keys:
return promise, nil
return pipeBlocks(ctx, promise, len(keys)), nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func pipeBlocks(ctx context.Context, in <-chan *blocks.Block, count int) <-chan *blocks.Block {
out := make(chan *blocks.Block, 1)
go func() {
defer close(out)
for i := 0; i < count; i++ {
select {
case blk, ok := <-in:
if !ok {
return
}
select {
case out <- blk:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out
}
func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error {
if peers == nil {
panic("Cant send wantlist to nil peerchan")
......@@ -220,7 +243,7 @@ func (bs *bitswap) loop(parent context.Context) {
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
log.Debugf("Has Block %v", blk.Key())
log.Debugf("Has Block %s", blk.Key())
bs.wantlist.Remove(blk.Key())
bs.sendToPeersThatWant(ctx, blk)
return bs.routing.Provide(ctx, blk.Key())
......@@ -262,10 +285,6 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
}
}
message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() {
message.AddWanted(wanted)
}
for _, key := range incoming.Wantlist() {
// TODO: might be better to check if we have the block before checking
// if we should send it to someone
......@@ -273,14 +292,22 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
continue
} else {
message.AddBlock(block)
// Create a separate message to send this block in
blkmsg := bsmsg.New()
// TODO: only send this the first time
for _, k := range bs.wantlist.Keys() {
blkmsg.AddWanted(k)
}
blkmsg.AddBlock(block)
bs.strategy.MessageSent(p, blkmsg)
bs.send(ctx, p, blkmsg)
}
}
}
bs.strategy.MessageSent(p, message)
log.Debug("Returning message.")
return p, message
return nil, nil
}
func (bs *bitswap) ReceiveError(err error) {
......
......@@ -106,7 +106,7 @@ func TestLargeSwarm(t *testing.T) {
t.SkipNow()
}
t.Parallel()
numInstances := 500
numInstances := 5
numBlocks := 2
PerformDistributionTest(t, numInstances, numBlocks)
}
......
......@@ -61,6 +61,7 @@ func (l *ledger) ReceivedBytes(n int) {
// TODO: this needs to be different. We need timeouts.
func (l *ledger) Wants(k u.Key) {
log.Debugf("peer %s wants %s", l.Partner, k)
l.wantList[k] = struct{}{}
}
......
......@@ -10,6 +10,8 @@ import (
u "github.com/jbenet/go-ipfs/util"
)
var log = u.Logger("strategy")
// TODO niceness should be on a per-peer basis. Use-case: Certain peers are
// "trusted" and/or controlled by a single human user. The user may want for
// these peers to exchange data freely
......
......@@ -163,6 +163,17 @@ func (n *Node) Multihash() (mh.Multihash, error) {
return n.cached, nil
}
// Searches this nodes links for one to the given key,
// returns the index of said link
func (n *Node) FindLink(k u.Key) (int, error) {
for i, lnk := range n.Links {
if u.Key(lnk.Hash) == k {
return i, nil
}
}
return -1, u.ErrNotFound
}
// Key returns the Multihash as a key, for maps.
func (n *Node) Key() (u.Key, error) {
h, err := n.Multihash()
......@@ -296,6 +307,10 @@ func (ds *dagService) BatchFetch(ctx context.Context, root *Node) <-chan *Node {
var keys []u.Key
nodes := make([]*Node, len(root.Links))
//temp
recvd := []int{}
//
//
next := 0
//
......@@ -306,28 +321,36 @@ func (ds *dagService) BatchFetch(ctx context.Context, root *Node) <-chan *Node {
blkchan := ds.Blocks.GetBlocks(ctx, keys)
count := 0
for blk := range blkchan {
for i, lnk := range root.Links {
if u.Key(lnk.Hash) != blk.Key() {
continue
}
count++
i, err := root.FindLink(blk.Key())
if err != nil {
panic("Received block that wasnt in this nodes links!")
}
nd, err := Decoded(blk.Data)
if err != nil {
log.Error("Got back bad block!")
break
}
nodes[i] = nd
if next == i {
sig <- nd
next++
for ; next < len(nodes) && nodes[next] != nil; next++ {
sig <- nodes[next]
}
recvd = append(recvd, i)
nd, err := Decoded(blk.Data)
if err != nil {
log.Error("Got back bad block!")
break
}
nodes[i] = nd
if next == i {
sig <- nd
next++
for ; next < len(nodes) && nodes[next] != nil; next++ {
sig <- nodes[next]
}
}
}
if next < len(nodes) {
log.Errorf("count = %d, links = %d", count, len(nodes))
log.Error(recvd)
panic("didnt receive all requested blocks!")
}
close(sig)
}()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment