ext_test.go 11.3 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
5
	"math/rand"
Jeromy's avatar
Jeromy committed
6
	"testing"
7
	"time"
Jeromy's avatar
Jeromy committed
8

9 10
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
11
	"github.com/libp2p/go-libp2p-core/peerstore"
Aarsh Shah's avatar
Aarsh Shah committed
12
	"github.com/libp2p/go-libp2p-core/protocol"
13
	"github.com/libp2p/go-libp2p-core/routing"
Aarsh Shah's avatar
Aarsh Shah committed
14 15 16

	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
	record "github.com/libp2p/go-libp2p-record"
17 18
	swarmt "github.com/libp2p/go-libp2p-swarm/testing"
	bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
Aarsh Shah's avatar
Aarsh Shah committed
19
	mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
20

George Antoniadis's avatar
George Antoniadis committed
21
	ggio "github.com/gogo/protobuf/io"
22
	u "github.com/ipfs/go-ipfs-util"
Jeromy's avatar
Jeromy committed
23 24
)

25 26 27
// Test that one hung request to a peer doesn't prevent another request
// using that same peer from obeying its context.
func TestHungRequest(t *testing.T) {
28 29 30
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

31
	mn, err := mocknet.FullMeshLinked(ctx, 2)
32 33 34 35 36
	if err != nil {
		t.Fatal(err)
	}
	hosts := mn.Hosts()

37
	os := []Option{testPrefix, DisableAutoRefresh(), Mode(ModeServer)}
38 39 40 41
	d, err := New(ctx, hosts[0], os...)
	if err != nil {
		t.Fatal(err)
	}
42 43 44 45 46 47 48
	for _, proto := range d.serverProtocols {
		// Hang on every request.
		hosts[1].SetStreamHandler(proto, func(s network.Stream) {
			defer s.Reset() //nolint
			<-ctx.Done()
		})
	}
Aarsh Shah's avatar
Aarsh Shah committed
49

50 51 52 53 54
	err = mn.ConnectAllButSelf()
	if err != nil {
		t.Fatal("failed to connect peers", err)
	}

Steven Allen's avatar
Steven Allen committed
55
	// Wait at a bit for a peer in our routing table.
56 57 58
	for i := 0; i < 100 && d.routingTable.Size() == 0; i++ {
		time.Sleep(10 * time.Millisecond)
	}
59 60 61
	if d.routingTable.Size() == 0 {
		t.Fatal("failed to fill routing table")
	}
62 63 64 65

	ctx1, cancel1 := context.WithTimeout(ctx, 1*time.Second)
	defer cancel1()

66 67 68 69 70
	done := make(chan error, 1)
	go func() {
		_, err := d.GetClosestPeers(ctx1, testCaseCids[0].KeyString())
		done <- err
	}()
71 72 73 74

	time.Sleep(100 * time.Millisecond)
	ctx2, cancel2 := context.WithTimeout(ctx, 100*time.Millisecond)
	defer cancel2()
75 76
	err = d.Provide(ctx2, testCaseCids[0], true)
	if err != context.DeadlineExceeded {
77 78 79
		t.Errorf("expected to fail with deadline exceeded, got: %s", ctx2.Err())
	}
	select {
80 81
	case err = <-done:
		t.Error("GetClosestPeers should not have returned yet", err)
82
	default:
83 84 85 86
		err = <-done
		if err != context.DeadlineExceeded {
			t.Errorf("expected the deadline to be exceeded, got %s", err)
		}
87 88
	}

89 90 91 92
	if d.routingTable.Size() == 0 {
		// make sure we didn't just disconnect
		t.Fatal("expected peers in the routing table")
	}
93 94
}

95
func TestGetFailures(t *testing.T) {
96 97 98
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
99

100
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
101

102 103 104
	host1 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))
	host2 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))

Adin Schmahmann's avatar
Adin Schmahmann committed
105
	d, err := New(ctx, host1, testPrefix, DisableAutoRefresh(), Mode(ModeServer))
106 107 108
	if err != nil {
		t.Fatal(err)
	}
