swarm_notif_test.go 4.87 KB
Newer Older
Steven Allen's avatar
Steven Allen committed
1
package swarm_test
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2 3

import (
4
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5 6 7
	"testing"
	"time"

8 9 10
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"

Jeromy's avatar
Jeromy committed
11
	ma "github.com/multiformats/go-multiaddr"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12

Steven Allen's avatar
Steven Allen committed
13 14
	. "github.com/libp2p/go-libp2p-swarm"
)
15

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16
func TestNotifications(t *testing.T) {
Steven Allen's avatar
Steven Allen committed
17 18
	const swarmSize = 5

19 20
	notifiees := make([]*netNotifiee, swarmSize)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21
	ctx := context.Background()
Steven Allen's avatar
Steven Allen committed
22
	swarms := makeSwarms(ctx, t, swarmSize)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23
	defer func() {
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
		for i, s := range swarms {
			select {
			case <-notifiees[i].listenClose:
				t.Error("should not have been closed")
			default:
			}
			err := s.Close()
			if err != nil {
				t.Error(err)
			}
			select {
			case <-notifiees[i].listenClose:
			default:
				t.Error("expected a listen close notification")
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
39 40 41 42 43 44 45
		}
	}()

	timeout := 5 * time.Second

	// signup notifs
	for i, swarm := range swarms {
Steven Allen's avatar
Steven Allen committed
46
		n := newNetNotifiee(swarmSize)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47 48 49 50 51 52 53 54 55 56 57 58
		swarm.Notify(n)
		notifiees[i] = n
	}

	connectSwarms(t, ctx, swarms)

	<-time.After(time.Millisecond)
	// should've gotten 5 by now.

	// test everyone got the correct connection opened calls
	for i, s := range swarms {
		n := notifiees[i]
59
		notifs := make(map[peer.ID][]network.Conn)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
60 61 62 63 64 65
		for j, s2 := range swarms {
			if i == j {
				continue
			}

			// this feels a little sketchy, but its probably okay
Steven Allen's avatar
Steven Allen committed
66
			for len(s.ConnsToPeer(s2.LocalPeer())) != len(notifs[s2.LocalPeer()]) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67 68 69 70 71 72 73 74 75 76 77
				select {
				case c := <-n.connected:
					nfp := notifs[c.RemotePeer()]
					notifs[c.RemotePeer()] = append(nfp, c)
				case <-time.After(timeout):
					t.Fatal("timeout")
				}
			}
		}

		for p, cons := range notifs {
Steven Allen's avatar
Steven Allen committed
78
			expect := s.ConnsToPeer(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
			if len(expect) != len(cons) {
				t.Fatal("got different number of connections")
			}

			for _, c := range cons {
				var found bool
				for _, c2 := range expect {
					if c == c2 {
						found = true
						break
					}
				}

				if !found {
					t.Fatal("connection not found!")
				}
			}
		}
	}

99
	complement := func(c network.Conn) (*Swarm, *netNotifiee, *Conn) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
100
		for i, s := range swarms {
Steven Allen's avatar
Steven Allen committed
101
			for _, c2 := range s.Conns() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
102 103
				if c.LocalMultiaddr().Equal(c2.RemoteMultiaddr()) &&
					c2.LocalMultiaddr().Equal(c.RemoteMultiaddr()) {
Steven Allen's avatar
Steven Allen committed
104
					return s, notifiees[i], c2.(*Conn)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
105 106 107 108 109 110 111
				}
			}
		}
		t.Fatal("complementary conn not found", c)
		return nil, nil, nil
	}

112 113
	testOCStream := func(n *netNotifiee, s network.Stream) {
		var s2 network.Stream
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
114 115 116 117 118 119
		select {
		case s2 = <-n.openedStream:
			t.Log("got notif for opened stream")
		case <-time.After(timeout):
			t.Fatal("timeout")
		}
Steven Allen's avatar
Steven Allen committed
120
		if s != s2 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
121 122 123 124 125 126 127 128 129
			t.Fatal("got incorrect stream", s.Conn(), s2.Conn())
		}

		select {
		case s2 = <-n.closedStream:
			t.Log("got notif for closed stream")
		case <-time.After(timeout):
			t.Fatal("timeout")
		}
Steven Allen's avatar
Steven Allen committed
130
		if s != s2 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
131 132 133 134
			t.Fatal("got incorrect stream", s.Conn(), s2.Conn())
		}
	}

135
	streams := make(chan network.Stream)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
136
	for _, s := range swarms {
137
		s.SetStreamHandler(func(s network.Stream) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
138
			streams <- s
Steven Allen's avatar
Steven Allen committed
139
			s.Reset()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
140 141 142 143 144
		})
	}

	// open a streams in each conn
	for i, s := range swarms {
Steven Allen's avatar
Steven Allen committed
145
		for _, c := range s.Conns() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
146 147
			_, n2, _ := complement(c)

148
			st1, err := c.NewStream(context.Background())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
149 150 151 152
			if err != nil {
				t.Error(err)
			} else {
				st1.Write([]byte("hello"))
Steven Allen's avatar
Steven Allen committed
153
				st1.Reset()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
154 155 156 157 158 159 160 161 162 163
				testOCStream(notifiees[i], st1)
				st2 := <-streams
				testOCStream(n2, st2)
			}
		}
	}

	// close conns
	for i, s := range swarms {
		n := notifiees[i]
Steven Allen's avatar
Steven Allen committed
164
		for _, c := range s.Conns() {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165 166 167 168
			_, n2, c2 := complement(c)
			c.Close()
			c2.Close()

169
			var c3, c4 network.Conn
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
			select {
			case c3 = <-n.disconnected:
			case <-time.After(timeout):
				t.Fatal("timeout")
			}
			if c != c3 {
				t.Fatal("got incorrect conn", c, c3)
			}

			select {
			case c4 = <-n2.disconnected:
			case <-time.After(timeout):
				t.Fatal("timeout")
			}
			if c2 != c4 {
				t.Fatal("got incorrect conn", c, c2)
			}
		}
	}
}

type netNotifiee struct {
	listen       chan ma.Multiaddr
	listenClose  chan ma.Multiaddr
194 195 196 197
	connected    chan network.Conn
	disconnected chan network.Conn
	openedStream chan network.Stream
	closedStream chan network.Stream
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
198 199
}

Steven Allen's avatar
Steven Allen committed
200
func newNetNotifiee(buffer int) *netNotifiee {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
201
	return &netNotifiee{
Steven Allen's avatar
Steven Allen committed
202 203
		listen:       make(chan ma.Multiaddr, buffer),
		listenClose:  make(chan ma.Multiaddr, buffer),
204 205 206 207
		connected:    make(chan network.Conn, buffer),
		disconnected: make(chan network.Conn, buffer),
		openedStream: make(chan network.Stream, buffer),
		closedStream: make(chan network.Stream, buffer),
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
208 209 210
	}
}

211
func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
212 213
	nn.listen <- a
}
214
func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
215 216
	nn.listenClose <- a
}
217
func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
218 219
	nn.connected <- v
}
220
func (nn *netNotifiee) Disconnected(n network.Network, v network.Conn) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
221 222
	nn.disconnected <- v
}
223
func (nn *netNotifiee) OpenedStream(n network.Network, v network.Stream) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
224 225
	nn.openedStream <- v
}
226
func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
227 228
	nn.closedStream <- v
}