blockstoremanager_test.go 5.86 KB
Newer Older
1 2 3 4 5 6 7 8 9
package decision

import (
	"context"
	"crypto/rand"
	"sync"
	"testing"
	"time"

10 11
	"gitlab.dms3.io/dms3/go-bitswap/internal/testutil"
	cid "gitlab.dms3.io/dms3/go-cid"
12 13

	process "github.com/jbenet/goprocess"
14 15 16 17 18 19
	blocks "gitlab.dms3.io/dms3/go-block-format"
	ds "gitlab.dms3.io/dms3/go-datastore"
	"gitlab.dms3.io/dms3/go-datastore/delayed"
	ds_sync "gitlab.dms3.io/dms3/go-datastore/sync"
	blockstore "gitlab.dms3.io/dms3/go-dms3-blockstore"
	delay "gitlab.dms3.io/dms3/go-dms3-delay"
20 21 22 23 24 25 26 27
)

func TestBlockstoreManagerNotFoundKey(t *testing.T) {
	ctx := context.Background()
	bsdelay := delay.Fixed(3 * time.Millisecond)
	dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
	bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

28
	bsm := newBlockstoreManager(bstore, 5)
29 30 31
	bsm.start(process.WithTeardown(func() error { return nil }))

	cids := testutil.GenerateCids(4)
32 33 34 35
	sizes, err := bsm.getBlockSizes(ctx, cids)
	if err != nil {
		t.Fatal(err)
	}
36 37 38 39 40 41 42 43 44 45
	if len(sizes) != 0 {
		t.Fatal("Wrong response length")
	}

	for _, c := range cids {
		if _, ok := sizes[c]; ok {
			t.Fatal("Non-existent block should have no size")
		}
	}

46 47 48 49
	blks, err := bsm.getBlocks(ctx, cids)
	if err != nil {
		t.Fatal(err)
	}
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
	if len(blks) != 0 {
		t.Fatal("Wrong response length")
	}

	for _, c := range cids {
		if _, ok := blks[c]; ok {
			t.Fatal("Non-existent block should have no size")
		}
	}
}

func TestBlockstoreManager(t *testing.T) {
	ctx := context.Background()
	bsdelay := delay.Fixed(3 * time.Millisecond)
	dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
	bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

67
	bsm := newBlockstoreManager(bstore, 5)
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
	bsm.start(process.WithTeardown(func() error { return nil }))

	exp := make(map[cid.Cid]blocks.Block)
	var blks []blocks.Block
	for i := 0; i < 32; i++ {
		buf := make([]byte, 1024*(i+1))
		_, _ = rand.Read(buf)
		b := blocks.NewBlock(buf)
		blks = append(blks, b)
		exp[b.Cid()] = b
	}

	// Put all blocks in the blockstore except the last one
	if err := bstore.PutMany(blks[:len(blks)-1]); err != nil {
		t.Fatal(err)
	}

	var cids []cid.Cid
	for _, b := range blks {
		cids = append(cids, b.Cid())
	}

90 91 92 93
	sizes, err := bsm.getBlockSizes(ctx, cids)
	if err != nil {
		t.Fatal(err)
	}
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
	if len(sizes) != len(blks)-1 {
		t.Fatal("Wrong response length")
	}

	for _, c := range cids {
		expSize := len(exp[c].RawData())
		size, ok := sizes[c]

		// Only the last key should be missing
		if c.Equals(cids[len(cids)-1]) {
			if ok {
				t.Fatal("Non-existent block should not be in sizes map")
			}
		} else {
			if !ok {
				t.Fatal("Block should be in sizes map")
			}
			if size != expSize {
				t.Fatal("Block has wrong size")
			}
		}
	}

117 118 119 120
	fetched, err := bsm.getBlocks(ctx, cids)
	if err != nil {
		t.Fatal(err)
	}
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
	if len(fetched) != len(blks)-1 {
		t.Fatal("Wrong response length")
	}

	for _, c := range cids {
		blk, ok := fetched[c]

		// Only the last key should be missing
		if c.Equals(cids[len(cids)-1]) {
			if ok {
				t.Fatal("Non-existent block should not be in blocks map")
			}
		} else {
			if !ok {
				t.Fatal("Block should be in blocks map")
			}
			if !blk.Cid().Equals(c) {
				t.Fatal("Block has wrong cid")
			}
		}
	}
}

func TestBlockstoreManagerConcurrency(t *testing.T) {
	ctx := context.Background()
	bsdelay := delay.Fixed(3 * time.Millisecond)
	dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
	bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

	workerCount := 5
151
	bsm := newBlockstoreManager(bstore, workerCount)
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
	bsm.start(process.WithTeardown(func() error { return nil }))

	blkSize := int64(8 * 1024)
	blks := testutil.GenerateBlocksOfSize(32, blkSize)
	var ks []cid.Cid
	for _, b := range blks {
		ks = append(ks, b.Cid())
	}

	err := bstore.PutMany(blks)
	if err != nil {
		t.Fatal(err)
	}

	// Create more concurrent requests than the number of workers
	wg := sync.WaitGroup{}
	for i := 0; i < 16; i++ {
		wg.Add(1)

		go func(t *testing.T) {
			defer wg.Done()

174 175 176 177
			sizes, err := bsm.getBlockSizes(ctx, ks)
			if err != nil {
				t.Error(err)
			}
178
			if len(sizes) != len(blks) {
179
				t.Error("Wrong response length")
180 181 182 183 184 185 186 187 188 189 190 191 192
			}
		}(t)
	}
	wg.Wait()
}

func TestBlockstoreManagerClose(t *testing.T) {
	ctx := context.Background()
	delayTime := 20 * time.Millisecond
	bsdelay := delay.Fixed(delayTime)
	dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
	bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

193
	bsm := newBlockstoreManager(bstore, 3)
194 195 196
	px := process.WithTeardown(func() error { return nil })
	bsm.start(px)

197
	blks := testutil.GenerateBlocksOfSize(10, 1024)
198 199 200 201 202 203 204 205 206 207 208 209 210 211
	var ks []cid.Cid
	for _, b := range blks {
		ks = append(ks, b.Cid())
	}

	err := bstore.PutMany(blks)
	if err != nil {
		t.Fatal(err)
	}

	go px.Close()

	time.Sleep(5 * time.Millisecond)

212 213 214 215 216 217 218 219
	before := time.Now()
	_, err = bsm.getBlockSizes(ctx, ks)
	if err == nil {
		t.Error("expected an error")
	}
	// would expect to wait delayTime*10 if we didn't cancel.
	if time.Since(before) > delayTime*2 {
		t.Error("expected a fast timeout")
220 221 222 223 224 225 226 227 228 229
	}
}

func TestBlockstoreManagerCtxDone(t *testing.T) {
	delayTime := 20 * time.Millisecond
	bsdelay := delay.Fixed(delayTime)

	dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
	bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

230
	bsm := newBlockstoreManager(bstore, 3)
231 232 233
	proc := process.WithTeardown(func() error { return nil })
	bsm.start(proc)

234
	blks := testutil.GenerateBlocksOfSize(10, 1024)
235 236 237 238 239 240 241 242 243 244
	var ks []cid.Cid
	for _, b := range blks {
		ks = append(ks, b.Cid())
	}

	err := bstore.PutMany(blks)
	if err != nil {
		t.Fatal(err)
	}

245 246 247 248 249 250 251 252
	ctx, cancel := context.WithTimeout(context.Background(), delayTime/2)
	defer cancel()

	before := time.Now()
	_, err = bsm.getBlockSizes(ctx, ks)
	if err == nil {
		t.Error("expected an error")
	}
253

254 255 256
	// would expect to wait delayTime*10 if we didn't cancel.
	if time.Since(before) > delayTime*2 {
		t.Error("expected a fast timeout")
257 258
	}
}