109

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
110
	// Reply with failures to every message
111 112 113 114 115 116
	for _, proto := range d.serverProtocols {
		host2.SetStreamHandler(proto, func(s network.Stream) {
			time.Sleep(400 * time.Millisecond)
			s.Close()
		})
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117

118 119 120 121 122 123 124
	host1.Peerstore().AddAddrs(host2.ID(), host2.Addrs(), peerstore.ConnectedAddrTTL)
	_, err = host1.Network().DialPeer(ctx, host2.ID())
	if err != nil {
		t.Fatal(err)
	}
	time.Sleep(1 * time.Second)

125
	// This one should time out
Jeromy's avatar
Jeromy committed
126 127
	ctx1, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()
128
	if _, err := d.GetValue(ctx1, "test"); err != nil {
129 130 131 132
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
133
		if err != context.DeadlineExceeded {
134
			t.Fatal("Got different error than we expected", err)
135 136 137
		}
	} else {
		t.Fatal("Did not get expected error!")
Jeromy's avatar
Jeromy committed
138 139
	}

140
	t.Log("Timeout test passed.")
141

142 143 144 145
	for _, proto := range d.serverProtocols {
		// Reply with failures to every message
		host2.SetStreamHandler(proto, func(s network.Stream) {
			defer s.Close()
146

147 148
			pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
			pbw := ggio.NewDelimitedWriter(s)
149

150 151 152 153 154
			pmes := new(pb.Message)
			if err := pbr.ReadMsg(pmes); err != nil {
				// user gave up
				return
			}
155

156 157 158 159 160 161
			resp := &pb.Message{
				Type: pmes.Type,
			}
			_ = pbw.WriteMsg(resp)
		})
	}
162

163 164 165 166 167
	// This one should fail with NotFound.
	// long context timeout to ensure we dont end too early.
	// the dht should be exhausting its query and returning not found.
	// (was 3 seconds before which should be _plenty_ of time, but maybe
	// travis machines really have a hard time...)
Jeromy's avatar
Jeromy committed
168 169
	ctx2, cancel := context.WithTimeout(context.Background(), 20*time.Second)
	defer cancel()
170
	_, err = d.GetValue(ctx2, "test")
171
	if err != nil {
172 173 174
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}
175
		if err != routing.ErrNotFound {
176
			t.Fatalf("Expected ErrNotFound, got: %s", err)
177 178 179 180
		}
	} else {
		t.Fatal("expected error, got none.")
	}
181

182 183
	t.Log("ErrNotFound check passed!")

184
	// Now we test this DHT's handleGetValue failure
185 186 187
	{
		typ := pb.Message_GET_VALUE
		str := "hello"
Jeromy's avatar
Jeromy committed
188

189
		rec := record.MakePutRecord(str, []byte("blah"))
190
		req := pb.Message{
191 192
			Type:   typ,
			Key:    []byte(str),
193 194
			Record: rec,
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
195

196
		s, err := host2.NewStream(context.Background(), host1.ID(), d.protocols...)
197 198 199 200
		if err != nil {
			t.Fatal(err)
		}
		defer s.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
201

202
		pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
203
		pbw := ggio.NewDelimitedWriter(s)
204

205 206 207
		if err := pbw.WriteMsg(&req); err != nil {
			t.Fatal(err)
		}
208

209 210 211 212 213 214 215 216 217 218 219
		pmes := new(pb.Message)
		if err := pbr.ReadMsg(pmes); err != nil {
			t.Fatal(err)
		}
		if pmes.GetRecord() != nil {
			t.Fatal("shouldnt have value")
		}
		if pmes.GetProviderPeers() != nil {
			t.Fatal("shouldnt have provider peers")
		}
	}
220 221 222 223 224

	if d.routingTable.Size() == 0 {
		// make sure we didn't just disconnect
		t.Fatal("expected peers in the routing table")
	}
Jeromy's avatar
Jeromy committed
225
}
Jeromy's avatar
Jeromy committed
226 227

