ext_test.go 6.64 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package dht

import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
4
	"io"
5
	"math/rand"
Jeromy's avatar
Jeromy committed
6
	"testing"
7
	"time"
Jeromy's avatar
Jeromy committed
8

George Antoniadis's avatar
George Antoniadis committed
9
	ggio "github.com/gogo/protobuf/io"
10 11 12 13
	ds "github.com/ipfs/go-datastore"
	dssync "github.com/ipfs/go-datastore/sync"
	u "github.com/ipfs/go-ipfs-util"
	pstore "github.com/ipfs/go-libp2p-peerstore"
George Antoniadis's avatar
George Antoniadis committed
14 15 16 17
	record "github.com/libp2p/go-libp2p-record"
	routing "github.com/libp2p/go-libp2p-routing"
	inet "github.com/libp2p/go-libp2p/p2p/net"
	mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
18
	context "golang.org/x/net/context"
George Antoniadis's avatar
George Antoniadis committed
19 20

	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Jeromy's avatar
Jeromy committed
21 22
)

23
func TestGetFailures(t *testing.T) {
24 25 26
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
27

28
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29
	mn, err := mocknet.FullMeshConnected(ctx, 2)
30 31 32
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33
	hosts := mn.Hosts()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
34

35
	tsds := dssync.MutexWrap(ds.NewMapDatastore())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
36
	d := NewDHT(ctx, hosts[0], tsds)
37
	d.Update(ctx, hosts[1].ID())
38

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
39 40
	// Reply with failures to every message
	hosts[1].SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
41
		s.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
42 43
	})

44
	// This one should time out
45
	ctx1, _ := context.WithTimeout(context.Background(), 200*time.Millisecond)
46
	if _, err := d.GetValue(ctx1, "test"); err != nil {
47 48 49 50
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}

51
		if err != io.EOF {
52
			t.Fatal("Got different error than we expected", err)
53 54 55
		}
	} else {
		t.Fatal("Did not get expected error!")
Jeromy's avatar
Jeromy committed
56 57
	}

58
	t.Log("Timeout test passed.")
59

60
	// Reply with failures to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
61
	hosts[1].SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
62 63 64 65 66
		defer s.Close()

		pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
		pbw := ggio.NewDelimitedWriter(s)

67
		pmes := new(pb.Message)
68 69
		if err := pbr.ReadMsg(pmes); err != nil {
			panic(err)
70 71
		}

72
		resp := &pb.Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73
			Type: pmes.Type,
74
		}
75 76 77
		if err := pbw.WriteMsg(resp); err != nil {
			panic(err)
		}
78 79
	})

80 81 82 83 84 85
	// This one should fail with NotFound.
	// long context timeout to ensure we dont end too early.
	// the dht should be exhausting its query and returning not found.
	// (was 3 seconds before which should be _plenty_ of time, but maybe
	// travis machines really have a hard time...)
	ctx2, _ := context.WithTimeout(context.Background(), 20*time.Second)
86
	_, err = d.GetValue(ctx2, "test")
87
	if err != nil {
88 89 90
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}
91
		if err != routing.ErrNotFound {
92
			t.Fatalf("Expected ErrNotFound, got: %s", err)
93 94 95 96
		}
	} else {
		t.Fatal("expected error, got none.")
	}
97

98 99
	t.Log("ErrNotFound check passed!")

100
	// Now we test this DHT's handleGetValue failure
101 102 103
	{
		typ := pb.Message_GET_VALUE
		str := "hello"
Jeromy's avatar
Jeromy committed
104 105 106 107 108 109

		sk, err := d.getOwnPrivateKey()
		if err != nil {
			t.Fatal(err)
		}

110
		rec, err := record.MakePutRecord(sk, str, []byte("blah"), true)
111 112 113 114 115 116 117 118
		if err != nil {
			t.Fatal(err)
		}
		req := pb.Message{
			Type:   &typ,
			Key:    &str,
			Record: rec,
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
119

George Antoniadis's avatar
George Antoniadis committed
120
		s, err := hosts[1].NewStream(context.Background(), hosts[0].ID(), ProtocolDHT)
121 122 123 124
		if err != nil {
			t.Fatal(err)
		}
		defer s.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
125

126 127
		pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
		pbw := ggio.NewDelimitedWriter(s)
128

129 130 131
		if err := pbw.WriteMsg(&req); err != nil {
			t.Fatal(err)
		}
132

133 134 135 136 137 138 139 140 141 142 143
		pmes := new(pb.Message)
		if err := pbr.ReadMsg(pmes); err != nil {
			t.Fatal(err)
		}
		if pmes.GetRecord() != nil {
			t.Fatal("shouldnt have value")
		}
		if pmes.GetProviderPeers() != nil {
			t.Fatal("shouldnt have provider peers")
		}
	}
Jeromy's avatar
Jeromy committed
144
}
Jeromy's avatar
Jeromy committed
145 146

func TestNotFound(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
147
	// t.Skip("skipping test to debug another")
148 149 150
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
151

152
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
153
	mn, err := mocknet.FullMeshConnected(ctx, 16)
154 155 156
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157
	hosts := mn.Hosts()
158
	tsds := dssync.MutexWrap(ds.NewMapDatastore())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
159
	d := NewDHT(ctx, hosts[0], tsds)
Jeromy's avatar
Jeromy committed
160

161 162
	for _, p := range hosts {
		d.Update(ctx, p.ID())
Jeromy's avatar
Jeromy committed
163 164 165
	}

	// Reply with random peers to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
166 167 168
	for _, host := range hosts {
		host := host // shadow loop var
		host.SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
169
			defer s.Close()
Jeromy's avatar
Jeromy committed
170

171 172
			pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
			pbw := ggio.NewDelimitedWriter(s)
Jeromy's avatar
Jeromy committed
173

174 175 176
			pmes := new(pb.Message)
			if err := pbr.ReadMsg(pmes); err != nil {
				panic(err)
Jeromy's avatar
Jeromy committed
177 178
			}

179 180 181 182
			switch pmes.GetType() {
			case pb.Message_GET_VALUE:
				resp := &pb.Message{Type: pmes.Type}

Jeromy's avatar
Jeromy committed
183
				ps := []pstore.PeerInfo{}
184
				for i := 0; i < 7; i++ {
185
					p := hosts[rand.Intn(len(hosts))].ID()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
186
					pi := host.Peerstore().PeerInfo(p)
187
					ps = append(ps, pi)
188 189
				}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
190
				resp.CloserPeers = pb.PeerInfosToPBPeers(d.host.Network(), ps)
191 192 193 194 195 196 197 198 199
				if err := pbw.WriteMsg(resp); err != nil {
					panic(err)
				}

			default:
				panic("Shouldnt recieve this.")
			}
		})
	}
Jeromy's avatar
Jeromy committed
200

201
	// long timeout to ensure timing is not at play.
rht's avatar
rht committed
202 203
	ctx, cancel := context.WithTimeout(ctx, time.Second*20)
	defer cancel()
204
	v, err := d.GetValue(ctx, "hello")
205
	log.Debugf("get value got %v", v)
Jeromy's avatar
Jeromy committed
206
	if err != nil {
207 208 209
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}
Jeromy's avatar
Jeromy committed
210
		switch err {
211
		case routing.ErrNotFound:
Jeromy's avatar
Jeromy committed
212 213 214 215 216 217 218 219 220 221
			//Success!
			return
		case u.ErrTimeout:
			t.Fatal("Should not have gotten timeout!")
		default:
			t.Fatalf("Got unexpected error: %s", err)
		}
	}
	t.Fatal("Expected to recieve an error.")
}
222 223 224 225

// If less than K nodes are in the entire network, it should fail when we make
// a GET rpc and nobody has the value
func TestLessThanKResponses(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
226
	// t.Skip("skipping test to debug another")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
227 228
	// t.Skip("skipping test because it makes a lot of output")

229
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
230
	mn, err := mocknet.FullMeshConnected(ctx, 6)
231 232 233
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
234
	hosts := mn.Hosts()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
235

236
	tsds := dssync.MutexWrap(ds.NewMapDatastore())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
237
	d := NewDHT(ctx, hosts[0], tsds)
238

239
	for i := 1; i < 5; i++ {
240
		d.Update(ctx, hosts[i].ID())
241 242 243
	}

	// Reply with random peers to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
244 245 246
	for _, host := range hosts {
		host := host // shadow loop var
		host.SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
247 248 249 250
			defer s.Close()

			pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
			pbw := ggio.NewDelimitedWriter(s)
251

252 253 254
			pmes := new(pb.Message)
			if err := pbr.ReadMsg(pmes); err != nil {
				panic(err)
255 256
			}

257 258
			switch pmes.GetType() {
			case pb.Message_GET_VALUE:
259
				pi := host.Peerstore().PeerInfo(hosts[1].ID())
260 261
				resp := &pb.Message{
					Type:        pmes.Type,
Jeromy's avatar
Jeromy committed
262
					CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []pstore.PeerInfo{pi}),
263 264 265 266 267 268 269
				}

				if err := pbw.WriteMsg(resp); err != nil {
					panic(err)
				}
			default:
				panic("Shouldnt recieve this.")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
270
			}
271

272 273
		})
	}
274

rht's avatar
rht committed
275 276
	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()
277
	if _, err := d.GetValue(ctx, "hello"); err != nil {
278
		switch err {
279
		case routing.ErrNotFound:
280 281 282 283 284 285 286 287 288 289
			//Success!
			return
		case u.ErrTimeout:
			t.Fatal("Should not have gotten timeout!")
		default:
			t.Fatalf("Got unexpected error: %s", err)
		}
	}
	t.Fatal("Expected to recieve an error.")
}