Commit d021381f authored by taylor's avatar taylor

blockstore: Adding GetSize method to map from Cid to BlockSize

Performant way to map from Cid to BlockSize. 

License: MIT
Signed-off-by: default avatarJeromy <why@ipfs.io>
parent fe4126ef
......@@ -34,7 +34,7 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
}
func (b *arccache) DeleteBlock(k *cid.Cid) error {
if has, ok := b.hasCached(k); ok && !has {
if has, _, ok := b.hasCached(k); ok && !has {
return ErrNotFound
}
......@@ -42,7 +42,7 @@ func (b *arccache) DeleteBlock(k *cid.Cid) error {
err := b.blockstore.DeleteBlock(k)
switch err {
case nil, ds.ErrNotFound, ErrNotFound:
b.addCache(k, false)
b.addCache(k, -1)
return err
default:
return err
......@@ -51,33 +51,46 @@ func (b *arccache) DeleteBlock(k *cid.Cid) error {
// if ok == false has is inconclusive
// if ok == true then has respons to question: is it contained
func (b *arccache) hasCached(k *cid.Cid) (has bool, ok bool) {
func (b *arccache) hasCached(k *cid.Cid) (has bool, size int, ok bool) {
b.total.Inc()
if k == nil {
log.Error("nil cid in arccache")
// Return cache invalid so the call to blockstore happens
// in case of invalid key and correct error is created.
return false, false
return false, -1, false
}
h, ok := b.arc.Get(k.KeyString())
if ok {
b.hits.Inc()
return h.(bool), true
if h.(int) > -1 {
return true, h.(int), true
} else {
return false, h.(int), true
}
}
return false, false
return false, -1, false
}
func (b *arccache) Has(k *cid.Cid) (bool, error) {
if has, ok := b.hasCached(k); ok {
return has, nil
blockSize, err := b.GetSize(k)
if err == ds.ErrNotFound {
return false, nil
}
return blockSize > -1, err
}
res, err := b.blockstore.Has(k)
if err == nil {
b.addCache(k, res)
func (b *arccache) GetSize(k *cid.Cid) (int, error) {
if _, blockSize, ok := b.hasCached(k); ok {
return blockSize, nil
}
blockSize, err := b.blockstore.GetSize(k)
if err == ds.ErrNotFound {
b.addCache(k, -1)
} else if err == nil {
b.addCache(k, blockSize)
}
return res, err
return blockSize, err
}
func (b *arccache) Get(k *cid.Cid) (blocks.Block, error) {
......@@ -86,27 +99,27 @@ func (b *arccache) Get(k *cid.Cid) (blocks.Block, error) {
return nil, ErrNotFound
}
if has, ok := b.hasCached(k); ok && !has {
if has, _, ok := b.hasCached(k); ok && !has {
return nil, ErrNotFound
}
bl, err := b.blockstore.Get(k)
if bl == nil && err == ErrNotFound {
b.addCache(k, false)
b.addCache(k, -1)
} else if bl != nil {
b.addCache(k, true)
b.addCache(k, len(bl.RawData()))
}
return bl, err
}
func (b *arccache) Put(bl blocks.Block) error {
if has, ok := b.hasCached(bl.Cid()); ok && has {
if has, _, ok := b.hasCached(bl.Cid()); ok && has {
return nil
}
err := b.blockstore.Put(bl)
if err == nil {
b.addCache(bl.Cid(), true)
b.addCache(bl.Cid(), len(bl.RawData()))
}
return err
}
......@@ -116,7 +129,7 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
for _, block := range bs {
// call put on block if result is inconclusive or we are sure that
// the block isn't in storage
if has, ok := b.hasCached(block.Cid()); !ok || (ok && !has) {
if has, _, ok := b.hasCached(block.Cid()); !ok || (ok && !has) {
good = append(good, block)
}
}
......@@ -125,7 +138,7 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
return err
}
for _, block := range good {
b.addCache(block.Cid(), true)
b.addCache(block.Cid(), len(block.RawData()))
}
return nil
}
......@@ -134,8 +147,8 @@ func (b *arccache) HashOnRead(enabled bool) {
b.blockstore.HashOnRead(enabled)
}
func (b *arccache) addCache(c *cid.Cid, has bool) {
b.arc.Add(c.KeyString(), has)
func (b *arccache) addCache(c *cid.Cid, blockSize int) {
b.arc.Add(c.KeyString(), blockSize)
}
func (b *arccache) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
......
......@@ -4,7 +4,7 @@ import (
"context"
"testing"
"github.com/ipfs/go-block-format"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
syncds "github.com/ipfs/go-datastore/sync"
......@@ -107,6 +107,9 @@ func TestGetFillsCache(t *testing.T) {
if has, err := arc.Has(exampleBlock.Cid()); has || err != nil {
t.Fatal("has was true but there is no such block")
}
if blockSize, err := arc.GetSize(exampleBlock.Cid()); blockSize > -1 || err != nil {
t.Fatal("getsize was true but there is no such block")
}
untrap(cd)
......@@ -119,12 +122,16 @@ func TestGetFillsCache(t *testing.T) {
if has, err := arc.Has(exampleBlock.Cid()); !has || err != nil {
t.Fatal("has returned invalid result")
}
if blockSize, err := arc.GetSize(exampleBlock.Cid()); blockSize == -1 || err != nil {
t.Fatal("getsize returned invalid result")
}
}
func TestGetAndDeleteFalseShortCircuit(t *testing.T) {
arc, _, cd := createStores(t)
arc.Has(exampleBlock.Cid())
arc.GetSize(exampleBlock.Cid())
trap("get hit datastore", cd, t)
......@@ -167,6 +174,41 @@ func TestHasAfterSucessfulGetIsCached(t *testing.T) {
arc.Has(exampleBlock.Cid())
}
func TestGetSizeAfterSucessfulGetIsCached(t *testing.T) {
arc, bs, cd := createStores(t)
bs.Put(exampleBlock)
arc.Get(exampleBlock.Cid())
trap("has hit datastore", cd, t)
arc.GetSize(exampleBlock.Cid())
}
func TestGetSizeMissingZeroSizeBlock(t *testing.T) {
arc, bs, cd := createStores(t)
emptyBlock := blocks.NewBlock([]byte{})
missingBlock := blocks.NewBlock([]byte("missingBlock"))
bs.Put(emptyBlock)
arc.Get(emptyBlock.Cid())
trap("has hit datastore", cd, t)
if blockSize, err := arc.GetSize(emptyBlock.Cid()); blockSize != 0 || err != nil {
t.Fatal("getsize returned invalid result")
}
untrap(cd)
arc.Get(missingBlock.Cid())
trap("has hit datastore", cd, t)
if blockSize, err := arc.GetSize(missingBlock.Cid()); blockSize != -1 || err != nil {
t.Fatal("getsize returned invalid result")
}
}
func TestDifferentKeyObjectsWork(t *testing.T) {
arc, bs, cd := createStores(t)
......@@ -191,6 +233,7 @@ func TestPutManyCaches(t *testing.T) {
trap("has hit datastore", cd, t)
arc.Has(exampleBlock.Cid())
arc.GetSize(exampleBlock.Cid())
untrap(cd)
arc.DeleteBlock(exampleBlock.Cid())
......
......@@ -40,6 +40,9 @@ type Blockstore interface {
Has(*cid.Cid) (bool, error)
Get(*cid.Cid) (blocks.Block, error)
// GetSize returns the CIDs mapped BlockSize
GetSize(*cid.Cid) (int, error)
// Put puts a given block to the underlying datastore
Put(blocks.Block) error
......@@ -183,6 +186,18 @@ func (bs *blockstore) Has(k *cid.Cid) (bool, error) {
return bs.datastore.Has(dshelp.CidToDsKey(k))
}
func (bs *blockstore) GetSize(k *cid.Cid) (int, error) {
maybeData, err := bs.datastore.Get(dshelp.CidToDsKey(k))
if err != nil {
return -1, err
}
bdata, ok := maybeData.([]byte)
if !ok {
return -1, ErrValueTypeMismatch
}
return len(bdata), nil
}
func (bs *blockstore) DeleteBlock(k *cid.Cid) error {
err := bs.datastore.Delete(dshelp.CidToDsKey(k))
if err == ds.ErrNotFound {
......
......@@ -54,6 +54,39 @@ func TestPutThenGetBlock(t *testing.T) {
}
}
func TestPutThenGetSizeBlock(t *testing.T) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
block := blocks.NewBlock([]byte("some data"))
missingBlock := blocks.NewBlock([]byte("missingBlock"))
emptyBlock := blocks.NewBlock([]byte{})
err := bs.Put(block)
if err != nil {
t.Fatal(err)
}
blockSize, err := bs.GetSize(block.Cid())
if err != nil {
t.Fatal(err)
}
if len(block.RawData()) != blockSize {
t.Fail()
}
err = bs.Put(emptyBlock)
if err != nil {
t.Fatal(err)
}
if blockSize, err := bs.GetSize(emptyBlock.Cid()); blockSize != 0 || err != nil {
t.Fatal(err)
}
if blockSize, err := bs.GetSize(missingBlock.Cid()); blockSize != -1 || err == nil {
t.Fatal("getsize returned invalid result")
}
}
func TestHashOnRead(t *testing.T) {
orginalDebug := u.Debug
defer (func() {
......
......@@ -133,6 +133,10 @@ func (b *bloomcache) Has(k *cid.Cid) (bool, error) {
return b.blockstore.Has(k)
}
func (b *bloomcache) GetSize(k *cid.Cid) (int, error) {
return b.blockstore.GetSize(k)
}
func (b *bloomcache) Get(k *cid.Cid) (blocks.Block, error) {
if has, ok := b.hasCached(k); ok && !has {
return nil, ErrNotFound
......
......@@ -45,13 +45,18 @@ func TestPutManyAddsToBloom(t *testing.T) {
block1 := blocks.NewBlock([]byte("foo"))
block2 := blocks.NewBlock([]byte("bar"))
emptyBlock := blocks.NewBlock([]byte{})
cachedbs.PutMany([]blocks.Block{block1})
cachedbs.PutMany([]blocks.Block{block1, emptyBlock})
has, err := cachedbs.Has(block1.Cid())
if err != nil {
t.Fatal(err)
}
if !has {
blockSize, err := cachedbs.GetSize(block1.Cid())
if err != nil {
t.Fatal(err)
}
if blockSize == -1 || !has {
t.Fatal("added block is reported missing")
}
......@@ -59,9 +64,25 @@ func TestPutManyAddsToBloom(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if has {
blockSize, err = cachedbs.GetSize(block2.Cid())
if err != nil && err != ds.ErrNotFound {
t.Fatal(err)
}
if blockSize > -1 || has {
t.Fatal("not added block is reported to be in blockstore")
}
has, err = cachedbs.Has(emptyBlock.Cid())
if err != nil {
t.Fatal(err)
}
blockSize, err = cachedbs.GetSize(emptyBlock.Cid())
if err != nil {
t.Fatal(err)
}
if blockSize != 0 || !has {
t.Fatal("added block is reported missing")
}
}
func TestReturnsErrorWhenSizeNegative(t *testing.T) {
......
......@@ -41,6 +41,14 @@ func (b *idstore) Has(k *cid.Cid) (bool, error) {
return b.bs.Has(k)
}
func (b *idstore) GetSize(k *cid.Cid) (int, error) {
isId, bdata := extractContents(k)
if isId {
return len(bdata), nil
}
return b.bs.GetSize(k)
}
func (b *idstore) Get(k *cid.Cid) (blocks.Block, error) {
isId, bdata := extractContents(k)
if isId {
......
......@@ -21,6 +21,8 @@ func TestIdStore(t *testing.T) {
idblock1, _ := blk.NewBlockWithCid([]byte("idhash1"), idhash1)
hash1, _ := cid.NewPrefixV1(cid.Raw, mh.SHA2_256).Sum([]byte("hash1"))
block1, _ := blk.NewBlockWithCid([]byte("hash1"), hash1)
emptyHash, _ := cid.NewPrefixV1(cid.Raw, mh.SHA2_256).Sum([]byte("emptyHash"))
emptyBlock, _ := blk.NewBlockWithCid([]byte{}, emptyHash)
ids, cb := createTestStores()
......@@ -56,11 +58,31 @@ func TestIdStore(t *testing.T) {
t.Fatal("normal block not added to datastore")
}
blockSize, _ := ids.GetSize(hash1)
if blockSize == -1 {
t.Fatal("normal block not added to datastore")
}
_, err = ids.Get(hash1)
if err != nil {
t.Fatal(err)
}
err = ids.Put(emptyBlock)
if err != nil {
t.Fatalf("Put() failed on normal block: %v", err)
}
have, _ = ids.Has(emptyHash)
if !have {
t.Fatal("normal block not added to datastore")
}
blockSize, _ = ids.GetSize(emptyHash)
if blockSize != 0 {
t.Fatal("normal block not added to datastore")
}
cb.f = failIfPassThough
err = ids.DeleteBlock(idhash1)
if err != nil {
......@@ -78,6 +100,16 @@ func TestIdStore(t *testing.T) {
t.Fatal("normal block not deleted from datastore")
}
blockSize, _ = ids.GetSize(hash1)
if blockSize > -1 {
t.Fatal("normal block not deleted from datastore")
}
err = ids.DeleteBlock(emptyHash)
if err != nil {
t.Fatal(err)
}
idhash2, _ := cid.NewPrefixV1(cid.Raw, mh.ID).Sum([]byte("idhash2"))
idblock2, _ := blk.NewBlockWithCid([]byte("idhash2"), idhash2)
hash2, _ := cid.NewPrefixV1(cid.Raw, mh.SHA2_256).Sum([]byte("hash2"))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment