Commit f8d70f34 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

simultaneous open should work for now

It's a patch, really. it's not the full multiconn fix.
parent 0078264a
......@@ -205,7 +205,7 @@ func (s *SecurePipe) handshake() error {
}
if bytes.Compare(resp2, finished) != 0 {
return errors.New("Negotiation failed.")
return fmt.Errorf("Negotiation failed, got: %s", resp2)
}
log.Debug("%s handshake: Got node id: %s", s.local, s.remote)
......
......@@ -69,7 +69,7 @@ func (s *Swarm) connListen(maddr ma.Multiaddr) error {
func (s *Swarm) handleIncomingConn(nconn conn.Conn) {
// Setup the new connection
err := s.connSetup(nconn)
_, err := s.connSetup(nconn)
if err != nil && err != ErrAlreadyOpen {
s.errChan <- err
nconn.Close()
......@@ -78,9 +78,9 @@ func (s *Swarm) handleIncomingConn(nconn conn.Conn) {
// connSetup adds the passed in connection to its peerMap and starts
// the fanIn routine for that connection
func (s *Swarm) connSetup(c conn.Conn) error {
func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) {
if c == nil {
return errors.New("Tried to start nil connection.")
return nil, errors.New("Tried to start nil connection.")
}
log.Debug("%s Started connection: %s", c.LocalPeer(), c.RemotePeer())
......@@ -93,10 +93,13 @@ func (s *Swarm) connSetup(c conn.Conn) error {
// add to conns
s.connsLock.Lock()
if _, ok := s.conns[c.RemotePeer().Key()]; ok {
if c2, ok := s.conns[c.RemotePeer().Key()]; ok {
log.Debug("Conn already open!")
s.connsLock.Unlock()
return ErrAlreadyOpen
c.Close()
return c2, nil // not error anymore, use existing conn.
// return ErrAlreadyOpen
}
s.conns[c.RemotePeer().Key()] = c
log.Debug("Added conn to map!")
......@@ -104,7 +107,7 @@ func (s *Swarm) connSetup(c conn.Conn) error {
// kick off reader goroutine
go s.fanIn(c)
return nil
return c, nil
}
// Handles the unwrapping + sending of messages to the right connection.
......
package swarm
import (
"fmt"
"sync"
"testing"
peer "github.com/jbenet/go-ipfs/peer"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
func TestSimultOpen(t *testing.T) {
t.Skip("skipping for another test")
addrs := []string{
"/ip4/127.0.0.1/tcp/1244",
"/ip4/127.0.0.1/tcp/1245",
}
ctx := context.Background()
swarms, _ := makeSwarms(ctx, t, addrs)
// connect everyone
{
var wg sync.WaitGroup
connect := func(s *Swarm, dst *peer.Peer) {
// copy for other peer
cp := &peer.Peer{ID: dst.ID}
cp.AddAddress(dst.Addresses[0])
if _, err := s.Dial(cp); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
log.Info("done?!?")
wg.Done()
}
log.Info("Connecting swarms simultaneously.")
wg.Add(2)
go connect(swarms[0], swarms[1].local)
go connect(swarms[1], swarms[0].local)
wg.Wait()
}
for _, s := range swarms {
s.Close()
}
}
func TestSimultOpenMany(t *testing.T) {
t.Skip("laggy")
addrs := []string{}
for i := 2200; i < 2300; i++ {
s := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", i)
addrs = append(addrs, s)
}
SubtestSwarm(t, addrs, 10)
}
func TestSimultOpenFewStress(t *testing.T) {
for i := 0; i < 100; i++ {
addrs := []string{
fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 1900+i),
fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 2900+i),
}
SubtestSwarm(t, addrs, 10)
}
}
......@@ -137,7 +137,8 @@ func (s *Swarm) Dial(peer *peer.Peer) (conn.Conn, error) {
return nil, err
}
if err := s.connSetup(c); err != nil {
c, err = s.connSetup(c)
if err != nil {
c.Close()
return nil, err
}
......
......@@ -2,7 +2,7 @@ package swarm
import (
"bytes"
"fmt"
"sync"
"testing"
"time"
......@@ -23,6 +23,7 @@ func pong(ctx context.Context, swarm *Swarm) {
case m1 := <-swarm.Incoming:
if bytes.Equal(m1.Data(), []byte("ping")) {
m2 := msg.New(m1.Peer(), []byte("pong"))
log.Debug("%s pong %s", swarm.local, m1.Peer())
swarm.Outgoing <- m2
}
}
......@@ -52,10 +53,10 @@ func setupPeer(t *testing.T, addr string) *peer.Peer {
return p
}
func makeSwarms(ctx context.Context, t *testing.T, peers map[string]string) []*Swarm {
func makeSwarms(ctx context.Context, t *testing.T, addrs []string) ([]*Swarm, []*peer.Peer) {
swarms := []*Swarm{}
for _, addr := range peers {
for _, addr := range addrs {
local := setupPeer(t, addr)
peerstore := peer.NewPeerstore()
swarm, err := NewSwarm(ctx, local, peerstore)
......@@ -65,35 +66,46 @@ func makeSwarms(ctx context.Context, t *testing.T, peers map[string]string) []*S
swarms = append(swarms, swarm)
}
return swarms
peers := make([]*peer.Peer, len(swarms))
for i, s := range swarms {
peers[i] = s.local
}
return swarms, peers
}
func TestSwarm(t *testing.T) {
peers := map[string]string{
"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a30": "/ip4/127.0.0.1/tcp/1234",
"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31": "/ip4/127.0.0.1/tcp/2345",
"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32": "/ip4/127.0.0.1/tcp/3456",
"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33": "/ip4/127.0.0.1/tcp/4567",
"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a34": "/ip4/127.0.0.1/tcp/5678",
}
func SubtestSwarm(t *testing.T, addrs []string, MsgNum int) {
// t.Skip("skipping for another test")
ctx := context.Background()
swarms := makeSwarms(ctx, t, peers)
swarms, peers := makeSwarms(ctx, t, addrs)
// connect everyone
for _, s := range swarms {
peers, err := s.peers.All()
if err != nil {
t.Fatal(err)
{
var wg sync.WaitGroup
connect := func(s *Swarm, dst *peer.Peer) {
// copy for other peer
cp := &peer.Peer{ID: dst.ID}
cp.AddAddress(dst.Addresses[0])
log.Info("SWARM TEST: %s dialing %s", s.local, dst)
if _, err := s.Dial(cp); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
log.Info("SWARM TEST: %s connected to %s", s.local, dst)
wg.Done()
}
for _, p := range *peers {
fmt.Println("dialing")
if _, err := s.Dial(p); err != nil {
t.Fatal("error swarm dialing to peer", err)
log.Info("Connecting swarms simultaneously.")
for _, s := range swarms {
for _, p := range peers {
if p != s.local { // don't connect to self.
wg.Add(1)
connect(s, p)
}
}
fmt.Println("dialed")
}
wg.Wait()
}
// ping/pong
......@@ -114,9 +126,9 @@ func TestSwarm(t *testing.T) {
t.Fatal(err)
}
MsgNum := 1000
for k := 0; k < MsgNum; k++ {
for _, p := range *peers {
log.Debug("%s ping %s", s1.local, p)
s1.Outgoing <- msg.New(p, []byte("ping"))
}
}
......@@ -143,10 +155,23 @@ func TestSwarm(t *testing.T) {
}
cancel()
<-time.After(50 * time.Millisecond)
<-time.After(50 * time.Microsecond)
}
for _, s := range swarms {
s.Close()
}
}
func TestSwarm(t *testing.T) {
addrs := []string{
"/ip4/127.0.0.1/tcp/1234",
"/ip4/127.0.0.1/tcp/1235",
"/ip4/127.0.0.1/tcp/1236",
"/ip4/127.0.0.1/tcp/1237",
"/ip4/127.0.0.1/tcp/1238",
}
SubtestSwarm(t, addrs, 1000)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment