providers_test.go 6.71 KB
Newer Older
1 2 3
package providers

import (
4
	"context"
5
	"fmt"
6 7
	"io/ioutil"
	"os"
8 9 10
	"testing"
	"time"

11 12
	"github.com/libp2p/go-libp2p-core/peer"

13 14
	mh "github.com/multiformats/go-multihash"

15
	ds "github.com/ipfs/go-datastore"
Steven Allen's avatar
Steven Allen committed
16 17
	dsq "github.com/ipfs/go-datastore/query"
	dssync "github.com/ipfs/go-datastore/sync"
18
	u "github.com/ipfs/go-ipfs-util"
19 20 21
	//
	// used by TestLargeProvidersSet: do not remove
	// lds "github.com/ipfs/go-ds-leveldb"
22 23 24
)

func TestProviderManager(t *testing.T) {
Steven Allen's avatar
Steven Allen committed
25 26 27
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

28
	mid := peer.ID("testing")
Steven Allen's avatar
Steven Allen committed
29
	p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
30
	a := u.Hash([]byte("test"))
31
	p.AddProvider(ctx, a, peer.ID("testingprovider"))
32 33

	// Not cached
34 35 36 37
	resp := p.GetProviders(ctx, a)
	if len(resp) != 1 {
		t.Fatal("Could not retrieve provider.")
	}
38 39 40 41 42 43

	// Cached
	resp = p.GetProviders(ctx, a)
	if len(resp) != 1 {
		t.Fatal("Could not retrieve provider.")
	}
44 45 46 47 48 49 50 51
	p.proc.Close()
}

func TestProvidersDatastore(t *testing.T) {
	old := lruCacheSize
	lruCacheSize = 10
	defer func() { lruCacheSize = old }()

Steven Allen's avatar
Steven Allen committed
52 53 54
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

55
	mid := peer.ID("testing")
Steven Allen's avatar
Steven Allen committed
56
	p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
57 58 59
	defer p.proc.Close()

	friend := peer.ID("friend")
60
	var mhs []mh.Multihash
61
	for i := 0; i < 100; i++ {
62 63 64
		h := u.Hash([]byte(fmt.Sprint(i)))
		mhs = append(mhs, h)
		p.AddProvider(ctx, h, friend)
65 66
	}

67
	for _, c := range mhs {
68
		resp := p.GetProviders(ctx, c)
69 70 71 72 73 74 75 76 77 78
		if len(resp) != 1 {
			t.Fatal("Could not retrieve provider.")
		}
		if resp[0] != friend {
			t.Fatal("expected provider to be 'friend'")
		}
	}
}

func TestProvidersSerialization(t *testing.T) {
Steven Allen's avatar
Steven Allen committed
79
	dstore := dssync.MutexWrap(ds.NewMapDatastore())
80

81
	k := u.Hash(([]byte("my key!")))
Jeromy's avatar
Jeromy committed
82 83 84 85
	p1 := peer.ID("peer one")
	p2 := peer.ID("peer two")
	pt1 := time.Now()
	pt2 := pt1.Add(time.Hour)
86

Jeromy's avatar
Jeromy committed
87 88 89 90 91 92
	err := writeProviderEntry(dstore, k, p1, pt1)
	if err != nil {
		t.Fatal(err)
	}

	err = writeProviderEntry(dstore, k, p2, pt2)
93 94 95 96 97 98 99 100 101
	if err != nil {
		t.Fatal(err)
	}

	pset, err := loadProvSet(dstore, k)
	if err != nil {
		t.Fatal(err)
	}

Jeromy's avatar
Jeromy committed
102 103 104 105 106
	lt1, ok := pset.set[p1]
	if !ok {
		t.Fatal("failed to load set correctly")
	}

107 108
	if !pt1.Equal(lt1) {
		t.Fatalf("time wasnt serialized correctly, %v != %v", pt1, lt1)
Jeromy's avatar
Jeromy committed
109 110 111
	}

	lt2, ok := pset.set[p2]
112 113 114 115
	if !ok {
		t.Fatal("failed to load set correctly")
	}

116 117
	if !pt2.Equal(lt2) {
		t.Fatalf("time wasnt serialized correctly, %v != %v", pt1, lt1)
118 119 120 121 122 123 124 125 126 127 128 129 130
	}
}

func TestProvidesExpire(t *testing.T) {
	pval := ProvideValidity
	cleanup := defaultCleanupInterval
	ProvideValidity = time.Second / 2
	defaultCleanupInterval = time.Second / 2
	defer func() {
		ProvideValidity = pval
		defaultCleanupInterval = cleanup
	}()

Steven Allen's avatar
Steven Allen committed
131 132 133
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

Steven Allen's avatar
Steven Allen committed
134
	ds := dssync.MutexWrap(ds.NewMapDatastore())
135
	mid := peer.ID("testing")
Steven Allen's avatar
Steven Allen committed
136
	p := NewProviderManager(ctx, mid, ds)
137 138

	peers := []peer.ID{"a", "b"}
139
	var mhs []mh.Multihash
140
	for i := 0; i < 10; i++ {
141 142
		h := u.Hash([]byte(fmt.Sprint(i)))
		mhs = append(mhs, h)
Steven Allen's avatar
Steven Allen committed
143 144
	}

145 146 147
	for _, h := range mhs[:5] {
		p.AddProvider(ctx, h, peers[0])
		p.AddProvider(ctx, h, peers[1])
148 149
	}

Steven Allen's avatar
Steven Allen committed
150 151
	time.Sleep(time.Second / 4)

152 153 154
	for _, h := range mhs[5:] {
		p.AddProvider(ctx, h, peers[0])
		p.AddProvider(ctx, h, peers[1])
Steven Allen's avatar
Steven Allen committed
155 156
	}

157 158
	for _, h := range mhs {
		out := p.GetProviders(ctx, h)
159 160 161 162 163
		if len(out) != 2 {
			t.Fatal("expected providers to still be there")
		}
	}

Steven Allen's avatar
Steven Allen committed
164 165
	time.Sleep(3 * time.Second / 8)

166 167
	for _, h := range mhs[:5] {
		out := p.GetProviders(ctx, h)
168 169
		if len(out) > 0 {
			t.Fatal("expected providers to be cleaned up, got: ", out)
170 171 172
		}
	}

173 174
	for _, h := range mhs[5:] {
		out := p.GetProviders(ctx, h)
Steven Allen's avatar
Steven Allen committed
175 176 177 178 179 180 181 182 183 184
		if len(out) != 2 {
			t.Fatal("expected providers to still be there")
		}
	}

	time.Sleep(time.Second / 2)

	// Stop to prevent data races
	p.Process().Close()

185 186 187 188
	if p.providers.Len() != 0 {
		t.Fatal("providers map not cleaned up")
	}

189
	res, err := ds.Query(dsq.Query{Prefix: ProvidersKeyPrefix})
190 191 192
	if err != nil {
		t.Fatal(err)
	}
Steven Allen's avatar
Steven Allen committed
193 194 195 196 197
	rest, err := res.Rest()
	if err != nil {
		t.Fatal(err)
	}
	if len(rest) > 0 {
198 199 200
		t.Fatal("expected everything to be cleaned out of the datastore")
	}
}
201

202 203 204
var _ = ioutil.NopCloser
var _ = os.DevNull

205 206
// TestLargeProvidersSet can be used for profiling.
// The datastore can be switched to levelDB by uncommenting the section below and the import above
207
func TestLargeProvidersSet(t *testing.T) {
208
	t.Skip("This can be used for profiling. Skipping it for now to avoid incurring extra CI time")
209 210 211 212
	old := lruCacheSize
	lruCacheSize = 10
	defer func() { lruCacheSize = old }()

213
	dstore := ds.NewMapDatastore()
214

215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
	//dirn, err := ioutil.TempDir("", "provtest")
	//if err != nil {
	//	t.Fatal(err)
	//}
	//
	//opts := &lds.Options{
	//	NoSync:      true,
	//	Compression: 1,
	//}
	//lds, err := lds.NewDatastore(dirn, opts)
	//if err != nil {
	//	t.Fatal(err)
	//}
	//dstore = lds
	//
	//defer func() {
	//	os.RemoveAll(dirn)
	//}()
233 234 235 236 237 238 239 240

	ctx := context.Background()
	var peers []peer.ID
	for i := 0; i < 3000; i++ {
		peers = append(peers, peer.ID(fmt.Sprint(i)))
	}

	mid := peer.ID("myself")
241
	p := NewProviderManager(ctx, mid, dstore)
242 243
	defer p.proc.Close()

244
	var mhs []mh.Multihash
245
	for i := 0; i < 1000; i++ {
246 247
		h := u.Hash([]byte(fmt.Sprint(i)))
		mhs = append(mhs, h)
248
		for _, pid := range peers {
249
			p.AddProvider(ctx, h, pid)
250 251 252
		}
	}

253 254
	for i := 0; i < 5; i++ {
		start := time.Now()
255 256
		for _, h := range mhs {
			_ = p.GetProviders(ctx, h)
257 258 259
		}
		elapsed := time.Since(start)
		fmt.Printf("query %f ms\n", elapsed.Seconds()*1000)
260 261
	}
}
262 263 264 265 266

func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) {
	old := lruCacheSize
	lruCacheSize = 1
	defer func() { lruCacheSize = old }()
Steven Allen's avatar
Steven Allen committed
267 268
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
269 270

	p1, p2 := peer.ID("a"), peer.ID("b")
271 272
	h1 := u.Hash([]byte("1"))
	h2 := u.Hash([]byte("2"))
Steven Allen's avatar
Steven Allen committed
273
	pm := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore()))
274

275
	// add provider
276 277 278
	pm.AddProvider(ctx, h1, p1)
	// make the cached provider for h1 go to datastore
	pm.AddProvider(ctx, h2, p1)
279
	// now just offloaded record should be brought back and joined with p2
280
	pm.AddProvider(ctx, h1, p2)
281

282 283 284
	h1Provs := pm.GetProviders(ctx, h1)
	if len(h1Provs) != 2 {
		t.Fatalf("expected h1 to be provided by 2 peers, is by %d", len(h1Provs))
285 286
	}
}
287 288 289 290 291 292

func TestWriteUpdatesCache(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	p1, p2 := peer.ID("a"), peer.ID("b")
293
	h1 := u.Hash([]byte("1"))
294 295 296
	pm := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore()))

	// add provider
297
	pm.AddProvider(ctx, h1, p1)
298
	// force into the cache
299
	pm.GetProviders(ctx, h1)
300
	// add a second provider
301
	pm.AddProvider(ctx, h1, p2)
302

303
	c1Provs := pm.GetProviders(ctx, h1)
304
	if len(c1Provs) != 2 {
305
		t.Fatalf("expected h1 to be provided by 2 peers, is by %d", len(c1Provs))
306 307
	}
}