ext_test.go 6.92 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3
package dht

import (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
4 5
	"io"
	"io/ioutil"
6
	"math/rand"
Jeromy's avatar
Jeromy committed
7
	"testing"
8
	"time"
Jeromy's avatar
Jeromy committed
9

10 11 12 13 14
	ggio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/io"
	ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
	dssync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"

15
	key "github.com/ipfs/go-ipfs/blocks/key"
16 17 18 19 20 21 22
	inet "github.com/ipfs/go-ipfs/p2p/net"
	mocknet "github.com/ipfs/go-ipfs/p2p/net/mock"
	peer "github.com/ipfs/go-ipfs/p2p/peer"
	routing "github.com/ipfs/go-ipfs/routing"
	pb "github.com/ipfs/go-ipfs/routing/dht/pb"
	record "github.com/ipfs/go-ipfs/routing/record"
	u "github.com/ipfs/go-ipfs/util"
Jeromy's avatar
Jeromy committed
23 24
)

25
func TestGetFailures(t *testing.T) {
26 27 28
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29

30
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31
	mn, err := mocknet.FullMeshConnected(ctx, 2)
32 33 34
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
35
	hosts := mn.Hosts()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
36

37
	tsds := dssync.MutexWrap(ds.NewMapDatastore())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38
	d := NewDHT(ctx, hosts[0], tsds)
39
	d.Update(ctx, hosts[1].ID())
40

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
41 42 43 44 45 46
	// Reply with failures to every message
	hosts[1].SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
		defer s.Close()
		io.Copy(ioutil.Discard, s)
	})

47
	// This one should time out
48
	ctx1, _ := context.WithTimeout(context.Background(), 200*time.Millisecond)
49
	if _, err := d.GetValue(ctx1, key.Key("test")); err != nil {
50 51 52 53
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}

54
		if err != context.DeadlineExceeded && err != context.Canceled {
55
			t.Fatal("Got different error than we expected", err)
56 57 58
		}
	} else {
		t.Fatal("Did not get expected error!")
Jeromy's avatar
Jeromy committed
59 60
	}

61
	t.Log("Timeout test passed.")
62

63
	// Reply with failures to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64
	hosts[1].SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
65 66 67 68 69
		defer s.Close()

		pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
		pbw := ggio.NewDelimitedWriter(s)

70
		pmes := new(pb.Message)
71 72
		if err := pbr.ReadMsg(pmes); err != nil {
			panic(err)
73 74
		}

75
		resp := &pb.Message{
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
76
			Type: pmes.Type,
77
		}
78 79 80
		if err := pbw.WriteMsg(resp); err != nil {
			panic(err)
		}
81 82
	})

83 84 85 86 87 88
	// This one should fail with NotFound.
	// long context timeout to ensure we dont end too early.
	// the dht should be exhausting its query and returning not found.
	// (was 3 seconds before which should be _plenty_ of time, but maybe
	// travis machines really have a hard time...)
	ctx2, _ := context.WithTimeout(context.Background(), 20*time.Second)
89
	_, err = d.GetValue(ctx2, key.Key("test"))
90
	if err != nil {
91 92 93
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}
94
		if err != routing.ErrNotFound {
95
			t.Fatalf("Expected ErrNotFound, got: %s", err)
96 97 98 99
		}
	} else {
		t.Fatal("expected error, got none.")
	}
100

101 102
	t.Log("ErrNotFound check passed!")

103
	// Now we test this DHT's handleGetValue failure
104 105 106
	{
		typ := pb.Message_GET_VALUE
		str := "hello"
Jeromy's avatar
Jeromy committed
107 108 109 110 111 112

		sk, err := d.getOwnPrivateKey()
		if err != nil {
			t.Fatal(err)
		}

113
		rec, err := record.MakePutRecord(sk, key.Key(str), []byte("blah"), true)
114 115 116 117 118 119 120 121
		if err != nil {
			t.Fatal(err)
		}
		req := pb.Message{
			Type:   &typ,
			Key:    &str,
			Record: rec,
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
122

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
123
		s, err := hosts[1].NewStream(ProtocolDHT, hosts[0].ID())
124 125 126 127
		if err != nil {
			t.Fatal(err)
		}
		defer s.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
128

129 130
		pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
		pbw := ggio.NewDelimitedWriter(s)
131

132 133 134
		if err := pbw.WriteMsg(&req); err != nil {
			t.Fatal(err)
		}
135

136 137 138 139 140 141 142 143 144 145 146
		pmes := new(pb.Message)
		if err := pbr.ReadMsg(pmes); err != nil {
			t.Fatal(err)
		}
		if pmes.GetRecord() != nil {
			t.Fatal("shouldnt have value")
		}
		if pmes.GetProviderPeers() != nil {
			t.Fatal("shouldnt have provider peers")
		}
	}
Jeromy's avatar
Jeromy committed
147
}
Jeromy's avatar
Jeromy committed
148 149

func TestNotFound(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
150
	// t.Skip("skipping test to debug another")
151 152 153
	if testing.Short() {
		t.SkipNow()
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
154

155
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
156
	mn, err := mocknet.FullMeshConnected(ctx, 16)
157 158 159
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160
	hosts := mn.Hosts()
161
	tsds := dssync.MutexWrap(ds.NewMapDatastore())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
162
	d := NewDHT(ctx, hosts[0], tsds)
Jeromy's avatar
Jeromy committed
163

164 165
	for _, p := range hosts {
		d.Update(ctx, p.ID())
Jeromy's avatar
Jeromy committed
166 167 168
	}

	// Reply with random peers to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
169 170 171
	for _, host := range hosts {
		host := host // shadow loop var
		host.SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
172
			defer s.Close()
Jeromy's avatar
Jeromy committed
173

174 175
			pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
			pbw := ggio.NewDelimitedWriter(s)
Jeromy's avatar
Jeromy committed
176

177 178 179
			pmes := new(pb.Message)
			if err := pbr.ReadMsg(pmes); err != nil {
				panic(err)
Jeromy's avatar
Jeromy committed
180 181
			}

182 183 184 185
			switch pmes.GetType() {
			case pb.Message_GET_VALUE:
				resp := &pb.Message{Type: pmes.Type}

186
				ps := []peer.PeerInfo{}
187
				for i := 0; i < 7; i++ {
188
					p := hosts[rand.Intn(len(hosts))].ID()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
189
					pi := host.Peerstore().PeerInfo(p)
190
					ps = append(ps, pi)
191 192
				}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
193
				resp.CloserPeers = pb.PeerInfosToPBPeers(d.host.Network(), ps)
194 195 196 197 198 199 200 201 202
				if err := pbw.WriteMsg(resp); err != nil {
					panic(err)
				}

			default:
				panic("Shouldnt recieve this.")
			}
		})
	}
Jeromy's avatar
Jeromy committed
203

204 205
	// long timeout to ensure timing is not at play.
	ctx, _ = context.WithTimeout(ctx, time.Second*20)
206
	v, err := d.GetValue(ctx, key.Key("hello"))
207
	log.Debugf("get value got %v", v)
Jeromy's avatar
Jeromy committed
208
	if err != nil {
209 210 211
		if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
			err = merr[0]
		}
Jeromy's avatar
Jeromy committed
212
		switch err {
213
		case routing.ErrNotFound:
Jeromy's avatar
Jeromy committed
214 215 216 217 218 219 220 221 222 223
			//Success!
			return
		case u.ErrTimeout:
			t.Fatal("Should not have gotten timeout!")
		default:
			t.Fatalf("Got unexpected error: %s", err)
		}
	}
	t.Fatal("Expected to recieve an error.")
}
224 225 226 227

// If less than K nodes are in the entire network, it should fail when we make
// a GET rpc and nobody has the value
func TestLessThanKResponses(t *testing.T) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
228
	// t.Skip("skipping test to debug another")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
229 230
	// t.Skip("skipping test because it makes a lot of output")

231
	ctx := context.Background()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
232
	mn, err := mocknet.FullMeshConnected(ctx, 6)
233 234 235
	if err != nil {
		t.Fatal(err)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
236
	hosts := mn.Hosts()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
237

238
	tsds := dssync.MutexWrap(ds.NewMapDatastore())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
239
	d := NewDHT(ctx, hosts[0], tsds)
240

241
	for i := 1; i < 5; i++ {
242
		d.Update(ctx, hosts[i].ID())
243 244 245
	}

	// Reply with random peers to every message
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
246 247 248
	for _, host := range hosts {
		host := host // shadow loop var
		host.SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
249 250 251 252
			defer s.Close()

			pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
			pbw := ggio.NewDelimitedWriter(s)
253

254 255 256
			pmes := new(pb.Message)
			if err := pbr.ReadMsg(pmes); err != nil {
				panic(err)
257 258
			}

259 260
			switch pmes.GetType() {
			case pb.Message_GET_VALUE:
261
				pi := host.Peerstore().PeerInfo(hosts[1].ID())
262 263
				resp := &pb.Message{
					Type:        pmes.Type,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
264
					CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []peer.PeerInfo{pi}),
265 266 267 268 269 270 271
				}

				if err := pbw.WriteMsg(resp); err != nil {
					panic(err)
				}
			default:
				panic("Shouldnt recieve this.")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
272
			}
273

274 275
		})
	}
276

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
277
	ctx, _ = context.WithTimeout(ctx, time.Second*30)
278
	if _, err := d.GetValue(ctx, key.Key("hello")); err != nil {
279
		switch err {
280
		case routing.ErrNotFound:
281 282 283 284 285 286 287 288 289 290
			//Success!
			return
		case u.ErrTimeout:
			t.Fatal("Should not have gotten timeout!")
		default:
			t.Fatalf("Got unexpected error: %s", err)
		}
	}
	t.Fatal("Expected to recieve an error.")
}