func TestNotFound(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
228
	// t.Skip("skipping test to debug another")
229 230 231
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
232

233
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
234
	mn, err := mocknet.FullMeshConnected(ctx, 16)
235 236 237
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
238
	hosts := mn.Hosts()
239

240
	os := []Option{testPrefix, DisableAutoRefresh(), Mode(ModeServer)}
241
	d, err := New(ctx, hosts[0], os...)
242 243 244
	if err != nil {
		t.Fatal(err)
	}
Jeromy's avatar
Jeromy committed
245 246

	// Reply with random peers to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
247 248
	for _, host := range hosts {
		host := host // shadow loop var
249 250 251
		for _, proto := range d.serverProtocols {
			host.SetStreamHandler(proto, func(s network.Stream) {
				defer s.Close()
Jeromy's avatar
Jeromy committed
252

253 254
				pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
				pbw := ggio.NewDelimitedWriter(s)
255

256 257
				pmes := new(pb.Message)
				if err := pbr.ReadMsg(pmes); err != nil {
Steven Allen's avatar
Steven Allen committed
258 259
					// this isn't an error, it just means the stream has died.
					return
260 261
				}

262 263 264 265 266 267 268 269 270 271 272 273 274
				switch pmes.GetType() {
				case pb.Message_GET_VALUE:
					resp := &pb.Message{Type: pmes.Type}

					ps := []peer.AddrInfo{}
					for i := 0; i < 7; i++ {
						p := hosts[rand.Intn(len(hosts))].ID()
						pi := host.Peerstore().PeerInfo(p)
						ps = append(ps, pi)
					}

					resp.CloserPeers = pb.PeerInfosToPBPeers(d.host.Network(), ps)
					if err := pbw.WriteMsg(resp); err != nil {
Steven Allen's avatar
Steven Allen committed
275
						return
276 277 278
					}
				default:
					panic("Shouldnt recieve this.")
279
				}
280 281
			})
		}
282 283 284 285 286 287 288 289 290 291
		for _, peer := range hosts {
			if host == peer {
				continue
			}
			_ = peer.Peerstore().AddProtocols(host.ID(), protocol.ConvertToStrings(d.serverProtocols)...)
		}
	}

	for _, p := range hosts {
		d.peerFound(ctx, p.ID(), true)
292
	}
Jeromy's avatar
Jeromy committed
293

294
	// long timeout to ensure timing is not at play.
rht's avatar
rht committed
295 296
	ctx, cancel := context.WithTimeout(ctx, time.Second*20)
	defer cancel()
297
	v, err := d.GetValue(ctx, "hello")
Matt Joiner's avatar
Matt Joiner committed
298
	logger.Debugf("get value got %v", v)
Jeromy's avatar
Jeromy committed
299
	if err != nil {
300 301 302
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}
Jeromy's avatar
Jeromy committed
303
		switch err {
304
		case routing.ErrNotFound:
305 306 307 308
			if d.routingTable.Size() == 0 {
				// make sure we didn't just disconnect
				t.Fatal("expected peers in the routing table")
			}
Jeromy's avatar
Jeromy committed
309 310 311 312 313 314 315 316 317 318
			//Success!
			return
		case u.ErrTimeout:
			t.Fatal("Should not have gotten timeout!")
		default:
			t.Fatalf("Got unexpected error: %s", err)
		}
	}
	t.Fatal("Expected to recieve an error.")
}
319 320 321 322

// If less than K nodes are in the entire network, it should fail when we make
// a GET rpc and nobody has the value
func TestLessThanKResponses(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
323
	// t.Skip("skipping test to debug another")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
324 325
	// t.Skip("skipping test because it makes a lot of output")

326
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
327
	mn, err := mocknet.FullMeshConnected(ctx, 6)
328 329 330
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
331
	hosts := mn.Hosts()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
332

333
	os := []Option{testPrefix, DisableAutoRefresh(), Mode(ModeServer)}
334
	d, err := New(ctx, hosts[0], os...)
335 336 337
	if err != nil {
		t.Fatal(err)
	}
338

339
	for i := 1; i < 5; i++ {
Aarsh Shah's avatar
Aarsh Shah committed
340
		d.peerFound(ctx, hosts[i].ID(), true)
341 342 343
	}

	// Reply with random peers to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
344 345
	for _, host := range hosts {
		host := host // shadow loop var
346 347 348
		for _, proto := range d.serverProtocols {
			host.SetStreamHandler(proto, func(s network.Stream) {
				defer s.Close()
349

350 351
				pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
				pbw := ggio.NewDelimitedWriter(s)
352

353 354 355
				pmes := new(pb.Message)
				if err := pbr.ReadMsg(pmes); err != nil {
					panic(err)
356 357
				}

358 359 360 361 362 363 364 365 366 367 368 369 370
				switch pmes.GetType() {
				case pb.Message_GET_VALUE:
					pi := host.Peerstore().PeerInfo(hosts[1].ID())
					resp := &pb.Message{
						Type:        pmes.Type,
						CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []peer.AddrInfo{pi}),
					}

					if err := pbw.WriteMsg(resp); err != nil {
						panic(err)
					}
				default:
					panic("Shouldnt recieve this.")
371
				}
372

373 374
			})
		}
375
	}
376

rht's avatar
rht committed
377 378
	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()
379
	if _, err := d.GetValue(ctx, "hello"); err != nil {
380
		switch err {
381
		case routing.ErrNotFound:
382 383 384 385 386 387 388 389 390 391
			//Success!
			return
		case u.ErrTimeout:
			t.Fatal("Should not have gotten timeout!")
		default:
			t.Fatalf("Got unexpected error: %s", err)
		}
	}
	t.Fatal("Expected to recieve an error.")
}
392 393 394 395 396 397 398 399 400 401 402 403 404

// Test multiple queries against a node that closes its stream after every query.
func TestMultipleQueries(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}

	ctx := context.Background()
	mn, err := mocknet.FullMeshConnected(ctx, 2)
	if err != nil {
		t.Fatal(err)
	}
	hosts := mn.Hosts()
405
	os := []Option{testPrefix, DisableAutoRefresh(), Mode(ModeServer)}
406
	d, err := New(ctx, hosts[0], os...)
407 408 409
	if err != nil {
		t.Fatal(err)
	}
410

Aarsh Shah's avatar
Aarsh Shah committed
411
	d.peerFound(ctx, hosts[1].ID(), true)
412

413 414 415 416 417
	for _, proto := range d.serverProtocols {
		// It would be nice to be able to just get a value and succeed but then
		// we'd need to deal with selectors and validators...
		hosts[1].SetStreamHandler(proto, func(s network.Stream) {
			defer s.Close()
418

419 420
			pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
			pbw := ggio.NewDelimitedWriter(s)
421

422 423 424
			pmes := new(pb.Message)
			if err := pbr.ReadMsg(pmes); err != nil {
				panic(err)
425 426
			}

427 428 429 430 431 432 433 434 435 436 437 438 439
			switch pmes.GetType() {
			case pb.Message_GET_VALUE:
				pi := hosts[1].Peerstore().PeerInfo(hosts[0].ID())
				resp := &pb.Message{
					Type:        pmes.Type,
					CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []peer.AddrInfo{pi}),
				}

				if err := pbw.WriteMsg(resp); err != nil {
					panic(err)
				}
			default:
				panic("Shouldnt recieve this.")
440
			}
441 442
		})
	}
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461

	// long timeout to ensure timing is not at play.
	ctx, cancel := context.WithTimeout(ctx, time.Second*20)
	defer cancel()
	for i := 0; i < 10; i++ {
		if _, err := d.GetValue(ctx, "hello"); err != nil {
			switch err {
			case routing.ErrNotFound:
				//Success!
				continue
			case u.ErrTimeout:
				t.Fatal("Should not have gotten timeout!")
			default:
				t.Fatalf("Got unexpected error: %s", err)
			}
		}
		t.Fatal("Expected to recieve an error.")
	}
}