Commit f96246e1 authored by Brian Tiger Chow's avatar Brian Tiger Chow

test(bitswap) send entire wantlist to peers

fix(bitswap) pass go vet

fixes #97

https://github.com/jbenet/go-ipfs/issues/97
parent 05265fe6
......@@ -2,6 +2,7 @@ package bitswap
import (
"errors"
"sync"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
......@@ -28,6 +29,9 @@ func NetMessageSession(parent context.Context, p *peer.Peer, s bsnet.NetMessageS
strategy: strategy.New(),
routing: directory,
sender: networkAdapter,
wantlist: WantList{
data: make(map[u.Key]struct{}),
},
}
networkAdapter.SetDelegate(bs)
......@@ -53,6 +57,39 @@ type bitswap struct {
// interact with partners.
// TODO(brian): save the strategy's state to the datastore
strategy strategy.Strategy
wantlist WantList
}
type WantList struct {
lock sync.RWMutex
data map[u.Key]struct{}
}
func (wl *WantList) Add(k u.Key) {
u.DOut("Adding %v to Wantlist\n", k.Pretty())
wl.lock.Lock()
defer wl.lock.Unlock()
wl.data[k] = struct{}{}
}
func (wl *WantList) Remove(k u.Key) {
u.DOut("Removing %v from Wantlist\n", k.Pretty())
wl.lock.Lock()
defer wl.lock.Unlock()
delete(wl.data, k)
}
func (wl *WantList) Keys() []u.Key {
wl.lock.RLock()
defer wl.lock.RUnlock()
keys := make([]u.Key, 0)
for k, _ := range wl.data {
keys = append(keys, k)
}
return keys
}
// GetBlock attempts to retrieve a particular block from peers within the
......@@ -60,9 +97,10 @@ type bitswap struct {
//
// TODO ensure only one active request per key
func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) {
u.DOut("Get Block %v\n", k.Pretty())
ctx, cancelFunc := context.WithCancel(parent)
// TODO add to wantlist
bs.wantlist.Add(k)
promise := bs.notifications.Subscribe(ctx, k)
const maxProviders = 20
......@@ -70,6 +108,9 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
go func() {
message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() {
message.AppendWanted(wanted)
}
message.AppendWanted(k)
for iiiii := range peersToQuery {
// u.DOut("bitswap got peersToQuery: %s\n", iiiii)
......@@ -94,6 +135,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
select {
case block := <-promise:
cancelFunc()
bs.wantlist.Remove(k)
// TODO remove from wantlist
return &block, nil
case <-parent.Done():
......@@ -104,6 +146,8 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
// HasBlock announces the existance of a block to bitswap, potentially sending
// it to peers (Partners) whose WantLists include it.
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
u.DOut("Has Block %v\n", blk.Key().Pretty())
bs.wantlist.Remove(blk.Key())
bs.sendToPeersThatWant(ctx, blk)
return bs.routing.Provide(ctx, blk.Key())
}
......@@ -111,6 +155,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
// TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
u.DOut("ReceiveMessage from %v\n", p.Key().Pretty())
if p == nil {
return nil, nil, errors.New("Received nil Peer")
......@@ -132,19 +177,21 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bs
}(block)
}
message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() {
message.AppendWanted(wanted)
}
for _, key := range incoming.Wantlist() {
if bs.strategy.ShouldSendBlockToPeer(key, p) {
block, errBlockNotFound := bs.blockstore.Get(key)
if errBlockNotFound != nil {
return nil, nil, errBlockNotFound
if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
continue
} else {
message.AppendBlock(*block)
}
message := bsmsg.New()
message.AppendBlock(*block)
defer bs.strategy.MessageSent(p, message)
return p, message, nil
}
}
return nil, nil, nil
defer bs.strategy.MessageSent(p, message)
return p, message, nil
}
// send strives to ensure that accounting is always performed when a message is
......@@ -155,11 +202,16 @@ func (bs *bitswap) send(ctx context.Context, p *peer.Peer, m bsmsg.BitSwapMessag
}
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
u.DOut("Sending %v to peers that want it\n", block.Key().Pretty())
for _, p := range bs.strategy.Peers() {
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
u.DOut("%v wants %v\n", p.Key().Pretty(), block.Key().Pretty())
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
message := bsmsg.New()
message.AppendBlock(block)
for _, wanted := range bs.wantlist.Keys() {
message.AppendWanted(wanted)
}
go bs.send(ctx, p, message)
}
}
......
......@@ -16,6 +16,7 @@ import (
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
peer "github.com/jbenet/go-ipfs/peer"
util "github.com/jbenet/go-ipfs/util"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
......@@ -145,7 +146,10 @@ func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGro
wg.Done()
}
// TODO simplify this test. get to the _essence_!
func TestSendToWantingPeer(t *testing.T) {
util.Debug = true
net := tn.VirtualNetwork()
rs := tn.VirtualRoutingServer()
sg := NewSessionGenerator(net, rs)
......@@ -155,48 +159,55 @@ func TestSendToWantingPeer(t *testing.T) {
w := sg.Next()
o := sg.Next()
t.Logf("Session %v\n", me.peer.Key().Pretty())
t.Logf("Session %v\n", w.peer.Key().Pretty())
t.Logf("Session %v\n", o.peer.Key().Pretty())
alpha := bg.Next()
const timeout = 100 * time.Millisecond
const wait = 100 * time.Millisecond
const timeout = 1 * time.Millisecond // FIXME don't depend on time
t.Log("Peer |w| attempts to get a file |alpha|. NB: alpha not available")
t.Logf("Peer %v attempts to get %v. NB: not available\n", w.peer.Key().Pretty(), alpha.Key().Pretty())
ctx, _ := context.WithTimeout(context.Background(), timeout)
_, err := w.exchange.Block(ctx, alpha.Key())
if err == nil {
t.Error("Expected alpha to NOT be available")
t.Fatalf("Expected %v to NOT be available", alpha.Key().Pretty())
}
time.Sleep(wait)
t.Log("Peer |w| announces availability of a file |beta|")
beta := bg.Next()
t.Logf("Peer %v announes availability of %v\n", w.peer.Key().Pretty(), beta.Key().Pretty())
ctx, _ = context.WithTimeout(context.Background(), timeout)
if err := w.blockstore.Put(beta); err != nil {
t.Fatal(err)
}
w.exchange.HasBlock(ctx, beta)
time.Sleep(wait)
t.Log("I request and get |beta| from |w|. In the message, I receive |w|'s wants [alpha]")
t.Log("I don't have alpha, but I keep it on my wantlist.")
t.Logf("%v gets %v from %v and discovers it wants %v\n", me.peer.Key().Pretty(), beta.Key().Pretty(), w.peer.Key().Pretty(), alpha.Key().Pretty())
ctx, _ = context.WithTimeout(context.Background(), timeout)
me.exchange.Block(ctx, beta.Key())
time.Sleep(wait)
if _, err := me.exchange.Block(ctx, beta.Key()); err != nil {
t.Fatal(err)
}
t.Log("Peer |o| announces the availability of |alpha|")
t.Logf("%v announces availability of %v\n", o.peer.Key().Pretty(), alpha.Key().Pretty())
ctx, _ = context.WithTimeout(context.Background(), timeout)
if err := o.blockstore.Put(alpha); err != nil {
t.Fatal(err)
}
o.exchange.HasBlock(ctx, alpha)
time.Sleep(wait)
t.Log("I request |alpha| for myself.")
t.Logf("%v requests %v\n", me.peer.Key().Pretty(), alpha.Key().Pretty())
ctx, _ = context.WithTimeout(context.Background(), timeout)
me.exchange.Block(ctx, alpha.Key())
time.Sleep(wait)
if _, err := me.exchange.Block(ctx, alpha.Key()); err != nil {
t.Fatal(err)
}
t.Log("After receiving |f| from |o|, I send it to the wanting peer |w|")
t.Logf("%v should now have %v\n", w.peer.Key().Pretty(), alpha.Key().Pretty())
block, err := w.blockstore.Get(alpha.Key())
if err != nil {
t.Fatal("Should not have received an error")
}
if block.Key() != alpha.Key() {
t.Error("Expected to receive alpha from me")
t.Fatal("Expected to receive alpha from me")
}
}
......@@ -278,6 +289,9 @@ func session(net tn.Network, rs tn.RoutingServer, id peer.ID) instance {
strategy: strategy.New(),
routing: htc,
sender: adapter,
wantlist: WantList{
data: make(map[util.Key]struct{}),
},
}
adapter.SetDelegate(bs)
return instance{
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment