ext_test.go 9.83 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
5
	"math/rand"
Jeromy's avatar
Jeromy committed
6
	"testing"
7
	"time"
Jeromy's avatar
Jeromy committed
8

9 10
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
11
	"github.com/libp2p/go-libp2p-core/peerstore"
12
	"github.com/libp2p/go-libp2p-core/routing"
13 14
	swarmt "github.com/libp2p/go-libp2p-swarm/testing"
	bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
15

George Antoniadis's avatar
George Antoniadis committed
16
	ggio "github.com/gogo/protobuf/io"
17
	u "github.com/ipfs/go-ipfs-util"
18
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
19 20
	record "github.com/libp2p/go-libp2p-record"
	mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
Jeromy's avatar
Jeromy committed
21 22
)

23 24 25
// Test that one hung request to a peer doesn't prevent another request
// using that same peer from obeying its context.
func TestHungRequest(t *testing.T) {
26 27 28 29 30 31 32
	ctx := context.Background()
	mn, err := mocknet.FullMeshConnected(ctx, 2)
	if err != nil {
		t.Fatal(err)
	}
	hosts := mn.Hosts()

33
	os := []Option{DisableAutoRefresh()}
34 35 36 37 38 39 40 41 42 43 44 45 46 47
	d, err := New(ctx, hosts[0], os...)
	if err != nil {
		t.Fatal(err)
	}
	// Hang on every request.
	hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
		defer s.Reset()
		<-ctx.Done()
	})
	d.Update(ctx, hosts[1].ID())

	ctx1, cancel1 := context.WithTimeout(ctx, 1*time.Second)
	defer cancel1()

48 49 50 51 52
	done := make(chan error, 1)
	go func() {
		_, err := d.GetClosestPeers(ctx1, testCaseCids[0].KeyString())
		done <- err
	}()
53 54 55 56

	time.Sleep(100 * time.Millisecond)
	ctx2, cancel2 := context.WithTimeout(ctx, 100*time.Millisecond)
	defer cancel2()
57 58
	err = d.Provide(ctx2, testCaseCids[0], true)
	if err != context.DeadlineExceeded {
59 60 61
		t.Errorf("expected to fail with deadline exceeded, got: %s", ctx2.Err())
	}
	select {
62 63
	case <-done:
		t.Errorf("GetClosestPeers should not have returned yet")
64
	default:
65 66 67 68
		err = <-done
		if err != context.DeadlineExceeded {
			t.Errorf("expected the deadline to be exceeded, got %s", err)
		}
69 70 71 72
	}

}

73
func TestGetFailures(t *testing.T) {
74 75 76
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77

78
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79

80 81 82
	host1 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))
	host2 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))

83
	d, err := New(ctx, host1, DisableAutoRefresh(), Mode(ModeServer))
84 85 86
	if err != nil {
		t.Fatal(err)
	}
87

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88
	// Reply with failures to every message
89
	host2.SetStreamHandler(d.protocols[0], func(s network.Stream) {
Łukasz Magiera's avatar
Łukasz Magiera committed
90
		time.Sleep(400 * time.Millisecond)
91
		s.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
92 93
	})

94 95 96 97 98 99 100
	host1.Peerstore().AddAddrs(host2.ID(), host2.Addrs(), peerstore.ConnectedAddrTTL)
	_, err = host1.Network().DialPeer(ctx, host2.ID())
	if err != nil {
		t.Fatal(err)
	}
	time.Sleep(1 * time.Second)

101
	// This one should time out
Jeromy's avatar
Jeromy committed
102 103
	ctx1, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()
104
	if _, err := d.GetValue(ctx1, "test"); err != nil {
105 106 107 108
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
109
		if err != context.DeadlineExceeded {
110
			t.Fatal("Got different error than we expected", err)
111 112 113
		}
	} else {
		t.Fatal("Did not get expected error!")
Jeromy's avatar
Jeromy committed
114 115
	}

116
	t.Log("Timeout test passed.")
117

118
	// Reply with failures to every message
119
	host2.SetStreamHandler(d.protocols[0], func(s network.Stream) {
120 121
		defer s.Close()

122
		pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
123 124
		pbw := ggio.NewDelimitedWriter(s)

125
		pmes := new(pb.Message)
126 127
		if err := pbr.ReadMsg(pmes); err != nil {
			panic(err)
128 129
		}

130
		resp := &pb.Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
131
			Type: pmes.Type,
132
		}
133 134 135
		if err := pbw.WriteMsg(resp); err != nil {
			panic(err)
		}
136 137
	})

