Unverified Commit 3e8fd893 authored by Raúl Kripalani's avatar Raúl Kripalani Committed by GitHub

add View() to all the various blockstores. (#59)

parent 9283acfc
...@@ -12,35 +12,42 @@ import ( ...@@ -12,35 +12,42 @@ import (
type cacheHave bool type cacheHave bool
type cacheSize int type cacheSize int
// arccache wraps a BlockStore with an Adaptive Replacement Cache (ARC) for // arccache wraps a BlockStore with an Adaptive Replacement Cache (ARC) that
// block Cids. This provides block access-time improvements, allowing // does not store the actual blocks, just metadata about them: existence and
// to short-cut many searches without query-ing the underlying datastore. // size. This provides block access-time improvements, allowing
// to short-cut many searches without querying the underlying datastore.
type arccache struct { type arccache struct {
arc *lru.TwoQueueCache cache *lru.TwoQueueCache
blockstore Blockstore blockstore Blockstore
viewer Viewer
hits metrics.Counter hits metrics.Counter
total metrics.Counter total metrics.Counter
} }
var _ Blockstore = (*arccache)(nil)
var _ Viewer = (*arccache)(nil)
func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache, error) { func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache, error) {
arc, err := lru.New2Q(lruSize) cache, err := lru.New2Q(lruSize)
if err != nil { if err != nil {
return nil, err return nil, err
} }
c := &arccache{arc: arc, blockstore: bs} c := &arccache{cache: cache, blockstore: bs}
c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter() c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter()
c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter() c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter()
if v, ok := bs.(Viewer); ok {
c.viewer = v
}
return c, nil return c, nil
} }
func (b *arccache) DeleteBlock(k cid.Cid) error { func (b *arccache) DeleteBlock(k cid.Cid) error {
if has, _, ok := b.hasCached(k); ok && !has { if has, _, ok := b.queryCache(k); ok && !has {
return nil return nil
} }
b.arc.Remove(k) // Invalidate cache before deleting. b.cache.Remove(k) // Invalidate cache before deleting.
err := b.blockstore.DeleteBlock(k) err := b.blockstore.DeleteBlock(k)
if err == nil { if err == nil {
b.cacheHave(k, false) b.cacheHave(k, false)
...@@ -48,32 +55,8 @@ func (b *arccache) DeleteBlock(k cid.Cid) error { ...@@ -48,32 +55,8 @@ func (b *arccache) DeleteBlock(k cid.Cid) error {
return err return err
} }
// if ok == false has is inconclusive
// if ok == true then has respons to question: is it contained
func (b *arccache) hasCached(k cid.Cid) (has bool, size int, ok bool) {
b.total.Inc()
if !k.Defined() {
log.Error("undefined cid in arccache")
// Return cache invalid so the call to blockstore happens
// in case of invalid key and correct error is created.
return false, -1, false
}
h, ok := b.arc.Get(string(k.Hash()))
if ok {
b.hits.Inc()
switch h := h.(type) {
case cacheHave:
return bool(h), -1, true
case cacheSize:
return true, int(h), true
}
}
return false, -1, false
}
func (b *arccache) Has(k cid.Cid) (bool, error) { func (b *arccache) Has(k cid.Cid) (bool, error) {
if has, _, ok := b.hasCached(k); ok { if has, _, ok := b.queryCache(k); ok {
return has, nil return has, nil
} }
has, err := b.blockstore.Has(k) has, err := b.blockstore.Has(k)
...@@ -85,7 +68,7 @@ func (b *arccache) Has(k cid.Cid) (bool, error) { ...@@ -85,7 +68,7 @@ func (b *arccache) Has(k cid.Cid) (bool, error) {
} }
func (b *arccache) GetSize(k cid.Cid) (int, error) { func (b *arccache) GetSize(k cid.Cid) (int, error) {
if has, blockSize, ok := b.hasCached(k); ok { if has, blockSize, ok := b.queryCache(k); ok {
if !has { if !has {
// don't have it, return // don't have it, return
return -1, ErrNotFound return -1, ErrNotFound
...@@ -105,13 +88,38 @@ func (b *arccache) GetSize(k cid.Cid) (int, error) { ...@@ -105,13 +88,38 @@ func (b *arccache) GetSize(k cid.Cid) (int, error) {
return blockSize, err return blockSize, err
} }
func (b *arccache) View(k cid.Cid, callback func([]byte) error) error {
// shortcircuit and fall back to Get if the underlying store
// doesn't support Viewer.
if b.viewer == nil {
blk, err := b.Get(k)
if err != nil {
return err
}
return callback(blk.RawData())
}
if !k.Defined() {
log.Error("undefined cid in arc cache")
return ErrNotFound
}
if has, _, ok := b.queryCache(k); ok && !has {
// short circuit if the cache deterministically tells us the item
// doesn't exist.
return ErrNotFound
}
return b.viewer.View(k, callback)
}
func (b *arccache) Get(k cid.Cid) (blocks.Block, error) { func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
if !k.Defined() { if !k.Defined() {
log.Error("undefined cid in arc cache") log.Error("undefined cid in arc cache")
return nil, ErrNotFound return nil, ErrNotFound
} }
if has, _, ok := b.hasCached(k); ok && !has { if has, _, ok := b.queryCache(k); ok && !has {
return nil, ErrNotFound return nil, ErrNotFound
} }
...@@ -125,7 +133,7 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) { ...@@ -125,7 +133,7 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
} }
func (b *arccache) Put(bl blocks.Block) error { func (b *arccache) Put(bl blocks.Block) error {
if has, _, ok := b.hasCached(bl.Cid()); ok && has { if has, _, ok := b.queryCache(bl.Cid()); ok && has {
return nil return nil
} }
...@@ -141,7 +149,7 @@ func (b *arccache) PutMany(bs []blocks.Block) error { ...@@ -141,7 +149,7 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
for _, block := range bs { for _, block := range bs {
// call put on block if result is inconclusive or we are sure that // call put on block if result is inconclusive or we are sure that
// the block isn't in storage // the block isn't in storage
if has, _, ok := b.hasCached(block.Cid()); !ok || (ok && !has) { if has, _, ok := b.queryCache(block.Cid()); !ok || (ok && !has) {
good = append(good, block) good = append(good, block)
} }
} }
...@@ -160,11 +168,44 @@ func (b *arccache) HashOnRead(enabled bool) { ...@@ -160,11 +168,44 @@ func (b *arccache) HashOnRead(enabled bool) {
} }
func (b *arccache) cacheHave(c cid.Cid, have bool) { func (b *arccache) cacheHave(c cid.Cid, have bool) {
b.arc.Add(string(c.Hash()), cacheHave(have)) b.cache.Add(string(c.Hash()), cacheHave(have))
} }
func (b *arccache) cacheSize(c cid.Cid, blockSize int) { func (b *arccache) cacheSize(c cid.Cid, blockSize int) {
b.arc.Add(string(c.Hash()), cacheSize(blockSize)) b.cache.Add(string(c.Hash()), cacheSize(blockSize))
}
// queryCache checks if the CID is in the cache. If so, it returns:
//
// * exists (bool): whether the CID is known to exist or not.
// * size (int): the size if cached, or -1 if not cached.
// * ok (bool): whether present in the cache.
//
// When ok is false, the answer in inconclusive and the caller must ignore the
// other two return values. Querying the underying store is necessary.
//
// When ok is true, exists carries the correct answer, and size carries the
// size, if known, or -1 if not.
func (b *arccache) queryCache(k cid.Cid) (exists bool, size int, ok bool) {
b.total.Inc()
if !k.Defined() {
log.Error("undefined cid in arccache")
// Return cache invalid so the call to blockstore happens
// in case of invalid key and correct error is created.
return false, -1, false
}
h, ok := b.cache.Get(string(k.Hash()))
if ok {
b.hits.Inc()
switch h := h.(type) {
case cacheHave:
return bool(h), -1, true
case cacheSize:
return true, int(h), true
}
}
return false, -1, false
} }
func (b *arccache) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { func (b *arccache) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
......
...@@ -57,6 +57,21 @@ type Blockstore interface { ...@@ -57,6 +57,21 @@ type Blockstore interface {
HashOnRead(enabled bool) HashOnRead(enabled bool)
} }
// Viewer can be implemented by blockstores that offer zero-copy access to
// values.
//
// Callers of View must not mutate or retain the byte slice, as it could be
// an mmapped memory region, or a pooled byte buffer.
//
// View is especially suitable for deserialising in place.
//
// The callback will only be called iff the query operation is successful (and
// the block is found); otherwise, the error will be propagated. Errors returned
// by the callback will be propagated as well.
type Viewer interface {
View(cid cid.Cid, callback func([]byte) error) error
}
// GCLocker abstract functionality to lock a blockstore when performing // GCLocker abstract functionality to lock a blockstore when performing
// garbage-collection operations. // garbage-collection operations.
type GCLocker interface { type GCLocker interface {
......
...@@ -29,6 +29,9 @@ func bloomCached(ctx context.Context, bs Blockstore, bloomSize, hashCount int) ( ...@@ -29,6 +29,9 @@ func bloomCached(ctx context.Context, bs Blockstore, bloomSize, hashCount int) (
"Total number of requests to bloom cache").Counter(), "Total number of requests to bloom cache").Counter(),
buildChan: make(chan struct{}), buildChan: make(chan struct{}),
} }
if v, ok := bs.(Viewer); ok {
bc.viewer = v
}
go func() { go func() {
err := bc.build(ctx) err := bc.build(ctx)
if err != nil { if err != nil {
...@@ -67,12 +70,16 @@ type bloomcache struct { ...@@ -67,12 +70,16 @@ type bloomcache struct {
buildChan chan struct{} buildChan chan struct{}
blockstore Blockstore blockstore Blockstore
viewer Viewer
// Statistics // Statistics
hits metrics.Counter hits metrics.Counter
total metrics.Counter total metrics.Counter
} }
var _ Blockstore = (*bloomcache)(nil)
var _ Viewer = (*bloomcache)(nil)
func (b *bloomcache) BloomActive() bool { func (b *bloomcache) BloomActive() bool {
return atomic.LoadInt32(&b.active) != 0 return atomic.LoadInt32(&b.active) != 0
} }
...@@ -151,6 +158,21 @@ func (b *bloomcache) GetSize(k cid.Cid) (int, error) { ...@@ -151,6 +158,21 @@ func (b *bloomcache) GetSize(k cid.Cid) (int, error) {
return b.blockstore.GetSize(k) return b.blockstore.GetSize(k)
} }
func (b *bloomcache) View(k cid.Cid, callback func([]byte) error) error {
if b.viewer == nil {
blk, err := b.Get(k)
if err != nil {
return err
}
return callback(blk.RawData())
}
if has, ok := b.hasCached(k); ok && !has {
return ErrNotFound
}
return b.viewer.View(k, callback)
}
func (b *bloomcache) Get(k cid.Cid) (blocks.Block, error) { func (b *bloomcache) Get(k cid.Cid) (blocks.Block, error) {
if has, ok := b.hasCached(k); ok && !has { if has, ok := b.hasCached(k); ok && !has {
return nil, ErrNotFound return nil, ErrNotFound
......
...@@ -10,11 +10,19 @@ import ( ...@@ -10,11 +10,19 @@ import (
// idstore wraps a BlockStore to add support for identity hashes // idstore wraps a BlockStore to add support for identity hashes
type idstore struct { type idstore struct {
bs Blockstore bs Blockstore
viewer Viewer
} }
var _ Blockstore = (*idstore)(nil)
var _ Viewer = (*idstore)(nil)
func NewIdStore(bs Blockstore) Blockstore { func NewIdStore(bs Blockstore) Blockstore {
return &idstore{bs} ids := &idstore{bs: bs}
if v, ok := bs.(Viewer); ok {
ids.viewer = v
}
return ids
} }
func extractContents(k cid.Cid) (bool, []byte) { func extractContents(k cid.Cid) (bool, []byte) {
...@@ -46,6 +54,21 @@ func (b *idstore) Has(k cid.Cid) (bool, error) { ...@@ -46,6 +54,21 @@ func (b *idstore) Has(k cid.Cid) (bool, error) {
return b.bs.Has(k) return b.bs.Has(k)
} }
func (b *idstore) View(k cid.Cid, callback func([]byte) error) error {
if b.viewer == nil {
blk, err := b.Get(k)
if err != nil {
return err
}
return callback(blk.RawData())
}
isId, bdata := extractContents(k)
if isId {
return callback(bdata)
}
return b.viewer.View(k, callback)
}
func (b *idstore) GetSize(k cid.Cid) (int, error) { func (b *idstore) GetSize(k cid.Cid) (int, error) {
isId, bdata := extractContents(k) isId, bdata := extractContents(k)
if isId { if isId {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment