bloom_cache_test.go 4.59 KB
Newer Older
1 2 3
package blockstore

import (
4
	"context"
5
	"fmt"
6 7 8 9
	"sync"
	"testing"
	"time"

10 11 12 13
	blocks "github.com/ipfs/go-block-format"
	ds "github.com/ipfs/go-datastore"
	dsq "github.com/ipfs/go-datastore/query"
	syncds "github.com/ipfs/go-datastore/sync"
14 15
)

16
func testBloomCached(ctx context.Context, bs Blockstore) (*bloomcache, error) {
17
	if ctx == nil {
18
		ctx = context.Background()
19
	}
20
	opts := DefaultCacheOpts()
21
	opts.HasARCCacheSize = 0
22
	bbs, err := CachedBlockstore(ctx, bs, opts)
23 24 25
	if err == nil {
		return bbs.(*bloomcache), nil
	}
26
	return nil, err
27 28
}

29 30 31
func TestPutManyAddsToBloom(t *testing.T) {
	bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))

Jakub Sztandera's avatar
Jakub Sztandera committed
32 33 34
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

35
	cachedbs, err := testBloomCached(ctx, bs)
36 37 38
	if err != nil {
		t.Fatal(err)
	}
39

Steven Allen's avatar
Steven Allen committed
40 41
	if err := cachedbs.Wait(ctx); err != nil {
		t.Fatalf("Failed while waiting for the filter to build: %d", cachedbs.bloom.ElementsAdded())
42 43 44 45
	}

	block1 := blocks.NewBlock([]byte("foo"))
	block2 := blocks.NewBlock([]byte("bar"))
46
	emptyBlock := blocks.NewBlock([]byte{})
47

48
	cachedbs.PutMany([]blocks.Block{block1, emptyBlock})
49
	has, err := cachedbs.Has(block1.Cid())
50 51 52
	if err != nil {
		t.Fatal(err)
	}
53 54 55 56 57
	blockSize, err := cachedbs.GetSize(block1.Cid())
	if err != nil {
		t.Fatal(err)
	}
	if blockSize == -1 || !has {
58 59 60
		t.Fatal("added block is reported missing")
	}

61
	has, err = cachedbs.Has(block2.Cid())
62 63 64
	if err != nil {
		t.Fatal(err)
	}
65
	blockSize, err = cachedbs.GetSize(block2.Cid())
66
	if err != nil && err != ErrNotFound {
67 68 69
		t.Fatal(err)
	}
	if blockSize > -1 || has {
70 71
		t.Fatal("not added block is reported to be in blockstore")
	}
72 73 74 75 76 77 78 79 80 81 82 83

	has, err = cachedbs.Has(emptyBlock.Cid())
	if err != nil {
		t.Fatal(err)
	}
	blockSize, err = cachedbs.GetSize(emptyBlock.Cid())
	if err != nil {
		t.Fatal(err)
	}
	if blockSize != 0 || !has {
		t.Fatal("added block is reported missing")
	}
84 85
}

86 87
func TestReturnsErrorWhenSizeNegative(t *testing.T) {
	bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
88
	_, err := bloomCached(context.Background(), bs, -1, 1)
89 90 91
	if err == nil {
		t.Fail()
	}
92
}
93 94 95 96 97 98 99
func TestHasIsBloomCached(t *testing.T) {
	cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
	bs := NewBlockstore(syncds.MutexWrap(cd))

	for i := 0; i < 1000; i++ {
		bs.Put(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i))))
	}
Jakub Sztandera's avatar
Jakub Sztandera committed
100 101 102
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

103
	cachedbs, err := testBloomCached(ctx, bs)
104 105 106 107
	if err != nil {
		t.Fatal(err)
	}

Steven Allen's avatar
Steven Allen committed
108 109
	if err := cachedbs.Wait(ctx); err != nil {
		t.Fatalf("Failed while waiting for the filter to build: %d", cachedbs.bloom.ElementsAdded())
110 111 112 113 114 115 116 117
	}

	cacheFails := 0
	cd.SetFunc(func() {
		cacheFails++
	})

	for i := 0; i < 1000; i++ {
118
		cachedbs.Has(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i+2000))).Cid())
119 120 121
	}

	if float64(cacheFails)/float64(1000) > float64(0.05) {
Steven Allen's avatar
Steven Allen committed
122
		t.Fatalf("Bloom filter has cache miss rate of more than 5%%")
123
	}
124 125 126 127 128 129 130 131 132 133 134 135 136

	cacheFails = 0
	block := blocks.NewBlock([]byte("newBlock"))

	cachedbs.PutMany([]blocks.Block{block})
	if cacheFails != 2 {
		t.Fatalf("expected two datastore hits: %d", cacheFails)
	}
	cachedbs.Put(block)
	if cacheFails != 3 {
		t.Fatalf("expected datastore hit: %d", cacheFails)
	}

137
	if has, err := cachedbs.Has(block.Cid()); !has || err != nil {
138 139 140
		t.Fatal("has gave wrong response")
	}

141
	bl, err := cachedbs.Get(block.Cid())
142 143 144 145 146 147 148
	if bl.String() != block.String() {
		t.Fatal("block data doesn't match")
	}

	if err != nil {
		t.Fatal("there should't be an error")
	}
149
}
150

Jakub Sztandera's avatar
Jakub Sztandera committed
151 152
var _ ds.Batching = (*callbackDatastore)(nil)

153
type callbackDatastore struct {
154
	sync.Mutex
155 156 157 158
	f  func()
	ds ds.Datastore
}

159 160 161 162 163 164 165 166 167 168 169
func (c *callbackDatastore) SetFunc(f func()) {
	c.Lock()
	defer c.Unlock()
	c.f = f
}

func (c *callbackDatastore) CallF() {
	c.Lock()
	defer c.Unlock()
	c.f()
}
170

171
func (c *callbackDatastore) Put(key ds.Key, value []byte) (err error) {
172
	c.CallF()
173 174 175
	return c.ds.Put(key, value)
}

176
func (c *callbackDatastore) Get(key ds.Key) (value []byte, err error) {
177
	c.CallF()
178 179 180 181
	return c.ds.Get(key)
}

func (c *callbackDatastore) Has(key ds.Key) (exists bool, err error) {
182
	c.CallF()
183 184 185
	return c.ds.Has(key)
}

Steven Allen's avatar
Steven Allen committed
186 187 188 189 190
func (c *callbackDatastore) GetSize(key ds.Key) (size int, err error) {
	c.CallF()
	return c.ds.GetSize(key)
}

Jakub Sztandera's avatar
Jakub Sztandera committed
191 192 193 194
func (c *callbackDatastore) Close() error {
	return nil
}

195
func (c *callbackDatastore) Delete(key ds.Key) (err error) {
196
	c.CallF()
197 198 199
	return c.ds.Delete(key)
}

200
func (c *callbackDatastore) Query(q dsq.Query) (dsq.Results, error) {
201
	c.CallF()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202
	return c.ds.Query(q)
203
}
204

205 206 207 208 209
func (c *callbackDatastore) Sync(key ds.Key) error {
	c.CallF()
	return c.ds.Sync(key)
}

210 211 212
func (c *callbackDatastore) Batch() (ds.Batch, error) {
	return ds.NewBasicBatch(c), nil
}