swarm_test.go 3.35 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
package swarm

import (
	"fmt"
	"net"
	"testing"

	msg "github.com/jbenet/go-ipfs/net/message"
	peer "github.com/jbenet/go-ipfs/peer"
	u "github.com/jbenet/go-ipfs/util"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
	msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
	mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
)

func pingListen(t *testing.T, listener *net.TCPListener, peer *peer.Peer) {
	for {
		c, err := listener.Accept()
		if err == nil {
			go pong(t, c, peer)
		}
	}
}

func pong(t *testing.T, c net.Conn, peer *peer.Peer) {
	mrw := msgio.NewReadWriter(c)
	for {
		data := make([]byte, 1024)
		n, err := mrw.ReadMsg(data)
		if err != nil {
			fmt.Printf("error %v\n", err)
			return
		}
		d := string(data[:n])
		if d != "ping" {
			t.Errorf("error: didn't receive ping: '%v'\n", d)
			return
		}
		err = mrw.WriteMsg([]byte("pong"))
		if err != nil {
			fmt.Printf("error %v\n", err)
			return
		}
	}
}

func setupPeer(id string, addr string) (*peer.Peer, error) {
	tcp, err := ma.NewMultiaddr(addr)
	if err != nil {
		return nil, err
	}

	mh, err := mh.FromHexString(id)
	if err != nil {
		return nil, err
	}

	p := &peer.Peer{ID: peer.ID(mh)}
	p.AddAddress(tcp)
	return p, nil
}

func TestSwarm(t *testing.T) {

	local, err := setupPeer("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a30",
		"/ip4/127.0.0.1/tcp/1234")
	if err != nil {
		t.Fatal("error setting up peer", err)
	}

	swarm, err := NewSwarm(context.Background(), local)
	if err != nil {
		t.Error(err)
	}
	var peers []*peer.Peer
	var listeners []net.Listener
	peerNames := map[string]string{
		"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31": "/ip4/127.0.0.1/tcp/2345",
		"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32": "/ip4/127.0.0.1/tcp/3456",
		"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33": "/ip4/127.0.0.1/tcp/4567",
83
		"11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a34": "/ip4/127.0.0.1/tcp/5678",
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
	}

	for k, n := range peerNames {
		peer, err := setupPeer(k, n)
		if err != nil {
			t.Fatal("error setting up peer", err)
		}
		a := peer.NetAddress("tcp")
		if a == nil {
			t.Fatal("error setting up peer (addr is nil)", peer)
		}
		n, h, err := a.DialArgs()
		if err != nil {
			t.Fatal("error getting dial args from addr")
		}
		listener, err := net.Listen(n, h)
		if err != nil {
			t.Fatal("error setting up listener", err)
		}
		go pingListen(t, listener.(*net.TCPListener), peer)

		_, err = swarm.Dial(peer)
		if err != nil {
			t.Fatal("error swarm dialing to peer", err)
		}

		// ok done, add it.
		peers = append(peers, peer)
		listeners = append(listeners, listener)
	}

	MsgNum := 1000
	for k := 0; k < MsgNum; k++ {
		for _, p := range peers {
118
			swarm.Outgoing <- msg.New(p, []byte("ping"))
119 120 121 122 123 124 125
		}
	}

	got := map[u.Key]int{}

	for k := 0; k < (MsgNum * len(peers)); k++ {
		msg := <-swarm.Incoming
126
		if string(msg.Data()) != "pong" {
127 128 129
			t.Error("unexpected conn output", msg.Data)
		}

130 131
		n, _ := got[msg.Peer().Key()]
		got[msg.Peer().Key()] = n + 1
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
	}

	if len(peers) != len(got) {
		t.Error("got less messages than sent")
	}

	for p, n := range got {
		if n != MsgNum {
			t.Error("peer did not get all msgs", p, n, "/", MsgNum)
		}
	}

	swarm.Close()
	for _, listener := range listeners {
		listener.(*net.TCPListener).Close()
	}
}