bloom_cache_test.go 3.48 KB
Newer Older
1 2 3
package blockstore

import (
4
	"fmt"
5 6 7 8
	"sync"
	"testing"
	"time"

9
	"github.com/ipfs/go-ipfs/blocks"
10

11
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
Jeromy's avatar
Jeromy committed
12 13 14
	ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore"
	dsq "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/query"
	syncds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/sync"
15 16
)

17
func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error) {
18 19 20
	if ctx == nil {
		ctx = context.TODO()
	}
21 22 23 24 25 26 27 28 29
	opts := DefaultCacheOpts()
	bbs, err := CachedBlockstore(bs, ctx, opts)
	if err == nil {
		return bbs.(*bloomcache), nil
	} else {
		return nil, err
	}
}

30 31
func TestReturnsErrorWhenSizeNegative(t *testing.T) {
	bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
32
	_, err := bloomCached(bs, context.TODO(), 100, 1, -1)
33 34 35
	if err == nil {
		t.Fail()
	}
36
	_, err = bloomCached(bs, context.TODO(), -1, 1, 100)
37 38
	if err == nil {
		t.Fail()
39 40 41 42 43 44 45
	}
}

func TestRemoveCacheEntryOnDelete(t *testing.T) {
	b := blocks.NewBlock([]byte("foo"))
	cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
	bs := NewBlockstore(syncds.MutexWrap(cd))
46
	cachedbs, err := testBloomCached(bs, nil)
47 48 49 50 51
	if err != nil {
		t.Fatal(err)
	}
	cachedbs.Put(b)

52
	cd.Lock()
53
	writeHitTheDatastore := false
54 55
	cd.Unlock()

56 57 58 59 60 61 62 63 64 65 66 67 68 69
	cd.SetFunc(func() {
		writeHitTheDatastore = true
	})

	cachedbs.DeleteBlock(b.Key())
	cachedbs.Put(b)
	if !writeHitTheDatastore {
		t.Fail()
	}
}

func TestElideDuplicateWrite(t *testing.T) {
	cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
	bs := NewBlockstore(syncds.MutexWrap(cd))
70
	cachedbs, err := testBloomCached(bs, nil)
71 72 73 74 75 76 77 78 79 80 81 82
	if err != nil {
		t.Fatal(err)
	}

	b1 := blocks.NewBlock([]byte("foo"))

	cachedbs.Put(b1)
	cd.SetFunc(func() {
		t.Fatal("write hit the datastore")
	})
	cachedbs.Put(b1)
}
83 84 85 86 87 88 89
func TestHasIsBloomCached(t *testing.T) {
	cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
	bs := NewBlockstore(syncds.MutexWrap(cd))

	for i := 0; i < 1000; i++ {
		bs.Put(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i))))
	}
90 91
	ctx, _ := context.WithTimeout(context.Background(), 1*time.Second)
	cachedbs, err := testBloomCached(bs, ctx)
92 93 94 95 96 97
	if err != nil {
		t.Fatal(err)
	}

	select {
	case <-cachedbs.rebuildChan:
98
	case <-ctx.Done():
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
		t.Fatalf("Timeout wating for rebuild: %d", cachedbs.bloom.ElementsAdded())
	}

	cacheFails := 0
	cd.SetFunc(func() {
		cacheFails++
	})

	for i := 0; i < 1000; i++ {
		cachedbs.Has(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i+2000))).Key())
	}

	if float64(cacheFails)/float64(1000) > float64(0.05) {
		t.Fatal("Bloom filter has cache miss rate of more than 5%")
	}
}
115 116

type callbackDatastore struct {
117
	sync.Mutex
118 119 120 121
	f  func()
	ds ds.Datastore
}

122 123 124 125 126 127 128 129 130 131 132
func (c *callbackDatastore) SetFunc(f func()) {
	c.Lock()
	defer c.Unlock()
	c.f = f
}

func (c *callbackDatastore) CallF() {
	c.Lock()
	defer c.Unlock()
	c.f()
}
133 134

func (c *callbackDatastore) Put(key ds.Key, value interface{}) (err error) {
135
	c.CallF()
136 137 138 139
	return c.ds.Put(key, value)
}

func (c *callbackDatastore) Get(key ds.Key) (value interface{}, err error) {
140
	c.CallF()
141 142 143 144
	return c.ds.Get(key)
}

func (c *callbackDatastore) Has(key ds.Key) (exists bool, err error) {
145
	c.CallF()
146 147 148 149
	return c.ds.Has(key)
}

func (c *callbackDatastore) Delete(key ds.Key) (err error) {
150
	c.CallF()
151 152 153
	return c.ds.Delete(key)
}

154
func (c *callbackDatastore) Query(q dsq.Query) (dsq.Results, error) {
155
	c.CallF()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
156
	return c.ds.Query(q)
157
}
158 159 160 161

func (c *callbackDatastore) Batch() (ds.Batch, error) {
	return ds.NewBasicBatch(c), nil
}