Commit c80c8aa9 authored by Brian Tiger Chow's avatar Brian Tiger Chow

test(bitswap:testnet)

misc:
* test network client getting more than max
* test for find providers
* rename factory method
* local network
* misc test improvements
* test bitswap get block timeout
* test provider exists but cannot connect to peer
* test sending a message async over local network
parent 7975ffe7
package bitswap
import (
"testing"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
bstore "github.com/jbenet/go-ipfs/blockstore"
exchange "github.com/jbenet/go-ipfs/exchange"
notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
peer "github.com/jbenet/go-ipfs/peer"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
func TestGetBlockTimeout(t *testing.T) {
net := LocalNetwork()
rs := newRoutingServer()
ipfs := session(net, rs, []byte("peer id"))
ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
block := testutil.NewBlockOrFail(t, "block")
_, err := ipfs.exchange.Block(ctx, block.Key())
if err != context.DeadlineExceeded {
t.Fatal("Expected DeadlineExceeded error")
}
}
func TestProviderForKeyButNetworkCannotFind(t *testing.T) {
net := LocalNetwork()
rs := newRoutingServer()
ipfs := session(net, rs, []byte("peer id"))
// ctx := context.Background()
ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond)
block := testutil.NewBlockOrFail(t, "block")
rs.Announce(&peer.Peer{}, block.Key()) // but not on network
_, err := ipfs.exchange.Block(ctx, block.Key())
if err != context.DeadlineExceeded {
t.Fatal("Expected DeadlineExceeded error")
}
}
type ipfs struct {
peer *peer.Peer
exchange exchange.Interface
blockstore bstore.Blockstore
}
func session(net Network, rs RoutingServer, id peer.ID) ipfs {
p := &peer.Peer{}
adapter := net.Adapter(p)
htc := rs.Client(p)
blockstore := bstore.NewBlockstore(ds.NewMapDatastore())
bs := &bitswap{
blockstore: blockstore,
notifications: notifications.New(),
strategy: strategy.New(),
routing: htc,
sender: adapter,
}
adapter.SetDelegate(bs)
return ipfs{
peer: p,
exchange: bs,
blockstore: blockstore,
}
}
func TestSendToWantingPeer(t *testing.T) {
t.Log("Peer |w| tells me it wants file, but I don't have it")
t.Log("Then another peer |o| sends it to me")
t.Log("After receiving the file from |o|, I send it to the wanting peer |w|")
}
package bitswap
import (
"errors"
"sync"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
peer "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
type RoutingServer interface {
// TODO
Announce(*peer.Peer, u.Key) error
// TODO
Providers(u.Key) []*peer.Peer
// TODO
// Returns a Routing instance configured to query this hash table
Client(*peer.Peer) Routing
}
func newRoutingServer() RoutingServer {
return &hashTable{
m: make(map[u.Key]map[*peer.Peer]bool),
}
}
type hashTable struct {
lock sync.RWMutex
m map[u.Key]map[*peer.Peer]bool
}
var TODO = errors.New("TODO")
func (rs *hashTable) Announce(p *peer.Peer, k u.Key) error {
rs.lock.Lock()
defer rs.lock.Unlock()
_, ok := rs.m[k]
if !ok {
rs.m[k] = make(map[*peer.Peer]bool)
}
rs.m[k][p] = true
return nil
}
func (rs *hashTable) Providers(k u.Key) []*peer.Peer {
rs.lock.RLock()
defer rs.lock.RUnlock()
ret := make([]*peer.Peer, 0)
peerset, ok := rs.m[k]
if !ok {
return ret
}
for peer, _ := range peerset {
ret = append(ret, peer)
}
return ret
}
// TODO
func (rs *hashTable) Client(p *peer.Peer) Routing {
return &routingClient{
peer: p,
hashTable: rs,
}
}
type routingClient struct {
peer *peer.Peer
hashTable RoutingServer
}
func (a *routingClient) FindProvidersAsync(ctx context.Context, k u.Key, max int) <-chan *peer.Peer {
out := make(chan *peer.Peer)
go func() {
defer close(out)
for i, p := range a.hashTable.Providers(k) {
if max <= i {
return
}
select {
case out <- p:
case <-ctx.Done():
return
}
}
}()
return out
}
func (a *routingClient) Provide(key u.Key) error {
return a.hashTable.Announce(a.peer, key)
}
package bitswap
import (
"bytes"
"testing"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
import (
"github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
)
func TestKeyNotFound(t *testing.T) {
rs := func() RoutingServer {
// TODO fields
return &hashTable{}
}()
empty := rs.Providers(u.Key("not there"))
if len(empty) != 0 {
t.Fatal("should be empty")
}
}
func TestSetAndGet(t *testing.T) {
pid := peer.ID([]byte("the peer id"))
p := &peer.Peer{
ID: pid,
}
k := u.Key("42")
rs := newRoutingServer()
err := rs.Announce(p, k)
if err != nil {
t.Fatal(err)
}
providers := rs.Providers(k)
if len(providers) != 1 {
t.Fatal("should be one")
}
for _, elem := range providers {
if bytes.Equal(elem.ID, pid) {
return
}
}
t.Fatal("ID should have matched")
}
func TestClientFindProviders(t *testing.T) {
peer := &peer.Peer{
ID: []byte("42"),
}
rs := newRoutingServer()
client := rs.Client(peer)
k := u.Key("hello")
err := client.Provide(k)
if err != nil {
t.Fatal(err)
}
max := 100
providersFromHashTable := rs.Providers(k)
isInHT := false
for _, p := range providersFromHashTable {
if bytes.Equal(p.ID, peer.ID) {
isInHT = true
}
}
if !isInHT {
t.Fatal("Despite client providing key, peer wasn't in hash table as a provider")
}
providersFromClient := client.FindProvidersAsync(context.Background(), u.Key("hello"), max)
isInClient := false
for p := range providersFromClient {
if bytes.Equal(p.ID, peer.ID) {
isInClient = true
}
}
if !isInClient {
t.Fatal("Despite client providing key, client didn't receive peer when finding providers")
}
}
func TestClientOverMax(t *testing.T) {
rs := newRoutingServer()
k := u.Key("hello")
numProvidersForHelloKey := 100
for i := 0; i < numProvidersForHelloKey; i++ {
peer := &peer.Peer{
ID: []byte(string(i)),
}
err := rs.Announce(peer, k)
if err != nil {
t.Fatal(err)
}
}
providersFromHashTable := rs.Providers(k)
if len(providersFromHashTable) != numProvidersForHelloKey {
t.Log(1 == len(providersFromHashTable))
t.Fatal("not all providers were returned")
}
max := 10
client := rs.Client(&peer.Peer{ID: []byte("TODO")})
providersFromClient := client.FindProvidersAsync(context.Background(), k, max)
i := 0
for _ = range providersFromClient {
i++
}
if i != max {
t.Fatal("Too many providers returned")
}
}
// TODO does dht ensure won't receive self as a provider? probably not.
func TestCanceledContext(t *testing.T) {
rs := newRoutingServer()
k := u.Key("hello")
t.Log("async'ly announce infinite stream of providers for key")
i := 0
go func() { // infinite stream
for {
peer := &peer.Peer{
ID: []byte(string(i)),
}
err := rs.Announce(peer, k)
if err != nil {
t.Fatal(err)
}
i++
}
}()
client := rs.Client(&peer.Peer{ID: []byte("peer id doesn't matter")})
t.Log("warning: max is finite so this test is non-deterministic")
t.Log("context cancellation could simply take lower priority")
t.Log("and result in receiving the max number of results")
max := 1000
t.Log("cancel the context before consuming")
ctx, cancelFunc := context.WithCancel(context.Background())
cancelFunc()
providers := client.FindProvidersAsync(ctx, k, max)
numProvidersReturned := 0
for _ = range providers {
numProvidersReturned++
}
t.Log(numProvidersReturned)
if numProvidersReturned == max {
t.Fatal("Context cancel had no effect")
}
}
package bitswap
import (
"bytes"
"errors"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
peer "github.com/jbenet/go-ipfs/peer"
"github.com/jbenet/go-ipfs/util"
)
type Network interface {
Adapter(*peer.Peer) bsnet.Adapter
SendMessage(
ctx context.Context,
from *peer.Peer,
to *peer.Peer,
message bsmsg.BitSwapMessage) error
SendRequest(
ctx context.Context,
from *peer.Peer,
to *peer.Peer,
message bsmsg.BitSwapMessage) (
incoming bsmsg.BitSwapMessage, err error)
}
// network impl
func LocalNetwork() Network {
return &network{
clients: make(map[util.Key]bsnet.Receiver),
}
}
type network struct {
clients map[util.Key]bsnet.Receiver
}
func (n *network) Adapter(p *peer.Peer) bsnet.Adapter {
client := &networkClient{
local: p,
network: n,
}
n.clients[p.Key()] = client
return client
}
// TODO should this be completely asynchronous?
// TODO what does the network layer do with errors received from services?
func (n *network) SendMessage(
ctx context.Context,
from *peer.Peer,
to *peer.Peer,
message bsmsg.BitSwapMessage) error {
receiver, ok := n.clients[to.Key()]
if !ok {
return errors.New("Cannot locate peer on network")
}
// nb: terminate the context since the context wouldn't actually be passed
// over the network in a real scenario
go n.deliver(receiver, from, message)
return nil
}
func (n *network) deliver(
r bsnet.Receiver, from *peer.Peer, message bsmsg.BitSwapMessage) error {
if message == nil || from == nil {
return errors.New("Invalid input")
}
nextPeer, nextMsg, err := r.ReceiveMessage(context.TODO(), from, message)
if err != nil {
// TODO should this error be returned across network boundary?
// TODO this raises an interesting question about network contract. How
// can the network be expected to behave under different failure
// conditions? What if peer is unreachable? Will we know if messages
// aren't delivered?
return err
}
if (nextPeer == nil && nextMsg != nil) || (nextMsg == nil && nextPeer != nil) {
return errors.New("Malformed client request")
}
if nextPeer == nil && nextMsg == nil {
return nil
}
nextReceiver, ok := n.clients[nextPeer.Key()]
if !ok {
return errors.New("Cannot locate peer on network")
}
go n.deliver(nextReceiver, nextPeer, nextMsg)
return nil
}
var NoResponse = errors.New("No response received from the receiver")
// TODO
func (n *network) SendRequest(
ctx context.Context,
from *peer.Peer,
to *peer.Peer,
message bsmsg.BitSwapMessage) (
incoming bsmsg.BitSwapMessage, err error) {
r, ok := n.clients[to.Key()]
if !ok {
return nil, errors.New("Cannot locate peer on network")
}
nextPeer, nextMsg, err := r.ReceiveMessage(context.TODO(), from, message)
if err != nil {
return nil, err
// TODO return nil, NoResponse
}
// TODO dedupe code
if (nextPeer == nil && nextMsg != nil) || (nextMsg == nil && nextPeer != nil) {
return nil, errors.New("Malformed client request")
}
// TODO dedupe code
if nextPeer == nil && nextMsg == nil {
return nil, nil
}
// TODO test when receiver doesn't immediately respond to the initiator of the request
if !bytes.Equal(nextPeer.ID, from.ID) {
go func() {
nextReceiver, ok := n.clients[nextPeer.Key()]
if !ok {
// TODO log the error?
}
n.deliver(nextReceiver, nextPeer, nextMsg)
}()
return nil, NoResponse
}
return nextMsg, nil
}
type networkClient struct {
local *peer.Peer
bsnet.Receiver
network Network
}
func (nc *networkClient) SendMessage(
ctx context.Context,
to *peer.Peer,
message bsmsg.BitSwapMessage) error {
return nc.network.SendMessage(ctx, nc.local, to, message)
}
func (nc *networkClient) SendRequest(
ctx context.Context,
to *peer.Peer,
message bsmsg.BitSwapMessage) (incoming bsmsg.BitSwapMessage, err error) {
return nc.network.SendRequest(ctx, nc.local, to, message)
}
func (nc *networkClient) SetDelegate(r bsnet.Receiver) {
nc.Receiver = r
}
package bitswap
import (
"sync"
"testing"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
peer "github.com/jbenet/go-ipfs/peer"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
func TestSendRequestToCooperativePeer(t *testing.T) {
net := LocalNetwork()
idOfRecipient := []byte("recipient")
t.Log("Get two network adapters")
initiator := net.Adapter(&peer.Peer{ID: []byte("initiator")})
recipient := net.Adapter(&peer.Peer{ID: idOfRecipient})
expectedStr := "response from recipient"
recipient.SetDelegate(lambda(func(
ctx context.Context,
from *peer.Peer,
incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
t.Log("Recipient received a message from the network")
// TODO test contents of incoming message
m := bsmsg.New()
m.AppendBlock(testutil.NewBlockOrFail(t, expectedStr))
return from, m, nil
}))
t.Log("Build a message and send a synchronous request to recipient")
message := bsmsg.New()
message.AppendBlock(testutil.NewBlockOrFail(t, "data"))
response, err := initiator.SendRequest(
context.Background(), &peer.Peer{ID: idOfRecipient}, message)
if err != nil {
t.Fatal(err)
}
t.Log("Check the contents of the response from recipient")
for _, blockFromRecipient := range response.Blocks() {
if string(blockFromRecipient.Data) == expectedStr {
return
}
}
t.Fatal("Should have returned after finding expected block data")
}
func TestSendMessageAsyncButWaitForResponse(t *testing.T) {
net := LocalNetwork()
idOfResponder := []byte("responder")
waiter := net.Adapter(&peer.Peer{ID: []byte("waiter")})
responder := net.Adapter(&peer.Peer{ID: idOfResponder})
var wg sync.WaitGroup
wg.Add(1)
expectedStr := "received async"
responder.SetDelegate(lambda(func(
ctx context.Context,
fromWaiter *peer.Peer,
msgFromWaiter bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
msgToWaiter := bsmsg.New()
msgToWaiter.AppendBlock(testutil.NewBlockOrFail(t, expectedStr))
return fromWaiter, msgToWaiter, nil
}))
waiter.SetDelegate(lambda(func(
ctx context.Context,
fromResponder *peer.Peer,
msgFromResponder bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
// TODO assert that this came from the correct peer and that the message contents are as expected
ok := false
for _, b := range msgFromResponder.Blocks() {
if string(b.Data) == expectedStr {
wg.Done()
ok = true
}
}
if !ok {
t.Fatal("Message not received from the responder")
}
return nil, nil, nil
}))
messageSentAsync := bsmsg.New()
messageSentAsync.AppendBlock(testutil.NewBlockOrFail(t, "data"))
errSending := waiter.SendMessage(
context.Background(), &peer.Peer{ID: idOfResponder}, messageSentAsync)
if errSending != nil {
t.Fatal(errSending)
}
wg.Wait() // until waiter delegate function is executed
}
type receiverFunc func(ctx context.Context, p *peer.Peer,
incoming bsmsg.BitSwapMessage) (*peer.Peer, bsmsg.BitSwapMessage, error)
// lambda returns a Receiver instance given a receiver function
func lambda(f receiverFunc) bsnet.Receiver {
return &lambdaImpl{
f: f,
}
}
type lambdaImpl struct {
f func(ctx context.Context, p *peer.Peer,
incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error)
}
func (lam *lambdaImpl) ReceiveMessage(ctx context.Context,
p *peer.Peer, incoming bsmsg.BitSwapMessage) (
*peer.Peer, bsmsg.BitSwapMessage, error) {
return lam.f(ctx, p, incoming)
}
......@@ -51,6 +51,13 @@ func (s *strategist) Seed(int64) {
}
func (s *strategist) MessageReceived(p *peer.Peer, m bsmsg.BitSwapMessage) error {
// TODO find a more elegant way to handle this check
if p == nil {
return errors.New("Strategy received nil peer")
}
if m == nil {
return errors.New("Strategy received nil message")
}
l := s.ledger(p)
for _, key := range m.Wantlist() {
l.Wants(key)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment