ext_test.go 8.05 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
5
	"math/rand"
Jeromy's avatar
Jeromy committed
6
	"testing"
7
	"time"
Jeromy's avatar
Jeromy committed
8

George Antoniadis's avatar
George Antoniadis committed
9
	ggio "github.com/gogo/protobuf/io"
10
	u "github.com/ipfs/go-ipfs-util"
11
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
12 13
	inet "github.com/libp2p/go-libp2p-net"
	pstore "github.com/libp2p/go-libp2p-peerstore"
14 15 16
	record "github.com/libp2p/go-libp2p-record"
	routing "github.com/libp2p/go-libp2p-routing"
	mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
Jeromy's avatar
Jeromy committed
17 18
)

19
func TestGetFailures(t *testing.T) {
20 21 22
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23

24
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25
	mn, err := mocknet.FullMeshConnected(ctx, 2)
26 27 28
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29
	hosts := mn.Hosts()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30

31 32 33 34
	d, err := New(ctx, hosts[0])
	if err != nil {
		t.Fatal(err)
	}
35
	d.Update(ctx, hosts[1].ID())
36

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
37
	// Reply with failures to every message
38
	hosts[1].SetStreamHandler(d.protocols[0], func(s inet.Stream) {
Łukasz Magiera's avatar
Łukasz Magiera committed
39
		time.Sleep(400 * time.Millisecond)
40
		s.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
41 42
	})

43
	// This one should time out
Jeromy's avatar
Jeromy committed
44 45
	ctx1, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()
46
	if _, err := d.GetValue(ctx1, "test"); err != nil {
47 48 49 50
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}

Łukasz Magiera's avatar
Łukasz Magiera committed
51
		if err != context.DeadlineExceeded {
52
			t.Fatal("Got different error than we expected", err)
53 54 55
		}
	} else {
		t.Fatal("Did not get expected error!")
Jeromy's avatar
Jeromy committed
56 57
	}

58
	t.Log("Timeout test passed.")
59

60
	// Reply with failures to every message
61
	hosts[1].SetStreamHandler(d.protocols[0], func(s inet.Stream) {
62 63 64 65 66
		defer s.Close()

		pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
		pbw := ggio.NewDelimitedWriter(s)

67
		pmes := new(pb.Message)
68 69
		if err := pbr.ReadMsg(pmes); err != nil {
			panic(err)
70 71
		}

72
		resp := &pb.Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73
			Type: pmes.Type,
74
		}
75 76 77
		if err := pbw.WriteMsg(resp); err != nil {
			panic(err)
		}
78 79
	})

80 81 82 83 84
	// This one should fail with NotFound.
	// long context timeout to ensure we dont end too early.
	// the dht should be exhausting its query and returning not found.
	// (was 3 seconds before which should be _plenty_ of time, but maybe
	// travis machines really have a hard time...)
Jeromy's avatar
Jeromy committed
85 86
	ctx2, cancel := context.WithTimeout(context.Background(), 20*time.Second)
	defer cancel()
87
	_, err = d.GetValue(ctx2, "test")
88
	if err != nil {
89 90 91
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}
92
		if err != routing.ErrNotFound {
93
			t.Fatalf("Expected ErrNotFound, got: %s", err)
94 95 96 97
		}
	} else {
		t.Fatal("expected error, got none.")
	}
98

99 100
	t.Log("ErrNotFound check passed!")

101
	// Now we test this DHT's handleGetValue failure
102 103 104
	{
		typ := pb.Message_GET_VALUE
		str := "hello"
Jeromy's avatar
Jeromy committed
105

106
		rec := record.MakePutRecord(str, []byte("blah"))
107
		req := pb.Message{
108 109
			Type:   typ,
			Key:    []byte(str),
110 111
			Record: rec,
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112

113
		s, err := hosts[1].NewStream(context.Background(), hosts[0].ID(), d.protocols[0])
114 115 116 117
		if err != nil {
			t.Fatal(err)
		}
		defer s.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118

119 120
		pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
		pbw := ggio.NewDelimitedWriter(s)
121

122 123 124
		if err := pbw.WriteMsg(&req); err != nil {
			t.Fatal(err)
		}
125

126 127 128 129 130 131 132 133 134 135 136
		pmes := new(pb.Message)
		if err := pbr.ReadMsg(pmes); err != nil {
			t.Fatal(err)
		}
		if pmes.GetRecord() != nil {
			t.Fatal("shouldnt have value")
		}
		if pmes.GetProviderPeers() != nil {
			t.Fatal("shouldnt have provider peers")
		}
	}
Jeromy's avatar
Jeromy committed
137
}
Jeromy's avatar
Jeromy committed
138 139

func TestNotFound(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
140
	// t.Skip("skipping test to debug another")
141 142 143
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
144

145
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
146
	mn, err := mocknet.FullMeshConnected(ctx, 16)
147 148 149
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
150
	hosts := mn.Hosts()
151 152 153 154
	d, err := New(ctx, hosts[0])
	if err != nil {
		t.Fatal(err)
	}
Jeromy's avatar
Jeromy committed
155

156 157
	for _, p := range hosts {
		d.Update(ctx, p.ID())
Jeromy's avatar
Jeromy committed
158 159 160
	}

	// Reply with random peers to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
161 162
	for _, host := range hosts {
		host := host // shadow loop var
163
		host.SetStreamHandler(d.protocols[0], func(s inet.Stream) {
164
			defer s.Close()
Jeromy's avatar
Jeromy committed
165

166 167
			pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
			pbw := ggio.NewDelimitedWriter(s)
Jeromy's avatar
Jeromy committed
168

169 170 171
			pmes := new(pb.Message)
			if err := pbr.ReadMsg(pmes); err != nil {
				panic(err)
Jeromy's avatar
Jeromy committed
172 173
			}

174 175 176 177
			switch pmes.GetType() {
			case pb.Message_GET_VALUE:
				resp := &pb.Message{Type: pmes.Type}

Jeromy's avatar
Jeromy committed
178
				ps := []pstore.PeerInfo{}
179
				for i := 0; i < 7; i++ {
180
					p := hosts[rand.Intn(len(hosts))].ID()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
181
					pi := host.Peerstore().PeerInfo(p)
182
					ps = append(ps, pi)
183 184
				}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
185
				resp.CloserPeers = pb.PeerInfosToPBPeers(d.host.Network(), ps)
186 187 188 189 190 191 192 193
				if err := pbw.WriteMsg(resp); err != nil {
					panic(err)
				}
			default:
				panic("Shouldnt recieve this.")
			}
		})
	}
Jeromy's avatar
Jeromy committed
194

195
	// long timeout to ensure timing is not at play.
rht's avatar
rht committed
196 197
	ctx, cancel := context.WithTimeout(ctx, time.Second*20)
	defer cancel()
198
	v, err := d.GetValue(ctx, "hello")
Matt Joiner's avatar
Matt Joiner committed
199
	logger.Debugf("get value got %v", v)
Jeromy's avatar
Jeromy committed
200
	if err != nil {
201 202 203
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}
Jeromy's avatar
Jeromy committed
204
		switch err {
205
		case routing.ErrNotFound:
Jeromy's avatar
Jeromy committed
206 207 208 209 210 211 212 213 214 215
			//Success!
			return
		case u.ErrTimeout:
			t.Fatal("Should not have gotten timeout!")
		default:
			t.Fatalf("Got unexpected error: %s", err)
		}
	}
	t.Fatal("Expected to recieve an error.")
}
216 217 218 219

// If less than K nodes are in the entire network, it should fail when we make
// a GET rpc and nobody has the value
func TestLessThanKResponses(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
220
	// t.Skip("skipping test to debug another")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
221 222
	// t.Skip("skipping test because it makes a lot of output")

223
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
224
	mn, err := mocknet.FullMeshConnected(ctx, 6)
225 226 227
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
228
	hosts := mn.Hosts()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
229

230 231 232 233
	d, err := New(ctx, hosts[0])
	if err != nil {
		t.Fatal(err)
	}
234

235
	for i := 1; i < 5; i++ {
236
		d.Update(ctx, hosts[i].ID())
237 238 239
	}

	// Reply with random peers to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
240 241
	for _, host := range hosts {
		host := host // shadow loop var
242
		host.SetStreamHandler(d.protocols[0], func(s inet.Stream) {
243 244 245 246
			defer s.Close()

			pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
			pbw := ggio.NewDelimitedWriter(s)
247

248 249 250
			pmes := new(pb.Message)
			if err := pbr.ReadMsg(pmes); err != nil {
				panic(err)
251 252
			}

253 254
			switch pmes.GetType() {
			case pb.Message_GET_VALUE:
255
				pi := host.Peerstore().PeerInfo(hosts[1].ID())
256 257
				resp := &pb.Message{
					Type:        pmes.Type,
Jeromy's avatar
Jeromy committed
258
					CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []pstore.PeerInfo{pi}),
259 260 261 262 263 264 265
				}

				if err := pbw.WriteMsg(resp); err != nil {
					panic(err)
				}
			default:
				panic("Shouldnt recieve this.")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
266
			}
267

268 269
		})
	}
270

rht's avatar
rht committed
271 272
	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()
273
	if _, err := d.GetValue(ctx, "hello"); err != nil {
274
		switch err {
275
		case routing.ErrNotFound:
276 277 278 279 280 281 282 283 284 285
			//Success!
			return
		case u.ErrTimeout:
			t.Fatal("Should not have gotten timeout!")
		default:
			t.Fatalf("Got unexpected error: %s", err)
		}
	}
	t.Fatal("Expected to recieve an error.")
}
286 287 288 289 290 291 292 293 294 295 296 297 298

// Test multiple queries against a node that closes its stream after every query.
func TestMultipleQueries(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}

	ctx := context.Background()
	mn, err := mocknet.FullMeshConnected(ctx, 2)
	if err != nil {
		t.Fatal(err)
	}
	hosts := mn.Hosts()
299 300 301 302
	d, err := New(ctx, hosts[0])
	if err != nil {
		t.Fatal(err)
	}
303 304 305 306 307

	d.Update(ctx, hosts[1].ID())

	// It would be nice to be able to just get a value and succeed but then
	// we'd need to deal with selectors and validators...
308
	hosts[1].SetStreamHandler(d.protocols[0], func(s inet.Stream) {
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
		defer s.Close()

		pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
		pbw := ggio.NewDelimitedWriter(s)

		pmes := new(pb.Message)
		if err := pbr.ReadMsg(pmes); err != nil {
			panic(err)
		}

		switch pmes.GetType() {
		case pb.Message_GET_VALUE:
			pi := hosts[1].Peerstore().PeerInfo(hosts[0].ID())
			resp := &pb.Message{
				Type:        pmes.Type,
				CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []pstore.PeerInfo{pi}),
			}

			if err := pbw.WriteMsg(resp); err != nil {
				panic(err)
			}
		default:
			panic("Shouldnt recieve this.")
		}
	})

	// long timeout to ensure timing is not at play.
	ctx, cancel := context.WithTimeout(ctx, time.Second*20)
	defer cancel()
	for i := 0; i < 10; i++ {
		if _, err := d.GetValue(ctx, "hello"); err != nil {
			switch err {
			case routing.ErrNotFound:
				//Success!
				continue
			case u.ErrTimeout:
				t.Fatal("Should not have gotten timeout!")
			default:
				t.Fatalf("Got unexpected error: %s", err)
			}
		}
		t.Fatal("Expected to recieve an error.")
	}
}