From 3e8fd89307c11636913e21e3643ce930f39bb6c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 10 Nov 2020 16:28:56 +0000 Subject: [PATCH] add View() to all the various blockstores. (#59) --- arc_cache.go | 121 +++++++++++++++++++++++++++++++++---------------- blockstore.go | 15 ++++++ bloom_cache.go | 22 +++++++++ idstore.go | 27 ++++++++++- 4 files changed, 143 insertions(+), 42 deletions(-) diff --git a/arc_cache.go b/arc_cache.go index e2b930d..1e497ab 100644 --- a/arc_cache.go +++ b/arc_cache.go @@ -12,35 +12,42 @@ import ( type cacheHave bool type cacheSize int -// arccache wraps a BlockStore with an Adaptive Replacement Cache (ARC) for -// block Cids. This provides block access-time improvements, allowing -// to short-cut many searches without query-ing the underlying datastore. +// arccache wraps a BlockStore with an Adaptive Replacement Cache (ARC) that +// does not store the actual blocks, just metadata about them: existence and +// size. This provides block access-time improvements, allowing +// to short-cut many searches without querying the underlying datastore. type arccache struct { - arc *lru.TwoQueueCache + cache *lru.TwoQueueCache blockstore Blockstore + viewer Viewer hits metrics.Counter total metrics.Counter } +var _ Blockstore = (*arccache)(nil) +var _ Viewer = (*arccache)(nil) + func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache, error) { - arc, err := lru.New2Q(lruSize) + cache, err := lru.New2Q(lruSize) if err != nil { return nil, err } - c := &arccache{arc: arc, blockstore: bs} + c := &arccache{cache: cache, blockstore: bs} c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter() c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter() - + if v, ok := bs.(Viewer); ok { + c.viewer = v + } return c, nil } func (b *arccache) DeleteBlock(k cid.Cid) error { - if has, _, ok := b.hasCached(k); ok && !has { + if has, _, ok := b.queryCache(k); ok && !has { return nil } - b.arc.Remove(k) // Invalidate cache before deleting. + b.cache.Remove(k) // Invalidate cache before deleting. err := b.blockstore.DeleteBlock(k) if err == nil { b.cacheHave(k, false) @@ -48,32 +55,8 @@ func (b *arccache) DeleteBlock(k cid.Cid) error { return err } -// if ok == false has is inconclusive -// if ok == true then has respons to question: is it contained -func (b *arccache) hasCached(k cid.Cid) (has bool, size int, ok bool) { - b.total.Inc() - if !k.Defined() { - log.Error("undefined cid in arccache") - // Return cache invalid so the call to blockstore happens - // in case of invalid key and correct error is created. - return false, -1, false - } - - h, ok := b.arc.Get(string(k.Hash())) - if ok { - b.hits.Inc() - switch h := h.(type) { - case cacheHave: - return bool(h), -1, true - case cacheSize: - return true, int(h), true - } - } - return false, -1, false -} - func (b *arccache) Has(k cid.Cid) (bool, error) { - if has, _, ok := b.hasCached(k); ok { + if has, _, ok := b.queryCache(k); ok { return has, nil } has, err := b.blockstore.Has(k) @@ -85,7 +68,7 @@ func (b *arccache) Has(k cid.Cid) (bool, error) { } func (b *arccache) GetSize(k cid.Cid) (int, error) { - if has, blockSize, ok := b.hasCached(k); ok { + if has, blockSize, ok := b.queryCache(k); ok { if !has { // don't have it, return return -1, ErrNotFound @@ -105,13 +88,38 @@ func (b *arccache) GetSize(k cid.Cid) (int, error) { return blockSize, err } +func (b *arccache) View(k cid.Cid, callback func([]byte) error) error { + // shortcircuit and fall back to Get if the underlying store + // doesn't support Viewer. + if b.viewer == nil { + blk, err := b.Get(k) + if err != nil { + return err + } + return callback(blk.RawData()) + } + + if !k.Defined() { + log.Error("undefined cid in arc cache") + return ErrNotFound + } + + if has, _, ok := b.queryCache(k); ok && !has { + // short circuit if the cache deterministically tells us the item + // doesn't exist. + return ErrNotFound + } + + return b.viewer.View(k, callback) +} + func (b *arccache) Get(k cid.Cid) (blocks.Block, error) { if !k.Defined() { log.Error("undefined cid in arc cache") return nil, ErrNotFound } - if has, _, ok := b.hasCached(k); ok && !has { + if has, _, ok := b.queryCache(k); ok && !has { return nil, ErrNotFound } @@ -125,7 +133,7 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) { } func (b *arccache) Put(bl blocks.Block) error { - if has, _, ok := b.hasCached(bl.Cid()); ok && has { + if has, _, ok := b.queryCache(bl.Cid()); ok && has { return nil } @@ -141,7 +149,7 @@ func (b *arccache) PutMany(bs []blocks.Block) error { for _, block := range bs { // call put on block if result is inconclusive or we are sure that // the block isn't in storage - if has, _, ok := b.hasCached(block.Cid()); !ok || (ok && !has) { + if has, _, ok := b.queryCache(block.Cid()); !ok || (ok && !has) { good = append(good, block) } } @@ -160,11 +168,44 @@ func (b *arccache) HashOnRead(enabled bool) { } func (b *arccache) cacheHave(c cid.Cid, have bool) { - b.arc.Add(string(c.Hash()), cacheHave(have)) + b.cache.Add(string(c.Hash()), cacheHave(have)) } func (b *arccache) cacheSize(c cid.Cid, blockSize int) { - b.arc.Add(string(c.Hash()), cacheSize(blockSize)) + b.cache.Add(string(c.Hash()), cacheSize(blockSize)) +} + +// queryCache checks if the CID is in the cache. If so, it returns: +// +// * exists (bool): whether the CID is known to exist or not. +// * size (int): the size if cached, or -1 if not cached. +// * ok (bool): whether present in the cache. +// +// When ok is false, the answer in inconclusive and the caller must ignore the +// other two return values. Querying the underying store is necessary. +// +// When ok is true, exists carries the correct answer, and size carries the +// size, if known, or -1 if not. +func (b *arccache) queryCache(k cid.Cid) (exists bool, size int, ok bool) { + b.total.Inc() + if !k.Defined() { + log.Error("undefined cid in arccache") + // Return cache invalid so the call to blockstore happens + // in case of invalid key and correct error is created. + return false, -1, false + } + + h, ok := b.cache.Get(string(k.Hash())) + if ok { + b.hits.Inc() + switch h := h.(type) { + case cacheHave: + return bool(h), -1, true + case cacheSize: + return true, int(h), true + } + } + return false, -1, false } func (b *arccache) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { diff --git a/blockstore.go b/blockstore.go index f8eb07a..6625a34 100644 --- a/blockstore.go +++ b/blockstore.go @@ -57,6 +57,21 @@ type Blockstore interface { HashOnRead(enabled bool) } +// Viewer can be implemented by blockstores that offer zero-copy access to +// values. +// +// Callers of View must not mutate or retain the byte slice, as it could be +// an mmapped memory region, or a pooled byte buffer. +// +// View is especially suitable for deserialising in place. +// +// The callback will only be called iff the query operation is successful (and +// the block is found); otherwise, the error will be propagated. Errors returned +// by the callback will be propagated as well. +type Viewer interface { + View(cid cid.Cid, callback func([]byte) error) error +} + // GCLocker abstract functionality to lock a blockstore when performing // garbage-collection operations. type GCLocker interface { diff --git a/bloom_cache.go b/bloom_cache.go index b4fadc2..da302c9 100644 --- a/bloom_cache.go +++ b/bloom_cache.go @@ -29,6 +29,9 @@ func bloomCached(ctx context.Context, bs Blockstore, bloomSize, hashCount int) ( "Total number of requests to bloom cache").Counter(), buildChan: make(chan struct{}), } + if v, ok := bs.(Viewer); ok { + bc.viewer = v + } go func() { err := bc.build(ctx) if err != nil { @@ -67,12 +70,16 @@ type bloomcache struct { buildChan chan struct{} blockstore Blockstore + viewer Viewer // Statistics hits metrics.Counter total metrics.Counter } +var _ Blockstore = (*bloomcache)(nil) +var _ Viewer = (*bloomcache)(nil) + func (b *bloomcache) BloomActive() bool { return atomic.LoadInt32(&b.active) != 0 } @@ -151,6 +158,21 @@ func (b *bloomcache) GetSize(k cid.Cid) (int, error) { return b.blockstore.GetSize(k) } +func (b *bloomcache) View(k cid.Cid, callback func([]byte) error) error { + if b.viewer == nil { + blk, err := b.Get(k) + if err != nil { + return err + } + return callback(blk.RawData()) + } + + if has, ok := b.hasCached(k); ok && !has { + return ErrNotFound + } + return b.viewer.View(k, callback) +} + func (b *bloomcache) Get(k cid.Cid) (blocks.Block, error) { if has, ok := b.hasCached(k); ok && !has { return nil, ErrNotFound diff --git a/idstore.go b/idstore.go index 477da70..274c1a3 100644 --- a/idstore.go +++ b/idstore.go @@ -10,11 +10,19 @@ import ( // idstore wraps a BlockStore to add support for identity hashes type idstore struct { - bs Blockstore + bs Blockstore + viewer Viewer } +var _ Blockstore = (*idstore)(nil) +var _ Viewer = (*idstore)(nil) + func NewIdStore(bs Blockstore) Blockstore { - return &idstore{bs} + ids := &idstore{bs: bs} + if v, ok := bs.(Viewer); ok { + ids.viewer = v + } + return ids } func extractContents(k cid.Cid) (bool, []byte) { @@ -46,6 +54,21 @@ func (b *idstore) Has(k cid.Cid) (bool, error) { return b.bs.Has(k) } +func (b *idstore) View(k cid.Cid, callback func([]byte) error) error { + if b.viewer == nil { + blk, err := b.Get(k) + if err != nil { + return err + } + return callback(blk.RawData()) + } + isId, bdata := extractContents(k) + if isId { + return callback(bdata) + } + return b.viewer.View(k, callback) +} + func (b *idstore) GetSize(k cid.Cid) (int, error) { isId, bdata := extractContents(k) if isId { -- GitLab