138 139 140 141 142
	// This one should fail with NotFound.
	// long context timeout to ensure we dont end too early.
	// the dht should be exhausting its query and returning not found.
	// (was 3 seconds before which should be _plenty_ of time, but maybe
	// travis machines really have a hard time...)
Jeromy's avatar
Jeromy committed
143 144
	ctx2, cancel := context.WithTimeout(context.Background(), 20*time.Second)
	defer cancel()
145
	_, err = d.GetValue(ctx2, "test")
146
	if err != nil {
147 148 149
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}
150
		if err != routing.ErrNotFound {
151
			t.Fatalf("Expected ErrNotFound, got: %s", err)
152 153 154 155
		}
	} else {
		t.Fatal("expected error, got none.")
	}
156

157 158
	t.Log("ErrNotFound check passed!")

159
	// Now we test this DHT's handleGetValue failure
160 161 162
	{
		typ := pb.Message_GET_VALUE
		str := "hello"
Jeromy's avatar
Jeromy committed
163

164
		rec := record.MakePutRecord(str, []byte("blah"))
165
		req := pb.Message{
166 167
			Type:   typ,
			Key:    []byte(str),
168 169
			Record: rec,
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
170

171
		s, err := host2.NewStream(context.Background(), host1.ID(), d.protocols[0])
172 173 174 175
		if err != nil {
			t.Fatal(err)
		}
		defer s.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
176

177
		pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
178
		pbw := ggio.NewDelimitedWriter(s)
179

180 181 182
		if err := pbw.WriteMsg(&req); err != nil {
			t.Fatal(err)
		}
183

184 185 186 187 188 189 190 191 192 193 194
		pmes := new(pb.Message)
		if err := pbr.ReadMsg(pmes); err != nil {
			t.Fatal(err)
		}
		if pmes.GetRecord() != nil {
			t.Fatal("shouldnt have value")
		}
		if pmes.GetProviderPeers() != nil {
			t.Fatal("shouldnt have provider peers")
		}
	}
Jeromy's avatar
Jeromy committed
195
}
Jeromy's avatar
Jeromy committed
196 197

func TestNotFound(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
198
	// t.Skip("skipping test to debug another")
199 200 201
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202

203
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
204
	mn, err := mocknet.FullMeshConnected(ctx, 16)
205 206 207
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
208
	hosts := mn.Hosts()
209

210
	os := []Option{DisableAutoRefresh()}
211
	d, err := New(ctx, hosts[0], os...)
212 213 214
	if err != nil {
		t.Fatal(err)
	}
Jeromy's avatar
Jeromy committed
215

216 217
	for _, p := range hosts {
		d.Update(ctx, p.ID())
Jeromy's avatar
Jeromy committed
218 219 220
	}

	// Reply with random peers to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
221 222
	for _, host := range hosts {
		host := host // shadow loop var
223
		host.SetStreamHandler(d.protocols[0], func(s network.Stream) {
224
			defer s.Close()
Jeromy's avatar
Jeromy committed
225

226
			pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
227
			pbw := ggio.NewDelimitedWriter(s)
Jeromy's avatar
Jeromy committed
228

229 230 231
			pmes := new(pb.Message)
			if err := pbr.ReadMsg(pmes); err != nil {
				panic(err)
Jeromy's avatar
Jeromy committed
232 233
			}

234 235 236 237
			switch pmes.GetType() {
			case pb.Message_GET_VALUE:
				resp := &pb.Message{Type: pmes.Type}

238
				ps := []peer.AddrInfo{}
239
				for i := 0; i < 7; i++ {
240
					p := hosts[rand.Intn(len(hosts))].ID()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
241
					pi := host.Peerstore().PeerInfo(p)
242
					ps = append(ps, pi)
243 244
				}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
245
				resp.CloserPeers = pb.PeerInfosToPBPeers(d.host.Network(), ps)
246 247 248 249 250 251 252 253
				if err := pbw.WriteMsg(resp); err != nil {
					panic(err)
				}
			default:
				panic("Shouldnt recieve this.")
			}
		})
	}
Jeromy's avatar
Jeromy committed
254

255
	// long timeout to ensure timing is not at play.
rht's avatar
rht committed
256 257
	ctx, cancel := context.WithTimeout(ctx, time.Second*20)
	defer cancel()
258
	v, err := d.GetValue(ctx, "hello")
Matt Joiner's avatar
Matt Joiner committed
259
	logger.Debugf("get value got %v", v)
Jeromy's avatar
Jeromy committed
260
	if err != nil {
261 262 263
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}
Jeromy's avatar
Jeromy committed
264
		switch err {
265
		case routing.ErrNotFound:
Jeromy's avatar
Jeromy committed
266 267 268 269 270 271 272 273 274 275
			//Success!
			return
		case u.ErrTimeout:
			t.Fatal("Should not have gotten timeout!")
		default:
			t.Fatalf("Got unexpected error: %s", err)
		}
	}
	t.Fatal("Expected to recieve an error.")
}
276 277 278 279

