bloom_cache_test.go 3.88 KB
Newer Older
1 2 3
package blockstore

import (
4
	"fmt"
5 6 7 8
	"sync"
	"testing"
	"time"

9
	"github.com/ipfs/go-ipfs/blocks"
10

11
	context "context"
Jeromy's avatar
Jeromy committed
12 13 14
	ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
	dsq "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/query"
	syncds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/sync"
15 16
)

17
func testBloomCached(ctx context.Context, bs Blockstore) (*bloomcache, error) {
18
	if ctx == nil {
19
		ctx = context.Background()
20
	}
21
	opts := DefaultCacheOpts()
22
	opts.HasARCCacheSize = 0
23
	bbs, err := CachedBlockstore(ctx, bs, opts)
24 25 26
	if err == nil {
		return bbs.(*bloomcache), nil
	}
27
	return nil, err
28 29
}

30 31 32
func TestPutManyAddsToBloom(t *testing.T) {
	bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))

Jakub Sztandera's avatar
Jakub Sztandera committed
33 34 35
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

36
	cachedbs, err := testBloomCached(ctx, bs)
37 38 39 40 41 42 43 44 45 46 47

	select {
	case <-cachedbs.rebuildChan:
	case <-ctx.Done():
		t.Fatalf("Timeout wating for rebuild: %d", cachedbs.bloom.ElementsAdded())
	}

	block1 := blocks.NewBlock([]byte("foo"))
	block2 := blocks.NewBlock([]byte("bar"))

	cachedbs.PutMany([]blocks.Block{block1})
48
	has, err := cachedbs.Has(block1.Cid())
49 50 51 52 53 54 55
	if err != nil {
		t.Fatal(err)
	}
	if has == false {
		t.Fatal("added block is reported missing")
	}

56
	has, err = cachedbs.Has(block2.Cid())
57 58 59 60 61 62 63 64
	if err != nil {
		t.Fatal(err)
	}
	if has == true {
		t.Fatal("not added block is reported to be in blockstore")
	}
}

65 66
func TestReturnsErrorWhenSizeNegative(t *testing.T) {
	bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
67
	_, err := bloomCached(context.Background(), bs, -1, 1)
68 69 70
	if err == nil {
		t.Fail()
	}
71
}
72 73 74 75 76 77 78
func TestHasIsBloomCached(t *testing.T) {
	cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
	bs := NewBlockstore(syncds.MutexWrap(cd))

	for i := 0; i < 1000; i++ {
		bs.Put(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i))))
	}
Jakub Sztandera's avatar
Jakub Sztandera committed
79 80 81
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

82
	cachedbs, err := testBloomCached(ctx, bs)
83 84 85 86 87 88
	if err != nil {
		t.Fatal(err)
	}

	select {
	case <-cachedbs.rebuildChan:
89
	case <-ctx.Done():
90 91 92 93 94 95 96 97 98
		t.Fatalf("Timeout wating for rebuild: %d", cachedbs.bloom.ElementsAdded())
	}

	cacheFails := 0
	cd.SetFunc(func() {
		cacheFails++
	})

	for i := 0; i < 1000; i++ {
99
		cachedbs.Has(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i+2000))).Cid())
100 101 102 103 104
	}

	if float64(cacheFails)/float64(1000) > float64(0.05) {
		t.Fatal("Bloom filter has cache miss rate of more than 5%")
	}
105 106 107 108 109 110 111 112 113 114 115 116 117

	cacheFails = 0
	block := blocks.NewBlock([]byte("newBlock"))

	cachedbs.PutMany([]blocks.Block{block})
	if cacheFails != 2 {
		t.Fatalf("expected two datastore hits: %d", cacheFails)
	}
	cachedbs.Put(block)
	if cacheFails != 3 {
		t.Fatalf("expected datastore hit: %d", cacheFails)
	}

118
	if has, err := cachedbs.Has(block.Cid()); !has || err != nil {
119 120 121
		t.Fatal("has gave wrong response")
	}

122
	bl, err := cachedbs.Get(block.Cid())
123 124 125 126 127 128 129
	if bl.String() != block.String() {
		t.Fatal("block data doesn't match")
	}

	if err != nil {
		t.Fatal("there should't be an error")
	}
130
}
131 132

type callbackDatastore struct {
133
	sync.Mutex
134 135 136 137
	f  func()
	ds ds.Datastore
}

138 139 140 141 142 143 144 145 146 147 148
func (c *callbackDatastore) SetFunc(f func()) {
	c.Lock()
	defer c.Unlock()
	c.f = f
}

func (c *callbackDatastore) CallF() {
	c.Lock()
	defer c.Unlock()
	c.f()
}
149 150

func (c *callbackDatastore) Put(key ds.Key, value interface{}) (err error) {
151
	c.CallF()
152 153 154 155
	return c.ds.Put(key, value)
}

func (c *callbackDatastore) Get(key ds.Key) (value interface{}, err error) {
156
	c.CallF()
157 158 159 160
	return c.ds.Get(key)
}

func (c *callbackDatastore) Has(key ds.Key) (exists bool, err error) {
161
	c.CallF()
162 163 164 165
	return c.ds.Has(key)
}

func (c *callbackDatastore) Delete(key ds.Key) (err error) {
166
	c.CallF()
167 168 169
	return c.ds.Delete(key)
}

170
func (c *callbackDatastore) Query(q dsq.Query) (dsq.Results, error) {
171
	c.CallF()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
172
	return c.ds.Query(q)
173
}
174 175 176 177

func (c *callbackDatastore) Batch() (ds.Batch, error) {
	return ds.NewBasicBatch(c), nil
}