flatfs_test.go 26.8 KB
Newer Older
Tommi Virtanen's avatar
Tommi Virtanen committed
1 2 3
package flatfs_test

import (
Jeromy's avatar
Jeromy committed
4
	"encoding/base32"
5
	"encoding/json"
6
	"fmt"
Tommi Virtanen's avatar
Tommi Virtanen committed
7
	"io/ioutil"
8
	"math"
9
	"math/rand"
Tommi Virtanen's avatar
Tommi Virtanen committed
10 11
	"os"
	"path/filepath"
12
	"runtime"
13 14
	"strings"
	"sync"
Tommi Virtanen's avatar
Tommi Virtanen committed
15
	"testing"
16
	"time"
Tommi Virtanen's avatar
Tommi Virtanen committed
17

tavit ohanian's avatar
tavit ohanian committed
18 19 20 21 22
	"gitlab.dms3.io/dms3/public/go-datastore"
	"gitlab.dms3.io/dms3/public/go-datastore/mount"
	"gitlab.dms3.io/dms3/public/go-datastore/query"
	dstest "gitlab.dms3.io/dms3/public/go-datastore/test"
	flatfs "gitlab.dms3.io/dms3/public/go-ds-flatfs"
Tommi Virtanen's avatar
Tommi Virtanen committed
23 24
)

25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
func checkTemp(t *testing.T, dir string) {
	tempDir, err := os.Open(filepath.Join(dir, ".temp"))
	if err != nil {
		t.Errorf("failed to open temp dir: %s", err)
		return
	}

	names, err := tempDir.Readdirnames(-1)
	tempDir.Close()

	if err != nil {
		t.Errorf("failed to read temp dir: %s", err)
		return
	}

	for _, name := range names {
		t.Errorf("found leftover temporary file: %s", name)
	}
}

Tommi Virtanen's avatar
Tommi Virtanen committed
45 46 47 48 49 50 51 52 53 54 55 56 57 58
func tempdir(t testing.TB) (path string, cleanup func()) {
	path, err := ioutil.TempDir("", "test-datastore-flatfs-")
	if err != nil {
		t.Fatalf("cannot create temp directory: %v", err)
	}

	cleanup = func() {
		if err := os.RemoveAll(path); err != nil {
			t.Errorf("tempdir cleanup failed: %v", err)
		}
	}
	return path, cleanup
}

59 60 61 62
func tryAllShardFuncs(t *testing.T, testFunc func(mkShardFunc, *testing.T)) {
	t.Run("prefix", func(t *testing.T) { testFunc(flatfs.Prefix, t) })
	t.Run("suffix", func(t *testing.T) { testFunc(flatfs.Suffix, t) })
	t.Run("next-to-last", func(t *testing.T) { testFunc(flatfs.NextToLast, t) })
63 64
}

65 66
type mkShardFunc func(int) *flatfs.ShardIdV1

67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
func testBatch(dirFunc mkShardFunc, t *testing.T) {
	temp, cleanup := tempdir(t)
	defer cleanup()
	defer checkTemp(t, temp)

	fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
	defer fs.Close()

	batches := make([]datastore.Batch, 9)
	for i := range batches {
		batch, err := fs.Batch()
		if err != nil {
			t.Fatal(err)
		}

		batches[i] = batch

		err = batch.Put(datastore.NewKey("QUUX"), []byte("foo"))
		if err != nil {
			t.Fatal(err)
		}
		err = batch.Put(datastore.NewKey(fmt.Sprintf("Q%dX", i)), []byte(fmt.Sprintf("bar%d", i)))
		if err != nil {
			t.Fatal(err)
		}
	}

	var wg sync.WaitGroup
	wg.Add(len(batches))
	for _, batch := range batches {
		batch := batch
		go func() {
			defer wg.Done()
			err := batch.Commit()
			if err != nil {
				t.Error(err)
			}
		}()
	}

	check := func(key, expected string) {
		actual, err := fs.Get(datastore.NewKey(key))
		if err != nil {
			t.Fatalf("get for key %s, error: %s", key, err)
		}
		if string(actual) != expected {
			t.Fatalf("for key %s, expected %s, got %s", key, expected, string(actual))
		}
	}

	wg.Wait()

	check("QUUX", "foo")
	for i := range batches {
		check(fmt.Sprintf("Q%dX", i), fmt.Sprintf("bar%d", i))
	}
}

func TestBatch(t *testing.T) { tryAllShardFuncs(t, testBatch) }

130
func testPut(dirFunc mkShardFunc, t *testing.T) {
Tommi Virtanen's avatar
Tommi Virtanen committed
131 132
	temp, cleanup := tempdir(t)
	defer cleanup()
133
	defer checkTemp(t, temp)
Tommi Virtanen's avatar
Tommi Virtanen committed
134

135
	fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
Tommi Virtanen's avatar
Tommi Virtanen committed
136 137 138
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
139
	defer fs.Close()
Tommi Virtanen's avatar
Tommi Virtanen committed
140

141
	err = fs.Put(datastore.NewKey("QUUX"), []byte("foobar"))
Tommi Virtanen's avatar
Tommi Virtanen committed
142 143 144
	if err != nil {
		t.Fatalf("Put fail: %v\n", err)
	}
145 146 147 148 149

	err = fs.Put(datastore.NewKey("foo"), []byte("nonono"))
	if err == nil {
		t.Fatalf("did not expect to put a lowercase key")
	}
Tommi Virtanen's avatar
Tommi Virtanen committed
150 151
}

152
func TestPut(t *testing.T) { tryAllShardFuncs(t, testPut) }
153

154
func testGet(dirFunc mkShardFunc, t *testing.T) {
Tommi Virtanen's avatar
Tommi Virtanen committed
155 156
	temp, cleanup := tempdir(t)
	defer cleanup()
157
	defer checkTemp(t, temp)
Tommi Virtanen's avatar
Tommi Virtanen committed
158

159
	fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
Tommi Virtanen's avatar
Tommi Virtanen committed
160 161 162
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
163
	defer fs.Close()
Tommi Virtanen's avatar
Tommi Virtanen committed
164 165

	const input = "foobar"
166
	err = fs.Put(datastore.NewKey("QUUX"), []byte(input))
Tommi Virtanen's avatar
Tommi Virtanen committed
167 168 169 170
	if err != nil {
		t.Fatalf("Put fail: %v\n", err)
	}

171
	buf, err := fs.Get(datastore.NewKey("QUUX"))
Tommi Virtanen's avatar
Tommi Virtanen committed
172 173 174 175 176 177
	if err != nil {
		t.Fatalf("Get failed: %v", err)
	}
	if g, e := string(buf), input; g != e {
		t.Fatalf("Get gave wrong content: %q != %q", g, e)
	}
178 179 180 181 182

	_, err = fs.Get(datastore.NewKey("/FOO/BAR"))
	if err != datastore.ErrNotFound {
		t.Fatalf("expected ErrNotFound, got %s", err)
	}
Tommi Virtanen's avatar
Tommi Virtanen committed
183 184
}

185
func TestGet(t *testing.T) { tryAllShardFuncs(t, testGet) }
186

187
func testPutOverwrite(dirFunc mkShardFunc, t *testing.T) {
Tommi Virtanen's avatar
Tommi Virtanen committed
188 189
	temp, cleanup := tempdir(t)
	defer cleanup()
190
	defer checkTemp(t, temp)
Tommi Virtanen's avatar
Tommi Virtanen committed
191

192
	fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
Tommi Virtanen's avatar
Tommi Virtanen committed
193 194 195
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
196
	defer fs.Close()
Tommi Virtanen's avatar
Tommi Virtanen committed
197 198 199 200 201

	const (
		loser  = "foobar"
		winner = "xyzzy"
	)
202
	err = fs.Put(datastore.NewKey("QUUX"), []byte(loser))
Tommi Virtanen's avatar
Tommi Virtanen committed
203 204 205 206
	if err != nil {
		t.Fatalf("Put fail: %v\n", err)
	}

207
	err = fs.Put(datastore.NewKey("QUUX"), []byte(winner))
Tommi Virtanen's avatar
Tommi Virtanen committed
208 209 210 211
	if err != nil {
		t.Fatalf("Put fail: %v\n", err)
	}

212
	data, err := fs.Get(datastore.NewKey("QUUX"))
Tommi Virtanen's avatar
Tommi Virtanen committed
213 214 215
	if err != nil {
		t.Fatalf("Get failed: %v", err)
	}
216
	if g, e := string(data), winner; g != e {
Tommi Virtanen's avatar
Tommi Virtanen committed
217 218 219 220
		t.Fatalf("Get gave wrong content: %q != %q", g, e)
	}
}

221
func TestPutOverwrite(t *testing.T) { tryAllShardFuncs(t, testPutOverwrite) }
222

223
func testGetNotFoundError(dirFunc mkShardFunc, t *testing.T) {
Tommi Virtanen's avatar
Tommi Virtanen committed
224 225
	temp, cleanup := tempdir(t)
	defer cleanup()
226
	defer checkTemp(t, temp)
Tommi Virtanen's avatar
Tommi Virtanen committed
227

228
	fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
Tommi Virtanen's avatar
Tommi Virtanen committed
229 230 231
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
232
	defer fs.Close()
Tommi Virtanen's avatar
Tommi Virtanen committed
233

234
	_, err = fs.Get(datastore.NewKey("QUUX"))
Tommi Virtanen's avatar
Tommi Virtanen committed
235 236 237 238 239
	if g, e := err, datastore.ErrNotFound; g != e {
		t.Fatalf("expected ErrNotFound, got: %v\n", g)
	}
}

240
func TestGetNotFoundError(t *testing.T) { tryAllShardFuncs(t, testGetNotFoundError) }
241 242

type params struct {
243
	shard *flatfs.ShardIdV1
Kevin Atkinson's avatar
Kevin Atkinson committed
244 245
	dir   string
	key   string
246 247 248
}

func testStorage(p *params, t *testing.T) {
Tommi Virtanen's avatar
Tommi Virtanen committed
249 250
	temp, cleanup := tempdir(t)
	defer cleanup()
251
	defer checkTemp(t, temp)
Tommi Virtanen's avatar
Tommi Virtanen committed
252

253
	target := p.dir + string(os.PathSeparator) + p.key + ".data"
254
	fs, err := flatfs.CreateOrOpen(temp, p.shard, false)
Tommi Virtanen's avatar
Tommi Virtanen committed
255 256 257
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
258
	defer fs.Close()
Tommi Virtanen's avatar
Tommi Virtanen committed
259

260
	err = fs.Put(datastore.NewKey(p.key), []byte("foobar"))
Tommi Virtanen's avatar
Tommi Virtanen committed
261 262 263 264
	if err != nil {
		t.Fatalf("Put fail: %v\n", err)
	}

265
	fs.Close()
Tommi Virtanen's avatar
Tommi Virtanen committed
266
	seen := false
267
	haveREADME := false
Tommi Virtanen's avatar
Tommi Virtanen committed
268 269 270 271 272 273 274 275 276
	walk := func(absPath string, fi os.FileInfo, err error) error {
		if err != nil {
			return err
		}
		path, err := filepath.Rel(temp, absPath)
		if err != nil {
			return err
		}
		switch path {
277
		case ".", "..", "SHARDING", flatfs.DiskUsageFile, ".temp":
Tommi Virtanen's avatar
Tommi Virtanen committed
278
			// ignore
Kevin Atkinson's avatar
Kevin Atkinson committed
279
		case "_README":
280 281 282 283 284
			_, err := ioutil.ReadFile(absPath)
			if err != nil {
				t.Error("could not read _README file")
			}
			haveREADME = true
285
		case p.dir:
Tommi Virtanen's avatar
Tommi Virtanen committed
286
			if !fi.IsDir() {
Kevin Atkinson's avatar
Kevin Atkinson committed
287
				t.Errorf("directory is not a file? %v", fi.Mode())
Tommi Virtanen's avatar
Tommi Virtanen committed
288 289 290 291 292 293 294 295
			}
			// we know it's there if we see the file, nothing more to
			// do here
		case target:
			seen = true
			if !fi.Mode().IsRegular() {
				t.Errorf("expected a regular file, mode: %04o", fi.Mode())
			}
296 297 298 299
			if runtime.GOOS != "windows" {
				if g, e := fi.Mode()&os.ModePerm&0007, os.FileMode(0000); g != e {
					t.Errorf("file should not be world accessible: %04o", fi.Mode())
				}
Tommi Virtanen's avatar
Tommi Virtanen committed
300 301 302 303 304 305 306
			}
		default:
			t.Errorf("saw unexpected directory entry: %q %v", path, fi.Mode())
		}
		return nil
	}
	if err := filepath.Walk(temp, walk); err != nil {
307
		t.Fatalf("walk: %v", err)
Tommi Virtanen's avatar
Tommi Virtanen committed
308 309 310 311
	}
	if !seen {
		t.Error("did not see the data file")
	}
tavit ohanian's avatar
tavit ohanian committed
312
	if fs.ShardStr() == flatfs.DMS3_DEF_SHARD_STR && !haveREADME {
313
		t.Error("expected _README file")
tavit ohanian's avatar
tavit ohanian committed
314
	} else if fs.ShardStr() != flatfs.DMS3_DEF_SHARD_STR && haveREADME {
315 316
		t.Error("did not expect _README file")
	}
Tommi Virtanen's avatar
Tommi Virtanen committed
317
}
Tommi Virtanen's avatar
Tommi Virtanen committed
318

319 320 321
func TestStorage(t *testing.T) {
	t.Run("prefix", func(t *testing.T) {
		testStorage(&params{
322
			shard: flatfs.Prefix(2),
323 324
			dir:   "QU",
			key:   "QUUX",
325 326 327 328
		}, t)
	})
	t.Run("suffix", func(t *testing.T) {
		testStorage(&params{
329
			shard: flatfs.Suffix(2),
330 331
			dir:   "UX",
			key:   "QUUX",
332 333
		}, t)
	})
334 335
	t.Run("next-to-last", func(t *testing.T) {
		testStorage(&params{
336
			shard: flatfs.NextToLast(2),
337 338
			dir:   "UU",
			key:   "QUUX",
339 340
		}, t)
	})
341 342
}

343
func testHasNotFound(dirFunc mkShardFunc, t *testing.T) {
Tommi Virtanen's avatar
Tommi Virtanen committed
344 345
	temp, cleanup := tempdir(t)
	defer cleanup()
346
	defer checkTemp(t, temp)
Tommi Virtanen's avatar
Tommi Virtanen committed
347

348
	fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
Tommi Virtanen's avatar
Tommi Virtanen committed
349 350 351
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
352
	defer fs.Close()
Tommi Virtanen's avatar
Tommi Virtanen committed
353

354
	found, err := fs.Has(datastore.NewKey("QUUX"))
Tommi Virtanen's avatar
Tommi Virtanen committed
355 356 357
	if err != nil {
		t.Fatalf("Has fail: %v\n", err)
	}
Steven Allen's avatar
Steven Allen committed
358 359
	if found {
		t.Fatal("Has should have returned false")
Tommi Virtanen's avatar
Tommi Virtanen committed
360 361 362
	}
}

363
func TestHasNotFound(t *testing.T) { tryAllShardFuncs(t, testHasNotFound) }
364

365
func testHasFound(dirFunc mkShardFunc, t *testing.T) {
Tommi Virtanen's avatar
Tommi Virtanen committed
366 367
	temp, cleanup := tempdir(t)
	defer cleanup()
368
	defer checkTemp(t, temp)
Tommi Virtanen's avatar
Tommi Virtanen committed
369

370
	fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
Tommi Virtanen's avatar
Tommi Virtanen committed
371 372 373
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
374 375
	defer fs.Close()

376
	err = fs.Put(datastore.NewKey("QUUX"), []byte("foobar"))
Tommi Virtanen's avatar
Tommi Virtanen committed
377 378 379 380
	if err != nil {
		t.Fatalf("Put fail: %v\n", err)
	}

381
	found, err := fs.Has(datastore.NewKey("QUUX"))
Tommi Virtanen's avatar
Tommi Virtanen committed
382 383 384
	if err != nil {
		t.Fatalf("Has fail: %v\n", err)
	}
Steven Allen's avatar
Steven Allen committed
385 386
	if !found {
		t.Fatal("Has should have returned true")
Tommi Virtanen's avatar
Tommi Virtanen committed
387 388
	}
}
Tommi Virtanen's avatar
Tommi Virtanen committed
389

390
func TestHasFound(t *testing.T) { tryAllShardFuncs(t, testHasFound) }
391

Steven Allen's avatar
Steven Allen committed
392 393 394
func testGetSizeFound(dirFunc mkShardFunc, t *testing.T) {
	temp, cleanup := tempdir(t)
	defer cleanup()
395
	defer checkTemp(t, temp)
Steven Allen's avatar
Steven Allen committed
396 397 398 399 400 401 402

	fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
	defer fs.Close()

403
	_, err = fs.GetSize(datastore.NewKey("QUUX"))
Steven Allen's avatar
Steven Allen committed
404 405 406 407 408 409 410 411 412 413
	if err != datastore.ErrNotFound {
		t.Fatalf("GetSize should have returned ErrNotFound, got: %v\n", err)
	}
}

func TestGetSizeFound(t *testing.T) { tryAllShardFuncs(t, testGetSizeFound) }

func testGetSizeNotFound(dirFunc mkShardFunc, t *testing.T) {
	temp, cleanup := tempdir(t)
	defer cleanup()
414
	defer checkTemp(t, temp)
Steven Allen's avatar
Steven Allen committed
415 416 417 418 419 420 421

	fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
	defer fs.Close()

422
	err = fs.Put(datastore.NewKey("QUUX"), []byte("foobar"))
Steven Allen's avatar
Steven Allen committed
423 424 425 426
	if err != nil {
		t.Fatalf("Put fail: %v\n", err)
	}

427
	size, err := fs.GetSize(datastore.NewKey("QUUX"))
Steven Allen's avatar
Steven Allen committed
428 429 430 431 432 433 434 435 436 437
	if err != nil {
		t.Fatalf("GetSize failed with: %v\n", err)
	}
	if size != len("foobar") {
		t.Fatalf("GetSize returned wrong size: got %d, expected %d", size, len("foobar"))
	}
}

func TestGetSizeNotFound(t *testing.T) { tryAllShardFuncs(t, testGetSizeNotFound) }

438
func testDeleteNotFound(dirFunc mkShardFunc, t *testing.T) {
Tommi Virtanen's avatar
Tommi Virtanen committed
439 440
	temp, cleanup := tempdir(t)
	defer cleanup()
441
	defer checkTemp(t, temp)
Tommi Virtanen's avatar
Tommi Virtanen committed
442

443
	fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
Tommi Virtanen's avatar
Tommi Virtanen committed
444 445 446
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
447
	defer fs.Close()
Tommi Virtanen's avatar
Tommi Virtanen committed
448

449
	err = fs.Delete(datastore.NewKey("QUUX"))
Steven Allen's avatar
Steven Allen committed
450 451
	if err != nil {
		t.Fatalf("expected nil, got: %v\n", err)
Tommi Virtanen's avatar
Tommi Virtanen committed
452 453 454
	}
}

455
func TestDeleteNotFound(t *testing.T) { tryAllShardFuncs(t, testDeleteNotFound) }
456

457
func testDeleteFound(dirFunc mkShardFunc, t *testing.T) {
Tommi Virtanen's avatar
Tommi Virtanen committed
458 459
	temp, cleanup := tempdir(t)
	defer cleanup()
460
	defer checkTemp(t, temp)
Tommi Virtanen's avatar
Tommi Virtanen committed
461

462
	fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
Tommi Virtanen's avatar
Tommi Virtanen committed
463 464 465
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
466 467
	defer fs.Close()

468
	err = fs.Put(datastore.NewKey("QUUX"), []byte("foobar"))
Tommi Virtanen's avatar
Tommi Virtanen committed
469 470 471 472
	if err != nil {
		t.Fatalf("Put fail: %v\n", err)
	}

473
	err = fs.Delete(datastore.NewKey("QUUX"))
Tommi Virtanen's avatar
Tommi Virtanen committed
474 475 476 477 478
	if err != nil {
		t.Fatalf("Delete fail: %v\n", err)
	}

	// check that it's gone
479
	_, err = fs.Get(datastore.NewKey("QUUX"))
Tommi Virtanen's avatar
Tommi Virtanen committed
480 481 482 483
	if g, e := err, datastore.ErrNotFound; g != e {
		t.Fatalf("expected Get after Delete to give ErrNotFound, got: %v\n", g)
	}
}
484

485
func TestDeleteFound(t *testing.T) { tryAllShardFuncs(t, testDeleteFound) }
486

487
func testQuerySimple(dirFunc mkShardFunc, t *testing.T) {
488 489
	temp, cleanup := tempdir(t)
	defer cleanup()
490
	defer checkTemp(t, temp)
491

492
	fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
493 494 495
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
496 497
	defer fs.Close()

498
	const myKey = "QUUX"
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524
	err = fs.Put(datastore.NewKey(myKey), []byte("foobar"))
	if err != nil {
		t.Fatalf("Put fail: %v\n", err)
	}

	res, err := fs.Query(query.Query{KeysOnly: true})
	if err != nil {
		t.Fatalf("Query fail: %v\n", err)
	}
	entries, err := res.Rest()
	if err != nil {
		t.Fatalf("Query Results.Rest fail: %v\n", err)
	}
	seen := false
	for _, e := range entries {
		switch e.Key {
		case datastore.NewKey(myKey).String():
			seen = true
		default:
			t.Errorf("saw unexpected key: %q", e.Key)
		}
	}
	if !seen {
		t.Errorf("did not see wanted key %q in %+v", myKey, entries)
	}
}
Jeromy's avatar
Jeromy committed
525

526
func TestQuerySimple(t *testing.T) { tryAllShardFuncs(t, testQuerySimple) }
527

528 529 530
func testDiskUsage(dirFunc mkShardFunc, t *testing.T) {
	temp, cleanup := tempdir(t)
	defer cleanup()
531
	defer checkTemp(t, temp)
532 533 534 535 536

	fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
537
	defer fs.Close()
538 539 540 541 542 543 544 545 546 547

	time.Sleep(100 * time.Millisecond)
	duNew, err := fs.DiskUsage()
	if err != nil {
		t.Fatal(err)
	}
	t.Log("duNew:", duNew)

	count := 200
	for i := 0; i < count; i++ {
548
		k := datastore.NewKey(fmt.Sprintf("TEST-%d", i))
549 550 551 552 553 554 555 556 557 558 559 560 561 562 563
		v := []byte("10bytes---")
		err = fs.Put(k, v)
		if err != nil {
			t.Fatalf("Put fail: %v\n", err)
		}
	}

	time.Sleep(100 * time.Millisecond)
	duElems, err := fs.DiskUsage()
	if err != nil {
		t.Fatal(err)
	}
	t.Log("duPostPut:", duElems)

	for i := 0; i < count; i++ {
564
		k := datastore.NewKey(fmt.Sprintf("TEST-%d", i))
565 566 567 568 569 570 571 572 573 574 575 576 577
		err = fs.Delete(k)
		if err != nil {
			t.Fatalf("Delete fail: %v\n", err)
		}
	}

	time.Sleep(100 * time.Millisecond)
	duDelete, err := fs.DiskUsage()
	if err != nil {
		t.Fatal(err)
	}
	t.Log("duPostDelete:", duDelete)

578 579 580 581
	du, err := fs.DiskUsage()
	t.Log("duFinal:", du)
	if err != nil {
		t.Fatal(err)
582
	}
583
	fs.Close()
584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606

	// Check that disk usage file is correct
	duB, err := ioutil.ReadFile(filepath.Join(temp, flatfs.DiskUsageFile))
	if err != nil {
		t.Fatal(err)
	}
	contents := make(map[string]interface{})
	err = json.Unmarshal(duB, &contents)
	if err != nil {
		t.Fatal(err)
	}

	// Make sure diskUsage value is correct
	if val, ok := contents["diskUsage"].(float64); !ok || uint64(val) != du {
		t.Fatalf("Unexpected value for diskUsage in %s: %v (expected %d)",
			flatfs.DiskUsageFile, contents["diskUsage"], du)
	}

	// Make sure the accuracy value is correct
	if val, ok := contents["accuracy"].(string); !ok || val != "initial-exact" {
		t.Fatalf("Unexpected value for accuracyin %s: %v",
			flatfs.DiskUsageFile, contents["accuracy"])
	}
607 608

	// Make sure size is correctly calculated on re-open
609
	os.Remove(filepath.Join(temp, flatfs.DiskUsageFile))
610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652
	fs, err = flatfs.Open(temp, false)
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}

	duReopen, err := fs.DiskUsage()
	if err != nil {
		t.Fatal(err)
	}
	t.Log("duReopen:", duReopen)

	// Checks
	if duNew == 0 {
		t.Error("new datastores should have some size")
	}

	if duElems <= duNew {
		t.Error("size should grow as new elements are added")
	}

	if duElems-duDelete != uint64(count*10) {
		t.Error("size should be reduced exactly as size of objects deleted")
	}

	if duReopen < duNew {
		t.Error("Reopened datastore should not be smaller")
	}
}

func TestDiskUsage(t *testing.T) {
	tryAllShardFuncs(t, testDiskUsage)
}

func TestDiskUsageDoubleCount(t *testing.T) {
	tryAllShardFuncs(t, testDiskUsageDoubleCount)
}

// test that concurrently writing and deleting the same key/value
// does not throw any errors and disk usage does not do
// any double-counting.
func testDiskUsageDoubleCount(dirFunc mkShardFunc, t *testing.T) {
	temp, cleanup := tempdir(t)
	defer cleanup()
653
	defer checkTemp(t, temp)
654 655 656 657 658

	fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
659
	defer fs.Close()
660 661 662

	var count int
	var wg sync.WaitGroup
663
	testKey := datastore.NewKey("TEST")
664 665 666 667 668 669 670

	put := func() {
		defer wg.Done()
		for i := 0; i < count; i++ {
			v := []byte("10bytes---")
			err := fs.Put(testKey, v)
			if err != nil {
Steven Allen's avatar
Steven Allen committed
671
				t.Errorf("Put fail: %v\n", err)
672 673 674 675 676 677 678 679 680
			}
		}
	}

	del := func() {
		defer wg.Done()
		for i := 0; i < count; i++ {
			err := fs.Delete(testKey)
			if err != nil && !strings.Contains(err.Error(), "key not found") {
Steven Allen's avatar
Steven Allen committed
681
				t.Errorf("Delete fail: %v\n", err)
682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726
			}
		}
	}

	// Add one element and then remove it and check disk usage
	// makes sense
	count = 1
	wg.Add(2)
	put()
	du, _ := fs.DiskUsage()
	del()
	du2, _ := fs.DiskUsage()
	if du-10 != du2 {
		t.Error("should have deleted exactly 10 bytes:", du, du2)
	}

	// Add and remove many times at the same time
	count = 200
	wg.Add(4)
	go put()
	go del()
	go put()
	go del()
	wg.Wait()

	du3, _ := fs.DiskUsage()
	has, err := fs.Has(testKey)
	if err != nil {
		t.Fatal(err)
	}

	if has { // put came last
		if du3 != du {
			t.Error("du should be the same as after first put:", du, du3)
		}
	} else { //delete came last
		if du3 != du2 {
			t.Error("du should be the same as after first delete:", du2, du3)
		}
	}
}

func testDiskUsageBatch(dirFunc mkShardFunc, t *testing.T) {
	temp, cleanup := tempdir(t)
	defer cleanup()
727
	defer checkTemp(t, temp)
728 729 730 731 732

	fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
733
	defer fs.Close()
734

Steven Allen's avatar
Steven Allen committed
735 736 737 738
	fsBatch, err := fs.Batch()
	if err != nil {
		t.Fatal(err)
	}
739 740 741 742 743

	count := 200
	var wg sync.WaitGroup
	testKeys := []datastore.Key{}
	for i := 0; i < count; i++ {
744
		k := datastore.NewKey(fmt.Sprintf("TEST%d", i))
745 746 747 748 749
		testKeys = append(testKeys, k)
	}

	put := func() {
		for i := 0; i < count; i++ {
Steven Allen's avatar
Steven Allen committed
750 751 752 753
			err := fsBatch.Put(testKeys[i], []byte("10bytes---"))
			if err != nil {
				t.Error(err)
			}
754 755 756 757 758 759
		}
	}
	commit := func() {
		defer wg.Done()
		err := fsBatch.Commit()
		if err != nil {
Steven Allen's avatar
Steven Allen committed
760
			t.Errorf("Batch Put fail: %v\n", err)
761 762 763 764 765 766 767 768
		}
	}

	del := func() {
		defer wg.Done()
		for _, k := range testKeys {
			err := fs.Delete(k)
			if err != nil && !strings.Contains(err.Error(), "key not found") {
Steven Allen's avatar
Steven Allen committed
769
				t.Errorf("Delete fail: %v\n", err)
770 771 772 773 774 775 776 777 778
			}
		}
	}

	// Put many elements and then delete them and check disk usage
	// makes sense
	wg.Add(2)
	put()
	commit()
Steven Allen's avatar
Steven Allen committed
779 780 781 782
	du, err := fs.DiskUsage()
	if err != nil {
		t.Fatal(err)
	}
783
	del()
Steven Allen's avatar
Steven Allen committed
784 785 786 787
	du2, err := fs.DiskUsage()
	if err != nil {
		t.Fatal(err)
	}
788 789 790 791 792 793 794 795 796 797 798
	if du-uint64(10*count) != du2 {
		t.Errorf("should have deleted exactly %d bytes: %d %d", 10*count, du, du2)
	}

	// Do deletes while doing putManys concurrently
	wg.Add(2)
	put()
	go commit()
	go del()
	wg.Wait()

Steven Allen's avatar
Steven Allen committed
799 800 801 802
	du3, err := fs.DiskUsage()
	if err != nil {
		t.Fatal(err)
	}
803 804 805 806
	// Now query how many keys we have
	results, err := fs.Query(query.Query{
		KeysOnly: true,
	})
Steven Allen's avatar
Steven Allen committed
807 808 809
	if err != nil {
		t.Fatal(err)
	}
810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827
	rest, err := results.Rest()
	if err != nil {
		t.Fatal(err)
	}

	expectedSize := uint64(len(rest) * 10)

	if exp := du2 + expectedSize; exp != du3 {
		t.Error("diskUsage has skewed off from real size:",
			exp, du3)
	}
}

func TestDiskUsageBatch(t *testing.T) { tryAllShardFuncs(t, testDiskUsageBatch) }

func testDiskUsageEstimation(dirFunc mkShardFunc, t *testing.T) {
	temp, cleanup := tempdir(t)
	defer cleanup()
828
	defer checkTemp(t, temp)
829 830 831 832 833

	fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
834
	defer fs.Close()
835 836 837

	count := 50000
	for i := 0; i < count; i++ {
838
		k := datastore.NewKey(fmt.Sprintf("%d-TEST-%d", i, i))
839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885
		v := make([]byte, 1000)
		err = fs.Put(k, v)
		if err != nil {
			t.Fatalf("Put fail: %v\n", err)
		}
	}

	// Delete checkpoint
	fs.Close()
	os.Remove(filepath.Join(temp, flatfs.DiskUsageFile))

	// This will do a full du
	flatfs.DiskUsageFilesAverage = -1
	fs, err = flatfs.Open(temp, false)
	if err != nil {
		t.Fatalf("Open fail: %v\n", err)
	}

	duReopen, err := fs.DiskUsage()
	if err != nil {
		t.Fatal(err)
	}

	fs.Close()
	os.Remove(filepath.Join(temp, flatfs.DiskUsageFile))

	// This will estimate the size. Since all files are the same
	// length we can use a low file average number.
	flatfs.DiskUsageFilesAverage = 100
	// Make sure size is correctly calculated on re-open
	fs, err = flatfs.Open(temp, false)
	if err != nil {
		t.Fatalf("Open fail: %v\n", err)
	}

	duEst, err := fs.DiskUsage()
	if err != nil {
		t.Fatal(err)
	}

	t.Log("RealDu:", duReopen)
	t.Log("Est:", duEst)

	diff := int(math.Abs(float64(int(duReopen) - int(duEst))))
	maxDiff := int(0.05 * float64(duReopen)) // %5 of actual

	if diff > maxDiff {
886
		t.Fatalf("expected a better estimation within 5%%")
887
	}
888 889 890 891 892

	// Make sure the accuracy value is correct
	if fs.Accuracy() != "initial-approximate" {
		t.Errorf("Unexpected value for fs.Accuracy(): %s", fs.Accuracy())
	}
893 894 895 896 897 898 899 900 901 902 903 904 905

	fs.Close()

	// Reopen into a new variable
	fs2, err := flatfs.Open(temp, false)
	if err != nil {
		t.Fatalf("Open fail: %v\n", err)
	}

	// Make sure the accuracy value is preserved
	if fs2.Accuracy() != "initial-approximate" {
		t.Errorf("Unexpected value for fs.Accuracy(): %s", fs2.Accuracy())
	}
906 907 908 909
}

func TestDiskUsageEstimation(t *testing.T) { tryAllShardFuncs(t, testDiskUsageEstimation) }

910
func testBatchPut(dirFunc mkShardFunc, t *testing.T) {
Jeromy's avatar
Jeromy committed
911 912
	temp, cleanup := tempdir(t)
	defer cleanup()
913
	defer checkTemp(t, temp)
Jeromy's avatar
Jeromy committed
914

915
	fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
Jeromy's avatar
Jeromy committed
916 917 918
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
919
	defer fs.Close()
Jeromy's avatar
Jeromy committed
920

Jeromy's avatar
Jeromy committed
921 922
	dstest.RunBatchTest(t, fs)
}
Jeromy's avatar
Jeromy committed
923

924
func TestBatchPut(t *testing.T) { tryAllShardFuncs(t, testBatchPut) }
925

926
func testBatchDelete(dirFunc mkShardFunc, t *testing.T) {
Jeromy's avatar
Jeromy committed
927 928
	temp, cleanup := tempdir(t)
	defer cleanup()
929
	defer checkTemp(t, temp)
Jeromy's avatar
Jeromy committed
930

931
	fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
Jeromy's avatar
Jeromy committed
932
	if err != nil {
Jeromy's avatar
Jeromy committed
933
		t.Fatalf("New fail: %v\n", err)
Jeromy's avatar
Jeromy committed
934
	}
935
	defer fs.Close()
Jeromy's avatar
Jeromy committed
936

Jeromy's avatar
Jeromy committed
937
	dstest.RunBatchDeleteTest(t, fs)
Jeromy's avatar
Jeromy committed
938 939
}

940
func TestBatchDelete(t *testing.T) { tryAllShardFuncs(t, testBatchDelete) }
941

Steven Allen's avatar
Steven Allen committed
942 943 944
func testClose(dirFunc mkShardFunc, t *testing.T) {
	temp, cleanup := tempdir(t)
	defer cleanup()
945
	defer checkTemp(t, temp)
Steven Allen's avatar
Steven Allen committed
946 947 948 949 950 951

	fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}

952
	err = fs.Put(datastore.NewKey("QUUX"), []byte("foobar"))
Steven Allen's avatar
Steven Allen committed
953 954 955 956 957 958
	if err != nil {
		t.Fatalf("Put fail: %v\n", err)
	}

	fs.Close()

959
	err = fs.Put(datastore.NewKey("QAAX"), []byte("foobar"))
Steven Allen's avatar
Steven Allen committed
960 961 962 963 964 965 966
	if err == nil {
		t.Fatal("expected put on closed datastore to fail")
	}
}

func TestClose(t *testing.T) { tryAllShardFuncs(t, testClose) }

967 968 969 970
func TestSHARDINGFile(t *testing.T) {
	tempdir, cleanup := tempdir(t)
	defer cleanup()

tavit ohanian's avatar
tavit ohanian committed
971
	fun := flatfs.DMS3_DEF_SHARD
972

973
	err := flatfs.Create(tempdir, fun)
974
	if err != nil {
975
		t.Fatalf("Create: %v\n", err)
976 977
	}

978
	fs, err := flatfs.Open(tempdir, false)
979
	if err != nil {
980
		t.Fatalf("Open fail: %v\n", err)
981
	}
tavit ohanian's avatar
tavit ohanian committed
982 983
	if fs.ShardStr() != flatfs.DMS3_DEF_SHARD_STR {
		t.Fatalf("Expected '%s' for shard function got '%s'", flatfs.DMS3_DEF_SHARD_STR, fs.ShardStr())
984 985 986
	}
	fs.Close()

987
	fs, err = flatfs.CreateOrOpen(tempdir, fun, false)
988 989 990 991 992
	if err != nil {
		t.Fatalf("Could not reopen repo: %v\n", err)
	}
	fs.Close()

993
	fs, err = flatfs.CreateOrOpen(tempdir, flatfs.Prefix(5), false)
994
	if err == nil {
Steven Allen's avatar
Steven Allen committed
995
		fs.Close()
996 997 998 999
		t.Fatalf("Was able to open repo with incompatible sharding function")
	}
}

1000
func TestInvalidPrefix(t *testing.T) {
1001
	_, err := flatfs.ParseShardFunc("/bad/prefix/v1/next-to-last/2")
1002
	if err == nil {
1003
		t.Fatalf("Expected an error while parsing a shard identifier with a bad prefix")
1004 1005 1006
	}
}

1007 1008 1009 1010
func TestNonDatastoreDir(t *testing.T) {
	tempdir, cleanup := tempdir(t)
	defer cleanup()

Steven Allen's avatar
Steven Allen committed
1011 1012 1013 1014
	err := ioutil.WriteFile(filepath.Join(tempdir, "afile"), []byte("Some Content"), 0644)
	if err != nil {
		t.Fatal(err)
	}
1015

Steven Allen's avatar
Steven Allen committed
1016
	err = flatfs.Create(tempdir, flatfs.NextToLast(2))
1017 1018 1019 1020 1021
	if err == nil {
		t.Fatalf("Expected an error when creating a datastore in a non-empty directory")
	}
}

1022 1023 1024
func TestNoCluster(t *testing.T) {
	tempdir, cleanup := tempdir(t)
	defer cleanup()
1025
	defer checkTemp(t, tempdir)
1026

1027
	fs, err := flatfs.CreateOrOpen(tempdir, flatfs.NextToLast(1), false)
1028 1029 1030
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
1031
	defer fs.Close()
1032

1033
	r := rand.New(rand.NewSource(0))
1034
	N := 3200 // should be divisible by 32 so the math works out
1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045
	for i := 0; i < N; i++ {
		blk := make([]byte, 1000)
		r.Read(blk)

		key := "CIQ" + base32.StdEncoding.EncodeToString(blk[:10])
		err := fs.Put(datastore.NewKey(key), blk)
		if err != nil {
			t.Fatalf("Put fail: %v\n", err)
		}
	}

Kevin Atkinson's avatar
Kevin Atkinson committed
1046
	fs.Close()
1047 1048 1049 1050 1051
	dirs, err := ioutil.ReadDir(tempdir)
	if err != nil {
		t.Fatalf("ReadDir fail: %v\n", err)
	}
	idealFilesPerDir := float64(N) / 32.0
1052
	tolerance := math.Floor(idealFilesPerDir * 0.25)
1053
	count := 0
1054
	for _, dir := range dirs {
1055 1056
		switch dir.Name() {
		case flatfs.SHARDING_FN, flatfs.README_FN, flatfs.DiskUsageFile, ".temp":
1057 1058 1059
			continue
		}
		count += 1
1060 1061 1062 1063 1064 1065 1066 1067 1068 1069
		files, err := ioutil.ReadDir(filepath.Join(tempdir, dir.Name()))
		if err != nil {
			t.Fatalf("ReadDir fail: %v\n", err)
		}
		num := float64(len(files))
		if math.Abs(num-idealFilesPerDir) > tolerance {
			t.Fatalf("Dir %s has %.0f files, expected between %.f and %.f files",
				filepath.Join(tempdir, dir.Name()), num, idealFilesPerDir-tolerance, idealFilesPerDir+tolerance)
		}
	}
1070 1071 1072
	if count != 32 {
		t.Fatalf("Expected 32 directories and one file in %s", tempdir)
	}
1073 1074
}

Jeromy's avatar
Jeromy committed
1075
func BenchmarkConsecutivePut(b *testing.B) {
1076
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
Jeromy's avatar
Jeromy committed
1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089
	var blocks [][]byte
	var keys []datastore.Key
	for i := 0; i < b.N; i++ {
		blk := make([]byte, 256*1024)
		r.Read(blk)
		blocks = append(blocks, blk)

		key := base32.StdEncoding.EncodeToString(blk[:8])
		keys = append(keys, datastore.NewKey(key))
	}
	temp, cleanup := tempdir(b)
	defer cleanup()

1090
	fs, err := flatfs.CreateOrOpen(temp, flatfs.Prefix(2), false)
Jeromy's avatar
Jeromy committed
1091 1092 1093
	if err != nil {
		b.Fatalf("New fail: %v\n", err)
	}
1094
	defer fs.Close()
Jeromy's avatar
Jeromy committed
1095 1096 1097 1098 1099 1100 1101 1102 1103

	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		err := fs.Put(keys[i], blocks[i])
		if err != nil {
			b.Fatal(err)
		}
	}
1104
	b.StopTimer() // avoid counting cleanup
Jeromy's avatar
Jeromy committed
1105 1106 1107
}

func BenchmarkBatchedPut(b *testing.B) {
1108
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
Jeromy's avatar
Jeromy committed
1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121
	var blocks [][]byte
	var keys []datastore.Key
	for i := 0; i < b.N; i++ {
		blk := make([]byte, 256*1024)
		r.Read(blk)
		blocks = append(blocks, blk)

		key := base32.StdEncoding.EncodeToString(blk[:8])
		keys = append(keys, datastore.NewKey(key))
	}
	temp, cleanup := tempdir(b)
	defer cleanup()

1122
	fs, err := flatfs.CreateOrOpen(temp, flatfs.Prefix(2), false)
Jeromy's avatar
Jeromy committed
1123 1124 1125
	if err != nil {
		b.Fatalf("New fail: %v\n", err)
	}
1126
	defer fs.Close()
Jeromy's avatar
Jeromy committed
1127 1128 1129 1130

	b.ResetTimer()

	for i := 0; i < b.N; {
Jeromy's avatar
Jeromy committed
1131 1132 1133 1134
		batch, err := fs.Batch()
		if err != nil {
			b.Fatal(err)
		}
Jeromy's avatar
Jeromy committed
1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146

		for n := i; i-n < 512 && i < b.N; i++ {
			err := batch.Put(keys[i], blocks[i])
			if err != nil {
				b.Fatal(err)
			}
		}
		err = batch.Commit()
		if err != nil {
			b.Fatal(err)
		}
	}
1147
	b.StopTimer() // avoid counting cleanup
Jeromy's avatar
Jeromy committed
1148
}
Steven Allen's avatar
Steven Allen committed
1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179

func TestQueryLeak(t *testing.T) {
	temp, cleanup := tempdir(t)
	defer cleanup()

	fs, err := flatfs.CreateOrOpen(temp, flatfs.Prefix(2), false)
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}
	defer fs.Close()

	for i := 0; i < 1000; i++ {
		err = fs.Put(datastore.NewKey(fmt.Sprint(i)), []byte("foobar"))
		if err != nil {
			t.Fatalf("Put fail: %v\n", err)
		}
	}

	before := runtime.NumGoroutine()
	for i := 0; i < 200; i++ {
		res, err := fs.Query(query.Query{KeysOnly: true})
		if err != nil {
			t.Errorf("Query fail: %v\n", err)
		}
		res.Close()
	}
	after := runtime.NumGoroutine()
	if after-before > 100 {
		t.Errorf("leaked %d goroutines", after-before)
	}
}
1180 1181 1182 1183

func TestSuite(t *testing.T) {
	temp, cleanup := tempdir(t)
	defer cleanup()
1184
	defer checkTemp(t, temp)
1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206

	fs, err := flatfs.CreateOrOpen(temp, flatfs.Prefix(2), false)
	if err != nil {
		t.Fatalf("New fail: %v\n", err)
	}

	ds := mount.New([]mount.Mount{{
		Prefix:    datastore.RawKey("/"),
		Datastore: datastore.NewMapDatastore(),
	}, {
		Prefix:    datastore.RawKey("/capital"),
		Datastore: fs,
	}})
	defer func() {
		err := ds.Close()
		if err != nil {
			t.Error(err)
		}
	}()

	dstest.SubtestAll(t, ds)
}