package blockstore import ( "fmt" "sync" "testing" "time" "github.com/ipfs/go-ipfs/blocks" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore" dsq "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/query" syncds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/sync" ) func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error) { opts := DefaultCacheOpts() bbs, err := CachedBlockstore(bs, ctx, opts) if err == nil { return bbs.(*bloomcache), nil } else { return nil, err } } func TestReturnsErrorWhenSizeNegative(t *testing.T) { bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore())) _, err := bloomCached(bs, nil, 100, 1, -1) if err == nil { t.Fail() } _, err = bloomCached(bs, nil, -1, 1, 100) if err == nil { t.Fail() } } func TestRemoveCacheEntryOnDelete(t *testing.T) { b := blocks.NewBlock([]byte("foo")) cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} bs := NewBlockstore(syncds.MutexWrap(cd)) cachedbs, err := testBloomCached(bs, nil) if err != nil { t.Fatal(err) } cachedbs.Put(b) cd.Lock() writeHitTheDatastore := false cd.Unlock() cd.SetFunc(func() { writeHitTheDatastore = true }) cachedbs.DeleteBlock(b.Key()) cachedbs.Put(b) if !writeHitTheDatastore { t.Fail() } } func TestElideDuplicateWrite(t *testing.T) { cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} bs := NewBlockstore(syncds.MutexWrap(cd)) cachedbs, err := testBloomCached(bs, nil) if err != nil { t.Fatal(err) } b1 := blocks.NewBlock([]byte("foo")) cachedbs.Put(b1) cd.SetFunc(func() { t.Fatal("write hit the datastore") }) cachedbs.Put(b1) } func TestHasIsBloomCached(t *testing.T) { cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} bs := NewBlockstore(syncds.MutexWrap(cd)) for i := 0; i < 1000; i++ { bs.Put(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i)))) } ctx, _ := context.WithTimeout(context.Background(), 1*time.Second) cachedbs, err := testBloomCached(bs, ctx) if err != nil { t.Fatal(err) } select { case <-cachedbs.rebuildChan: case <-ctx.Done(): t.Fatalf("Timeout wating for rebuild: %d", cachedbs.bloom.ElementsAdded()) } cacheFails := 0 cd.SetFunc(func() { cacheFails++ }) for i := 0; i < 1000; i++ { cachedbs.Has(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i+2000))).Key()) } if float64(cacheFails)/float64(1000) > float64(0.05) { t.Fatal("Bloom filter has cache miss rate of more than 5%") } } type callbackDatastore struct { sync.Mutex f func() ds ds.Datastore } func (c *callbackDatastore) SetFunc(f func()) { c.Lock() defer c.Unlock() c.f = f } func (c *callbackDatastore) CallF() { c.Lock() defer c.Unlock() c.f() } func (c *callbackDatastore) Put(key ds.Key, value interface{}) (err error) { c.CallF() return c.ds.Put(key, value) } func (c *callbackDatastore) Get(key ds.Key) (value interface{}, err error) { c.CallF() return c.ds.Get(key) } func (c *callbackDatastore) Has(key ds.Key) (exists bool, err error) { c.CallF() return c.ds.Has(key) } func (c *callbackDatastore) Delete(key ds.Key) (err error) { c.CallF() return c.ds.Delete(key) } func (c *callbackDatastore) Query(q dsq.Query) (dsq.Results, error) { c.CallF() return c.ds.Query(q) } func (c *callbackDatastore) Batch() (ds.Batch, error) { return ds.NewBasicBatch(c), nil }