Commit 4183902e authored by Jakub Sztandera's avatar Jakub Sztandera

blockstore: extract ARC cache from Bloom cache

it removes race condition that would happen during various calls

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protonmail.ch>
parent 149819f8
package blockstore
import (
"github.com/ipfs/go-ipfs/blocks"
key "github.com/ipfs/go-ipfs/blocks/key"
ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)
type arccache struct {
arc *lru.ARCCache
blockstore Blockstore
}
func arcCached(bs Blockstore, lruSize int) (*arccache, error) {
arc, err := lru.NewARC(lruSize)
if err != nil {
return nil, err
}
return &arccache{arc: arc, blockstore: bs}, nil
}
func (b *arccache) DeleteBlock(k key.Key) error {
if has, ok := b.hasCached(k); ok && !has {
return ErrNotFound
}
b.arc.Remove(k) // Invalidate cache before deleting.
err := b.blockstore.DeleteBlock(k)
switch err {
case nil:
b.arc.Add(k, false)
case ds.ErrNotFound, ErrNotFound:
b.arc.Add(k, false)
default:
return err
}
return nil
}
// if ok == false has is inconclusive
// if ok == true then has respons to question: is it contained
func (b *arccache) hasCached(k key.Key) (has bool, ok bool) {
if k == "" {
// Return cache invalid so call to blockstore
// in case of invalid key is forwarded deeper
return false, false
}
h, ok := b.arc.Get(k)
if ok {
return h.(bool), ok
} else {
return false, false
}
}
func (b *arccache) Has(k key.Key) (bool, error) {
if has, ok := b.hasCached(k); ok {
return has, nil
}
res, err := b.blockstore.Has(k)
if err == nil {
b.arc.Add(k, res)
}
return res, err
}
func (b *arccache) Get(k key.Key) (blocks.Block, error) {
if has, ok := b.hasCached(k); ok && !has {
return nil, ErrNotFound
}
bl, err := b.blockstore.Get(k)
if bl == nil && err == ErrNotFound {
b.arc.Add(k, false)
} else if bl != nil {
b.arc.Add(k, true)
}
return bl, err
}
func (b *arccache) Put(bl blocks.Block) error {
if has, ok := b.hasCached(bl.Key()); ok && has {
return nil
}
err := b.blockstore.Put(bl)
if err == nil {
b.arc.Add(bl.Key(), true)
}
return err
}
func (b *arccache) PutMany(bs []blocks.Block) error {
var good []blocks.Block
for _, block := range bs {
if has, ok := b.hasCached(block.Key()); !ok || (ok && !has) {
good = append(good, block)
}
}
err := b.blockstore.PutMany(bs)
if err == nil {
for _, block := range bs {
b.arc.Add(block.Key(), true)
}
}
return err
}
func (b *arccache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
return b.blockstore.AllKeysChan(ctx)
}
func (b *arccache) GCLock() Unlocker {
return b.blockstore.(GCBlockstore).GCLock()
}
func (b *arccache) PinLock() Unlocker {
return b.blockstore.(GCBlockstore).PinLock()
}
func (b *arccache) GCRequested() bool {
return b.blockstore.(GCBlockstore).GCRequested()
}
package blockstore
import (
"github.com/ipfs/go-ipfs/blocks"
"testing"
ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
syncds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore/sync"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)
func testArcCached(bs GCBlockstore, ctx context.Context) (*arccache, error) {
if ctx == nil {
ctx = context.TODO()
}
opts := DefaultCacheOpts()
opts.HasBloomFilterSize = 0
opts.HasBloomFilterHashes = 0
bbs, err := CachedBlockstore(bs, ctx, opts)
if err == nil {
return bbs.(*arccache), nil
} else {
return nil, err
}
}
func TestRemoveCacheEntryOnDelete(t *testing.T) {
b := blocks.NewBlock([]byte("foo"))
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
bs := NewBlockstore(syncds.MutexWrap(cd))
cachedbs, err := testArcCached(bs, nil)
if err != nil {
t.Fatal(err)
}
cachedbs.Put(b)
cd.Lock()
writeHitTheDatastore := false
cd.Unlock()
cd.SetFunc(func() {
writeHitTheDatastore = true
})
cachedbs.DeleteBlock(b.Key())
cachedbs.Put(b)
if !writeHitTheDatastore {
t.Fail()
}
}
func TestElideDuplicateWrite(t *testing.T) {
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
bs := NewBlockstore(syncds.MutexWrap(cd))
cachedbs, err := testArcCached(bs, nil)
if err != nil {
t.Fatal(err)
}
b1 := blocks.NewBlock([]byte("foo"))
cachedbs.Put(b1)
cd.SetFunc(func() {
t.Fatal("write hit the datastore")
})
cachedbs.Put(b1)
}
......@@ -3,8 +3,6 @@ package blockstore
import (
"github.com/ipfs/go-ipfs/blocks"
key "github.com/ipfs/go-ipfs/blocks/key"
ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
bloom "gx/ipfs/QmWQ2SJisXwcCLsUXLwYCKSfyExXjFRW2WbBH5sqCUnwX5/bbloom"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
......@@ -13,16 +11,12 @@ import (
// bloomCached returns Blockstore that caches Has requests using Bloom filter
// Size is size of bloom filter in bytes
func bloomCached(bs Blockstore, ctx context.Context, bloomSize, hashCount, lruSize int) (*bloomcache, error) {
func bloomCached(bs Blockstore, ctx context.Context, bloomSize, hashCount int) (*bloomcache, error) {
bl, err := bloom.New(float64(bloomSize), float64(hashCount))
if err != nil {
return nil, err
}
arc, err := lru.NewARC(lruSize)
if err != nil {
return nil, err
}
bc := &bloomcache{blockstore: bs, bloom: bl, arc: arc}
bc := &bloomcache{blockstore: bs, bloom: bl}
bc.Invalidate()
go bc.Rebuild(ctx)
......@@ -33,7 +27,6 @@ type bloomcache struct {
bloom *bloom.Bloom
active int32
arc *lru.ARCCache
// This chan is only used for testing to wait for bloom to enable
rebuildChan chan struct{}
blockstore Blockstore
......@@ -84,17 +77,7 @@ func (b *bloomcache) DeleteBlock(k key.Key) error {
return ErrNotFound
}
b.arc.Remove(k) // Invalidate cache before deleting.
err := b.blockstore.DeleteBlock(k)
switch err {
case nil:
b.arc.Add(k, false)
case ds.ErrNotFound, ErrNotFound:
b.arc.Add(k, false)
default:
return err
}
return nil
return b.blockstore.DeleteBlock(k)
}
// if ok == false has is inconclusive
......@@ -111,12 +94,7 @@ func (b *bloomcache) hasCached(k key.Key) (has bool, ok bool) {
return false, true
}
}
h, ok := b.arc.Get(k)
if ok {
return h.(bool), ok
} else {
return false, false
}
return false, false
}
func (b *bloomcache) Has(k key.Key) (bool, error) {
......@@ -124,11 +102,7 @@ func (b *bloomcache) Has(k key.Key) (bool, error) {
return has, nil
}
res, err := b.blockstore.Has(k)
if err == nil {
b.arc.Add(k, res)
}
return res, err
return b.blockstore.Has(k)
}
func (b *bloomcache) Get(k key.Key) (blocks.Block, error) {
......@@ -136,13 +110,7 @@ func (b *bloomcache) Get(k key.Key) (blocks.Block, error) {
return nil, ErrNotFound
}
bl, err := b.blockstore.Get(k)
if bl == nil && err == ErrNotFound {
b.arc.Add(k, false)
} else if bl != nil {
b.arc.Add(k, true)
}
return bl, err
return b.blockstore.Get(k)
}
func (b *bloomcache) Put(bl blocks.Block) error {
......@@ -153,7 +121,6 @@ func (b *bloomcache) Put(bl blocks.Block) error {
err := b.blockstore.Put(bl)
if err == nil {
b.bloom.AddTS([]byte(bl.Key()))
b.arc.Add(bl.Key(), true)
}
return err
}
......@@ -169,7 +136,6 @@ func (b *bloomcache) PutMany(bs []blocks.Block) error {
if err == nil {
for _, block := range bs {
b.bloom.AddTS([]byte(block.Key()))
b.arc.Add(block.Key(), true)
}
}
return err
......
......@@ -19,6 +19,7 @@ func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error)
ctx = context.TODO()
}
opts := DefaultCacheOpts()
opts.HasARCCacheSize = 0
bbs, err := CachedBlockstore(bs, ctx, opts)
if err == nil {
return bbs.(*bloomcache), nil
......@@ -29,56 +30,10 @@ func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error)
func TestReturnsErrorWhenSizeNegative(t *testing.T) {
bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
_, err := bloomCached(bs, context.TODO(), 100, 1, -1)
_, err := bloomCached(bs, context.TODO(), -1, 1)
if err == nil {
t.Fail()
}
_, err = bloomCached(bs, context.TODO(), -1, 1, 100)
if err == nil {
t.Fail()
}
}
func TestRemoveCacheEntryOnDelete(t *testing.T) {
b := blocks.NewBlock([]byte("foo"))
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
bs := NewBlockstore(syncds.MutexWrap(cd))
cachedbs, err := testBloomCached(bs, nil)
if err != nil {
t.Fatal(err)
}
cachedbs.Put(b)
cd.Lock()
writeHitTheDatastore := false
cd.Unlock()
cd.SetFunc(func() {
writeHitTheDatastore = true
})
cachedbs.DeleteBlock(b.Key())
cachedbs.Put(b)
if !writeHitTheDatastore {
t.Fail()
}
}
func TestElideDuplicateWrite(t *testing.T) {
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
bs := NewBlockstore(syncds.MutexWrap(cd))
cachedbs, err := testBloomCached(bs, nil)
if err != nil {
t.Fatal(err)
}
b1 := blocks.NewBlock([]byte("foo"))
cachedbs.Put(b1)
cd.SetFunc(func() {
t.Fatal("write hit the datastore")
})
cachedbs.Put(b1)
}
func TestHasIsBloomCached(t *testing.T) {
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
......
......@@ -34,8 +34,10 @@ func CachedBlockstore(bs GCBlockstore,
return nil, errors.New("bloom filter hash count can't be 0 when there is size set")
}
if opts.HasBloomFilterSize != 0 {
cbs, err = bloomCached(cbs, ctx, opts.HasBloomFilterSize, opts.HasBloomFilterHashes,
opts.HasARCCacheSize)
cbs, err = bloomCached(cbs, ctx, opts.HasBloomFilterSize, opts.HasBloomFilterHashes)
}
if opts.HasARCCacheSize > 0 {
cbs, err = arcCached(cbs, opts.HasARCCacheSize)
}
return cbs, err
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment