package blockstore import ( "github.com/ipfs/go-ipfs/blocks" key "github.com/ipfs/go-ipfs/blocks/key" ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore" lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru" bloom "gx/ipfs/QmWQ2SJisXwcCLsUXLwYCKSfyExXjFRW2WbBH5sqCUnwX5/bbloom" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" "sync/atomic" ) // bloomCached returns Blockstore that caches Has requests using Bloom filter // Size is size of bloom filter in bytes func bloomCached(bs Blockstore, ctx context.Context, bloomSize, hashCount, lruSize int) (*bloomcache, error) { bl, err := bloom.New(float64(bloomSize), float64(hashCount)) if err != nil { return nil, err } arc, err := lru.NewARC(lruSize) if err != nil { return nil, err } bc := &bloomcache{blockstore: bs, bloom: bl, arc: arc} bc.Invalidate() go bc.Rebuild(ctx) return bc, nil } type bloomcache struct { bloom *bloom.Bloom active int32 arc *lru.ARCCache // This chan is only used for testing to wait for bloom to enable rebuildChan chan struct{} blockstore Blockstore // Statistics hits uint64 misses uint64 } func (b *bloomcache) Invalidate() { b.rebuildChan = make(chan struct{}) atomic.StoreInt32(&b.active, 0) } func (b *bloomcache) BloomActive() bool { return atomic.LoadInt32(&b.active) != 0 } func (b *bloomcache) Rebuild(ctx context.Context) { evt := log.EventBegin(ctx, "bloomcache.Rebuild") defer evt.Done() ch, err := b.blockstore.AllKeysChan(ctx) if err != nil { log.Errorf("AllKeysChan failed in bloomcache rebuild with: %v", err) return } finish := false for !finish { select { case key, ok := <-ch: if ok { b.bloom.AddTS([]byte(key)) // Use binary key, the more compact the better } else { finish = true } case <-ctx.Done(): log.Warning("Cache rebuild closed by context finishing.") return } } close(b.rebuildChan) atomic.StoreInt32(&b.active, 1) } func (b *bloomcache) DeleteBlock(k key.Key) error { if has, ok := b.hasCached(k); ok && !has { return ErrNotFound } b.arc.Remove(k) // Invalidate cache before deleting. err := b.blockstore.DeleteBlock(k) switch err { case nil: b.arc.Add(k, false) case ds.ErrNotFound, ErrNotFound: b.arc.Add(k, false) default: return err } return nil } // if ok == false has is inconclusive // if ok == true then has respons to question: is it contained func (b *bloomcache) hasCached(k key.Key) (has bool, ok bool) { if k == "" { // Return cache invalid so call to blockstore // in case of invalid key is forwarded deeper return false, false } if b.BloomActive() { blr := b.bloom.HasTS([]byte(k)) if blr == false { // not contained in bloom is only conclusive answer bloom gives return false, true } } h, ok := b.arc.Get(k) if ok { return h.(bool), ok } else { return false, false } } func (b *bloomcache) Has(k key.Key) (bool, error) { if has, ok := b.hasCached(k); ok { return has, nil } res, err := b.blockstore.Has(k) if err == nil { b.arc.Add(k, res) } return res, err } func (b *bloomcache) Get(k key.Key) (blocks.Block, error) { if has, ok := b.hasCached(k); ok && !has { return nil, ErrNotFound } bl, err := b.blockstore.Get(k) if bl == nil && err == ErrNotFound { b.arc.Add(k, false) } else if bl != nil { b.arc.Add(k, true) } return bl, err } func (b *bloomcache) Put(bl blocks.Block) error { if has, ok := b.hasCached(bl.Key()); ok && has { return nil } err := b.blockstore.Put(bl) if err == nil { b.bloom.AddTS([]byte(bl.Key())) b.arc.Add(bl.Key(), true) } return err } func (b *bloomcache) PutMany(bs []blocks.Block) error { var good []blocks.Block for _, block := range bs { if has, ok := b.hasCached(block.Key()); !ok || (ok && !has) { good = append(good, block) } } err := b.blockstore.PutMany(bs) if err == nil { for _, block := range bs { b.bloom.AddTS([]byte(block.Key())) b.arc.Add(block.Key(), true) } } return err } func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) { return b.blockstore.AllKeysChan(ctx) } func (b *bloomcache) GCLock() Unlocker { return b.blockstore.(GCBlockstore).GCLock() } func (b *bloomcache) PinLock() Unlocker { return b.blockstore.(GCBlockstore).PinLock() } func (b *bloomcache) GCRequested() bool { return b.blockstore.(GCBlockstore).GCRequested() }