Commit 153774b6 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

Lots of fixes. DHT tests pass

parent 05158e5b
......@@ -80,21 +80,17 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, n inet.Network,
// Connect to a new peer at the given address, ping and add to the routing table
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) error {
err := dht.network.DialPeer(ctx, npeer)
if err != nil {
if err := dht.network.DialPeer(ctx, npeer); err != nil {
return err
}
// Ping new peer to register in their routing table
// NOTE: this should be done better...
err = dht.Ping(ctx, npeer)
if err != nil {
if err := dht.Ping(ctx, npeer); err != nil {
return fmt.Errorf("failed to ping newly connected peer: %s\n", err)
}
log.Event(ctx, "connect", dht.self, npeer)
dht.Update(ctx, npeer)
return nil
}
......
package dht
import (
"errors"
"time"
inet "github.com/jbenet/go-ipfs/net"
peer "github.com/jbenet/go-ipfs/peer"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
ggio "code.google.com/p/gogoprotobuf/io"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
// handleNewStream implements the inet.StreamHandler
func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
go dht.handleNewMessage(s)
}
func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
defer s.Close()
ctx := dht.Context()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
mPeer := s.Conn().RemotePeer()
// receive msg
pmes := new(pb.Message)
if err := r.ReadMsg(pmes); err != nil {
log.Error("Error unmarshaling data")
return
}
// update the peer (on valid msgs only)
dht.Update(ctx, mPeer)
log.Event(ctx, "foo", dht.self, mPeer, pmes)
// get handler for this msg type.
handler := dht.handlerForMsgType(pmes.GetType())
if handler == nil {
log.Error("got back nil handler from handlerForMsgType")
return
}
// dispatch handler.
rpmes, err := handler(ctx, mPeer, pmes)
if err != nil {
log.Errorf("handle message error: %s", err)
return
}
// if nil response, return it before serializing
if rpmes == nil {
log.Warning("Got back nil response from request.")
return
}
// send out response msg
if err := w.WriteMsg(rpmes); err != nil {
log.Errorf("send response error: %s", err)
return
}
return
}
// sendRequest sends out a request, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
log.Debugf("%s dht starting stream", dht.self)
s, err := dht.network.NewStream(inet.ProtocolDHT, p)
if err != nil {
return nil, err
}
defer s.Close()
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(s)
start := time.Now()
log.Debugf("%s writing", dht.self)
if err := w.WriteMsg(pmes); err != nil {
return nil, err
}
log.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
log.Debugf("%s reading", dht.self)
defer log.Debugf("%s done", dht.self)
rpmes := new(pb.Message)
if err := r.ReadMsg(rpmes); err != nil {
return nil, err
}
if rpmes == nil {
return nil, errors.New("no response to request")
}
p.SetLatency(time.Since(start))
log.Event(ctx, "dhtReceivedMessage", dht.self, p, rpmes)
return rpmes, nil
}
......@@ -2,6 +2,7 @@ package dht
import (
"bytes"
"math/rand"
"sort"
"testing"
......@@ -20,6 +21,16 @@ import (
"time"
)
func randMultiaddr(t *testing.T) ma.Multiaddr {
s := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 10000+rand.Intn(40000))
a, err := ma.NewMultiaddr(s)
if err != nil {
t.Fatal(err)
}
return a
}
func setupDHT(ctx context.Context, t *testing.T, p peer.Peer) *IpfsDHT {
peerstore := peer.NewPeerstore()
......@@ -29,7 +40,6 @@ func setupDHT(ctx context.Context, t *testing.T, p peer.Peer) *IpfsDHT {
}
d := NewDHT(ctx, p, peerstore, n, ds.NewMapDatastore())
d.network.SetHandler(inet.ProtocolDHT, d.handleNewStream)
d.Validators["v"] = func(u.Key, []byte) error {
return nil
......@@ -40,7 +50,8 @@ func setupDHT(ctx context.Context, t *testing.T, p peer.Peer) *IpfsDHT {
func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.Peer, []*IpfsDHT) {
var addrs []ma.Multiaddr
for i := 0; i < n; i++ {
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 5000+i))
r := rand.Intn(40000)
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 10000+r))
if err != nil {
t.Fatal(err)
}
......@@ -85,15 +96,9 @@ func makePeer(addr ma.Multiaddr) peer.Peer {
func TestPing(t *testing.T) {
// t.Skip("skipping test to debug another")
ctx := context.Background()
u.Debug = false
addrA, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/2222")
if err != nil {
t.Fatal(err)
}
addrB, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678")
if err != nil {
t.Fatal(err)
}
addrA := randMultiaddr(t)
addrB := randMultiaddr(t)
peerA := makePeer(addrA)
peerB := makePeer(addrB)
......@@ -106,21 +111,22 @@ func TestPing(t *testing.T) {
defer dhtA.network.Close()
defer dhtB.network.Close()
err = dhtA.Connect(ctx, peerB)
if err != nil {
if err := dhtA.Connect(ctx, peerB); err != nil {
t.Fatal(err)
}
// if err := dhtB.Connect(ctx, peerA); err != nil {
// t.Fatal(err)
// }
//Test that we can ping the node
ctxT, _ := context.WithTimeout(ctx, 100*time.Millisecond)
err = dhtA.Ping(ctxT, peerB)
if err != nil {
if err := dhtA.Ping(ctxT, peerB); err != nil {
t.Fatal(err)
}
ctxT, _ = context.WithTimeout(ctx, 100*time.Millisecond)
err = dhtB.Ping(ctxT, peerA)
if err != nil {
if err := dhtB.Ping(ctxT, peerA); err != nil {
t.Fatal(err)
}
}
......@@ -129,15 +135,9 @@ func TestValueGetSet(t *testing.T) {
// t.Skip("skipping test to debug another")
ctx := context.Background()
u.Debug = false
addrA, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/11235")
if err != nil {
t.Fatal(err)
}
addrB, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/15679")
if err != nil {
t.Fatal(err)
}
addrA := randMultiaddr(t)
addrB := randMultiaddr(t)
peerA := makePeer(addrA)
peerB := makePeer(addrB)
......@@ -156,7 +156,7 @@ func TestValueGetSet(t *testing.T) {
defer dhtA.network.Close()
defer dhtB.network.Close()
err = dhtA.Connect(ctx, peerB)
err := dhtA.Connect(ctx, peerB)
if err != nil {
t.Fatal(err)
}
......@@ -189,8 +189,6 @@ func TestProvides(t *testing.T) {
// t.Skip("skipping test to debug another")
ctx := context.Background()
u.Debug = false
_, peers, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
......@@ -251,7 +249,6 @@ func TestProvidesAsync(t *testing.T) {
}
ctx := context.Background()
u.Debug = false
_, peers, dhts := setupDHTS(ctx, 4, t)
defer func() {
......@@ -317,7 +314,7 @@ func TestLayeredGet(t *testing.T) {
}
ctx := context.Background()
u.Debug = false
_, peers, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
......@@ -371,7 +368,6 @@ func TestFindPeer(t *testing.T) {
}
ctx := context.Background()
u.Debug = false
_, peers, dhts := setupDHTS(ctx, 4, t)
defer func() {
......@@ -412,12 +408,13 @@ func TestFindPeer(t *testing.T) {
}
func TestFindPeersConnectedToPeer(t *testing.T) {
t.Skip("not quite correct (see note)")
if testing.Short() {
t.SkipNow()
}
ctx := context.Background()
u.Debug = false
_, peers, dhts := setupDHTS(ctx, 4, t)
defer func() {
......@@ -516,15 +513,9 @@ func TestConnectCollision(t *testing.T) {
log.Notice("Running Time: ", rtime)
ctx := context.Background()
u.Debug = false
addrA, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/11235")
if err != nil {
t.Fatal(err)
}
addrB, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/15679")
if err != nil {
t.Fatal(err)
}
addrA := randMultiaddr(t)
addrB := randMultiaddr(t)
peerA := makePeer(addrA)
peerB := makePeer(addrB)
......
package dht
import (
"math/rand"
"testing"
crand "crypto/rand"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
inet "github.com/jbenet/go-ipfs/net"
mocknet "github.com/jbenet/go-ipfs/net/mock"
peer "github.com/jbenet/go-ipfs/peer"
routing "github.com/jbenet/go-ipfs/routing"
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
u "github.com/jbenet/go-ipfs/util"
testutil "github.com/jbenet/go-ipfs/util/testutil"
"sync"
ggio "code.google.com/p/gogoprotobuf/io"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
"time"
)
// mesHandleFunc is a function that takes in outgoing messages
// and can respond to them, simulating other peers on the network.
// returning nil will chose not to respond and pass the message onto the
// next registered handler
type mesHandleFunc func(msg.NetMessage) msg.NetMessage
// fauxNet is a standin for a swarm.Network in order to more easily recreate
// different testing scenarios
type fauxSender struct {
sync.Mutex
handlers []mesHandleFunc
}
func (f *fauxSender) AddHandler(fn func(msg.NetMessage) msg.NetMessage) {
f.Lock()
defer f.Unlock()
f.handlers = append(f.handlers, fn)
}
func (f *fauxSender) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) {
f.Lock()
handlers := make([]mesHandleFunc, len(f.handlers))
copy(handlers, f.handlers)
f.Unlock()
for _, h := range handlers {
reply := h(m)
if reply != nil {
return reply, nil
}
}
// no reply? ok force a timeout
select {
case <-ctx.Done():
}
return nil, ctx.Err()
}
func (f *fauxSender) SendMessage(ctx context.Context, m msg.NetMessage) error {
f.Lock()
handlers := make([]mesHandleFunc, len(f.handlers))
copy(handlers, f.handlers)
f.Unlock()
for _, h := range handlers {
reply := h(m)
if reply != nil {
return nil
}
}
return nil
}
// fauxNet is a standin for a swarm.Network in order to more easily recreate
// different testing scenarios
type fauxNet struct {
local peer.Peer
}
// DialPeer attempts to establish a connection to a given peer
func (f *fauxNet) DialPeer(context.Context, peer.Peer) error {
return nil
}
func (f *fauxNet) LocalPeer() peer.Peer {
return f.local
}
// ClosePeer connection to peer
func (f *fauxNet) ClosePeer(peer.Peer) error {
return nil
}
// IsConnected returns whether a connection to given peer exists.
func (f *fauxNet) IsConnected(peer.Peer) (bool, error) {
return true, nil
}
// Connectedness returns whether a connection to given peer exists.
func (f *fauxNet) Connectedness(peer.Peer) inet.Connectedness {
return inet.Connected
}
// GetProtocols returns the protocols registered in the network.
func (f *fauxNet) GetProtocols() *mux.ProtocolMap { return nil }
// SendMessage sends given Message out
func (f *fauxNet) SendMessage(msg.NetMessage) error {
return nil
}
func (f *fauxNet) GetPeerList() []peer.Peer {
return nil
}
func (f *fauxNet) GetBandwidthTotals() (uint64, uint64) {
return 0, 0
}
// Close terminates all network operation
func (f *fauxNet) Close() error { return nil }
func TestGetFailures(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
ctx := context.Background()
peerstore := peer.NewPeerstore()
local := makePeerString(t, "")
peers := []peer.Peer{local, testutil.RandPeer()}
ctx := context.Background()
fn := &fauxNet{local}
fs := &fauxSender{}
nets, err := mocknet.MakeNetworks(ctx, peers)
if err != nil {
t.Fatal(err)
}
d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore())
other := makePeerString(t, "")
d.Update(ctx, other)
d := NewDHT(ctx, peers[0], peerstore, nets[0], ds.NewMapDatastore())
d.Update(ctx, peers[1])
// This one should time out
// u.POut("Timout Test\n")
ctx1, _ := context.WithTimeout(context.Background(), time.Second)
_, err := d.GetValue(ctx1, u.Key("test"))
if err != nil {
if _, err := d.GetValue(ctx1, u.Key("test")); err != nil {
if err != context.DeadlineExceeded {
t.Fatal("Got different error than we expected", err)
}
......@@ -152,20 +50,29 @@ func TestGetFailures(t *testing.T) {
t.Fatal("Did not get expected error!")
}
msgs := make(chan *pb.Message, 100)
// u.POut("NotFound Test\n")
// Reply with failures to every message
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
nets[1].SetHandler(inet.ProtocolDHT, func(s inet.Stream) {
defer s.Close()
pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
pbw := ggio.NewDelimitedWriter(s)
pmes := new(pb.Message)
err := proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
if err := pbr.ReadMsg(pmes); err != nil {
panic(err)
}
resp := &pb.Message{
Type: pmes.Type,
}
m, err := msg.FromObject(mes.Peer(), resp)
return m
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}
msgs <- resp
})
// This one should fail with NotFound
......@@ -179,8 +86,8 @@ func TestGetFailures(t *testing.T) {
t.Fatal("expected error, got none.")
}
fs.handlers = nil
// Now we test this DHT's handleGetValue failure
{
typ := pb.Message_GET_VALUE
str := "hello"
rec, err := d.makePutRecord(u.Key(str), []byte("blah"))
......@@ -194,16 +101,21 @@ func TestGetFailures(t *testing.T) {
}
// u.POut("handleGetValue Test\n")
mes, err := msg.FromObject(other, &req)
s, err := nets[1].NewStream(inet.ProtocolDHT, peers[0])
if err != nil {
t.Error(err)
t.Fatal(err)
}
defer s.Close()
mes = d.HandleMessage(ctx, mes)
pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
pbw := ggio.NewDelimitedWriter(s)
if err := pbw.WriteMsg(&req); err != nil {
t.Fatal(err)
}
pmes := new(pb.Message)
err = proto.Unmarshal(mes.Data(), pmes)
if err != nil {
if err := pbr.ReadMsg(pmes); err != nil {
t.Fatal(err)
}
if pmes.GetRecord() != nil {
......@@ -212,7 +124,7 @@ func TestGetFailures(t *testing.T) {
if pmes.GetProviderPeers() != nil {
t.Fatal("shouldnt have provider peers")
}
}
}
// TODO: Maybe put these in some sort of "ipfs_testutil" package
......@@ -228,49 +140,57 @@ func TestNotFound(t *testing.T) {
t.SkipNow()
}
local := makePeerString(t, "")
ctx := context.Background()
peerstore := peer.NewPeerstore()
peerstore.Add(local)
ctx := context.Background()
fn := &fauxNet{local}
fs := &fauxSender{}
var peers []peer.Peer
for i := 0; i < 16; i++ {
peers = append(peers, testutil.RandPeer())
}
nets, err := mocknet.MakeNetworks(ctx, peers)
if err != nil {
t.Fatal(err)
}
d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore())
d := NewDHT(ctx, peers[0], peerstore, nets[0], ds.NewMapDatastore())
var ps []peer.Peer
for i := 0; i < 5; i++ {
ps = append(ps, _randPeer())
d.Update(ctx, ps[i])
for _, p := range peers {
d.Update(ctx, p)
}
// Reply with random peers to every message
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
for _, neti := range nets {
neti.SetHandler(inet.ProtocolDHT, func(s inet.Stream) {
defer s.Close()
pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
pbw := ggio.NewDelimitedWriter(s)
pmes := new(pb.Message)
err := proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
if err := pbr.ReadMsg(pmes); err != nil {
panic(err)
}
switch pmes.GetType() {
case pb.Message_GET_VALUE:
resp := &pb.Message{Type: pmes.Type}
peers := []peer.Peer{}
ps := []peer.Peer{}
for i := 0; i < 7; i++ {
peers = append(peers, _randPeer())
ps = append(ps, peers[rand.Intn(len(peers))])
}
resp.CloserPeers = pb.PeersToPBPeers(d.dialer, peers)
mes, err := msg.FromObject(mes.Peer(), resp)
if err != nil {
t.Error(err)
resp.CloserPeers = pb.PeersToPBPeers(d.network, peers)
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}
return mes
default:
panic("Shouldnt recieve this.")
}
})
}
ctx, _ = context.WithTimeout(ctx, time.Second*5)
v, err := d.GetValue(ctx, u.Key("hello"))
......@@ -294,53 +214,57 @@ func TestNotFound(t *testing.T) {
func TestLessThanKResponses(t *testing.T) {
// t.Skip("skipping test because it makes a lot of output")
local := makePeerString(t, "")
ctx := context.Background()
peerstore := peer.NewPeerstore()
peerstore.Add(local)
ctx := context.Background()
u.Debug = false
fn := &fauxNet{local}
fs := &fauxSender{}
var peers []peer.Peer
for i := 0; i < 6; i++ {
peers = append(peers, testutil.RandPeer())
}
d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore())
nets, err := mocknet.MakeNetworks(ctx, peers)
if err != nil {
t.Fatal(err)
}
var ps []peer.Peer
for i := 0; i < 5; i++ {
ps = append(ps, _randPeer())
d.Update(ctx, ps[i])
d := NewDHT(ctx, peers[0], peerstore, nets[0], ds.NewMapDatastore())
for i := 1; i < 5; i++ {
d.Update(ctx, peers[i])
}
other := _randPeer()
// Reply with random peers to every message
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
for _, neti := range nets {
neti.SetHandler(inet.ProtocolDHT, func(s inet.Stream) {
defer s.Close()
pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
pbw := ggio.NewDelimitedWriter(s)
pmes := new(pb.Message)
err := proto.Unmarshal(mes.Data(), pmes)
if err != nil {
t.Fatal(err)
if err := pbr.ReadMsg(pmes); err != nil {
panic(err)
}
switch pmes.GetType() {
case pb.Message_GET_VALUE:
resp := &pb.Message{
Type: pmes.Type,
CloserPeers: pb.PeersToPBPeers(d.dialer, []peer.Peer{other}),
CloserPeers: pb.PeersToPBPeers(d.network, []peer.Peer{peers[1]}),
}
mes, err := msg.FromObject(mes.Peer(), resp)
if err != nil {
t.Error(err)
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}
return mes
default:
panic("Shouldnt recieve this.")
}
})
}
ctx, _ = context.WithTimeout(ctx, time.Second*30)
_, err := d.GetValue(ctx, u.Key("hello"))
if err != nil {
if _, err := d.GetValue(ctx, u.Key("hello")); err != nil {
switch err {
case routing.ErrNotFound:
//Success!
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment