Commit 6257699d authored by tavit ohanian's avatar tavit ohanian

Merge branch 'port-2021-04-29'

parents 93cf595d 38ebdb53
Pipeline #370 passed with stages
in 22 seconds
blank_issues_enabled: false
contact_links:
- name: Getting Help on IPFS
url: https://ipfs.io/help
about: All information about how and where to get help on IPFS.
- name: IPFS Official Forum
url: https://discuss.ipfs.io
about: Please post general questions, support requests, and discussions here.
---
name: Open an issue
about: Only for actionable issues relevant to this repository.
title: ''
labels: need/triage
assignees: ''
---
<!--
Hello! To ensure this issue is correctly addressed as soon as possible by the IPFS team, please try to make sure:
- This issue is relevant to this repository's topic or codebase.
- A clear description is provided. It should includes as much relevant information as possible and clear scope for the issue to be actionable.
FOR GENERAL DISCUSSION, HELP OR QUESTIONS, please see the options at https://ipfs.io/help or head directly to https://discuss.ipfs.io.
(you can delete this section after reading)
-->
# Configuration for welcome - https://github.com/behaviorbot/welcome
# Configuration for new-issue-welcome - https://github.com/behaviorbot/new-issue-welcome
# Comment to be posted to on first time issues
newIssueWelcomeComment: >
Thank you for submitting your first issue to this repository! A maintainer
will be here shortly to triage and review.
In the meantime, please double-check that you have provided all the
necessary information to make this process easy! Any information that can
help save additional round trips is useful! We currently aim to give
initial feedback within **two business days**. If this does not happen, feel
free to leave a comment.
Please keep an eye on how this issue will be labeled, as labels give an
overview of priorities, assignments and additional actions requested by the
maintainers:
- "Priority" labels will show how urgent this is for the team.
- "Status" labels will show if this is ready to be worked on, blocked, or in progress.
- "Need" labels will indicate if additional input or analysis is required.
Finally, remember to use https://discuss.ipfs.io if you just need general
support.
# Configuration for new-pr-welcome - https://github.com/behaviorbot/new-pr-welcome
# Comment to be posted to on PRs from first time contributors in your repository
newPRWelcomeComment: >
Thank you for submitting this PR!
A maintainer will be here shortly to review it.
We are super grateful, but we are also overloaded! Help us by making sure
that:
* The context for this PR is clear, with relevant discussion, decisions
and stakeholders linked/mentioned.
* Your contribution itself is clear (code comments, self-review for the
rest) and in its best form. Follow the [code contribution
guidelines](https://github.com/ipfs/community/blob/master/CONTRIBUTING.md#code-contribution-guidelines)
if they apply.
Getting other community members to do a review would be great help too on
complex PRs (you can ask in the chats/forums). If you are unsure about
something, just leave us a comment.
Next steps:
* A maintainer will triage and assign priority to this PR, commenting on
any missing things and potentially assigning a reviewer for high
priority items.
* The PR gets reviews, discussed and approvals as needed.
* The PR is merged by maintainers when it has been approved and comments addressed.
We currently aim to provide initial feedback/triaging within **two business
days**. Please keep an eye on any labelling actions, as these will indicate
priorities and status of your contribution.
We are very grateful for your contribution!
# Configuration for first-pr-merge - https://github.com/behaviorbot/first-pr-merge
# Comment to be posted to on pull requests merged by a first time user
# Currently disabled
#firstPRMergeComment: ""
stages:
- build
- test
variables:
BUILD_DIR: "/tmp/$CI_CONCURRENT_PROJECT_ID"
before_script:
- mkdir -p $BUILD_DIR/src
- cd $BUILD_DIR/src
- if [ -d $CI_PROJECT_DIR ]
- then
- echo "soft link $CI_PROJECT_DIR exists"
- else
- echo "creating soft link $CI_PROJECT_DIR"
- ln -s $CI_PROJECT_DIR
- fi
- cd $CI_PROJECT_DIR
build:
stage: build
tags:
- testing
script:
- echo $CI_JOB_STAGE
- go build
test:
stage: test
tags:
- testing
script:
- echo $CI_JOB_STAGE
- go test -cover
coverage: '/coverage: \d+.\d+% of statements/'
0.1.8: QmXjKkjMDTtXAiLBwstVexofB8LeruZmE2eBd85GwGFFLA
os:
- linux
language: go
go:
- 1.13.x
env:
global:
- GOTFLAGS="-race"
matrix:
- BUILD_DEPTYPE=gomod
# disable travis install
install:
- true
script:
- bash <(curl -s https://raw.githubusercontent.com/ipfs/ci-helpers/master/travis-ci/run-standard-tests.sh)
cache:
directories:
- $GOPATH/pkg/mod
- $HOME/.cache/go-build
notifications:
email: false
MIT License
Copyright (c) 2020 Protocol Labs
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# go-dms3-blockstore
# go-ipfs-blockstore
dms3 blockstore
\ No newline at end of file
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/)
[![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme)
[![GoDoc](https://godoc.org/github.com/ipfs/go-ipfs-blockstore?status.svg)](https://godoc.org/github.com/ipfs/go-ipfs-blockstore)
[![Build Status](https://travis-ci.com/ipfs/go-ipfs-blockstore.svg?branch=master)](https://travis-ci.com/ipfs/go-ipfs-blockstore)
> go-ipfs-blockstore implements a thin wrapper over a datastore, giving a clean interface for Getting and Putting block objects.
## Lead Maintainer
[Steven Allen](https://github.com/Stebalien)
## Table of Contents
- [Install](#install)
- [Usage](#usage)
- [Contribute](#contribute)
- [License](#license)
## Install
`go-ipfs-blockstore` works like a regular Go module:
```
> go get github.com/ipfs/go-ipfs-blockstore
```
## Usage
```
import "github.com/ipfs/go-ipfs-blockstore"
```
Check the [GoDoc documentation](https://godoc.org/github.com/ipfs/go-ipfs-blockstore)
## Contribute
PRs accepted.
Small note: If editing the README, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification.
## License
MIT © Protocol Labs, Inc.
package blockstore
import (
"context"
lru "github.com/hashicorp/golang-lru"
blocks "gitlab.dms3.io/dms3/public/go-block-format"
cid "gitlab.dms3.io/dms3/public/go-cid"
metrics "gitlab.dms3.io/dms3/public/go-metrics-interface"
)
type cacheHave bool
type cacheSize int
// arccache wraps a BlockStore with an Adaptive Replacement Cache (ARC) that
// does not store the actual blocks, just metadata about them: existence and
// size. This provides block access-time improvements, allowing
// to short-cut many searches without querying the underlying datastore.
type arccache struct {
cache *lru.TwoQueueCache
blockstore Blockstore
viewer Viewer
hits metrics.Counter
total metrics.Counter
}
var _ Blockstore = (*arccache)(nil)
var _ Viewer = (*arccache)(nil)
func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache, error) {
cache, err := lru.New2Q(lruSize)
if err != nil {
return nil, err
}
c := &arccache{cache: cache, blockstore: bs}
c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter()
c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter()
if v, ok := bs.(Viewer); ok {
c.viewer = v
}
return c, nil
}
func (b *arccache) DeleteBlock(k cid.Cid) error {
if has, _, ok := b.queryCache(k); ok && !has {
return nil
}
b.cache.Remove(k) // Invalidate cache before deleting.
err := b.blockstore.DeleteBlock(k)
if err == nil {
b.cacheHave(k, false)
}
return err
}
func (b *arccache) Has(k cid.Cid) (bool, error) {
if has, _, ok := b.queryCache(k); ok {
return has, nil
}
has, err := b.blockstore.Has(k)
if err != nil {
return false, err
}
b.cacheHave(k, has)
return has, nil
}
func (b *arccache) GetSize(k cid.Cid) (int, error) {
if has, blockSize, ok := b.queryCache(k); ok {
if !has {
// don't have it, return
return -1, ErrNotFound
}
if blockSize >= 0 {
// have it and we know the size
return blockSize, nil
}
// we have it but don't know the size, ask the datastore.
}
blockSize, err := b.blockstore.GetSize(k)
if err == ErrNotFound {
b.cacheHave(k, false)
} else if err == nil {
b.cacheSize(k, blockSize)
}
return blockSize, err
}
func (b *arccache) View(k cid.Cid, callback func([]byte) error) error {
// shortcircuit and fall back to Get if the underlying store
// doesn't support Viewer.
if b.viewer == nil {
blk, err := b.Get(k)
if err != nil {
return err
}
return callback(blk.RawData())
}
if !k.Defined() {
log.Error("undefined cid in arc cache")
return ErrNotFound
}
if has, _, ok := b.queryCache(k); ok && !has {
// short circuit if the cache deterministically tells us the item
// doesn't exist.
return ErrNotFound
}
return b.viewer.View(k, callback)
}
func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
if !k.Defined() {
log.Error("undefined cid in arc cache")
return nil, ErrNotFound
}
if has, _, ok := b.queryCache(k); ok && !has {
return nil, ErrNotFound
}
bl, err := b.blockstore.Get(k)
if bl == nil && err == ErrNotFound {
b.cacheHave(k, false)
} else if bl != nil {
b.cacheSize(k, len(bl.RawData()))
}
return bl, err
}
func (b *arccache) Put(bl blocks.Block) error {
if has, _, ok := b.queryCache(bl.Cid()); ok && has {
return nil
}
err := b.blockstore.Put(bl)
if err == nil {
b.cacheSize(bl.Cid(), len(bl.RawData()))
}
return err
}
func (b *arccache) PutMany(bs []blocks.Block) error {
var good []blocks.Block
for _, block := range bs {
// call put on block if result is inconclusive or we are sure that
// the block isn't in storage
if has, _, ok := b.queryCache(block.Cid()); !ok || (ok && !has) {
good = append(good, block)
}
}
err := b.blockstore.PutMany(good)
if err != nil {
return err
}
for _, block := range good {
b.cacheSize(block.Cid(), len(block.RawData()))
}
return nil
}
func (b *arccache) HashOnRead(enabled bool) {
b.blockstore.HashOnRead(enabled)
}
func (b *arccache) cacheHave(c cid.Cid, have bool) {
b.cache.Add(string(c.Hash()), cacheHave(have))
}
func (b *arccache) cacheSize(c cid.Cid, blockSize int) {
b.cache.Add(string(c.Hash()), cacheSize(blockSize))
}
// queryCache checks if the CID is in the cache. If so, it returns:
//
// * exists (bool): whether the CID is known to exist or not.
// * size (int): the size if cached, or -1 if not cached.
// * ok (bool): whether present in the cache.
//
// When ok is false, the answer in inconclusive and the caller must ignore the
// other two return values. Querying the underying store is necessary.
//
// When ok is true, exists carries the correct answer, and size carries the
// size, if known, or -1 if not.
func (b *arccache) queryCache(k cid.Cid) (exists bool, size int, ok bool) {
b.total.Inc()
if !k.Defined() {
log.Error("undefined cid in arccache")
// Return cache invalid so the call to blockstore happens
// in case of invalid key and correct error is created.
return false, -1, false
}
h, ok := b.cache.Get(string(k.Hash()))
if ok {
b.hits.Inc()
switch h := h.(type) {
case cacheHave:
return bool(h), -1, true
case cacheSize:
return true, int(h), true
}
}
return false, -1, false
}
func (b *arccache) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return b.blockstore.AllKeysChan(ctx)
}
func (b *arccache) GCLock() Unlocker {
return b.blockstore.(GCBlockstore).GCLock()
}
func (b *arccache) PinLock() Unlocker {
return b.blockstore.(GCBlockstore).PinLock()
}
func (b *arccache) GCRequested() bool {
return b.blockstore.(GCBlockstore).GCRequested()
}
package blockstore
import (
"context"
"testing"
blocks "gitlab.dms3.io/dms3/public/go-block-format"
cid "gitlab.dms3.io/dms3/public/go-cid"
ds "gitlab.dms3.io/dms3/public/go-datastore"
syncds "gitlab.dms3.io/dms3/public/go-datastore/sync"
)
var exampleBlock = blocks.NewBlock([]byte("foo"))
func testArcCached(ctx context.Context, bs Blockstore) (*arccache, error) {
if ctx == nil {
ctx = context.TODO()
}
opts := DefaultCacheOpts()
opts.HasBloomFilterSize = 0
opts.HasBloomFilterHashes = 0
bbs, err := CachedBlockstore(ctx, bs, opts)
if err == nil {
return bbs.(*arccache), nil
}
return nil, err
}
func createStores(t *testing.T) (*arccache, Blockstore, *callbackDatastore) {
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
bs := NewBlockstore(syncds.MutexWrap(cd))
arc, err := testArcCached(context.TODO(), bs)
if err != nil {
t.Fatal(err)
}
return arc, bs, cd
}
func trap(message string, cd *callbackDatastore, t *testing.T) {
cd.SetFunc(func() {
t.Fatal(message)
})
}
func untrap(cd *callbackDatastore) {
cd.SetFunc(func() {})
}
func TestRemoveCacheEntryOnDelete(t *testing.T) {
arc, _, cd := createStores(t)
arc.Put(exampleBlock)
cd.Lock()
writeHitTheDatastore := false
cd.Unlock()
cd.SetFunc(func() {
writeHitTheDatastore = true
})
arc.DeleteBlock(exampleBlock.Cid())
arc.Put(exampleBlock)
if !writeHitTheDatastore {
t.Fail()
}
}
func TestElideDuplicateWrite(t *testing.T) {
arc, _, cd := createStores(t)
arc.Put(exampleBlock)
trap("write hit datastore", cd, t)
arc.Put(exampleBlock)
}
func TestHasRequestTriggersCache(t *testing.T) {
arc, _, cd := createStores(t)
arc.Has(exampleBlock.Cid())
trap("has hit datastore", cd, t)
if has, err := arc.Has(exampleBlock.Cid()); has || err != nil {
t.Fatal("has was true but there is no such block")
}
untrap(cd)
err := arc.Put(exampleBlock)
if err != nil {
t.Fatal(err)
}
trap("has hit datastore", cd, t)
if has, err := arc.Has(exampleBlock.Cid()); !has || err != nil {
t.Fatal("has returned invalid result")
}
}
func TestGetFillsCache(t *testing.T) {
arc, _, cd := createStores(t)
if bl, err := arc.Get(exampleBlock.Cid()); bl != nil || err == nil {
t.Fatal("block was found or there was no error")
}
trap("has hit datastore", cd, t)
if has, err := arc.Has(exampleBlock.Cid()); has || err != nil {
t.Fatal("has was true but there is no such block")
}
if _, err := arc.GetSize(exampleBlock.Cid()); err != ErrNotFound {
t.Fatal("getsize was true but there is no such block")
}
untrap(cd)
if err := arc.Put(exampleBlock); err != nil {
t.Fatal(err)
}
trap("has hit datastore", cd, t)
if has, err := arc.Has(exampleBlock.Cid()); !has || err != nil {
t.Fatal("has returned invalid result")
}
if blockSize, err := arc.GetSize(exampleBlock.Cid()); blockSize == -1 || err != nil {
t.Fatal("getsize returned invalid result", blockSize, err)
}
}
func TestGetAndDeleteFalseShortCircuit(t *testing.T) {
arc, _, cd := createStores(t)
arc.Has(exampleBlock.Cid())
arc.GetSize(exampleBlock.Cid())
trap("get hit datastore", cd, t)
if bl, err := arc.Get(exampleBlock.Cid()); bl != nil || err != ErrNotFound {
t.Fatal("get returned invalid result")
}
if arc.DeleteBlock(exampleBlock.Cid()) != nil {
t.Fatal("expected deletes to be idempotent")
}
}
func TestArcCreationFailure(t *testing.T) {
if arc, err := newARCCachedBS(context.TODO(), nil, -1); arc != nil || err == nil {
t.Fatal("expected error and no cache")
}
}
func TestInvalidKey(t *testing.T) {
arc, _, _ := createStores(t)
bl, err := arc.Get(cid.Cid{})
if bl != nil {
t.Fatal("blocks should be nil")
}
if err == nil {
t.Fatal("expected error")
}
}
func TestHasAfterSucessfulGetIsCached(t *testing.T) {
arc, bs, cd := createStores(t)
bs.Put(exampleBlock)
arc.Get(exampleBlock.Cid())
trap("has hit datastore", cd, t)
arc.Has(exampleBlock.Cid())
}
func TestGetSizeAfterSucessfulGetIsCached(t *testing.T) {
arc, bs, cd := createStores(t)
bs.Put(exampleBlock)
arc.Get(exampleBlock.Cid())
trap("has hit datastore", cd, t)
arc.GetSize(exampleBlock.Cid())
}
func TestGetSizeAfterSucessfulHas(t *testing.T) {
arc, bs, _ := createStores(t)
bs.Put(exampleBlock)
has, err := arc.Has(exampleBlock.Cid())
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatal("expected to have block")
}
if size, err := arc.GetSize(exampleBlock.Cid()); err != nil {
t.Fatal(err)
} else if size != len(exampleBlock.RawData()) {
t.Fatalf("expected size %d, got %d", len(exampleBlock.RawData()), size)
}
}
func TestGetSizeMissingZeroSizeBlock(t *testing.T) {
arc, bs, cd := createStores(t)
emptyBlock := blocks.NewBlock([]byte{})
missingBlock := blocks.NewBlock([]byte("missingBlock"))
bs.Put(emptyBlock)
arc.Get(emptyBlock.Cid())
trap("has hit datastore", cd, t)
if blockSize, err := arc.GetSize(emptyBlock.Cid()); blockSize != 0 || err != nil {
t.Fatal("getsize returned invalid result")
}
untrap(cd)
arc.Get(missingBlock.Cid())
trap("has hit datastore", cd, t)
if _, err := arc.GetSize(missingBlock.Cid()); err != ErrNotFound {
t.Fatal("getsize returned invalid result")
}
}
func TestDifferentKeyObjectsWork(t *testing.T) {
arc, bs, cd := createStores(t)
bs.Put(exampleBlock)
arc.Get(exampleBlock.Cid())
trap("has hit datastore", cd, t)
cidstr := exampleBlock.Cid().String()
ncid, err := cid.Decode(cidstr)
if err != nil {
t.Fatal(err)
}
arc.Has(ncid)
}
func TestPutManyCaches(t *testing.T) {
arc, _, cd := createStores(t)
arc.PutMany([]blocks.Block{exampleBlock})
trap("has hit datastore", cd, t)
arc.Has(exampleBlock.Cid())
arc.GetSize(exampleBlock.Cid())
untrap(cd)
arc.DeleteBlock(exampleBlock.Cid())
arc.Put(exampleBlock)
trap("PunMany has hit datastore", cd, t)
arc.PutMany([]blocks.Block{exampleBlock})
}
// Package blockstore implements a thin wrapper over a datastore, giving a
// clean interface for Getting and Putting block objects.
package blockstore
import (
"context"
"errors"
"sync"
"sync/atomic"
blocks "gitlab.dms3.io/dms3/public/go-block-format"
cid "gitlab.dms3.io/dms3/public/go-cid"
ds "gitlab.dms3.io/dms3/public/go-datastore"
dsns "gitlab.dms3.io/dms3/public/go-datastore/namespace"
dsq "gitlab.dms3.io/dms3/public/go-datastore/query"
dshelp "gitlab.dms3.io/dms3/public/go-dms3-ds-help"
logging "gitlab.dms3.io/dms3/public/go-log"
uatomic "go.uber.org/atomic"
)
var log = logging.Logger("blockstore")
// BlockPrefix namespaces blockstore datastores
var BlockPrefix = ds.NewKey("blocks")
// ErrHashMismatch is an error returned when the hash of a block
// is different than expected.
var ErrHashMismatch = errors.New("block in storage has different hash than requested")
// ErrNotFound is an error returned when a block is not found.
var ErrNotFound = errors.New("blockstore: block not found")
// Blockstore wraps a Datastore block-centered methods and provides a layer
// of abstraction which allows to add different caching strategies.
type Blockstore interface {
DeleteBlock(cid.Cid) error
Has(cid.Cid) (bool, error)
Get(cid.Cid) (blocks.Block, error)
// GetSize returns the CIDs mapped BlockSize
GetSize(cid.Cid) (int, error)
// Put puts a given block to the underlying datastore
Put(blocks.Block) error
// PutMany puts a slice of blocks at the same time using batching
// capabilities of the underlying datastore whenever possible.
PutMany([]blocks.Block) error
// AllKeysChan returns a channel from which
// the CIDs in the Blockstore can be read. It should respect
// the given context, closing the channel if it becomes Done.
AllKeysChan(ctx context.Context) (<-chan cid.Cid, error)
// HashOnRead specifies if every read block should be
// rehashed to make sure it matches its CID.
HashOnRead(enabled bool)
}
// Viewer can be implemented by blockstores that offer zero-copy access to
// values.
//
// Callers of View must not mutate or retain the byte slice, as it could be
// an mmapped memory region, or a pooled byte buffer.
//
// View is especially suitable for deserialising in place.
//
// The callback will only be called iff the query operation is successful (and
// the block is found); otherwise, the error will be propagated. Errors returned
// by the callback will be propagated as well.
type Viewer interface {
View(cid cid.Cid, callback func([]byte) error) error
}
// GCLocker abstract functionality to lock a blockstore when performing
// garbage-collection operations.
type GCLocker interface {
// GCLock locks the blockstore for garbage collection. No operations
// that expect to finish with a pin should ocurr simultaneously.
// Reading during GC is safe, and requires no lock.
GCLock() Unlocker
// PinLock locks the blockstore for sequences of puts expected to finish
// with a pin (before GC). Multiple put->pin sequences can write through
// at the same time, but no GC should happen simulatenously.
// Reading during Pinning is safe, and requires no lock.
PinLock() Unlocker
// GcRequested returns true if GCLock has been called and is waiting to
// take the lock
GCRequested() bool
}
// GCBlockstore is a blockstore that can safely run garbage-collection
// operations.
type GCBlockstore interface {
Blockstore
GCLocker
}
// NewGCBlockstore returns a default implementation of GCBlockstore
// using the given Blockstore and GCLocker.
func NewGCBlockstore(bs Blockstore, gcl GCLocker) GCBlockstore {
return gcBlockstore{bs, gcl}
}
type gcBlockstore struct {
Blockstore
GCLocker
}
// NewBlockstore returns a default Blockstore implementation
// using the provided datastore.Batching backend.
func NewBlockstore(d ds.Batching) Blockstore {
var dsb ds.Batching
dd := dsns.Wrap(d, BlockPrefix)
dsb = dd
return &blockstore{
datastore: dsb,
rehash: uatomic.NewBool(false),
}
}
type blockstore struct {
datastore ds.Batching
rehash *uatomic.Bool
}
func (bs *blockstore) HashOnRead(enabled bool) {
bs.rehash.Store(enabled)
}
func (bs *blockstore) Get(k cid.Cid) (blocks.Block, error) {
if !k.Defined() {
log.Error("undefined cid in blockstore")
return nil, ErrNotFound
}
bdata, err := bs.datastore.Get(dshelp.MultihashToDsKey(k.Hash()))
if err == ds.ErrNotFound {
return nil, ErrNotFound
}
if err != nil {
return nil, err
}
if bs.rehash.Load() {
rbcid, err := k.Prefix().Sum(bdata)
if err != nil {
return nil, err
}
if !rbcid.Equals(k) {
return nil, ErrHashMismatch
}
return blocks.NewBlockWithCid(bdata, rbcid)
}
return blocks.NewBlockWithCid(bdata, k)
}
func (bs *blockstore) Put(block blocks.Block) error {
k := dshelp.MultihashToDsKey(block.Cid().Hash())
// Has is cheaper than Put, so see if we already have it
exists, err := bs.datastore.Has(k)
if err == nil && exists {
return nil // already stored.
}
return bs.datastore.Put(k, block.RawData())
}
func (bs *blockstore) PutMany(blocks []blocks.Block) error {
t, err := bs.datastore.Batch()
if err != nil {
return err
}
for _, b := range blocks {
k := dshelp.MultihashToDsKey(b.Cid().Hash())
exists, err := bs.datastore.Has(k)
if err == nil && exists {
continue
}
err = t.Put(k, b.RawData())
if err != nil {
return err
}
}
return t.Commit()
}
func (bs *blockstore) Has(k cid.Cid) (bool, error) {
return bs.datastore.Has(dshelp.MultihashToDsKey(k.Hash()))
}
func (bs *blockstore) GetSize(k cid.Cid) (int, error) {
size, err := bs.datastore.GetSize(dshelp.MultihashToDsKey(k.Hash()))
if err == ds.ErrNotFound {
return -1, ErrNotFound
}
return size, err
}
func (bs *blockstore) DeleteBlock(k cid.Cid) error {
return bs.datastore.Delete(dshelp.MultihashToDsKey(k.Hash()))
}
// AllKeysChan runs a query for keys from the blockstore.
// this is very simplistic, in the future, take dsq.Query as a param?
//
// AllKeysChan respects context.
func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
// KeysOnly, because that would be _a lot_ of data.
q := dsq.Query{KeysOnly: true}
res, err := bs.datastore.Query(q)
if err != nil {
return nil, err
}
output := make(chan cid.Cid, dsq.KeysOnlyBufSize)
go func() {
defer func() {
res.Close() // ensure exit (signals early exit, too)
close(output)
}()
for {
e, ok := res.NextSync()
if !ok {
return
}
if e.Error != nil {
log.Errorf("blockstore.AllKeysChan got err: %s", e.Error)
return
}
// need to convert to key.Key using key.KeyFromDsKey.
bk, err := dshelp.BinaryFromDsKey(ds.RawKey(e.Key))
if err != nil {
log.Warningf("error parsing key from binary: %s", err)
continue
}
k := cid.NewCidV1(cid.Raw, bk)
select {
case <-ctx.Done():
return
case output <- k:
}
}
}()
return output, nil
}
// NewGCLocker returns a default implementation of
// GCLocker using standard [RW] mutexes.
func NewGCLocker() GCLocker {
return &gclocker{}
}
type gclocker struct {
lk sync.RWMutex
gcreq int32
}
// Unlocker represents an object which can Unlock
// something.
type Unlocker interface {
Unlock()
}
type unlocker struct {
unlock func()
}
func (u *unlocker) Unlock() {
u.unlock()
u.unlock = nil // ensure its not called twice
}
func (bs *gclocker) GCLock() Unlocker {
atomic.AddInt32(&bs.gcreq, 1)
bs.lk.Lock()
atomic.AddInt32(&bs.gcreq, -1)
return &unlocker{bs.lk.Unlock}
}
func (bs *gclocker) PinLock() Unlocker {
bs.lk.RLock()
return &unlocker{bs.lk.RUnlock}
}
func (bs *gclocker) GCRequested() bool {
return atomic.LoadInt32(&bs.gcreq) > 0
}
package blockstore
import (
"bytes"
"context"
"fmt"
"testing"
blocks "gitlab.dms3.io/dms3/public/go-block-format"
cid "gitlab.dms3.io/dms3/public/go-cid"
ds "gitlab.dms3.io/dms3/public/go-datastore"
dsq "gitlab.dms3.io/dms3/public/go-datastore/query"
ds_sync "gitlab.dms3.io/dms3/public/go-datastore/sync"
u "gitlab.dms3.io/dms3/public/go-dms3-util"
)
func TestGetWhenKeyNotPresent(t *testing.T) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
c := cid.NewCidV0(u.Hash([]byte("stuff")))
bl, err := bs.Get(c)
if bl != nil {
t.Error("nil block expected")
}
if err == nil {
t.Error("error expected, got nil")
}
}
func TestGetWhenKeyIsNil(t *testing.T) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
_, err := bs.Get(cid.Cid{})
if err != ErrNotFound {
t.Fail()
}
}
func TestPutThenGetBlock(t *testing.T) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
block := blocks.NewBlock([]byte("some data"))
err := bs.Put(block)
if err != nil {
t.Fatal(err)
}
blockFromBlockstore, err := bs.Get(block.Cid())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(block.RawData(), blockFromBlockstore.RawData()) {
t.Fail()
}
}
func TestCidv0v1(t *testing.T) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
block := blocks.NewBlock([]byte("some data"))
err := bs.Put(block)
if err != nil {
t.Fatal(err)
}
blockFromBlockstore, err := bs.Get(cid.NewCidV1(cid.DagProtobuf, block.Cid().Hash()))
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(block.RawData(), blockFromBlockstore.RawData()) {
t.Fail()
}
}
func TestPutThenGetSizeBlock(t *testing.T) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
block := blocks.NewBlock([]byte("some data"))
missingBlock := blocks.NewBlock([]byte("missingBlock"))
emptyBlock := blocks.NewBlock([]byte{})
err := bs.Put(block)
if err != nil {
t.Fatal(err)
}
blockSize, err := bs.GetSize(block.Cid())
if err != nil {
t.Fatal(err)
}
if len(block.RawData()) != blockSize {
t.Fail()
}
err = bs.Put(emptyBlock)
if err != nil {
t.Fatal(err)
}
if blockSize, err := bs.GetSize(emptyBlock.Cid()); blockSize != 0 || err != nil {
t.Fatal(err)
}
if blockSize, err := bs.GetSize(missingBlock.Cid()); blockSize != -1 || err == nil {
t.Fatal("getsize returned invalid result")
}
}
type countHasDS struct {
ds.Datastore
hasCount int
}
func (ds *countHasDS) Has(key ds.Key) (exists bool, err error) {
ds.hasCount += 1
return ds.Datastore.Has(key)
}
func TestPutUsesHas(t *testing.T) {
// Some datastores rely on the implementation detail that Put checks Has
// first, to avoid overriding existing objects' metadata. This test ensures
// that Blockstore continues to behave this way.
// Please ping https://gitlab.dms3.io/dms3/public/go-dms3-blockstore/pull/47 if this
// behavior is being removed.
ds := &countHasDS{
Datastore: ds.NewMapDatastore(),
}
bs := NewBlockstore(ds_sync.MutexWrap(ds))
bl := blocks.NewBlock([]byte("some data"))
if err := bs.Put(bl); err != nil {
t.Fatal(err)
}
if err := bs.Put(bl); err != nil {
t.Fatal(err)
}
if ds.hasCount != 2 {
t.Errorf("Blockstore did not call Has before attempting Put, this breaks compatibility")
}
}
func TestHashOnRead(t *testing.T) {
orginalDebug := u.Debug
defer (func() {
u.Debug = orginalDebug
})()
u.Debug = false
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
bl := blocks.NewBlock([]byte("some data"))
blBad, err := blocks.NewBlockWithCid([]byte("some other data"), bl.Cid())
if err != nil {
t.Fatal("debug is off, still got an error")
}
bl2 := blocks.NewBlock([]byte("some other data"))
bs.Put(blBad)
bs.Put(bl2)
bs.HashOnRead(true)
if _, err := bs.Get(bl.Cid()); err != ErrHashMismatch {
t.Fatalf("expected '%v' got '%v'\n", ErrHashMismatch, err)
}
if b, err := bs.Get(bl2.Cid()); err != nil || b.String() != bl2.String() {
t.Fatal("got wrong blocks")
}
}
func newBlockStoreWithKeys(t *testing.T, d ds.Datastore, N int) (Blockstore, []cid.Cid) {
if d == nil {
d = ds.NewMapDatastore()
}
bs := NewBlockstore(ds_sync.MutexWrap(d))
keys := make([]cid.Cid, N)
for i := 0; i < N; i++ {
block := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i)))
err := bs.Put(block)
if err != nil {
t.Fatal(err)
}
keys[i] = block.Cid()
}
return bs, keys
}
func collect(ch <-chan cid.Cid) []cid.Cid {
var keys []cid.Cid
for k := range ch {
keys = append(keys, k)
}
return keys
}
func TestAllKeysSimple(t *testing.T) {
bs, keys := newBlockStoreWithKeys(t, nil, 100)
ctx := context.Background()
ch, err := bs.AllKeysChan(ctx)
if err != nil {
t.Fatal(err)
}
keys2 := collect(ch)
// for _, k2 := range keys2 {
// t.Log("found ", k2.B58String())
// }
expectMatches(t, keys, keys2)
}
func TestAllKeysRespectsContext(t *testing.T) {
N := 100
d := &queryTestDS{ds: ds.NewMapDatastore()}
bs, _ := newBlockStoreWithKeys(t, d, N)
started := make(chan struct{}, 1)
done := make(chan struct{}, 1)
errors := make(chan error, 100)
getKeys := func(ctx context.Context) {
started <- struct{}{}
ch, err := bs.AllKeysChan(ctx) // once without cancelling
if err != nil {
errors <- err
}
_ = collect(ch)
done <- struct{}{}
errors <- nil // a nil one to signal break
}
var results dsq.Results
var resultsmu = make(chan struct{})
resultChan := make(chan dsq.Result)
d.SetFunc(func(q dsq.Query) (dsq.Results, error) {
results = dsq.ResultsWithChan(q, resultChan)
resultsmu <- struct{}{}
return results, nil
})
go getKeys(context.Background())
// make sure it's waiting.
<-started
<-resultsmu
select {
case <-done:
t.Fatal("sync is wrong")
case <-results.Process().Closing():
t.Fatal("should not be closing")
case <-results.Process().Closed():
t.Fatal("should not be closed")
default:
}
e := dsq.Entry{Key: BlockPrefix.ChildString("foo").String()}
resultChan <- dsq.Result{Entry: e} // let it go.
close(resultChan)
<-done // should be done now.
<-results.Process().Closed() // should be closed now
// print any errors
for err := range errors {
if err == nil {
break
}
t.Error(err)
}
}
func expectMatches(t *testing.T, expect, actual []cid.Cid) {
t.Helper()
if len(expect) != len(actual) {
t.Errorf("expect and actual differ: %d != %d", len(expect), len(actual))
}
actualSet := make(map[string]bool, len(actual))
for _, k := range actual {
actualSet[string(k.Hash())] = true
}
for _, ek := range expect {
if !actualSet[string(ek.Hash())] {
t.Error("expected key not found: ", ek)
}
}
}
type queryTestDS struct {
cb func(q dsq.Query) (dsq.Results, error)
ds ds.Datastore
}
func (c *queryTestDS) SetFunc(f func(dsq.Query) (dsq.Results, error)) { c.cb = f }
func (c *queryTestDS) Put(key ds.Key, value []byte) (err error) {
return c.ds.Put(key, value)
}
func (c *queryTestDS) Get(key ds.Key) (value []byte, err error) {
return c.ds.Get(key)
}
func (c *queryTestDS) Has(key ds.Key) (exists bool, err error) {
return c.ds.Has(key)
}
func (c *queryTestDS) GetSize(key ds.Key) (size int, err error) {
return c.ds.GetSize(key)
}
func (c *queryTestDS) Delete(key ds.Key) (err error) {
return c.ds.Delete(key)
}
func (c *queryTestDS) Query(q dsq.Query) (dsq.Results, error) {
if c.cb != nil {
return c.cb(q)
}
return c.ds.Query(q)
}
func (c *queryTestDS) Sync(key ds.Key) error {
return c.ds.Sync(key)
}
func (c *queryTestDS) Batch() (ds.Batch, error) {
return ds.NewBasicBatch(c), nil
}
func (c *queryTestDS) Close() error {
return nil
}
package blockstore
import (
"context"
"fmt"
"sync/atomic"
"time"
bloom "gitlab.dms3.io/dms3/public/bbloom"
blocks "gitlab.dms3.io/dms3/public/go-block-format"
cid "gitlab.dms3.io/dms3/public/go-cid"
metrics "gitlab.dms3.io/dms3/public/go-metrics-interface"
)
// bloomCached returns a Blockstore that caches Has requests using a Bloom
// filter. bloomSize is size of bloom filter in bytes. hashCount specifies the
// number of hashing functions in the bloom filter (usually known as k).
func bloomCached(ctx context.Context, bs Blockstore, bloomSize, hashCount int) (*bloomcache, error) {
bl, err := bloom.New(float64(bloomSize), float64(hashCount))
if err != nil {
return nil, err
}
bc := &bloomcache{
blockstore: bs,
bloom: bl,
hits: metrics.NewCtx(ctx, "bloom.hits_total",
"Number of cache hits in bloom cache").Counter(),
total: metrics.NewCtx(ctx, "bloom_total",
"Total number of requests to bloom cache").Counter(),
buildChan: make(chan struct{}),
}
if v, ok := bs.(Viewer); ok {
bc.viewer = v
}
go func() {
err := bc.build(ctx)
if err != nil {
select {
case <-ctx.Done():
log.Warning("Cache rebuild closed by context finishing: ", err)
default:
log.Error(err)
}
return
}
if metrics.Active() {
fill := metrics.NewCtx(ctx, "bloom_fill_ratio",
"Ratio of bloom filter fullnes, (updated once a minute)").Gauge()
t := time.NewTicker(1 * time.Minute)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
fill.Set(bc.bloom.FillRatioTS())
}
}
}
}()
return bc, nil
}
type bloomcache struct {
active int32
bloom *bloom.Bloom
buildErr error
buildChan chan struct{}
blockstore Blockstore
viewer Viewer
// Statistics
hits metrics.Counter
total metrics.Counter
}
var _ Blockstore = (*bloomcache)(nil)
var _ Viewer = (*bloomcache)(nil)
func (b *bloomcache) BloomActive() bool {
return atomic.LoadInt32(&b.active) != 0
}
func (b *bloomcache) Wait(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-b.buildChan:
return b.buildErr
}
}
func (b *bloomcache) build(ctx context.Context) error {
// evt := log.EventBegin(ctx, "bloomcache.build")
// defer evt.Done()
defer close(b.buildChan)
ch, err := b.blockstore.AllKeysChan(ctx)
if err != nil {
b.buildErr = fmt.Errorf("AllKeysChan failed in bloomcache rebuild with: %v", err)
return b.buildErr
}
for {
select {
case key, ok := <-ch:
if !ok {
atomic.StoreInt32(&b.active, 1)
return nil
}
b.bloom.AddTS(key.Hash()) // Use binary key, the more compact the better
case <-ctx.Done():
b.buildErr = ctx.Err()
return b.buildErr
}
}
}
func (b *bloomcache) DeleteBlock(k cid.Cid) error {
if has, ok := b.hasCached(k); ok && !has {
return nil
}
return b.blockstore.DeleteBlock(k)
}
// if ok == false has is inconclusive
// if ok == true then has respons to question: is it contained
func (b *bloomcache) hasCached(k cid.Cid) (has bool, ok bool) {
b.total.Inc()
if !k.Defined() {
log.Error("undefined in bloom cache")
// Return cache invalid so call to blockstore
// in case of invalid key is forwarded deeper
return false, false
}
if b.BloomActive() {
blr := b.bloom.HasTS(k.Hash())
if !blr { // not contained in bloom is only conclusive answer bloom gives
b.hits.Inc()
return false, true
}
}
return false, false
}
func (b *bloomcache) Has(k cid.Cid) (bool, error) {
if has, ok := b.hasCached(k); ok {
return has, nil
}
return b.blockstore.Has(k)
}
func (b *bloomcache) GetSize(k cid.Cid) (int, error) {
return b.blockstore.GetSize(k)
}
func (b *bloomcache) View(k cid.Cid, callback func([]byte) error) error {
if b.viewer == nil {
blk, err := b.Get(k)
if err != nil {
return err
}
return callback(blk.RawData())
}
if has, ok := b.hasCached(k); ok && !has {
return ErrNotFound
}
return b.viewer.View(k, callback)
}
func (b *bloomcache) Get(k cid.Cid) (blocks.Block, error) {
if has, ok := b.hasCached(k); ok && !has {
return nil, ErrNotFound
}
return b.blockstore.Get(k)
}
func (b *bloomcache) Put(bl blocks.Block) error {
// See comment in PutMany
err := b.blockstore.Put(bl)
if err == nil {
b.bloom.AddTS(bl.Cid().Hash())
}
return err
}
func (b *bloomcache) PutMany(bs []blocks.Block) error {
// bloom cache gives only conclusive resulty if key is not contained
// to reduce number of puts we need conclusive information if block is contained
// this means that PutMany can't be improved with bloom cache so we just
// just do a passthrough.
err := b.blockstore.PutMany(bs)
if err != nil {
return err
}
for _, bl := range bs {
b.bloom.AddTS(bl.Cid().Hash())
}
return nil
}
func (b *bloomcache) HashOnRead(enabled bool) {
b.blockstore.HashOnRead(enabled)
}
func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return b.blockstore.AllKeysChan(ctx)
}
func (b *bloomcache) GCLock() Unlocker {
return b.blockstore.(GCBlockstore).GCLock()
}
func (b *bloomcache) PinLock() Unlocker {
return b.blockstore.(GCBlockstore).PinLock()
}
func (b *bloomcache) GCRequested() bool {
return b.blockstore.(GCBlockstore).GCRequested()
}
package blockstore
import (
"context"
"fmt"
"sync"
"testing"
"time"
blocks "gitlab.dms3.io/dms3/public/go-block-format"
ds "gitlab.dms3.io/dms3/public/go-datastore"
dsq "gitlab.dms3.io/dms3/public/go-datastore/query"
syncds "gitlab.dms3.io/dms3/public/go-datastore/sync"
)
func testBloomCached(ctx context.Context, bs Blockstore) (*bloomcache, error) {
if ctx == nil {
ctx = context.Background()
}
opts := DefaultCacheOpts()
opts.HasARCCacheSize = 0
bbs, err := CachedBlockstore(ctx, bs, opts)
if err == nil {
return bbs.(*bloomcache), nil
}
return nil, err
}
func TestPutManyAddsToBloom(t *testing.T) {
bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
cachedbs, err := testBloomCached(ctx, bs)
if err != nil {
t.Fatal(err)
}
if err := cachedbs.Wait(ctx); err != nil {
t.Fatalf("Failed while waiting for the filter to build: %d", cachedbs.bloom.ElementsAdded())
}
block1 := blocks.NewBlock([]byte("foo"))
block2 := blocks.NewBlock([]byte("bar"))
emptyBlock := blocks.NewBlock([]byte{})
cachedbs.PutMany([]blocks.Block{block1, emptyBlock})
has, err := cachedbs.Has(block1.Cid())
if err != nil {
t.Fatal(err)
}
blockSize, err := cachedbs.GetSize(block1.Cid())
if err != nil {
t.Fatal(err)
}
if blockSize == -1 || !has {
t.Fatal("added block is reported missing")
}
has, err = cachedbs.Has(block2.Cid())
if err != nil {
t.Fatal(err)
}
blockSize, err = cachedbs.GetSize(block2.Cid())
if err != nil && err != ErrNotFound {
t.Fatal(err)
}
if blockSize > -1 || has {
t.Fatal("not added block is reported to be in blockstore")
}
has, err = cachedbs.Has(emptyBlock.Cid())
if err != nil {
t.Fatal(err)
}
blockSize, err = cachedbs.GetSize(emptyBlock.Cid())
if err != nil {
t.Fatal(err)
}
if blockSize != 0 || !has {
t.Fatal("added block is reported missing")
}
}
func TestReturnsErrorWhenSizeNegative(t *testing.T) {
bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
_, err := bloomCached(context.Background(), bs, -1, 1)
if err == nil {
t.Fail()
}
}
func TestHasIsBloomCached(t *testing.T) {
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
bs := NewBlockstore(syncds.MutexWrap(cd))
for i := 0; i < 1000; i++ {
bs.Put(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i))))
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
cachedbs, err := testBloomCached(ctx, bs)
if err != nil {
t.Fatal(err)
}
if err := cachedbs.Wait(ctx); err != nil {
t.Fatalf("Failed while waiting for the filter to build: %d", cachedbs.bloom.ElementsAdded())
}
cacheFails := 0
cd.SetFunc(func() {
cacheFails++
})
for i := 0; i < 1000; i++ {
cachedbs.Has(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i+2000))).Cid())
}
if float64(cacheFails)/float64(1000) > float64(0.05) {
t.Fatalf("Bloom filter has cache miss rate of more than 5%%")
}
cacheFails = 0
block := blocks.NewBlock([]byte("newBlock"))
cachedbs.PutMany([]blocks.Block{block})
if cacheFails != 2 {
t.Fatalf("expected two datastore hits: %d", cacheFails)
}
cachedbs.Put(block)
if cacheFails != 3 {
t.Fatalf("expected datastore hit: %d", cacheFails)
}
if has, err := cachedbs.Has(block.Cid()); !has || err != nil {
t.Fatal("has gave wrong response")
}
bl, err := cachedbs.Get(block.Cid())
if bl.String() != block.String() {
t.Fatal("block data doesn't match")
}
if err != nil {
t.Fatal("there should't be an error")
}
}
var _ ds.Batching = (*callbackDatastore)(nil)
type callbackDatastore struct {
sync.Mutex
f func()
ds ds.Datastore
}
func (c *callbackDatastore) SetFunc(f func()) {
c.Lock()
defer c.Unlock()
c.f = f
}
func (c *callbackDatastore) CallF() {
c.Lock()
defer c.Unlock()
c.f()
}
func (c *callbackDatastore) Put(key ds.Key, value []byte) (err error) {
c.CallF()
return c.ds.Put(key, value)
}
func (c *callbackDatastore) Get(key ds.Key) (value []byte, err error) {
c.CallF()
return c.ds.Get(key)
}
func (c *callbackDatastore) Has(key ds.Key) (exists bool, err error) {
c.CallF()
return c.ds.Has(key)
}
func (c *callbackDatastore) GetSize(key ds.Key) (size int, err error) {
c.CallF()
return c.ds.GetSize(key)
}
func (c *callbackDatastore) Close() error {
return nil
}
func (c *callbackDatastore) Delete(key ds.Key) (err error) {
c.CallF()
return c.ds.Delete(key)
}
func (c *callbackDatastore) Query(q dsq.Query) (dsq.Results, error) {
c.CallF()
return c.ds.Query(q)
}
func (c *callbackDatastore) Sync(key ds.Key) error {
c.CallF()
return c.ds.Sync(key)
}
func (c *callbackDatastore) Batch() (ds.Batch, error) {
return ds.NewBasicBatch(c), nil
}
package blockstore
import (
"context"
"errors"
metrics "gitlab.dms3.io/dms3/public/go-metrics-interface"
)
// CacheOpts wraps options for CachedBlockStore().
// Next to each option is it aproximate memory usage per unit
type CacheOpts struct {
HasBloomFilterSize int // 1 byte
HasBloomFilterHashes int // No size, 7 is usually best, consult bloom papers
HasARCCacheSize int // 32 bytes
}
// DefaultCacheOpts returns a CacheOpts initialized with default values.
func DefaultCacheOpts() CacheOpts {
return CacheOpts{
HasBloomFilterSize: 512 << 10,
HasBloomFilterHashes: 7,
HasARCCacheSize: 64 << 10,
}
}
// CachedBlockstore returns a blockstore wrapped in an ARCCache and
// then in a bloom filter cache, if the options indicate it.
func CachedBlockstore(
ctx context.Context,
bs Blockstore,
opts CacheOpts) (cbs Blockstore, err error) {
cbs = bs
if opts.HasBloomFilterSize < 0 || opts.HasBloomFilterHashes < 0 ||
opts.HasARCCacheSize < 0 {
return nil, errors.New("all options for cache need to be greater than zero")
}
if opts.HasBloomFilterSize != 0 && opts.HasBloomFilterHashes == 0 {
return nil, errors.New("bloom filter hash count can't be 0 when there is size set")
}
ctx = metrics.CtxSubScope(ctx, "bs.cache")
if opts.HasARCCacheSize > 0 {
cbs, err = newARCCachedBS(ctx, cbs, opts.HasARCCacheSize)
}
if opts.HasBloomFilterSize != 0 {
// *8 because of bytes to bits conversion
cbs, err = bloomCached(ctx, cbs, opts.HasBloomFilterSize*8, opts.HasBloomFilterHashes)
}
return cbs, err
}
package blockstore
import (
"context"
"testing"
)
func TestCachingOptsLessThanZero(t *testing.T) {
opts := DefaultCacheOpts()
opts.HasARCCacheSize = -1
if _, err := CachedBlockstore(context.TODO(), nil, opts); err == nil {
t.Error("wrong ARC setting was not detected")
}
opts = DefaultCacheOpts()
opts.HasBloomFilterSize = -1
if _, err := CachedBlockstore(context.TODO(), nil, opts); err == nil {
t.Error("negative bloom size was not detected")
}
opts = DefaultCacheOpts()
opts.HasBloomFilterHashes = -1
if _, err := CachedBlockstore(context.TODO(), nil, opts); err == nil {
t.Error("negative hashes setting was not detected")
}
}
func TestBloomHashesAtZero(t *testing.T) {
opts := DefaultCacheOpts()
opts.HasBloomFilterHashes = 0
if _, err := CachedBlockstore(context.TODO(), nil, opts); err == nil {
t.Error("zero hashes setting with positive size was not detected")
}
}
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771 h1:MHkK1uRtFbVqvAgvWxafZe54+5uBxLluGylDiKgdhwo=
github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
github.com/mr-tron/base58 v1.1.0 h1:Y51FGVJ91WBqCEabAi5OPUz38eAx8DakuAm5svLcsfQ=
github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/mr-tron/base58 v1.1.3 h1:v+sk57XuaCKGXpWtVBX8YJzO7hMGx4Aajh4TQbdEFdc=
github.com/mr-tron/base58 v1.1.3/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI=
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-base36 v0.1.0 h1:JR6TyF7JjGd3m6FbLU2cOxhC0Li8z8dLNGQ89tUg4F4=
github.com/multiformats/go-base36 v0.1.0/go.mod h1:kFGE83c6s80PklsHO9sRn2NCoffoRdUUOENyW/Vv6sM=
github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk=
github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc=
github.com/multiformats/go-multihash v0.0.13 h1:06x+mk/zj1FoMsgNejLpy6QTvJqlSt/BhLEy87zidlc=
github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc=
github.com/multiformats/go-multihash v0.0.14 h1:QoBceQYQQtNUuf6s7wHxnE2c8bhbMqhfGzNI032se/I=
github.com/multiformats/go-multihash v0.0.14/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc=
github.com/multiformats/go-varint v0.0.5 h1:XVZwSo04Cs3j/jS0uAEPpT3JY6DzMcVLLoWOSnCxOjg=
github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/multiformats/go-varint v0.0.6 h1:gk85QWKxh3TazbLxED/NlDVv8+q+ReFJk7Y2W/KhfNY=
github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
gitlab.dms3.io/dms3/public/bbloom v0.0.1 h1:kgk1gfIoUl7cuxlYZ0/Mv8k01xikBH33fb2peWIQJic=
gitlab.dms3.io/dms3/public/bbloom v0.0.1/go.mod h1:n76ooRIrdCEsVmkoqml73ZuX1kVWNstRTPpoz4YyMzQ=
gitlab.dms3.io/dms3/public/go-block-format v0.0.1 h1:PQ6+E7zY6kUIHET86uJTQHTTj4Z9ZNfP7w281ZdExgk=
gitlab.dms3.io/dms3/public/go-block-format v0.0.1/go.mod h1:xlvtW/OF72rOzLa2RVWXX2Uw18qTAWTQEs/Xp7SCnuY=
gitlab.dms3.io/dms3/public/go-cid v0.0.1 h1:qs4dtkDigcLGY/58dIZaFjKLt+orrTcmTBvtqaM3570=
gitlab.dms3.io/dms3/public/go-cid v0.0.1/go.mod h1:GQw3gc4CSrFY+aX6M+OBQDlg0p5/eQJoRrayaZzkAOQ=
gitlab.dms3.io/dms3/public/go-datastore v0.0.1 h1:RjZLvnqlvWDpb5ZwvkVEWGONF7zKNPe4Q0DND5oxZec=
gitlab.dms3.io/dms3/public/go-datastore v0.0.1/go.mod h1:qSI0hpmVMo6HCp0uveHTKyQ87j1aVe2hqiTeiPCehYA=
gitlab.dms3.io/dms3/public/go-dms3-delay v0.0.1/go.mod h1:Mg+buHOoh8UruN+MMqeqBUPBKMRTmpsXAyxv5ZSt+X4=
gitlab.dms3.io/dms3/public/go-dms3-ds-help v0.0.2 h1:SYgjiHzcpFBgabZhLVdfC4LPKjcQ/Lc8xgh3c3JsoFs=
gitlab.dms3.io/dms3/public/go-dms3-ds-help v0.0.2/go.mod h1:szc3LU0qkzM55gKHJuGx2TU6tIn/4sRJt/ThZzffi9A=
gitlab.dms3.io/dms3/public/go-dms3-util v0.0.1 h1:Gd+kJl1Rc+ZEUb9CIS1ZctQnF9G1oruNFyxUC//QBUQ=
gitlab.dms3.io/dms3/public/go-dms3-util v0.0.1/go.mod h1:ymlwtzTNMq8Ug+gVtPAMxXKCKTXwXJAzXS+SUihfKgo=
gitlab.dms3.io/dms3/public/go-log v0.0.1 h1:jqz2g8pVdPW+Sy8CCo4rYfGEjktGhCBfgIb3oeY6yx8=
gitlab.dms3.io/dms3/public/go-log v0.0.1/go.mod h1:OsyF7lVYe47r03v1ZCbrmz0byeGUWB0Y219jN1DJx3s=
gitlab.dms3.io/dms3/public/go-metrics-interface v0.0.1 h1:oLJzd5zSjf5C1aEMHziNbV4RbJVuPaNzzKW4VBzKnQM=
gitlab.dms3.io/dms3/public/go-metrics-interface v0.0.1/go.mod h1:Ag1ayfnHxy0H659akn+bjAGn9k/HJrpxsVrB90DmRL8=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM=
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8 h1:1wopBVtVdWnn03fZelqdXTqk7U7zPQCb+T4rbU9ZEoU=
golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
package blockstore
import (
"context"
"io"
mh "github.com/multiformats/go-multihash"
blocks "gitlab.dms3.io/dms3/public/go-block-format"
cid "gitlab.dms3.io/dms3/public/go-cid"
)
// idstore wraps a BlockStore to add support for identity hashes
type idstore struct {
bs Blockstore
viewer Viewer
}
var _ Blockstore = (*idstore)(nil)
var _ Viewer = (*idstore)(nil)
var _ io.Closer = (*idstore)(nil)
func NewIdStore(bs Blockstore) Blockstore {
ids := &idstore{bs: bs}
if v, ok := bs.(Viewer); ok {
ids.viewer = v
}
return ids
}
func extractContents(k cid.Cid) (bool, []byte) {
// Pre-check by calling Prefix(), this much faster than extracting the hash.
if k.Prefix().MhType != mh.IDENTITY {
return false, nil
}
dmh, err := mh.Decode(k.Hash())
if err != nil || dmh.Code != mh.ID {
return false, nil
}
return true, dmh.Digest
}
func (b *idstore) DeleteBlock(k cid.Cid) error {
isId, _ := extractContents(k)
if isId {
return nil
}
return b.bs.DeleteBlock(k)
}
func (b *idstore) Has(k cid.Cid) (bool, error) {
isId, _ := extractContents(k)
if isId {
return true, nil
}
return b.bs.Has(k)
}
func (b *idstore) View(k cid.Cid, callback func([]byte) error) error {
if b.viewer == nil {
blk, err := b.Get(k)
if err != nil {
return err
}
return callback(blk.RawData())
}
isId, bdata := extractContents(k)
if isId {
return callback(bdata)
}
return b.viewer.View(k, callback)
}
func (b *idstore) GetSize(k cid.Cid) (int, error) {
isId, bdata := extractContents(k)
if isId {
return len(bdata), nil
}
return b.bs.GetSize(k)
}
func (b *idstore) Get(k cid.Cid) (blocks.Block, error) {
isId, bdata := extractContents(k)
if isId {
return blocks.NewBlockWithCid(bdata, k)
}
return b.bs.Get(k)
}
func (b *idstore) Put(bl blocks.Block) error {
isId, _ := extractContents(bl.Cid())
if isId {
return nil
}
return b.bs.Put(bl)
}
func (b *idstore) PutMany(bs []blocks.Block) error {
toPut := make([]blocks.Block, 0, len(bs))
for _, bl := range bs {
isId, _ := extractContents(bl.Cid())
if isId {
continue
}
toPut = append(toPut, bl)
}
return b.bs.PutMany(toPut)
}
func (b *idstore) HashOnRead(enabled bool) {
b.bs.HashOnRead(enabled)
}
func (b *idstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return b.bs.AllKeysChan(ctx)
}
func (b *idstore) Close() error {
if c, ok := b.bs.(io.Closer); ok {
return c.Close()
}
return nil
}
package blockstore
import (
"context"
"testing"
mh "github.com/multiformats/go-multihash"
blk "gitlab.dms3.io/dms3/public/go-block-format"
cid "gitlab.dms3.io/dms3/public/go-cid"
ds "gitlab.dms3.io/dms3/public/go-datastore"
)
func createTestStores() (Blockstore, *callbackDatastore) {
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
ids := NewIdStore(NewBlockstore(cd))
return ids, cd
}
func TestIdStore(t *testing.T) {
idhash1, _ := cid.NewPrefixV1(cid.Raw, mh.ID).Sum([]byte("idhash1"))
idblock1, _ := blk.NewBlockWithCid([]byte("idhash1"), idhash1)
hash1, _ := cid.NewPrefixV1(cid.Raw, mh.SHA2_256).Sum([]byte("hash1"))
block1, _ := blk.NewBlockWithCid([]byte("hash1"), hash1)
emptyHash, _ := cid.NewPrefixV1(cid.Raw, mh.SHA2_256).Sum([]byte("emptyHash"))
emptyBlock, _ := blk.NewBlockWithCid([]byte{}, emptyHash)
ids, cb := createTestStores()
have, _ := ids.Has(idhash1)
if !have {
t.Fatal("Has() failed on idhash")
}
_, err := ids.Get(idhash1)
if err != nil {
t.Fatalf("Get() failed on idhash: %v", err)
}
noop := func() {}
failIfPassThough := func() {
t.Fatal("operation on identity hash passed though to datastore")
}
cb.f = failIfPassThough
err = ids.Put(idblock1)
if err != nil {
t.Fatal(err)
}
cb.f = noop
err = ids.Put(block1)
if err != nil {
t.Fatalf("Put() failed on normal block: %v", err)
}
have, _ = ids.Has(hash1)
if !have {
t.Fatal("normal block not added to datastore")
}
blockSize, _ := ids.GetSize(hash1)
if blockSize == -1 {
t.Fatal("normal block not added to datastore")
}
_, err = ids.Get(hash1)
if err != nil {
t.Fatal(err)
}
err = ids.Put(emptyBlock)
if err != nil {
t.Fatalf("Put() failed on normal block: %v", err)
}
have, _ = ids.Has(emptyHash)
if !have {
t.Fatal("normal block not added to datastore")
}
blockSize, _ = ids.GetSize(emptyHash)
if blockSize != 0 {
t.Fatal("normal block not added to datastore")
}
cb.f = failIfPassThough
err = ids.DeleteBlock(idhash1)
if err != nil {
t.Fatal(err)
}
cb.f = noop
err = ids.DeleteBlock(hash1)
if err != nil {
t.Fatal(err)
}
have, _ = ids.Has(hash1)
if have {
t.Fatal("normal block not deleted from datastore")
}
blockSize, _ = ids.GetSize(hash1)
if blockSize > -1 {
t.Fatal("normal block not deleted from datastore")
}
err = ids.DeleteBlock(emptyHash)
if err != nil {
t.Fatal(err)
}
idhash2, _ := cid.NewPrefixV1(cid.Raw, mh.ID).Sum([]byte("idhash2"))
idblock2, _ := blk.NewBlockWithCid([]byte("idhash2"), idhash2)
hash2, _ := cid.NewPrefixV1(cid.Raw, mh.SHA2_256).Sum([]byte("hash2"))
block2, _ := blk.NewBlockWithCid([]byte("hash2"), hash2)
cb.f = failIfPassThough
err = ids.PutMany([]blk.Block{idblock1, idblock2})
if err != nil {
t.Fatal(err)
}
opCount := 0
cb.f = func() {
opCount++
}
err = ids.PutMany([]blk.Block{block1, block2})
if err != nil {
t.Fatal(err)
}
if opCount != 4 {
// one call to Has and Put for each Cid
t.Fatalf("expected exactly 4 operations got %d", opCount)
}
opCount = 0
err = ids.PutMany([]blk.Block{idblock1, block1})
if err != nil {
t.Fatal(err)
}
if opCount != 1 {
// just one call to Put from the normal (non-id) block
t.Fatalf("expected exactly 1 operations got %d", opCount)
}
ch, err := ids.AllKeysChan(context.TODO())
cnt := 0
for c := range ch {
cnt++
if c.Prefix().MhType == mh.ID {
t.Fatalf("block with identity hash found in blockstore")
}
}
if cnt != 2 {
t.Fatalf("expected exactly two keys returned by AllKeysChan got %d", cnt)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment