ext_test.go 8.25 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5
	"io"
6
	"math/rand"
Jeromy's avatar
Jeromy committed
7
	"testing"
8
	"time"
Jeromy's avatar
Jeromy committed
9

George Antoniadis's avatar
George Antoniadis committed
10
	ggio "github.com/gogo/protobuf/io"
11 12 13
	ds "github.com/ipfs/go-datastore"
	dssync "github.com/ipfs/go-datastore/sync"
	u "github.com/ipfs/go-ipfs-util"
Jeromy's avatar
Jeromy committed
14
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
15 16
	inet "github.com/libp2p/go-libp2p-net"
	pstore "github.com/libp2p/go-libp2p-peerstore"
George Antoniadis's avatar
George Antoniadis committed
17 18 19
	record "github.com/libp2p/go-libp2p-record"
	routing "github.com/libp2p/go-libp2p-routing"
	mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
Jeromy's avatar
Jeromy committed
20 21
)

22
func TestGetFailures(t *testing.T) {
23 24 25
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26

27
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28
	mn, err := mocknet.FullMeshConnected(ctx, 2)
29 30 31
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32
	hosts := mn.Hosts()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33

34
	tsds := dssync.MutexWrap(ds.NewMapDatastore())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
35
	d := NewDHT(ctx, hosts[0], tsds)
36
	d.Update(ctx, hosts[1].ID())
37

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38 39
	// Reply with failures to every message
	hosts[1].SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
40
		s.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
41 42
	})

43
	// This one should time out
Jeromy's avatar
Jeromy committed
44 45
	ctx1, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
	defer cancel()
46
	if _, err := d.GetValue(ctx1, "test"); err != nil {
47 48 49 50
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}

51
		if err != io.EOF {
52
			t.Fatal("Got different error than we expected", err)
53 54 55
		}
	} else {
		t.Fatal("Did not get expected error!")
Jeromy's avatar
Jeromy committed
56 57
	}

58
	t.Log("Timeout test passed.")
59

60
	// Reply with failures to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
61
	hosts[1].SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
62 63 64 65 66
		defer s.Close()

		pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
		pbw := ggio.NewDelimitedWriter(s)

67
		pmes := new(pb.Message)
68 69
		if err := pbr.ReadMsg(pmes); err != nil {
			panic(err)
70 71
		}

72
		resp := &pb.Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73
			Type: pmes.Type,
74
		}
75 76 77
		if err := pbw.WriteMsg(resp); err != nil {
			panic(err)
		}
78 79
	})

80 81 82 83 84
	// This one should fail with NotFound.
	// long context timeout to ensure we dont end too early.
	// the dht should be exhausting its query and returning not found.
	// (was 3 seconds before which should be _plenty_ of time, but maybe
	// travis machines really have a hard time...)
Jeromy's avatar
Jeromy committed
85 86
	ctx2, cancel := context.WithTimeout(context.Background(), 20*time.Second)
	defer cancel()
87
	_, err = d.GetValue(ctx2, "test")
88
	if err != nil {
89 90 91
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}
92
		if err != routing.ErrNotFound {
93
			t.Fatalf("Expected ErrNotFound, got: %s", err)
94 95 96 97
		}
	} else {
		t.Fatal("expected error, got none.")
	}
98

99 100
	t.Log("ErrNotFound check passed!")

101
	// Now we test this DHT's handleGetValue failure
102 103 104
	{
		typ := pb.Message_GET_VALUE
		str := "hello"
Jeromy's avatar
Jeromy committed
105 106 107 108 109 110

		sk, err := d.getOwnPrivateKey()
		if err != nil {
			t.Fatal(err)
		}

111
		rec, err := record.MakePutRecord(sk, str, []byte("blah"), true)
112 113 114 115 116 117 118 119
		if err != nil {
			t.Fatal(err)
		}
		req := pb.Message{
			Type:   &typ,
			Key:    &str,
			Record: rec,
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120

George Antoniadis's avatar
George Antoniadis committed
121
		s, err := hosts[1].NewStream(context.Background(), hosts[0].ID(), ProtocolDHT)
122 123 124 125
		if err != nil {
			t.Fatal(err)
		}
		defer s.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
126

127 128
		pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
		pbw := ggio.NewDelimitedWriter(s)
129

130 131 132
		if err := pbw.WriteMsg(&req); err != nil {
			t.Fatal(err)
		}
133

134 135 136 137 138 139 140 141 142 143 144
		pmes := new(pb.Message)
		if err := pbr.ReadMsg(pmes); err != nil {
			t.Fatal(err)
		}
		if pmes.GetRecord() != nil {
			t.Fatal("shouldnt have value")
		}
		if pmes.GetProviderPeers() != nil {
			t.Fatal("shouldnt have provider peers")
		}
	}
Jeromy's avatar
Jeromy committed
145
}
Jeromy's avatar
Jeromy committed
146 147

func TestNotFound(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
148
	// t.Skip("skipping test to debug another")
149 150 151
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
152

153
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
154
	mn, err := mocknet.FullMeshConnected(ctx, 16)
155 156 157
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158
	hosts := mn.Hosts()
159
	tsds := dssync.MutexWrap(ds.NewMapDatastore())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160
	d := NewDHT(ctx, hosts[0], tsds)
Jeromy's avatar
Jeromy committed
161

162 163
	for _, p := range hosts {
		d.Update(ctx, p.ID())
Jeromy's avatar
Jeromy committed
164 165 166
	}

	// Reply with random peers to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
167 168 169
	for _, host := range hosts {
		host := host // shadow loop var
		host.SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
170
			defer s.Close()
Jeromy's avatar
Jeromy committed
171

172 173
			pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
			pbw := ggio.NewDelimitedWriter(s)
Jeromy's avatar
Jeromy committed
174

175 176 177
			pmes := new(pb.Message)
			if err := pbr.ReadMsg(pmes); err != nil {
				panic(err)
Jeromy's avatar
Jeromy committed
178 179
			}

180 181 182 183
			switch pmes.GetType() {
			case pb.Message_GET_VALUE:
				resp := &pb.Message{Type: pmes.Type}

Jeromy's avatar
Jeromy committed
184
				ps := []pstore.PeerInfo{}
185
				for i := 0; i < 7; i++ {
186
					p := hosts[rand.Intn(len(hosts))].ID()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187
					pi := host.Peerstore().PeerInfo(p)
188
					ps = append(ps, pi)
189 190
				}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
191
				resp.CloserPeers = pb.PeerInfosToPBPeers(d.host.Network(), ps)
192 193 194 195 196 197 198 199
				if err := pbw.WriteMsg(resp); err != nil {
					panic(err)
				}
			default:
				panic("Shouldnt recieve this.")
			}
		})
	}
Jeromy's avatar
Jeromy committed
200

201
	// long timeout to ensure timing is not at play.
rht's avatar
rht committed
202 203
	ctx, cancel := context.WithTimeout(ctx, time.Second*20)
	defer cancel()
204
	v, err := d.GetValue(ctx, "hello")
205
	log.Debugf("get value got %v", v)
Jeromy's avatar
Jeromy committed
206
	if err != nil {
207 208 209
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}
Jeromy's avatar
Jeromy committed
210
		switch err {
211
		case routing.ErrNotFound:
Jeromy's avatar
Jeromy committed
212 213 214 215 216 217 218 219 220 221
			//Success!
			return
		case u.ErrTimeout:
			t.Fatal("Should not have gotten timeout!")
		default:
			t.Fatalf("Got unexpected error: %s", err)
		}
	}
	t.Fatal("Expected to recieve an error.")
}
222 223 224 225

// If less than K nodes are in the entire network, it should fail when we make
// a GET rpc and nobody has the value
func TestLessThanKResponses(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
226
	// t.Skip("skipping test to debug another")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
227 228
	// t.Skip("skipping test because it makes a lot of output")

229
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
230
	mn, err := mocknet.FullMeshConnected(ctx, 6)
231 232 233
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
234
	hosts := mn.Hosts()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
235

236
	tsds := dssync.MutexWrap(ds.NewMapDatastore())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
237
	d := NewDHT(ctx, hosts[0], tsds)
238

239
	for i := 1; i < 5; i++ {
240
		d.Update(ctx, hosts[i].ID())
241 242 243
	}

	// Reply with random peers to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
244 245 246
	for _, host := range hosts {
		host := host // shadow loop var
		host.SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
247 248 249 250
			defer s.Close()

			pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
			pbw := ggio.NewDelimitedWriter(s)
251

252 253 254
			pmes := new(pb.Message)
			if err := pbr.ReadMsg(pmes); err != nil {
				panic(err)
255 256
			}

257 258
			switch pmes.GetType() {
			case pb.Message_GET_VALUE:
259
				pi := host.Peerstore().PeerInfo(hosts[1].ID())
260 261
				resp := &pb.Message{
					Type:        pmes.Type,
Jeromy's avatar
Jeromy committed
262
					CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []pstore.PeerInfo{pi}),
263 264 265 266 267 268 269
				}

				if err := pbw.WriteMsg(resp); err != nil {
					panic(err)
				}
			default:
				panic("Shouldnt recieve this.")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
270
			}
271

272 273
		})
	}
274

rht's avatar
rht committed
275 276
	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()
277
	if _, err := d.GetValue(ctx, "hello"); err != nil {
278
		switch err {
279
		case routing.ErrNotFound:
280 281 282 283 284 285 286 287 288 289
			//Success!
			return
		case u.ErrTimeout:
			t.Fatal("Should not have gotten timeout!")
		default:
			t.Fatalf("Got unexpected error: %s", err)
		}
	}
	t.Fatal("Expected to recieve an error.")
}
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354

// Test multiple queries against a node that closes its stream after every query.
func TestMultipleQueries(t *testing.T) {
	if testing.Short() {
		t.SkipNow()
	}

	ctx := context.Background()
	mn, err := mocknet.FullMeshConnected(ctx, 2)
	if err != nil {
		t.Fatal(err)
	}
	hosts := mn.Hosts()
	tsds := dssync.MutexWrap(ds.NewMapDatastore())
	d := NewDHT(ctx, hosts[0], tsds)

	d.Update(ctx, hosts[1].ID())

	// It would be nice to be able to just get a value and succeed but then
	// we'd need to deal with selectors and validators...
	hosts[1].SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
		defer s.Close()

		pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
		pbw := ggio.NewDelimitedWriter(s)

		pmes := new(pb.Message)
		if err := pbr.ReadMsg(pmes); err != nil {
			panic(err)
		}

		switch pmes.GetType() {
		case pb.Message_GET_VALUE:
			pi := hosts[1].Peerstore().PeerInfo(hosts[0].ID())
			resp := &pb.Message{
				Type:        pmes.Type,
				CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []pstore.PeerInfo{pi}),
			}

			if err := pbw.WriteMsg(resp); err != nil {
				panic(err)
			}
		default:
			panic("Shouldnt recieve this.")
		}
	})

	// long timeout to ensure timing is not at play.
	ctx, cancel := context.WithTimeout(ctx, time.Second*20)
	defer cancel()
	for i := 0; i < 10; i++ {
		if _, err := d.GetValue(ctx, "hello"); err != nil {
			switch err {
			case routing.ErrNotFound:
				//Success!
				continue
			case u.ErrTimeout:
				t.Fatal("Should not have gotten timeout!")
			default:
				t.Fatalf("Got unexpected error: %s", err)
			}
		}
		t.Fatal("Expected to recieve an error.")
	}
}