ext_test.go 6.96 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package dht

import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
4
	"io"
5
	"math/rand"
Jeromy's avatar
Jeromy committed
6
	"testing"
7
	"time"
Jeromy's avatar
Jeromy committed
8

9 10
	ds "github.com/ipfs/go-datastore"
	dssync "github.com/ipfs/go-datastore/sync"
11
	key "github.com/ipfs/go-ipfs/blocks/key"
12 13 14
	routing "github.com/ipfs/go-ipfs/routing"
	pb "github.com/ipfs/go-ipfs/routing/dht/pb"
	record "github.com/ipfs/go-ipfs/routing/record"
Jeromy's avatar
Jeromy committed
15

Jeromy's avatar
Jeromy committed
16 17 18
	inet "gx/ipfs/QmQkQP7WmeT9FRJDsEzAaGYDparttDiB6mCpVBrq2MuWQS/go-libp2p/p2p/net"
	mocknet "gx/ipfs/QmQkQP7WmeT9FRJDsEzAaGYDparttDiB6mCpVBrq2MuWQS/go-libp2p/p2p/net/mock"
	pstore "gx/ipfs/QmXHUpFsnpCmanRnacqYkFoLoFfEq5yS2nUgGkAjJ1Nj9j/go-libp2p-peerstore"
Jeromy's avatar
Jeromy committed
19
	ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io"
20
	u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
Jeromy's avatar
Jeromy committed
21
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
Jeromy's avatar
Jeromy committed
22 23
)

24
func TestGetFailures(t *testing.T) {
25 26 27
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28

29
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30
	mn, err := mocknet.FullMeshConnected(ctx, 2)
31 32 33
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
34
	hosts := mn.Hosts()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
35

36
	tsds := dssync.MutexWrap(ds.NewMapDatastore())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
37
	d := NewDHT(ctx, hosts[0], tsds)
38
	d.Update(ctx, hosts[1].ID())
39

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
40 41
	// Reply with failures to every message
	hosts[1].SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
42
		s.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43 44
	})

45
	// This one should time out
46
	ctx1, _ := context.WithTimeout(context.Background(), 200*time.Millisecond)
47
	if _, err := d.GetValue(ctx1, key.Key("test")); err != nil {
48 49 50 51
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}

52
		if err != io.EOF {
53
			t.Fatal("Got different error than we expected", err)
54 55 56
		}
	} else {
		t.Fatal("Did not get expected error!")
Jeromy's avatar
Jeromy committed
57 58
	}

59
	t.Log("Timeout test passed.")
60

61
	// Reply with failures to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62
	hosts[1].SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
63 64 65 66 67
		defer s.Close()

		pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
		pbw := ggio.NewDelimitedWriter(s)

68
		pmes := new(pb.Message)
69 70
		if err := pbr.ReadMsg(pmes); err != nil {
			panic(err)
71 72
		}

73
		resp := &pb.Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
74
			Type: pmes.Type,
75
		}
76 77 78
		if err := pbw.WriteMsg(resp); err != nil {
			panic(err)
		}
79 80
	})

81 82 83 84 85 86
	// This one should fail with NotFound.
	// long context timeout to ensure we dont end too early.
	// the dht should be exhausting its query and returning not found.
	// (was 3 seconds before which should be _plenty_ of time, but maybe
	// travis machines really have a hard time...)
	ctx2, _ := context.WithTimeout(context.Background(), 20*time.Second)
87
	_, err = d.GetValue(ctx2, key.Key("test"))
88
	if err != nil {
89 90 91
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}
92
		if err != routing.ErrNotFound {
93
			t.Fatalf("Expected ErrNotFound, got: %s", err)
94 95 96 97
		}
	} else {
		t.Fatal("expected error, got none.")
	}
98

99 100
	t.Log("ErrNotFound check passed!")

101
	// Now we test this DHT's handleGetValue failure
102 103 104
	{
		typ := pb.Message_GET_VALUE
		str := "hello"
Jeromy's avatar
Jeromy committed
105 106 107 108 109 110

		sk, err := d.getOwnPrivateKey()
		if err != nil {
			t.Fatal(err)
		}

111
		rec, err := record.MakePutRecord(sk, key.Key(str), []byte("blah"), true)
112 113 114 115 116 117 118 119
		if err != nil {
			t.Fatal(err)
		}
		req := pb.Message{
			Type:   &typ,
			Key:    &str,
			Record: rec,
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120

121
		s, err := hosts[1].NewStream(context.Background(), ProtocolDHT, hosts[0].ID())
122 123 124 125
		if err != nil {
			t.Fatal(err)
		}
		defer s.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
126

127 128
		pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
		pbw := ggio.NewDelimitedWriter(s)
129

130 131 132
		if err := pbw.WriteMsg(&req); err != nil {
			t.Fatal(err)
		}
133

134 135 136 137 138 139 140 141 142 143 144
		pmes := new(pb.Message)
		if err := pbr.ReadMsg(pmes); err != nil {
			t.Fatal(err)
		}
		if pmes.GetRecord() != nil {
			t.Fatal("shouldnt have value")
		}
		if pmes.GetProviderPeers() != nil {
			t.Fatal("shouldnt have provider peers")
		}
	}
Jeromy's avatar
Jeromy committed
145
}
Jeromy's avatar
Jeromy committed
146 147

func TestNotFound(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
148
	// t.Skip("skipping test to debug another")
149 150 151
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
152

153
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
154
	mn, err := mocknet.FullMeshConnected(ctx, 16)
155 156 157
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158
	hosts := mn.Hosts()
159
	tsds := dssync.MutexWrap(ds.NewMapDatastore())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160
	d := NewDHT(ctx, hosts[0], tsds)
Jeromy's avatar
Jeromy committed
161

162 163
	for _, p := range hosts {
		d.Update(ctx, p.ID())
Jeromy's avatar
Jeromy committed
164 165 166
	}

	// Reply with random peers to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
167 168 169
	for _, host := range hosts {
		host := host // shadow loop var
		host.SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
170
			defer s.Close()
Jeromy's avatar
Jeromy committed
171

172 173
			pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
			pbw := ggio.NewDelimitedWriter(s)
Jeromy's avatar
Jeromy committed
174

175 176 177
			pmes := new(pb.Message)
			if err := pbr.ReadMsg(pmes); err != nil {
				panic(err)
Jeromy's avatar
Jeromy committed
178 179
			}

180 181 182 183
			switch pmes.GetType() {
			case pb.Message_GET_VALUE:
				resp := &pb.Message{Type: pmes.Type}

Jeromy's avatar
Jeromy committed
184
				ps := []pstore.PeerInfo{}
185
				for i := 0; i < 7; i++ {
186
					p := hosts[rand.Intn(len(hosts))].ID()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187
					pi := host.Peerstore().PeerInfo(p)
188
					ps = append(ps, pi)
189 190
				}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
191
				resp.CloserPeers = pb.PeerInfosToPBPeers(d.host.Network(), ps)
192 193 194 195 196 197 198 199 200
				if err := pbw.WriteMsg(resp); err != nil {
					panic(err)
				}

			default:
				panic("Shouldnt recieve this.")
			}
		})
	}
Jeromy's avatar
Jeromy committed
201

202
	// long timeout to ensure timing is not at play.
rht's avatar
rht committed
203 204
	ctx, cancel := context.WithTimeout(ctx, time.Second*20)
	defer cancel()
205
	v, err := d.GetValue(ctx, key.Key("hello"))
206
	log.Debugf("get value got %v", v)
Jeromy's avatar
Jeromy committed
207
	if err != nil {
208 209 210
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}
Jeromy's avatar
Jeromy committed
211
		switch err {
212
		case routing.ErrNotFound:
Jeromy's avatar
Jeromy committed
213 214 215 216 217 218 219 220 221 222
			//Success!
			return
		case u.ErrTimeout:
			t.Fatal("Should not have gotten timeout!")
		default:
			t.Fatalf("Got unexpected error: %s", err)
		}
	}
	t.Fatal("Expected to recieve an error.")
}
223 224 225 226

// If less than K nodes are in the entire network, it should fail when we make
// a GET rpc and nobody has the value
func TestLessThanKResponses(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
227
	// t.Skip("skipping test to debug another")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
228 229
	// t.Skip("skipping test because it makes a lot of output")

230
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
231
	mn, err := mocknet.FullMeshConnected(ctx, 6)
232 233 234
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
235
	hosts := mn.Hosts()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
236

237
	tsds := dssync.MutexWrap(ds.NewMapDatastore())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
238
	d := NewDHT(ctx, hosts[0], tsds)
239

240
	for i := 1; i < 5; i++ {
241
		d.Update(ctx, hosts[i].ID())
242 243 244
	}

	// Reply with random peers to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
245 246 247
	for _, host := range hosts {
		host := host // shadow loop var
		host.SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
248 249 250 251
			defer s.Close()

			pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
			pbw := ggio.NewDelimitedWriter(s)
252

253 254 255
			pmes := new(pb.Message)
			if err := pbr.ReadMsg(pmes); err != nil {
				panic(err)
256 257
			}

258 259
			switch pmes.GetType() {
			case pb.Message_GET_VALUE:
260
				pi := host.Peerstore().PeerInfo(hosts[1].ID())
261 262
				resp := &pb.Message{
					Type:        pmes.Type,
Jeromy's avatar
Jeromy committed
263
					CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []pstore.PeerInfo{pi}),
264 265 266 267 268 269 270
				}

				if err := pbw.WriteMsg(resp); err != nil {
					panic(err)
				}
			default:
				panic("Shouldnt recieve this.")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
271
			}
272

273 274
		})
	}
275

rht's avatar
rht committed
276 277
	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()
278
	if _, err := d.GetValue(ctx, key.Key("hello")); err != nil {
279
		switch err {
280
		case routing.ErrNotFound:
281 282 283 284 285 286 287 288 289 290
			//Success!
			return
		case u.ErrTimeout:
			t.Fatal("Should not have gotten timeout!")
		default:
			t.Fatalf("Got unexpected error: %s", err)
		}
	}
	t.Fatal("Expected to recieve an error.")
}