// If less than K nodes are in the entire network, it should fail when we make
// a GET rpc and nobody has the value
func TestLessThanKResponses(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
280
	// t.Skip("skipping test to debug another")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
281 282
	// t.Skip("skipping test because it makes a lot of output")

283
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
284
	mn, err := mocknet.FullMeshConnected(ctx, 6)
285 286 287
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
288
	hosts := mn.Hosts()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
289

290
	os := []Option{DisableAutoRefresh()}
291
	d, err := New(ctx, hosts[0], os...)
292 293 294
	if err != nil {
		t.Fatal(err)
	}
295

296
	for i := 1; i < 5; i++ {
297
		d.Update(ctx, hosts[i].ID())
298 299 300
	}

	// Reply with random peers to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
301 302
	for _, host := range hosts {
		host := host // shadow loop var
303
		host.SetStreamHandler(d.protocols[0], func(s network.Stream) {
304 305
			defer s.Close()

306
			pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
307
			pbw := ggio.NewDelimitedWriter(s)
308

309 310 311
			pmes := new(pb.Message)
			if err := pbr.ReadMsg(pmes); err != nil {
				panic(err)
312 313
			}

314 315
			switch pmes.GetType() {
			case pb.Message_GET_VALUE:
316
				pi := host.Peerstore().PeerInfo(hosts[1].ID())
317 318
				resp := &pb.Message{
					Type:        pmes.Type,
319
					CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []peer.AddrInfo{pi}),
320 321 322 323 324 325 326
				}

				if err := pbw.WriteMsg(resp); err != nil {
					panic(err)
				}
			default:
				panic("Shouldnt recieve this.")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
327
			}
328

329 330
		})
	}
331

rht's avatar
rht committed
332 333
	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()
334
	if _, err := d.GetValue(ctx, "hello"); err != nil {
335
		switch err {
336
		case routing.ErrNotFound:
337 338 339 340 341 342 343 344 345 346
			//Success!
			return
		case u.ErrTimeout:
			t.Fatal("Should not have gotten timeout!")
		default:
			t.Fatalf("Got unexpected error: %s", err)
		}
	}
	t.Fatal("Expected to recieve an error.")
}
347 348 349 350 351 352 353 354 355 356 357 358 359

// Test multiple queries against a node that closes its stream after every query.
func TestMultipleQueries(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}

	ctx := context.Background()
	mn, err := mocknet.FullMeshConnected(ctx, 2)
	if err != nil {
		t.Fatal(err)
	}
	hosts := mn.Hosts()
360
	os := []Option{DisableAutoRefresh()}
361
	d, err := New(ctx, hosts[0], os...)
362 363 364
	if err != nil {
		t.Fatal(err)
	}
365 366 367 368 369

	d.Update(ctx, hosts[1].ID())

	// It would be nice to be able to just get a value and succeed but then
	// we'd need to deal with selectors and validators...
370
	hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
371 372
		defer s.Close()

373
		pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
374 375 376 377 378 379 380 381 382 383 384 385
		pbw := ggio.NewDelimitedWriter(s)

		pmes := new(pb.Message)
		if err := pbr.ReadMsg(pmes); err != nil {
			panic(err)
		}

		switch pmes.GetType() {
		case pb.Message_GET_VALUE:
			pi := hosts[1].Peerstore().PeerInfo(hosts[0].ID())
			resp := &pb.Message{
				Type:        pmes.Type,
386
				CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []peer.AddrInfo{pi}),
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
			}

			if err := pbw.WriteMsg(resp); err != nil {
				panic(err)
			}
		default:
			panic("Shouldnt recieve this.")
		}
	})

	// long timeout to ensure timing is not at play.
	ctx, cancel := context.WithTimeout(ctx, time.Second*20)
	defer cancel()
	for i := 0; i < 10; i++ {
		if _, err := d.GetValue(ctx, "hello"); err != nil {
			switch err {
			case routing.ErrNotFound:
				//Success!
				continue
			case u.ErrTimeout:
				t.Fatal("Should not have gotten timeout!")
			default:
				t.Fatalf("Got unexpected error: %s", err)
			}
		}
		t.Fatal("Expected to recieve an error.")
	}
}