providers_manager_test.go 7.28 KB
Newer Older
1 2 3
package providers

import (
4
	"context"
5
	"fmt"
6 7
	"io/ioutil"
	"os"
8 9 10
	"testing"
	"time"

11 12
	"github.com/libp2p/go-libp2p-core/peer"

13 14
	mh "github.com/multiformats/go-multihash"

15
	ds "github.com/ipfs/go-datastore"
Steven Allen's avatar
Steven Allen committed
16 17
	dsq "github.com/ipfs/go-datastore/query"
	dssync "github.com/ipfs/go-datastore/sync"
18
	u "github.com/ipfs/go-ipfs-util"
19 20 21
	//
	// used by TestLargeProvidersSet: do not remove
	// lds "github.com/ipfs/go-ds-leveldb"
22 23 24
)

func TestProviderManager(t *testing.T) {
Steven Allen's avatar
Steven Allen committed
25 26 27
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

28
	mid := peer.ID("testing")
Alan Shaw's avatar
Alan Shaw committed
29 30 31 32
	p, err := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
	if err != nil {
		t.Fatal(err)
	}
33
	a := u.Hash([]byte("test"))
34
	p.AddProvider(ctx, a, peer.ID("testingprovider"))
35 36

	// Not cached
David Dias's avatar
David Dias committed
37
	// TODO verify that cache is empty
38 39 40 41
	resp := p.GetProviders(ctx, a)
	if len(resp) != 1 {
		t.Fatal("Could not retrieve provider.")
	}
42 43

	// Cached
David Dias's avatar
David Dias committed
44
	// TODO verify that cache is populated
45 46 47 48
	resp = p.GetProviders(ctx, a)
	if len(resp) != 1 {
		t.Fatal("Could not retrieve provider.")
	}
David Dias's avatar
David Dias committed
49 50 51 52 53 54 55 56 57

	p.AddProvider(ctx, a, peer.ID("testingprovider2"))
	p.AddProvider(ctx, a, peer.ID("testingprovider3"))
	// TODO verify that cache is already up to date
	resp = p.GetProviders(ctx, a)
	if len(resp) != 3 {
		t.Fatalf("Should have got 3 providers, got %d", len(resp))
	}

58 59 60 61 62 63 64 65
	p.proc.Close()
}

func TestProvidersDatastore(t *testing.T) {
	old := lruCacheSize
	lruCacheSize = 10
	defer func() { lruCacheSize = old }()

Steven Allen's avatar
Steven Allen committed
66 67 68
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

69
	mid := peer.ID("testing")
Alan Shaw's avatar
Alan Shaw committed
70 71 72 73
	p, err := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
	if err != nil {
		t.Fatal(err)
	}
74 75 76
	defer p.proc.Close()

	friend := peer.ID("friend")
77
	var mhs []mh.Multihash
78
	for i := 0; i < 100; i++ {
79 80 81
		h := u.Hash([]byte(fmt.Sprint(i)))
		mhs = append(mhs, h)
		p.AddProvider(ctx, h, friend)
82 83
	}

84
	for _, c := range mhs {
85
		resp := p.GetProviders(ctx, c)
86 87 88 89 90 91 92 93 94 95
		if len(resp) != 1 {
			t.Fatal("Could not retrieve provider.")
		}
		if resp[0] != friend {
			t.Fatal("expected provider to be 'friend'")
		}
	}
}

func TestProvidersSerialization(t *testing.T) {
Steven Allen's avatar
Steven Allen committed
96
	dstore := dssync.MutexWrap(ds.NewMapDatastore())
97

98
	k := u.Hash(([]byte("my key!")))
Jeromy's avatar
Jeromy committed
99 100 101 102
	p1 := peer.ID("peer one")
	p2 := peer.ID("peer two")
	pt1 := time.Now()
	pt2 := pt1.Add(time.Hour)
103

Jeromy's avatar
Jeromy committed
104 105 106 107 108 109
	err := writeProviderEntry(dstore, k, p1, pt1)
	if err != nil {
		t.Fatal(err)
	}

	err = writeProviderEntry(dstore, k, p2, pt2)
110 111 112 113
	if err != nil {
		t.Fatal(err)
	}

David Dias's avatar
David Dias committed
114
	pset, err := loadProviderSet(dstore, k)
115 116 117 118
	if err != nil {
		t.Fatal(err)
	}

Jeromy's avatar
Jeromy committed
119 120 121 122 123
	lt1, ok := pset.set[p1]
	if !ok {
		t.Fatal("failed to load set correctly")
	}

124 125
	if !pt1.Equal(lt1) {
		t.Fatalf("time wasnt serialized correctly, %v != %v", pt1, lt1)
Jeromy's avatar
Jeromy committed
126 127 128
	}

	lt2, ok := pset.set[p2]
129 130 131 132
	if !ok {
		t.Fatal("failed to load set correctly")
	}

133 134
	if !pt2.Equal(lt2) {
		t.Fatalf("time wasnt serialized correctly, %v != %v", pt1, lt1)
135 136 137 138 139 140 141 142 143 144 145 146 147
	}
}

func TestProvidesExpire(t *testing.T) {
	pval := ProvideValidity
	cleanup := defaultCleanupInterval
	ProvideValidity = time.Second / 2
	defaultCleanupInterval = time.Second / 2
	defer func() {
		ProvideValidity = pval
		defaultCleanupInterval = cleanup
	}()

Steven Allen's avatar
Steven Allen committed
148 149 150
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

Steven Allen's avatar
Steven Allen committed
151
	ds := dssync.MutexWrap(ds.NewMapDatastore())
152
	mid := peer.ID("testing")
Alan Shaw's avatar
Alan Shaw committed
153 154 155 156
	p, err := NewProviderManager(ctx, mid, ds)
	if err != nil {
		t.Fatal(err)
	}
157 158

	peers := []peer.ID{"a", "b"}
159
	var mhs []mh.Multihash
160
	for i := 0; i < 10; i++ {
161 162
		h := u.Hash([]byte(fmt.Sprint(i)))
		mhs = append(mhs, h)
Steven Allen's avatar
Steven Allen committed
163 164
	}

165 166 167
	for _, h := range mhs[:5] {
		p.AddProvider(ctx, h, peers[0])
		p.AddProvider(ctx, h, peers[1])
168 169
	}

Steven Allen's avatar
Steven Allen committed
170 171
	time.Sleep(time.Second / 4)

172 173 174
	for _, h := range mhs[5:] {
		p.AddProvider(ctx, h, peers[0])
		p.AddProvider(ctx, h, peers[1])
Steven Allen's avatar
Steven Allen committed
175 176
	}

177 178
	for _, h := range mhs {
		out := p.GetProviders(ctx, h)
179 180 181 182 183
		if len(out) != 2 {
			t.Fatal("expected providers to still be there")
		}
	}

Steven Allen's avatar
Steven Allen committed
184 185
	time.Sleep(3 * time.Second / 8)

186 187
	for _, h := range mhs[:5] {
		out := p.GetProviders(ctx, h)
188 189
		if len(out) > 0 {
			t.Fatal("expected providers to be cleaned up, got: ", out)
190 191 192
		}
	}

193 194
	for _, h := range mhs[5:] {
		out := p.GetProviders(ctx, h)
Steven Allen's avatar
Steven Allen committed
195 196 197 198 199 200 201 202 203 204
		if len(out) != 2 {
			t.Fatal("expected providers to still be there")
		}
	}

	time.Sleep(time.Second / 2)

	// Stop to prevent data races
	p.Process().Close()

David Dias's avatar
David Dias committed
205
	if p.cache.Len() != 0 {
206 207 208
		t.Fatal("providers map not cleaned up")
	}

209
	res, err := ds.Query(dsq.Query{Prefix: ProvidersKeyPrefix})
210 211 212
	if err != nil {
		t.Fatal(err)
	}
Steven Allen's avatar
Steven Allen committed
213 214 215 216 217
	rest, err := res.Rest()
	if err != nil {
		t.Fatal(err)
	}
	if len(rest) > 0 {
218 219 220
		t.Fatal("expected everything to be cleaned out of the datastore")
	}
}
221

222 223 224
var _ = ioutil.NopCloser
var _ = os.DevNull

225 226
// TestLargeProvidersSet can be used for profiling.
// The datastore can be switched to levelDB by uncommenting the section below and the import above
227
func TestLargeProvidersSet(t *testing.T) {
228
	t.Skip("This can be used for profiling. Skipping it for now to avoid incurring extra CI time")
229 230 231 232
	old := lruCacheSize
	lruCacheSize = 10
	defer func() { lruCacheSize = old }()

233
	dstore := ds.NewMapDatastore()
234

235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
	//dirn, err := ioutil.TempDir("", "provtest")
	//if err != nil {
	//	t.Fatal(err)
	//}
	//
	//opts := &lds.Options{
	//	NoSync:      true,
	//	Compression: 1,
	//}
	//lds, err := lds.NewDatastore(dirn, opts)
	//if err != nil {
	//	t.Fatal(err)
	//}
	//dstore = lds
	//
	//defer func() {
	//	os.RemoveAll(dirn)
	//}()
253 254 255 256 257 258 259 260

	ctx := context.Background()
	var peers []peer.ID
	for i := 0; i < 3000; i++ {
		peers = append(peers, peer.ID(fmt.Sprint(i)))
	}

	mid := peer.ID("myself")
Alan Shaw's avatar
Alan Shaw committed
261 262 263 264
	p, err := NewProviderManager(ctx, mid, dstore)
	if err != nil {
		t.Fatal(err)
	}
265 266
	defer p.proc.Close()

267
	var mhs []mh.Multihash
268
	for i := 0; i < 1000; i++ {
269 270
		h := u.Hash([]byte(fmt.Sprint(i)))
		mhs = append(mhs, h)
271
		for _, pid := range peers {
272
			p.AddProvider(ctx, h, pid)
273 274 275
		}
	}

276 277
	for i := 0; i < 5; i++ {
		start := time.Now()
278 279
		for _, h := range mhs {
			_ = p.GetProviders(ctx, h)
280 281 282
		}
		elapsed := time.Since(start)
		fmt.Printf("query %f ms\n", elapsed.Seconds()*1000)
283 284
	}
}
285 286 287 288 289

func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) {
	old := lruCacheSize
	lruCacheSize = 1
	defer func() { lruCacheSize = old }()
Steven Allen's avatar
Steven Allen committed
290 291
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
292 293

	p1, p2 := peer.ID("a"), peer.ID("b")
294 295
	h1 := u.Hash([]byte("1"))
	h2 := u.Hash([]byte("2"))
Alan Shaw's avatar
Alan Shaw committed
296 297 298 299
	pm, err := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore()))
	if err != nil {
		t.Fatal(err)
	}
300

301
	// add provider
302 303 304
	pm.AddProvider(ctx, h1, p1)
	// make the cached provider for h1 go to datastore
	pm.AddProvider(ctx, h2, p1)
305
	// now just offloaded record should be brought back and joined with p2
306
	pm.AddProvider(ctx, h1, p2)
307

308 309 310
	h1Provs := pm.GetProviders(ctx, h1)
	if len(h1Provs) != 2 {
		t.Fatalf("expected h1 to be provided by 2 peers, is by %d", len(h1Provs))
311 312
	}
}
313 314 315 316 317 318

func TestWriteUpdatesCache(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	p1, p2 := peer.ID("a"), peer.ID("b")
319
	h1 := u.Hash([]byte("1"))
Alan Shaw's avatar
Alan Shaw committed
320 321 322 323
	pm, err := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore()))
	if err != nil {
		t.Fatal(err)
	}
324 325

	// add provider
326
	pm.AddProvider(ctx, h1, p1)
327
	// force into the cache
328
	pm.GetProviders(ctx, h1)
329
	// add a second provider
330
	pm.AddProvider(ctx, h1, p2)
331

332
	c1Provs := pm.GetProviders(ctx, h1)
333
	if len(c1Provs) != 2 {
334
		t.Fatalf("expected h1 to be provided by 2 peers, is by %d", len(c1Provs))
335 336
	}
}