ext_test.go 8.34 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
5
	"math/rand"
Jeromy's avatar
Jeromy committed
6
	"testing"
7
	"time"
Jeromy's avatar
Jeromy committed
8

9 10 11
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/routing"
12
	opts "github.com/libp2p/go-libp2p-kad-dht/opts"
13

George Antoniadis's avatar
George Antoniadis committed
14
	ggio "github.com/gogo/protobuf/io"
15
	u "github.com/ipfs/go-ipfs-util"
16
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
17 18
	record "github.com/libp2p/go-libp2p-record"
	mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
Jeromy's avatar
Jeromy committed
19 20
)

21
func TestGetFailures(t *testing.T) {
22 23 24
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25

26
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
27
	mn, err := mocknet.FullMeshConnected(ctx, 2)
28 29 30
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31
	hosts := mn.Hosts()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32

33
	os := []opts.Option{opts.DisableAutoRefresh()}
34
	d, err := New(ctx, hosts[0], os...)
35 36 37
	if err != nil {
		t.Fatal(err)
	}
38
	d.Update(ctx, hosts[1].ID())
39

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
40
	// Reply with failures to every message
41
	hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
Łukasz Magiera's avatar
Łukasz Magiera committed
42
		time.Sleep(400 * time.Millisecond)
43
		s.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
44 45
	})

46
	// This one should time out
Jeromy's avatar
Jeromy committed
47 48
	ctx1, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()
49
	if _, err := d.GetValue(ctx1, "test"); err != nil {
50 51 52 53
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
54
		if err != context.DeadlineExceeded {
55
			t.Fatal("Got different error than we expected", err)
56 57 58
		}
	} else {
		t.Fatal("Did not get expected error!")
Jeromy's avatar
Jeromy committed
59 60
	}

61
	t.Log("Timeout test passed.")
62

63
	// Reply with failures to every message
64
	hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
65 66
		defer s.Close()

67
		pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
68 69
		pbw := ggio.NewDelimitedWriter(s)

70
		pmes := new(pb.Message)
71 72
		if err := pbr.ReadMsg(pmes); err != nil {
			panic(err)
73 74
		}

75
		resp := &pb.Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
76
			Type: pmes.Type,
77
		}
78 79 80
		if err := pbw.WriteMsg(resp); err != nil {
			panic(err)
		}
81 82
	})

83 84 85 86 87
	// This one should fail with NotFound.
	// long context timeout to ensure we dont end too early.
	// the dht should be exhausting its query and returning not found.
	// (was 3 seconds before which should be _plenty_ of time, but maybe
	// travis machines really have a hard time...)
Jeromy's avatar
Jeromy committed
88 89
	ctx2, cancel := context.WithTimeout(context.Background(), 20*time.Second)
	defer cancel()
90
	_, err = d.GetValue(ctx2, "test")
91
	if err != nil {
92 93 94
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}
95
		if err != routing.ErrNotFound {
96
			t.Fatalf("Expected ErrNotFound, got: %s", err)
97 98 99 100
		}
	} else {
		t.Fatal("expected error, got none.")
	}
101

102 103
	t.Log("ErrNotFound check passed!")

104
	// Now we test this DHT's handleGetValue failure
105 106 107
	{
		typ := pb.Message_GET_VALUE
		str := "hello"
Jeromy's avatar
Jeromy committed
108

109
		rec := record.MakePutRecord(str, []byte("blah"))
110
		req := pb.Message{
111 112
			Type:   typ,
			Key:    []byte(str),
113 114
			Record: rec,
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
115

116
		s, err := hosts[1].NewStream(context.Background(), hosts[0].ID(), d.protocols[0])
117 118 119 120
		if err != nil {
			t.Fatal(err)
		}
		defer s.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
121

122
		pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
123
		pbw := ggio.NewDelimitedWriter(s)
124

125 126 127
		if err := pbw.WriteMsg(&req); err != nil {
			t.Fatal(err)
		}
128

129 130 131 132 133 134 135 136 137 138 139
		pmes := new(pb.Message)
		if err := pbr.ReadMsg(pmes); err != nil {
			t.Fatal(err)
		}
		if pmes.GetRecord() != nil {
			t.Fatal("shouldnt have value")
		}
		if pmes.GetProviderPeers() != nil {
			t.Fatal("shouldnt have provider peers")
		}
	}
Jeromy's avatar
Jeromy committed
140
}
Jeromy's avatar
Jeromy committed
141 142

func TestNotFound(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
143
	// t.Skip("skipping test to debug another")
144 145 146
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
147

148
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
149
	mn, err := mocknet.FullMeshConnected(ctx, 16)
150 151 152
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
153
	hosts := mn.Hosts()
154

155
	os := []opts.Option{opts.DisableAutoRefresh()}
156
	d, err := New(ctx, hosts[0], os...)
157 158 159
	if err != nil {
		t.Fatal(err)
	}
Jeromy's avatar
Jeromy committed
160

161 162
	for _, p := range hosts {
		d.Update(ctx, p.ID())
Jeromy's avatar
Jeromy committed
163 164 165
	}

	// Reply with random peers to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
166 167
	for _, host := range hosts {
		host := host // shadow loop var
168
		host.SetStreamHandler(d.protocols[0], func(s network.Stream) {
169
			defer s.Close()
Jeromy's avatar
Jeromy committed
170

171
			pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
172
			pbw := ggio.NewDelimitedWriter(s)
Jeromy's avatar
Jeromy committed
173

174 175 176
			pmes := new(pb.Message)
			if err := pbr.ReadMsg(pmes); err != nil {
				panic(err)
Jeromy's avatar
Jeromy committed
177 178
			}

179 180 181 182
			switch pmes.GetType() {
			case pb.Message_GET_VALUE:
				resp := &pb.Message{Type: pmes.Type}

183
				ps := []peer.AddrInfo{}
184
				for i := 0; i < 7; i++ {
185
					p := hosts[rand.Intn(len(hosts))].ID()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
186
					pi := host.Peerstore().PeerInfo(p)
187
					ps = append(ps, pi)
188 189
				}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
190
				resp.CloserPeers = pb.PeerInfosToPBPeers(d.host.Network(), ps)
191 192 193 194 195 196 197 198
				if err := pbw.WriteMsg(resp); err != nil {
					panic(err)
				}
			default:
				panic("Shouldnt recieve this.")
			}
		})
	}
Jeromy's avatar
Jeromy committed
199

200
	// long timeout to ensure timing is not at play.
rht's avatar
rht committed
201 202
	ctx, cancel := context.WithTimeout(ctx, time.Second*20)
	defer cancel()
203
	v, err := d.GetValue(ctx, "hello")
Matt Joiner's avatar
Matt Joiner committed
204
	logger.Debugf("get value got %v", v)
Jeromy's avatar
Jeromy committed
205
	if err != nil {
206 207 208
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}
Jeromy's avatar
Jeromy committed
209
		switch err {
210
		case routing.ErrNotFound:
Jeromy's avatar
Jeromy committed
211 212 213 214 215 216 217 218 219 220
			//Success!
			return
		case u.ErrTimeout:
			t.Fatal("Should not have gotten timeout!")
		default:
			t.Fatalf("Got unexpected error: %s", err)
		}
	}
	t.Fatal("Expected to recieve an error.")
}
221 222 223 224

// If less than K nodes are in the entire network, it should fail when we make
// a GET rpc and nobody has the value
func TestLessThanKResponses(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
225
	// t.Skip("skipping test to debug another")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
226 227
	// t.Skip("skipping test because it makes a lot of output")

228
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
229
	mn, err := mocknet.FullMeshConnected(ctx, 6)
230 231 232
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
233
	hosts := mn.Hosts()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
234

235
	os := []opts.Option{opts.DisableAutoRefresh()}
236
	d, err := New(ctx, hosts[0], os...)
237 238 239
	if err != nil {
		t.Fatal(err)
	}
240

241
	for i := 1; i < 5; i++ {
242
		d.Update(ctx, hosts[i].ID())
243 244 245
	}

	// Reply with random peers to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
246 247
	for _, host := range hosts {
		host := host // shadow loop var
248
		host.SetStreamHandler(d.protocols[0], func(s network.Stream) {
249 250
			defer s.Close()

251
			pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
252
			pbw := ggio.NewDelimitedWriter(s)
253

254 255 256
			pmes := new(pb.Message)
			if err := pbr.ReadMsg(pmes); err != nil {
				panic(err)
257 258
			}

259 260
			switch pmes.GetType() {
			case pb.Message_GET_VALUE:
261
				pi := host.Peerstore().PeerInfo(hosts[1].ID())
262 263
				resp := &pb.Message{
					Type:        pmes.Type,
264
					CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []peer.AddrInfo{pi}),
265 266 267 268 269 270 271
				}

				if err := pbw.WriteMsg(resp); err != nil {
					panic(err)
				}
			default:
				panic("Shouldnt recieve this.")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
272
			}
273

274 275
		})
	}
276

rht's avatar
rht committed
277 278
	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()
279
	if _, err := d.GetValue(ctx, "hello"); err != nil {
280
		switch err {
281
		case routing.ErrNotFound:
282 283 284 285 286 287 288 289 290 291
			//Success!
			return
		case u.ErrTimeout:
			t.Fatal("Should not have gotten timeout!")
		default:
			t.Fatalf("Got unexpected error: %s", err)
		}
	}
	t.Fatal("Expected to recieve an error.")
}
292 293 294 295 296 297 298 299 300 301 302 303 304

// Test multiple queries against a node that closes its stream after every query.
func TestMultipleQueries(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}

	ctx := context.Background()
	mn, err := mocknet.FullMeshConnected(ctx, 2)
	if err != nil {
		t.Fatal(err)
	}
	hosts := mn.Hosts()
305
	os := []opts.Option{opts.DisableAutoRefresh()}
306
	d, err := New(ctx, hosts[0], os...)
307 308 309
	if err != nil {
		t.Fatal(err)
	}
310 311 312 313 314

	d.Update(ctx, hosts[1].ID())

	// It would be nice to be able to just get a value and succeed but then
	// we'd need to deal with selectors and validators...
315
	hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
316 317
		defer s.Close()

318
		pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
319 320 321 322 323 324 325 326 327 328 329 330
		pbw := ggio.NewDelimitedWriter(s)

		pmes := new(pb.Message)
		if err := pbr.ReadMsg(pmes); err != nil {
			panic(err)
		}

		switch pmes.GetType() {
		case pb.Message_GET_VALUE:
			pi := hosts[1].Peerstore().PeerInfo(hosts[0].ID())
			resp := &pb.Message{
				Type:        pmes.Type,
331
				CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []peer.AddrInfo{pi}),
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
			}

			if err := pbw.WriteMsg(resp); err != nil {
				panic(err)
			}
		default:
			panic("Shouldnt recieve this.")
		}
	})

	// long timeout to ensure timing is not at play.
	ctx, cancel := context.WithTimeout(ctx, time.Second*20)
	defer cancel()
	for i := 0; i < 10; i++ {
		if _, err := d.GetValue(ctx, "hello"); err != nil {
			switch err {
			case routing.ErrNotFound:
				//Success!
				continue
			case u.ErrTimeout:
				t.Fatal("Should not have gotten timeout!")
			default:
				t.Fatalf("Got unexpected error: %s", err)
			}
		}
		t.Fatal("Expected to recieve an error.")
	}
}