Commit faee10ef authored by Brian Tiger Chow's avatar Brian Tiger Chow

test(bitswap) send entire wantlist to peers

fix(bitswap) pass go vet

fixes #97

https://github.com/jbenet/go-ipfs/issues/97
parent d514b91f
...@@ -2,6 +2,7 @@ package bitswap ...@@ -2,6 +2,7 @@ package bitswap
import ( import (
"errors" "errors"
"sync"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
...@@ -28,6 +29,9 @@ func NetMessageSession(parent context.Context, p *peer.Peer, s bsnet.NetMessageS ...@@ -28,6 +29,9 @@ func NetMessageSession(parent context.Context, p *peer.Peer, s bsnet.NetMessageS
strategy: strategy.New(), strategy: strategy.New(),
routing: directory, routing: directory,
sender: networkAdapter, sender: networkAdapter,
wantlist: WantList{
data: make(map[u.Key]struct{}),
},
} }
networkAdapter.SetDelegate(bs) networkAdapter.SetDelegate(bs)
...@@ -53,6 +57,39 @@ type bitswap struct { ...@@ -53,6 +57,39 @@ type bitswap struct {
// interact with partners. // interact with partners.
// TODO(brian): save the strategy's state to the datastore // TODO(brian): save the strategy's state to the datastore
strategy strategy.Strategy strategy strategy.Strategy
wantlist WantList
}
type WantList struct {
lock sync.RWMutex
data map[u.Key]struct{}
}
func (wl *WantList) Add(k u.Key) {
u.DOut("Adding %v to Wantlist\n", k.Pretty())
wl.lock.Lock()
defer wl.lock.Unlock()
wl.data[k] = struct{}{}
}
func (wl *WantList) Remove(k u.Key) {
u.DOut("Removing %v from Wantlist\n", k.Pretty())
wl.lock.Lock()
defer wl.lock.Unlock()
delete(wl.data, k)
}
func (wl *WantList) Keys() []u.Key {
wl.lock.RLock()
defer wl.lock.RUnlock()
keys := make([]u.Key, 0)
for k, _ := range wl.data {
keys = append(keys, k)
}
return keys
} }
// GetBlock attempts to retrieve a particular block from peers within the // GetBlock attempts to retrieve a particular block from peers within the
...@@ -60,9 +97,10 @@ type bitswap struct { ...@@ -60,9 +97,10 @@ type bitswap struct {
// //
// TODO ensure only one active request per key // TODO ensure only one active request per key
func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) { func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) {
u.DOut("Get Block %v\n", k.Pretty())
ctx, cancelFunc := context.WithCancel(parent) ctx, cancelFunc := context.WithCancel(parent)
// TODO add to wantlist bs.wantlist.Add(k)
promise := bs.notifications.Subscribe(ctx, k) promise := bs.notifications.Subscribe(ctx, k)
const maxProviders = 20 const maxProviders = 20
...@@ -70,6 +108,9 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) ...@@ -70,6 +108,9 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
go func() { go func() {
message := bsmsg.New() message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() {
message.AppendWanted(wanted)
}
message.AppendWanted(k) message.AppendWanted(k)
for iiiii := range peersToQuery { for iiiii := range peersToQuery {
// u.DOut("bitswap got peersToQuery: %s\n", iiiii) // u.DOut("bitswap got peersToQuery: %s\n", iiiii)
...@@ -94,6 +135,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) ...@@ -94,6 +135,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
select { select {
case block := <-promise: case block := <-promise:
cancelFunc() cancelFunc()
bs.wantlist.Remove(k)
// TODO remove from wantlist // TODO remove from wantlist
return &block, nil return &block, nil
case <-parent.Done(): case <-parent.Done():
...@@ -104,6 +146,8 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) ...@@ -104,6 +146,8 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
// HasBlock announces the existance of a block to bitswap, potentially sending // HasBlock announces the existance of a block to bitswap, potentially sending
// it to peers (Partners) whose WantLists include it. // it to peers (Partners) whose WantLists include it.
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
u.DOut("Has Block %v\n", blk.Key().Pretty())
bs.wantlist.Remove(blk.Key())
bs.sendToPeersThatWant(ctx, blk) bs.sendToPeersThatWant(ctx, blk)
return bs.routing.Provide(ctx, blk.Key()) return bs.routing.Provide(ctx, blk.Key())
} }
...@@ -111,6 +155,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error { ...@@ -111,6 +155,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
// TODO(brian): handle errors // TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) ( func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) { *peer.Peer, bsmsg.BitSwapMessage, error) {
u.DOut("ReceiveMessage from %v\n", p.Key().Pretty())
if p == nil { if p == nil {
return nil, nil, errors.New("Received nil Peer") return nil, nil, errors.New("Received nil Peer")
...@@ -132,19 +177,21 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bs ...@@ -132,19 +177,21 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p *peer.Peer, incoming bs
}(block) }(block)
} }
message := bsmsg.New()
for _, wanted := range bs.wantlist.Keys() {
message.AppendWanted(wanted)
}
for _, key := range incoming.Wantlist() { for _, key := range incoming.Wantlist() {
if bs.strategy.ShouldSendBlockToPeer(key, p) { if bs.strategy.ShouldSendBlockToPeer(key, p) {
block, errBlockNotFound := bs.blockstore.Get(key) if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
if errBlockNotFound != nil { continue
return nil, nil, errBlockNotFound } else {
message.AppendBlock(*block)
} }
message := bsmsg.New()
message.AppendBlock(*block)
defer bs.strategy.MessageSent(p, message)
return p, message, nil
} }
} }
return nil, nil, nil defer bs.strategy.MessageSent(p, message)
return p, message, nil
} }
// send strives to ensure that accounting is always performed when a message is // send strives to ensure that accounting is always performed when a message is
...@@ -155,11 +202,16 @@ func (bs *bitswap) send(ctx context.Context, p *peer.Peer, m bsmsg.BitSwapMessag ...@@ -155,11 +202,16 @@ func (bs *bitswap) send(ctx context.Context, p *peer.Peer, m bsmsg.BitSwapMessag
} }
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) { func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
u.DOut("Sending %v to peers that want it\n", block.Key().Pretty())
for _, p := range bs.strategy.Peers() { for _, p := range bs.strategy.Peers() {
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) { if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
u.DOut("%v wants %v\n", p.Key().Pretty(), block.Key().Pretty())
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) { if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
message := bsmsg.New() message := bsmsg.New()
message.AppendBlock(block) message.AppendBlock(block)
for _, wanted := range bs.wantlist.Keys() {
message.AppendWanted(wanted)
}
go bs.send(ctx, p, message) go bs.send(ctx, p, message)
} }
} }
......
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy" strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
peer "github.com/jbenet/go-ipfs/peer" peer "github.com/jbenet/go-ipfs/peer"
util "github.com/jbenet/go-ipfs/util"
testutil "github.com/jbenet/go-ipfs/util/testutil" testutil "github.com/jbenet/go-ipfs/util/testutil"
) )
...@@ -145,7 +146,10 @@ func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGro ...@@ -145,7 +146,10 @@ func getOrFail(bitswap instance, b *blocks.Block, t *testing.T, wg *sync.WaitGro
wg.Done() wg.Done()
} }
// TODO simplify this test. get to the _essence_!
func TestSendToWantingPeer(t *testing.T) { func TestSendToWantingPeer(t *testing.T) {
util.Debug = true
net := tn.VirtualNetwork() net := tn.VirtualNetwork()
rs := tn.VirtualRoutingServer() rs := tn.VirtualRoutingServer()
sg := NewSessionGenerator(net, rs) sg := NewSessionGenerator(net, rs)
...@@ -155,48 +159,55 @@ func TestSendToWantingPeer(t *testing.T) { ...@@ -155,48 +159,55 @@ func TestSendToWantingPeer(t *testing.T) {
w := sg.Next() w := sg.Next()
o := sg.Next() o := sg.Next()
t.Logf("Session %v\n", me.peer.Key().Pretty())
t.Logf("Session %v\n", w.peer.Key().Pretty())
t.Logf("Session %v\n", o.peer.Key().Pretty())
alpha := bg.Next() alpha := bg.Next()
const timeout = 100 * time.Millisecond const timeout = 1 * time.Millisecond // FIXME don't depend on time
const wait = 100 * time.Millisecond
t.Log("Peer |w| attempts to get a file |alpha|. NB: alpha not available") t.Logf("Peer %v attempts to get %v. NB: not available\n", w.peer.Key().Pretty(), alpha.Key().Pretty())
ctx, _ := context.WithTimeout(context.Background(), timeout) ctx, _ := context.WithTimeout(context.Background(), timeout)
_, err := w.exchange.Block(ctx, alpha.Key()) _, err := w.exchange.Block(ctx, alpha.Key())
if err == nil { if err == nil {
t.Error("Expected alpha to NOT be available") t.Fatalf("Expected %v to NOT be available", alpha.Key().Pretty())
} }
time.Sleep(wait)
t.Log("Peer |w| announces availability of a file |beta|")
beta := bg.Next() beta := bg.Next()
t.Logf("Peer %v announes availability of %v\n", w.peer.Key().Pretty(), beta.Key().Pretty())
ctx, _ = context.WithTimeout(context.Background(), timeout) ctx, _ = context.WithTimeout(context.Background(), timeout)
if err := w.blockstore.Put(beta); err != nil {
t.Fatal(err)
}
w.exchange.HasBlock(ctx, beta) w.exchange.HasBlock(ctx, beta)
time.Sleep(wait)
t.Log("I request and get |beta| from |w|. In the message, I receive |w|'s wants [alpha]") t.Logf("%v gets %v from %v and discovers it wants %v\n", me.peer.Key().Pretty(), beta.Key().Pretty(), w.peer.Key().Pretty(), alpha.Key().Pretty())
t.Log("I don't have alpha, but I keep it on my wantlist.")
ctx, _ = context.WithTimeout(context.Background(), timeout) ctx, _ = context.WithTimeout(context.Background(), timeout)
me.exchange.Block(ctx, beta.Key()) if _, err := me.exchange.Block(ctx, beta.Key()); err != nil {
time.Sleep(wait) t.Fatal(err)
}
t.Log("Peer |o| announces the availability of |alpha|") t.Logf("%v announces availability of %v\n", o.peer.Key().Pretty(), alpha.Key().Pretty())
ctx, _ = context.WithTimeout(context.Background(), timeout) ctx, _ = context.WithTimeout(context.Background(), timeout)
if err := o.blockstore.Put(alpha); err != nil {
t.Fatal(err)
}
o.exchange.HasBlock(ctx, alpha) o.exchange.HasBlock(ctx, alpha)
time.Sleep(wait)
t.Log("I request |alpha| for myself.") t.Logf("%v requests %v\n", me.peer.Key().Pretty(), alpha.Key().Pretty())
ctx, _ = context.WithTimeout(context.Background(), timeout) ctx, _ = context.WithTimeout(context.Background(), timeout)
me.exchange.Block(ctx, alpha.Key()) if _, err := me.exchange.Block(ctx, alpha.Key()); err != nil {
time.Sleep(wait) t.Fatal(err)
}
t.Log("After receiving |f| from |o|, I send it to the wanting peer |w|") t.Logf("%v should now have %v\n", w.peer.Key().Pretty(), alpha.Key().Pretty())
block, err := w.blockstore.Get(alpha.Key()) block, err := w.blockstore.Get(alpha.Key())
if err != nil { if err != nil {
t.Fatal("Should not have received an error") t.Fatal("Should not have received an error")
} }
if block.Key() != alpha.Key() { if block.Key() != alpha.Key() {
t.Error("Expected to receive alpha from me") t.Fatal("Expected to receive alpha from me")
} }
} }
...@@ -278,6 +289,9 @@ func session(net tn.Network, rs tn.RoutingServer, id peer.ID) instance { ...@@ -278,6 +289,9 @@ func session(net tn.Network, rs tn.RoutingServer, id peer.ID) instance {
strategy: strategy.New(), strategy: strategy.New(),
routing: htc, routing: htc,
sender: adapter, sender: adapter,
wantlist: WantList{
data: make(map[util.Key]struct{}),
},
} }
adapter.SetDelegate(bs) adapter.SetDelegate(bs)
return instance{ return instance{
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment