ext_test.go 7.99 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
5
	"math/rand"
Jeromy's avatar
Jeromy committed
6
	"testing"
7
	"time"
Jeromy's avatar
Jeromy committed
8

George Antoniadis's avatar
George Antoniadis committed
9
	ggio "github.com/gogo/protobuf/io"
10
	u "github.com/ipfs/go-ipfs-util"
Jeromy's avatar
Jeromy committed
11
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
12 13
	inet "github.com/libp2p/go-libp2p-net"
	pstore "github.com/libp2p/go-libp2p-peerstore"
14 15 16
	"github.com/libp2p/go-libp2p-record"
	"github.com/libp2p/go-libp2p-routing"
	"github.com/libp2p/go-libp2p/p2p/net/mock"
Jeromy's avatar
Jeromy committed
17 18
)

19
func TestGetFailures(t *testing.T) {
20 21 22
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23

24
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25
	mn, err := mocknet.FullMeshConnected(ctx, 2)
26 27 28
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29
	hosts := mn.Hosts()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30

31 32 33 34
	d, err := New(ctx, hosts[0])
	if err != nil {
		t.Fatal(err)
	}
35
	d.Update(ctx, hosts[1].ID())
36

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
37
	// Reply with failures to every message
38
	hosts[1].SetStreamHandler(d.protocols[0], func(s inet.Stream) {
39
		s.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
40 41
	})

42
	// This one should time out
Jeromy's avatar
Jeromy committed
43 44
	ctx1, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()
45
	if _, err := d.GetValue(ctx1, "test"); err != nil {
46 47 48 49
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}

50
		if err != routing.ErrNotFound {
51
			t.Fatal("Got different error than we expected", err)
52 53 54
		}
	} else {
		t.Fatal("Did not get expected error!")
Jeromy's avatar
Jeromy committed
55 56
	}

57
	t.Log("Timeout test passed.")
58

59
	// Reply with failures to every message
60
	hosts[1].SetStreamHandler(d.protocols[0], func(s inet.Stream) {
61 62 63 64 65
		defer s.Close()

		pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
		pbw := ggio.NewDelimitedWriter(s)

66
		pmes := new(pb.Message)
67 68
		if err := pbr.ReadMsg(pmes); err != nil {
			panic(err)
69 70
		}

71
		resp := &pb.Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72
			Type: pmes.Type,
73
		}
74 75 76
		if err := pbw.WriteMsg(resp); err != nil {
			panic(err)
		}
77 78
	})

79 80 81 82 83
	// This one should fail with NotFound.
	// long context timeout to ensure we dont end too early.
	// the dht should be exhausting its query and returning not found.
	// (was 3 seconds before which should be _plenty_ of time, but maybe
	// travis machines really have a hard time...)
Jeromy's avatar
Jeromy committed
84 85
	ctx2, cancel := context.WithTimeout(context.Background(), 20*time.Second)
	defer cancel()
86
	_, err = d.GetValue(ctx2, "test")
87
	if err != nil {
88 89 90
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}
91
		if err != routing.ErrNotFound {
92
			t.Fatalf("Expected ErrNotFound, got: %s", err)
93 94 95 96
		}
	} else {
		t.Fatal("expected error, got none.")
	}
97

98 99
	t.Log("ErrNotFound check passed!")

100
	// Now we test this DHT's handleGetValue failure
101 102 103
	{
		typ := pb.Message_GET_VALUE
		str := "hello"
Jeromy's avatar
Jeromy committed
104

105
		rec := record.MakePutRecord(str, []byte("blah"))
106
		req := pb.Message{
107 108
			Type:   typ,
			Key:    []byte(str),
109 110
			Record: rec,
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
111

112
		s, err := hosts[1].NewStream(context.Background(), hosts[0].ID(), d.protocols[0])
113 114 115 116
		if err != nil {
			t.Fatal(err)
		}
		defer s.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117

118 119
		pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
		pbw := ggio.NewDelimitedWriter(s)
120

121 122 123
		if err := pbw.WriteMsg(&req); err != nil {
			t.Fatal(err)
		}
124

125 126 127 128 129 130 131 132 133 134 135
		pmes := new(pb.Message)
		if err := pbr.ReadMsg(pmes); err != nil {
			t.Fatal(err)
		}
		if pmes.GetRecord() != nil {
			t.Fatal("shouldnt have value")
		}
		if pmes.GetProviderPeers() != nil {
			t.Fatal("shouldnt have provider peers")
		}
	}
Jeromy's avatar
Jeromy committed
136
}
Jeromy's avatar
Jeromy committed
137 138

func TestNotFound(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
139
	// t.Skip("skipping test to debug another")
140 141 142
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
143

144
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
145
	mn, err := mocknet.FullMeshConnected(ctx, 16)
146 147 148
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
149
	hosts := mn.Hosts()
150 151 152 153
	d, err := New(ctx, hosts[0])
	if err != nil {
		t.Fatal(err)
	}
Jeromy's avatar
Jeromy committed
154

155 156
	for _, p := range hosts {
		d.Update(ctx, p.ID())
Jeromy's avatar
Jeromy committed
157 158 159
	}

	// Reply with random peers to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160 161
	for _, host := range hosts {
		host := host // shadow loop var
162
		host.SetStreamHandler(d.protocols[0], func(s inet.Stream) {
163
			defer s.Close()
Jeromy's avatar
Jeromy committed
164

165 166
			pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
			pbw := ggio.NewDelimitedWriter(s)
Jeromy's avatar
Jeromy committed
167

168 169 170
			pmes := new(pb.Message)
			if err := pbr.ReadMsg(pmes); err != nil {
				panic(err)
Jeromy's avatar
Jeromy committed
171 172
			}

173 174 175 176
			switch pmes.GetType() {
			case pb.Message_GET_VALUE:
				resp := &pb.Message{Type: pmes.Type}

Jeromy's avatar
Jeromy committed
177
				ps := []pstore.PeerInfo{}
178
				for i := 0; i < 7; i++ {
179
					p := hosts[rand.Intn(len(hosts))].ID()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
180
					pi := host.Peerstore().PeerInfo(p)
181
					ps = append(ps, pi)
182 183
				}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
184
				resp.CloserPeers = pb.PeerInfosToPBPeers(d.host.Network(), ps)
185 186 187 188 189 190 191 192
				if err := pbw.WriteMsg(resp); err != nil {
					panic(err)
				}
			default:
				panic("Shouldnt recieve this.")
			}
		})
	}
Jeromy's avatar
Jeromy committed
193

194
	// long timeout to ensure timing is not at play.
rht's avatar
rht committed
195 196
	ctx, cancel := context.WithTimeout(ctx, time.Second*20)
	defer cancel()
197
	v, err := d.GetValue(ctx, "hello")
198
	log.Debugf("get value got %v", v)
Jeromy's avatar
Jeromy committed
199
	if err != nil {
200 201 202
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}
Jeromy's avatar
Jeromy committed
203
		switch err {
204
		case routing.ErrNotFound:
Jeromy's avatar
Jeromy committed
205 206 207 208 209 210 211 212 213 214
			//Success!
			return
		case u.ErrTimeout:
			t.Fatal("Should not have gotten timeout!")
		default:
			t.Fatalf("Got unexpected error: %s", err)
		}
	}
	t.Fatal("Expected to recieve an error.")
}
215 216 217 218

// If less than K nodes are in the entire network, it should fail when we make
// a GET rpc and nobody has the value
func TestLessThanKResponses(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
219
	// t.Skip("skipping test to debug another")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
220 221
	// t.Skip("skipping test because it makes a lot of output")

222
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
223
	mn, err := mocknet.FullMeshConnected(ctx, 6)
224 225 226
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
227
	hosts := mn.Hosts()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
228

229 230 231 232
	d, err := New(ctx, hosts[0])
	if err != nil {
		t.Fatal(err)
	}
233

234
	for i := 1; i < 5; i++ {
235
		d.Update(ctx, hosts[i].ID())
236 237 238
	}

	// Reply with random peers to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
239 240
	for _, host := range hosts {
		host := host // shadow loop var
241
		host.SetStreamHandler(d.protocols[0], func(s inet.Stream) {
242 243 244 245
			defer s.Close()

			pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
			pbw := ggio.NewDelimitedWriter(s)
246

247 248 249
			pmes := new(pb.Message)
			if err := pbr.ReadMsg(pmes); err != nil {
				panic(err)
250 251
			}

252 253
			switch pmes.GetType() {
			case pb.Message_GET_VALUE:
254
				pi := host.Peerstore().PeerInfo(hosts[1].ID())
255 256
				resp := &pb.Message{
					Type:        pmes.Type,
Jeromy's avatar
Jeromy committed
257
					CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []pstore.PeerInfo{pi}),
258 259 260 261 262 263 264
				}

				if err := pbw.WriteMsg(resp); err != nil {
					panic(err)
				}
			default:
				panic("Shouldnt recieve this.")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
265
			}
266

267 268
		})
	}
269

rht's avatar
rht committed
270 271
	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()
272
	if _, err := d.GetValue(ctx, "hello"); err != nil {
273
		switch err {
274
		case routing.ErrNotFound:
275 276 277 278 279 280 281 282 283 284
			//Success!
			return
		case u.ErrTimeout:
			t.Fatal("Should not have gotten timeout!")
		default:
			t.Fatalf("Got unexpected error: %s", err)
		}
	}
	t.Fatal("Expected to recieve an error.")
}
285 286 287 288 289 290 291 292 293 294 295 296 297

// Test multiple queries against a node that closes its stream after every query.
func TestMultipleQueries(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}

	ctx := context.Background()
	mn, err := mocknet.FullMeshConnected(ctx, 2)
	if err != nil {
		t.Fatal(err)
	}
	hosts := mn.Hosts()
298 299 300 301
	d, err := New(ctx, hosts[0])
	if err != nil {
		t.Fatal(err)
	}
302 303 304 305 306

	d.Update(ctx, hosts[1].ID())

	// It would be nice to be able to just get a value and succeed but then
	// we'd need to deal with selectors and validators...
307
	hosts[1].SetStreamHandler(d.protocols[0], func(s inet.Stream) {
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
		defer s.Close()

		pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
		pbw := ggio.NewDelimitedWriter(s)

		pmes := new(pb.Message)
		if err := pbr.ReadMsg(pmes); err != nil {
			panic(err)
		}

		switch pmes.GetType() {
		case pb.Message_GET_VALUE:
			pi := hosts[1].Peerstore().PeerInfo(hosts[0].ID())
			resp := &pb.Message{
				Type:        pmes.Type,
				CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []pstore.PeerInfo{pi}),
			}

			if err := pbw.WriteMsg(resp); err != nil {
				panic(err)
			}
		default:
			panic("Shouldnt recieve this.")
		}
	})

	// long timeout to ensure timing is not at play.
	ctx, cancel := context.WithTimeout(ctx, time.Second*20)
	defer cancel()
	for i := 0; i < 10; i++ {
		if _, err := d.GetValue(ctx, "hello"); err != nil {
			switch err {
			case routing.ErrNotFound:
				//Success!
				continue
			case u.ErrTimeout:
				t.Fatal("Should not have gotten timeout!")
			default:
				t.Fatalf("Got unexpected error: %s", err)
			}
		}
		t.Fatal("Expected to recieve an error.")
	}
}