diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml new file mode 100644 index 0000000000000000000000000000000000000000..4b86d7197f9a6f1a988c503defc8451392cd5e55 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/config.yml @@ -0,0 +1,8 @@ +blank_issues_enabled: false +contact_links: + - name: Getting Help on IPFS + url: https://ipfs.io/help + about: All information about how and where to get help on IPFS. + - name: IPFS Official Forum + url: https://discuss.ipfs.io + about: Please post general questions, support requests, and discussions here. diff --git a/.github/ISSUE_TEMPLATE/open_an_issue.md b/.github/ISSUE_TEMPLATE/open_an_issue.md new file mode 100644 index 0000000000000000000000000000000000000000..4fcbd00aca0d513c2729d8df4afb5a01fdbe7d02 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/open_an_issue.md @@ -0,0 +1,19 @@ +--- +name: Open an issue +about: Only for actionable issues relevant to this repository. +title: '' +labels: need/triage +assignees: '' + +--- + diff --git a/.github/config.yml b/.github/config.yml new file mode 100644 index 0000000000000000000000000000000000000000..ed26646a0f7cdda6cf10ede2b0b98cac89cf67b0 --- /dev/null +++ b/.github/config.yml @@ -0,0 +1,68 @@ +# Configuration for welcome - https://github.com/behaviorbot/welcome + +# Configuration for new-issue-welcome - https://github.com/behaviorbot/new-issue-welcome +# Comment to be posted to on first time issues +newIssueWelcomeComment: > + Thank you for submitting your first issue to this repository! A maintainer + will be here shortly to triage and review. + + In the meantime, please double-check that you have provided all the + necessary information to make this process easy! Any information that can + help save additional round trips is useful! We currently aim to give + initial feedback within **two business days**. If this does not happen, feel + free to leave a comment. + + Please keep an eye on how this issue will be labeled, as labels give an + overview of priorities, assignments and additional actions requested by the + maintainers: + + - "Priority" labels will show how urgent this is for the team. + - "Status" labels will show if this is ready to be worked on, blocked, or in progress. + - "Need" labels will indicate if additional input or analysis is required. + + Finally, remember to use https://discuss.ipfs.io if you just need general + support. + +# Configuration for new-pr-welcome - https://github.com/behaviorbot/new-pr-welcome +# Comment to be posted to on PRs from first time contributors in your repository +newPRWelcomeComment: > + Thank you for submitting this PR! + + A maintainer will be here shortly to review it. + + We are super grateful, but we are also overloaded! Help us by making sure + that: + + * The context for this PR is clear, with relevant discussion, decisions + and stakeholders linked/mentioned. + + * Your contribution itself is clear (code comments, self-review for the + rest) and in its best form. Follow the [code contribution + guidelines](https://github.com/ipfs/community/blob/master/CONTRIBUTING.md#code-contribution-guidelines) + if they apply. + + Getting other community members to do a review would be great help too on + complex PRs (you can ask in the chats/forums). If you are unsure about + something, just leave us a comment. + + Next steps: + + * A maintainer will triage and assign priority to this PR, commenting on + any missing things and potentially assigning a reviewer for high + priority items. + + * The PR gets reviews, discussed and approvals as needed. + + * The PR is merged by maintainers when it has been approved and comments addressed. + + We currently aim to provide initial feedback/triaging within **two business + days**. Please keep an eye on any labelling actions, as these will indicate + priorities and status of your contribution. + + We are very grateful for your contribution! + + +# Configuration for first-pr-merge - https://github.com/behaviorbot/first-pr-merge +# Comment to be posted to on pull requests merged by a first time user +# Currently disabled +#firstPRMergeComment: "" diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000000000000000000000000000000000000..dfab88961c90fd41f90c5ab42b03a552ca342c2d --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,35 @@ +stages: + - build + - test + +variables: + BUILD_DIR: "/tmp/$CI_CONCURRENT_PROJECT_ID" + +before_script: + - mkdir -p $BUILD_DIR/src + - cd $BUILD_DIR/src + - if [ -d $CI_PROJECT_DIR ] + - then + - echo "soft link $CI_PROJECT_DIR exists" + - else + - echo "creating soft link $CI_PROJECT_DIR" + - ln -s $CI_PROJECT_DIR + - fi + - cd $CI_PROJECT_DIR + +build: + stage: build + tags: + - testing + script: + - echo $CI_JOB_STAGE + - go build + +test: + stage: test + tags: + - testing + script: + - echo $CI_JOB_STAGE + - go test -cover + coverage: '/coverage: \d+.\d+% of statements/' diff --git a/.gx/lastpubver b/.gx/lastpubver new file mode 100644 index 0000000000000000000000000000000000000000..0670e6452b077b0963e3f3fea072a95182c25ef7 --- /dev/null +++ b/.gx/lastpubver @@ -0,0 +1 @@ +0.1.8: QmXjKkjMDTtXAiLBwstVexofB8LeruZmE2eBd85GwGFFLA diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000000000000000000000000000000000000..a156d3eb5eb29547d593ebf789debd98e2135cf9 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,30 @@ +os: + - linux + +language: go + +go: + - 1.13.x + +env: + global: + - GOTFLAGS="-race" + matrix: + - BUILD_DEPTYPE=gomod + + +# disable travis install +install: + - true + +script: + - bash <(curl -s https://raw.githubusercontent.com/ipfs/ci-helpers/master/travis-ci/run-standard-tests.sh) + + +cache: + directories: + - $GOPATH/pkg/mod + - $HOME/.cache/go-build + +notifications: + email: false diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000000000000000000000000000000000000..51d20a894eae654802fc0d44566c74d7193c2b16 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2020 Protocol Labs + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md index d70a06b602fcc3a2f979cfc47a02828dabbb6fc6..f2cec940380b489c9e9fc54cf2c2506f514ec2c6 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,47 @@ -# go-dms3-blockstore +# go-ipfs-blockstore -dms3 blockstore \ No newline at end of file +[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) +[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/) +[![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme) +[![GoDoc](https://godoc.org/github.com/ipfs/go-ipfs-blockstore?status.svg)](https://godoc.org/github.com/ipfs/go-ipfs-blockstore) +[![Build Status](https://travis-ci.com/ipfs/go-ipfs-blockstore.svg?branch=master)](https://travis-ci.com/ipfs/go-ipfs-blockstore) + +> go-ipfs-blockstore implements a thin wrapper over a datastore, giving a clean interface for Getting and Putting block objects. + +## Lead Maintainer + +[Steven Allen](https://github.com/Stebalien) + + +## Table of Contents + +- [Install](#install) +- [Usage](#usage) +- [Contribute](#contribute) +- [License](#license) + +## Install + +`go-ipfs-blockstore` works like a regular Go module: + +``` +> go get github.com/ipfs/go-ipfs-blockstore +``` + +## Usage + +``` +import "github.com/ipfs/go-ipfs-blockstore" +``` + +Check the [GoDoc documentation](https://godoc.org/github.com/ipfs/go-ipfs-blockstore) + +## Contribute + +PRs accepted. + +Small note: If editing the README, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification. + +## License + +MIT © Protocol Labs, Inc. diff --git a/arc_cache.go b/arc_cache.go new file mode 100644 index 0000000000000000000000000000000000000000..e25dd50965ff6e48d3f458440d265c58d962a244 --- /dev/null +++ b/arc_cache.go @@ -0,0 +1,225 @@ +package blockstore + +import ( + "context" + + lru "github.com/hashicorp/golang-lru" + blocks "gitlab.dms3.io/dms3/public/go-block-format" + cid "gitlab.dms3.io/dms3/public/go-cid" + metrics "gitlab.dms3.io/dms3/public/go-metrics-interface" +) + +type cacheHave bool +type cacheSize int + +// arccache wraps a BlockStore with an Adaptive Replacement Cache (ARC) that +// does not store the actual blocks, just metadata about them: existence and +// size. This provides block access-time improvements, allowing +// to short-cut many searches without querying the underlying datastore. +type arccache struct { + cache *lru.TwoQueueCache + blockstore Blockstore + viewer Viewer + + hits metrics.Counter + total metrics.Counter +} + +var _ Blockstore = (*arccache)(nil) +var _ Viewer = (*arccache)(nil) + +func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache, error) { + cache, err := lru.New2Q(lruSize) + if err != nil { + return nil, err + } + c := &arccache{cache: cache, blockstore: bs} + c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter() + c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter() + if v, ok := bs.(Viewer); ok { + c.viewer = v + } + return c, nil +} + +func (b *arccache) DeleteBlock(k cid.Cid) error { + if has, _, ok := b.queryCache(k); ok && !has { + return nil + } + + b.cache.Remove(k) // Invalidate cache before deleting. + err := b.blockstore.DeleteBlock(k) + if err == nil { + b.cacheHave(k, false) + } + return err +} + +func (b *arccache) Has(k cid.Cid) (bool, error) { + if has, _, ok := b.queryCache(k); ok { + return has, nil + } + has, err := b.blockstore.Has(k) + if err != nil { + return false, err + } + b.cacheHave(k, has) + return has, nil +} + +func (b *arccache) GetSize(k cid.Cid) (int, error) { + if has, blockSize, ok := b.queryCache(k); ok { + if !has { + // don't have it, return + return -1, ErrNotFound + } + if blockSize >= 0 { + // have it and we know the size + return blockSize, nil + } + // we have it but don't know the size, ask the datastore. + } + blockSize, err := b.blockstore.GetSize(k) + if err == ErrNotFound { + b.cacheHave(k, false) + } else if err == nil { + b.cacheSize(k, blockSize) + } + return blockSize, err +} + +func (b *arccache) View(k cid.Cid, callback func([]byte) error) error { + // shortcircuit and fall back to Get if the underlying store + // doesn't support Viewer. + if b.viewer == nil { + blk, err := b.Get(k) + if err != nil { + return err + } + return callback(blk.RawData()) + } + + if !k.Defined() { + log.Error("undefined cid in arc cache") + return ErrNotFound + } + + if has, _, ok := b.queryCache(k); ok && !has { + // short circuit if the cache deterministically tells us the item + // doesn't exist. + return ErrNotFound + } + + return b.viewer.View(k, callback) +} + +func (b *arccache) Get(k cid.Cid) (blocks.Block, error) { + if !k.Defined() { + log.Error("undefined cid in arc cache") + return nil, ErrNotFound + } + + if has, _, ok := b.queryCache(k); ok && !has { + return nil, ErrNotFound + } + + bl, err := b.blockstore.Get(k) + if bl == nil && err == ErrNotFound { + b.cacheHave(k, false) + } else if bl != nil { + b.cacheSize(k, len(bl.RawData())) + } + return bl, err +} + +func (b *arccache) Put(bl blocks.Block) error { + if has, _, ok := b.queryCache(bl.Cid()); ok && has { + return nil + } + + err := b.blockstore.Put(bl) + if err == nil { + b.cacheSize(bl.Cid(), len(bl.RawData())) + } + return err +} + +func (b *arccache) PutMany(bs []blocks.Block) error { + var good []blocks.Block + for _, block := range bs { + // call put on block if result is inconclusive or we are sure that + // the block isn't in storage + if has, _, ok := b.queryCache(block.Cid()); !ok || (ok && !has) { + good = append(good, block) + } + } + err := b.blockstore.PutMany(good) + if err != nil { + return err + } + for _, block := range good { + b.cacheSize(block.Cid(), len(block.RawData())) + } + return nil +} + +func (b *arccache) HashOnRead(enabled bool) { + b.blockstore.HashOnRead(enabled) +} + +func (b *arccache) cacheHave(c cid.Cid, have bool) { + b.cache.Add(string(c.Hash()), cacheHave(have)) +} + +func (b *arccache) cacheSize(c cid.Cid, blockSize int) { + b.cache.Add(string(c.Hash()), cacheSize(blockSize)) +} + +// queryCache checks if the CID is in the cache. If so, it returns: +// +// * exists (bool): whether the CID is known to exist or not. +// * size (int): the size if cached, or -1 if not cached. +// * ok (bool): whether present in the cache. +// +// When ok is false, the answer in inconclusive and the caller must ignore the +// other two return values. Querying the underying store is necessary. +// +// When ok is true, exists carries the correct answer, and size carries the +// size, if known, or -1 if not. +func (b *arccache) queryCache(k cid.Cid) (exists bool, size int, ok bool) { + b.total.Inc() + if !k.Defined() { + log.Error("undefined cid in arccache") + // Return cache invalid so the call to blockstore happens + // in case of invalid key and correct error is created. + return false, -1, false + } + + h, ok := b.cache.Get(string(k.Hash())) + if ok { + b.hits.Inc() + switch h := h.(type) { + case cacheHave: + return bool(h), -1, true + case cacheSize: + return true, int(h), true + } + } + return false, -1, false +} + +func (b *arccache) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return b.blockstore.AllKeysChan(ctx) +} + +func (b *arccache) GCLock() Unlocker { + return b.blockstore.(GCBlockstore).GCLock() +} + +func (b *arccache) PinLock() Unlocker { + return b.blockstore.(GCBlockstore).PinLock() +} + +func (b *arccache) GCRequested() bool { + return b.blockstore.(GCBlockstore).GCRequested() +} diff --git a/arc_cache_test.go b/arc_cache_test.go new file mode 100644 index 0000000000000000000000000000000000000000..cdf0f46a64703cb9b65148b1f6eaeaf191da0d34 --- /dev/null +++ b/arc_cache_test.go @@ -0,0 +1,261 @@ +package blockstore + +import ( + "context" + "testing" + + blocks "gitlab.dms3.io/dms3/public/go-block-format" + cid "gitlab.dms3.io/dms3/public/go-cid" + ds "gitlab.dms3.io/dms3/public/go-datastore" + syncds "gitlab.dms3.io/dms3/public/go-datastore/sync" +) + +var exampleBlock = blocks.NewBlock([]byte("foo")) + +func testArcCached(ctx context.Context, bs Blockstore) (*arccache, error) { + if ctx == nil { + ctx = context.TODO() + } + opts := DefaultCacheOpts() + opts.HasBloomFilterSize = 0 + opts.HasBloomFilterHashes = 0 + bbs, err := CachedBlockstore(ctx, bs, opts) + if err == nil { + return bbs.(*arccache), nil + } + return nil, err +} + +func createStores(t *testing.T) (*arccache, Blockstore, *callbackDatastore) { + cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} + bs := NewBlockstore(syncds.MutexWrap(cd)) + arc, err := testArcCached(context.TODO(), bs) + if err != nil { + t.Fatal(err) + } + return arc, bs, cd +} + +func trap(message string, cd *callbackDatastore, t *testing.T) { + cd.SetFunc(func() { + t.Fatal(message) + }) +} +func untrap(cd *callbackDatastore) { + cd.SetFunc(func() {}) +} + +func TestRemoveCacheEntryOnDelete(t *testing.T) { + arc, _, cd := createStores(t) + + arc.Put(exampleBlock) + + cd.Lock() + writeHitTheDatastore := false + cd.Unlock() + + cd.SetFunc(func() { + writeHitTheDatastore = true + }) + + arc.DeleteBlock(exampleBlock.Cid()) + arc.Put(exampleBlock) + if !writeHitTheDatastore { + t.Fail() + } +} + +func TestElideDuplicateWrite(t *testing.T) { + arc, _, cd := createStores(t) + + arc.Put(exampleBlock) + trap("write hit datastore", cd, t) + arc.Put(exampleBlock) +} + +func TestHasRequestTriggersCache(t *testing.T) { + arc, _, cd := createStores(t) + + arc.Has(exampleBlock.Cid()) + trap("has hit datastore", cd, t) + if has, err := arc.Has(exampleBlock.Cid()); has || err != nil { + t.Fatal("has was true but there is no such block") + } + + untrap(cd) + err := arc.Put(exampleBlock) + if err != nil { + t.Fatal(err) + } + + trap("has hit datastore", cd, t) + + if has, err := arc.Has(exampleBlock.Cid()); !has || err != nil { + t.Fatal("has returned invalid result") + } +} + +func TestGetFillsCache(t *testing.T) { + arc, _, cd := createStores(t) + + if bl, err := arc.Get(exampleBlock.Cid()); bl != nil || err == nil { + t.Fatal("block was found or there was no error") + } + + trap("has hit datastore", cd, t) + + if has, err := arc.Has(exampleBlock.Cid()); has || err != nil { + t.Fatal("has was true but there is no such block") + } + if _, err := arc.GetSize(exampleBlock.Cid()); err != ErrNotFound { + t.Fatal("getsize was true but there is no such block") + } + + untrap(cd) + + if err := arc.Put(exampleBlock); err != nil { + t.Fatal(err) + } + + trap("has hit datastore", cd, t) + + if has, err := arc.Has(exampleBlock.Cid()); !has || err != nil { + t.Fatal("has returned invalid result") + } + if blockSize, err := arc.GetSize(exampleBlock.Cid()); blockSize == -1 || err != nil { + t.Fatal("getsize returned invalid result", blockSize, err) + } +} + +func TestGetAndDeleteFalseShortCircuit(t *testing.T) { + arc, _, cd := createStores(t) + + arc.Has(exampleBlock.Cid()) + arc.GetSize(exampleBlock.Cid()) + + trap("get hit datastore", cd, t) + + if bl, err := arc.Get(exampleBlock.Cid()); bl != nil || err != ErrNotFound { + t.Fatal("get returned invalid result") + } + + if arc.DeleteBlock(exampleBlock.Cid()) != nil { + t.Fatal("expected deletes to be idempotent") + } +} + +func TestArcCreationFailure(t *testing.T) { + if arc, err := newARCCachedBS(context.TODO(), nil, -1); arc != nil || err == nil { + t.Fatal("expected error and no cache") + } +} + +func TestInvalidKey(t *testing.T) { + arc, _, _ := createStores(t) + + bl, err := arc.Get(cid.Cid{}) + + if bl != nil { + t.Fatal("blocks should be nil") + } + if err == nil { + t.Fatal("expected error") + } +} + +func TestHasAfterSucessfulGetIsCached(t *testing.T) { + arc, bs, cd := createStores(t) + + bs.Put(exampleBlock) + + arc.Get(exampleBlock.Cid()) + + trap("has hit datastore", cd, t) + arc.Has(exampleBlock.Cid()) +} + +func TestGetSizeAfterSucessfulGetIsCached(t *testing.T) { + arc, bs, cd := createStores(t) + + bs.Put(exampleBlock) + + arc.Get(exampleBlock.Cid()) + + trap("has hit datastore", cd, t) + arc.GetSize(exampleBlock.Cid()) +} + +func TestGetSizeAfterSucessfulHas(t *testing.T) { + arc, bs, _ := createStores(t) + + bs.Put(exampleBlock) + has, err := arc.Has(exampleBlock.Cid()) + if err != nil { + t.Fatal(err) + } + if !has { + t.Fatal("expected to have block") + } + + if size, err := arc.GetSize(exampleBlock.Cid()); err != nil { + t.Fatal(err) + } else if size != len(exampleBlock.RawData()) { + t.Fatalf("expected size %d, got %d", len(exampleBlock.RawData()), size) + } +} + +func TestGetSizeMissingZeroSizeBlock(t *testing.T) { + arc, bs, cd := createStores(t) + emptyBlock := blocks.NewBlock([]byte{}) + missingBlock := blocks.NewBlock([]byte("missingBlock")) + + bs.Put(emptyBlock) + + arc.Get(emptyBlock.Cid()) + + trap("has hit datastore", cd, t) + if blockSize, err := arc.GetSize(emptyBlock.Cid()); blockSize != 0 || err != nil { + t.Fatal("getsize returned invalid result") + } + untrap(cd) + + arc.Get(missingBlock.Cid()) + + trap("has hit datastore", cd, t) + if _, err := arc.GetSize(missingBlock.Cid()); err != ErrNotFound { + t.Fatal("getsize returned invalid result") + } +} + +func TestDifferentKeyObjectsWork(t *testing.T) { + arc, bs, cd := createStores(t) + + bs.Put(exampleBlock) + + arc.Get(exampleBlock.Cid()) + + trap("has hit datastore", cd, t) + cidstr := exampleBlock.Cid().String() + + ncid, err := cid.Decode(cidstr) + if err != nil { + t.Fatal(err) + } + + arc.Has(ncid) +} + +func TestPutManyCaches(t *testing.T) { + arc, _, cd := createStores(t) + arc.PutMany([]blocks.Block{exampleBlock}) + + trap("has hit datastore", cd, t) + arc.Has(exampleBlock.Cid()) + arc.GetSize(exampleBlock.Cid()) + untrap(cd) + arc.DeleteBlock(exampleBlock.Cid()) + + arc.Put(exampleBlock) + trap("PunMany has hit datastore", cd, t) + arc.PutMany([]blocks.Block{exampleBlock}) +} diff --git a/blockstore.go b/blockstore.go new file mode 100644 index 0000000000000000000000000000000000000000..ad353edf88069914b3cf25c76971efa16850664a --- /dev/null +++ b/blockstore.go @@ -0,0 +1,296 @@ +// Package blockstore implements a thin wrapper over a datastore, giving a +// clean interface for Getting and Putting block objects. +package blockstore + +import ( + "context" + "errors" + "sync" + "sync/atomic" + + blocks "gitlab.dms3.io/dms3/public/go-block-format" + cid "gitlab.dms3.io/dms3/public/go-cid" + ds "gitlab.dms3.io/dms3/public/go-datastore" + dsns "gitlab.dms3.io/dms3/public/go-datastore/namespace" + dsq "gitlab.dms3.io/dms3/public/go-datastore/query" + dshelp "gitlab.dms3.io/dms3/public/go-dms3-ds-help" + logging "gitlab.dms3.io/dms3/public/go-log" + uatomic "go.uber.org/atomic" +) + +var log = logging.Logger("blockstore") + +// BlockPrefix namespaces blockstore datastores +var BlockPrefix = ds.NewKey("blocks") + +// ErrHashMismatch is an error returned when the hash of a block +// is different than expected. +var ErrHashMismatch = errors.New("block in storage has different hash than requested") + +// ErrNotFound is an error returned when a block is not found. +var ErrNotFound = errors.New("blockstore: block not found") + +// Blockstore wraps a Datastore block-centered methods and provides a layer +// of abstraction which allows to add different caching strategies. +type Blockstore interface { + DeleteBlock(cid.Cid) error + Has(cid.Cid) (bool, error) + Get(cid.Cid) (blocks.Block, error) + + // GetSize returns the CIDs mapped BlockSize + GetSize(cid.Cid) (int, error) + + // Put puts a given block to the underlying datastore + Put(blocks.Block) error + + // PutMany puts a slice of blocks at the same time using batching + // capabilities of the underlying datastore whenever possible. + PutMany([]blocks.Block) error + + // AllKeysChan returns a channel from which + // the CIDs in the Blockstore can be read. It should respect + // the given context, closing the channel if it becomes Done. + AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) + + // HashOnRead specifies if every read block should be + // rehashed to make sure it matches its CID. + HashOnRead(enabled bool) +} + +// Viewer can be implemented by blockstores that offer zero-copy access to +// values. +// +// Callers of View must not mutate or retain the byte slice, as it could be +// an mmapped memory region, or a pooled byte buffer. +// +// View is especially suitable for deserialising in place. +// +// The callback will only be called iff the query operation is successful (and +// the block is found); otherwise, the error will be propagated. Errors returned +// by the callback will be propagated as well. +type Viewer interface { + View(cid cid.Cid, callback func([]byte) error) error +} + +// GCLocker abstract functionality to lock a blockstore when performing +// garbage-collection operations. +type GCLocker interface { + // GCLock locks the blockstore for garbage collection. No operations + // that expect to finish with a pin should ocurr simultaneously. + // Reading during GC is safe, and requires no lock. + GCLock() Unlocker + + // PinLock locks the blockstore for sequences of puts expected to finish + // with a pin (before GC). Multiple put->pin sequences can write through + // at the same time, but no GC should happen simulatenously. + // Reading during Pinning is safe, and requires no lock. + PinLock() Unlocker + + // GcRequested returns true if GCLock has been called and is waiting to + // take the lock + GCRequested() bool +} + +// GCBlockstore is a blockstore that can safely run garbage-collection +// operations. +type GCBlockstore interface { + Blockstore + GCLocker +} + +// NewGCBlockstore returns a default implementation of GCBlockstore +// using the given Blockstore and GCLocker. +func NewGCBlockstore(bs Blockstore, gcl GCLocker) GCBlockstore { + return gcBlockstore{bs, gcl} +} + +type gcBlockstore struct { + Blockstore + GCLocker +} + +// NewBlockstore returns a default Blockstore implementation +// using the provided datastore.Batching backend. +func NewBlockstore(d ds.Batching) Blockstore { + var dsb ds.Batching + dd := dsns.Wrap(d, BlockPrefix) + dsb = dd + return &blockstore{ + datastore: dsb, + rehash: uatomic.NewBool(false), + } +} + +type blockstore struct { + datastore ds.Batching + + rehash *uatomic.Bool +} + +func (bs *blockstore) HashOnRead(enabled bool) { + bs.rehash.Store(enabled) +} + +func (bs *blockstore) Get(k cid.Cid) (blocks.Block, error) { + if !k.Defined() { + log.Error("undefined cid in blockstore") + return nil, ErrNotFound + } + bdata, err := bs.datastore.Get(dshelp.MultihashToDsKey(k.Hash())) + if err == ds.ErrNotFound { + return nil, ErrNotFound + } + if err != nil { + return nil, err + } + if bs.rehash.Load() { + rbcid, err := k.Prefix().Sum(bdata) + if err != nil { + return nil, err + } + + if !rbcid.Equals(k) { + return nil, ErrHashMismatch + } + + return blocks.NewBlockWithCid(bdata, rbcid) + } + return blocks.NewBlockWithCid(bdata, k) +} + +func (bs *blockstore) Put(block blocks.Block) error { + k := dshelp.MultihashToDsKey(block.Cid().Hash()) + + // Has is cheaper than Put, so see if we already have it + exists, err := bs.datastore.Has(k) + if err == nil && exists { + return nil // already stored. + } + return bs.datastore.Put(k, block.RawData()) +} + +func (bs *blockstore) PutMany(blocks []blocks.Block) error { + t, err := bs.datastore.Batch() + if err != nil { + return err + } + for _, b := range blocks { + k := dshelp.MultihashToDsKey(b.Cid().Hash()) + exists, err := bs.datastore.Has(k) + if err == nil && exists { + continue + } + + err = t.Put(k, b.RawData()) + if err != nil { + return err + } + } + return t.Commit() +} + +func (bs *blockstore) Has(k cid.Cid) (bool, error) { + return bs.datastore.Has(dshelp.MultihashToDsKey(k.Hash())) +} + +func (bs *blockstore) GetSize(k cid.Cid) (int, error) { + size, err := bs.datastore.GetSize(dshelp.MultihashToDsKey(k.Hash())) + if err == ds.ErrNotFound { + return -1, ErrNotFound + } + return size, err +} + +func (bs *blockstore) DeleteBlock(k cid.Cid) error { + return bs.datastore.Delete(dshelp.MultihashToDsKey(k.Hash())) +} + +// AllKeysChan runs a query for keys from the blockstore. +// this is very simplistic, in the future, take dsq.Query as a param? +// +// AllKeysChan respects context. +func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + + // KeysOnly, because that would be _a lot_ of data. + q := dsq.Query{KeysOnly: true} + res, err := bs.datastore.Query(q) + if err != nil { + return nil, err + } + + output := make(chan cid.Cid, dsq.KeysOnlyBufSize) + go func() { + defer func() { + res.Close() // ensure exit (signals early exit, too) + close(output) + }() + + for { + e, ok := res.NextSync() + if !ok { + return + } + if e.Error != nil { + log.Errorf("blockstore.AllKeysChan got err: %s", e.Error) + return + } + + // need to convert to key.Key using key.KeyFromDsKey. + bk, err := dshelp.BinaryFromDsKey(ds.RawKey(e.Key)) + if err != nil { + log.Warningf("error parsing key from binary: %s", err) + continue + } + k := cid.NewCidV1(cid.Raw, bk) + select { + case <-ctx.Done(): + return + case output <- k: + } + } + }() + + return output, nil +} + +// NewGCLocker returns a default implementation of +// GCLocker using standard [RW] mutexes. +func NewGCLocker() GCLocker { + return &gclocker{} +} + +type gclocker struct { + lk sync.RWMutex + gcreq int32 +} + +// Unlocker represents an object which can Unlock +// something. +type Unlocker interface { + Unlock() +} + +type unlocker struct { + unlock func() +} + +func (u *unlocker) Unlock() { + u.unlock() + u.unlock = nil // ensure its not called twice +} + +func (bs *gclocker) GCLock() Unlocker { + atomic.AddInt32(&bs.gcreq, 1) + bs.lk.Lock() + atomic.AddInt32(&bs.gcreq, -1) + return &unlocker{bs.lk.Unlock} +} + +func (bs *gclocker) PinLock() Unlocker { + bs.lk.RLock() + return &unlocker{bs.lk.RUnlock} +} + +func (bs *gclocker) GCRequested() bool { + return atomic.LoadInt32(&bs.gcreq) > 0 +} diff --git a/blockstore_test.go b/blockstore_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8fef6ba9cf649a6d5a6c85e75863d2a3eb5a1894 --- /dev/null +++ b/blockstore_test.go @@ -0,0 +1,332 @@ +package blockstore + +import ( + "bytes" + "context" + "fmt" + "testing" + + blocks "gitlab.dms3.io/dms3/public/go-block-format" + cid "gitlab.dms3.io/dms3/public/go-cid" + ds "gitlab.dms3.io/dms3/public/go-datastore" + dsq "gitlab.dms3.io/dms3/public/go-datastore/query" + ds_sync "gitlab.dms3.io/dms3/public/go-datastore/sync" + u "gitlab.dms3.io/dms3/public/go-dms3-util" +) + +func TestGetWhenKeyNotPresent(t *testing.T) { + bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + c := cid.NewCidV0(u.Hash([]byte("stuff"))) + bl, err := bs.Get(c) + + if bl != nil { + t.Error("nil block expected") + } + if err == nil { + t.Error("error expected, got nil") + } +} + +func TestGetWhenKeyIsNil(t *testing.T) { + bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + _, err := bs.Get(cid.Cid{}) + if err != ErrNotFound { + t.Fail() + } +} + +func TestPutThenGetBlock(t *testing.T) { + bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + block := blocks.NewBlock([]byte("some data")) + + err := bs.Put(block) + if err != nil { + t.Fatal(err) + } + + blockFromBlockstore, err := bs.Get(block.Cid()) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(block.RawData(), blockFromBlockstore.RawData()) { + t.Fail() + } +} + +func TestCidv0v1(t *testing.T) { + bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + block := blocks.NewBlock([]byte("some data")) + + err := bs.Put(block) + if err != nil { + t.Fatal(err) + } + + blockFromBlockstore, err := bs.Get(cid.NewCidV1(cid.DagProtobuf, block.Cid().Hash())) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(block.RawData(), blockFromBlockstore.RawData()) { + t.Fail() + } +} + +func TestPutThenGetSizeBlock(t *testing.T) { + bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + block := blocks.NewBlock([]byte("some data")) + missingBlock := blocks.NewBlock([]byte("missingBlock")) + emptyBlock := blocks.NewBlock([]byte{}) + + err := bs.Put(block) + if err != nil { + t.Fatal(err) + } + + blockSize, err := bs.GetSize(block.Cid()) + if err != nil { + t.Fatal(err) + } + if len(block.RawData()) != blockSize { + t.Fail() + } + + err = bs.Put(emptyBlock) + if err != nil { + t.Fatal(err) + } + + if blockSize, err := bs.GetSize(emptyBlock.Cid()); blockSize != 0 || err != nil { + t.Fatal(err) + } + + if blockSize, err := bs.GetSize(missingBlock.Cid()); blockSize != -1 || err == nil { + t.Fatal("getsize returned invalid result") + } +} + +type countHasDS struct { + ds.Datastore + hasCount int +} + +func (ds *countHasDS) Has(key ds.Key) (exists bool, err error) { + ds.hasCount += 1 + return ds.Datastore.Has(key) +} + +func TestPutUsesHas(t *testing.T) { + // Some datastores rely on the implementation detail that Put checks Has + // first, to avoid overriding existing objects' metadata. This test ensures + // that Blockstore continues to behave this way. + // Please ping https://gitlab.dms3.io/dms3/public/go-dms3-blockstore/pull/47 if this + // behavior is being removed. + ds := &countHasDS{ + Datastore: ds.NewMapDatastore(), + } + bs := NewBlockstore(ds_sync.MutexWrap(ds)) + bl := blocks.NewBlock([]byte("some data")) + if err := bs.Put(bl); err != nil { + t.Fatal(err) + } + if err := bs.Put(bl); err != nil { + t.Fatal(err) + } + if ds.hasCount != 2 { + t.Errorf("Blockstore did not call Has before attempting Put, this breaks compatibility") + } +} + +func TestHashOnRead(t *testing.T) { + orginalDebug := u.Debug + defer (func() { + u.Debug = orginalDebug + })() + u.Debug = false + + bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + bl := blocks.NewBlock([]byte("some data")) + blBad, err := blocks.NewBlockWithCid([]byte("some other data"), bl.Cid()) + if err != nil { + t.Fatal("debug is off, still got an error") + } + bl2 := blocks.NewBlock([]byte("some other data")) + bs.Put(blBad) + bs.Put(bl2) + bs.HashOnRead(true) + + if _, err := bs.Get(bl.Cid()); err != ErrHashMismatch { + t.Fatalf("expected '%v' got '%v'\n", ErrHashMismatch, err) + } + + if b, err := bs.Get(bl2.Cid()); err != nil || b.String() != bl2.String() { + t.Fatal("got wrong blocks") + } +} + +func newBlockStoreWithKeys(t *testing.T, d ds.Datastore, N int) (Blockstore, []cid.Cid) { + if d == nil { + d = ds.NewMapDatastore() + } + bs := NewBlockstore(ds_sync.MutexWrap(d)) + + keys := make([]cid.Cid, N) + for i := 0; i < N; i++ { + block := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i))) + err := bs.Put(block) + if err != nil { + t.Fatal(err) + } + keys[i] = block.Cid() + } + return bs, keys +} + +func collect(ch <-chan cid.Cid) []cid.Cid { + var keys []cid.Cid + for k := range ch { + keys = append(keys, k) + } + return keys +} + +func TestAllKeysSimple(t *testing.T) { + bs, keys := newBlockStoreWithKeys(t, nil, 100) + + ctx := context.Background() + ch, err := bs.AllKeysChan(ctx) + if err != nil { + t.Fatal(err) + } + keys2 := collect(ch) + + // for _, k2 := range keys2 { + // t.Log("found ", k2.B58String()) + // } + + expectMatches(t, keys, keys2) +} + +func TestAllKeysRespectsContext(t *testing.T) { + N := 100 + + d := &queryTestDS{ds: ds.NewMapDatastore()} + bs, _ := newBlockStoreWithKeys(t, d, N) + + started := make(chan struct{}, 1) + done := make(chan struct{}, 1) + errors := make(chan error, 100) + + getKeys := func(ctx context.Context) { + started <- struct{}{} + ch, err := bs.AllKeysChan(ctx) // once without cancelling + if err != nil { + errors <- err + } + _ = collect(ch) + done <- struct{}{} + errors <- nil // a nil one to signal break + } + + var results dsq.Results + var resultsmu = make(chan struct{}) + resultChan := make(chan dsq.Result) + d.SetFunc(func(q dsq.Query) (dsq.Results, error) { + results = dsq.ResultsWithChan(q, resultChan) + resultsmu <- struct{}{} + return results, nil + }) + + go getKeys(context.Background()) + + // make sure it's waiting. + <-started + <-resultsmu + select { + case <-done: + t.Fatal("sync is wrong") + case <-results.Process().Closing(): + t.Fatal("should not be closing") + case <-results.Process().Closed(): + t.Fatal("should not be closed") + default: + } + + e := dsq.Entry{Key: BlockPrefix.ChildString("foo").String()} + resultChan <- dsq.Result{Entry: e} // let it go. + close(resultChan) + <-done // should be done now. + <-results.Process().Closed() // should be closed now + + // print any errors + for err := range errors { + if err == nil { + break + } + t.Error(err) + } + +} + +func expectMatches(t *testing.T, expect, actual []cid.Cid) { + t.Helper() + + if len(expect) != len(actual) { + t.Errorf("expect and actual differ: %d != %d", len(expect), len(actual)) + } + + actualSet := make(map[string]bool, len(actual)) + for _, k := range actual { + actualSet[string(k.Hash())] = true + } + + for _, ek := range expect { + if !actualSet[string(ek.Hash())] { + t.Error("expected key not found: ", ek) + } + } +} + +type queryTestDS struct { + cb func(q dsq.Query) (dsq.Results, error) + ds ds.Datastore +} + +func (c *queryTestDS) SetFunc(f func(dsq.Query) (dsq.Results, error)) { c.cb = f } + +func (c *queryTestDS) Put(key ds.Key, value []byte) (err error) { + return c.ds.Put(key, value) +} + +func (c *queryTestDS) Get(key ds.Key) (value []byte, err error) { + return c.ds.Get(key) +} + +func (c *queryTestDS) Has(key ds.Key) (exists bool, err error) { + return c.ds.Has(key) +} + +func (c *queryTestDS) GetSize(key ds.Key) (size int, err error) { + return c.ds.GetSize(key) +} + +func (c *queryTestDS) Delete(key ds.Key) (err error) { + return c.ds.Delete(key) +} + +func (c *queryTestDS) Query(q dsq.Query) (dsq.Results, error) { + if c.cb != nil { + return c.cb(q) + } + return c.ds.Query(q) +} + +func (c *queryTestDS) Sync(key ds.Key) error { + return c.ds.Sync(key) +} + +func (c *queryTestDS) Batch() (ds.Batch, error) { + return ds.NewBasicBatch(c), nil +} +func (c *queryTestDS) Close() error { + return nil +} diff --git a/bloom_cache.go b/bloom_cache.go new file mode 100644 index 0000000000000000000000000000000000000000..1702e34bb6e16080570ea90e1aa22f9de4cecaac --- /dev/null +++ b/bloom_cache.go @@ -0,0 +1,226 @@ +package blockstore + +import ( + "context" + "fmt" + "sync/atomic" + "time" + + bloom "gitlab.dms3.io/dms3/public/bbloom" + blocks "gitlab.dms3.io/dms3/public/go-block-format" + cid "gitlab.dms3.io/dms3/public/go-cid" + metrics "gitlab.dms3.io/dms3/public/go-metrics-interface" +) + +// bloomCached returns a Blockstore that caches Has requests using a Bloom +// filter. bloomSize is size of bloom filter in bytes. hashCount specifies the +// number of hashing functions in the bloom filter (usually known as k). +func bloomCached(ctx context.Context, bs Blockstore, bloomSize, hashCount int) (*bloomcache, error) { + bl, err := bloom.New(float64(bloomSize), float64(hashCount)) + if err != nil { + return nil, err + } + bc := &bloomcache{ + blockstore: bs, + bloom: bl, + hits: metrics.NewCtx(ctx, "bloom.hits_total", + "Number of cache hits in bloom cache").Counter(), + total: metrics.NewCtx(ctx, "bloom_total", + "Total number of requests to bloom cache").Counter(), + buildChan: make(chan struct{}), + } + if v, ok := bs.(Viewer); ok { + bc.viewer = v + } + go func() { + err := bc.build(ctx) + if err != nil { + select { + case <-ctx.Done(): + log.Warning("Cache rebuild closed by context finishing: ", err) + default: + log.Error(err) + } + return + } + if metrics.Active() { + fill := metrics.NewCtx(ctx, "bloom_fill_ratio", + "Ratio of bloom filter fullnes, (updated once a minute)").Gauge() + + t := time.NewTicker(1 * time.Minute) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + fill.Set(bc.bloom.FillRatioTS()) + } + } + } + }() + return bc, nil +} + +type bloomcache struct { + active int32 + + bloom *bloom.Bloom + buildErr error + + buildChan chan struct{} + blockstore Blockstore + viewer Viewer + + // Statistics + hits metrics.Counter + total metrics.Counter +} + +var _ Blockstore = (*bloomcache)(nil) +var _ Viewer = (*bloomcache)(nil) + +func (b *bloomcache) BloomActive() bool { + return atomic.LoadInt32(&b.active) != 0 +} + +func (b *bloomcache) Wait(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-b.buildChan: + return b.buildErr + } +} + +func (b *bloomcache) build(ctx context.Context) error { + // evt := log.EventBegin(ctx, "bloomcache.build") + // defer evt.Done() + defer close(b.buildChan) + + ch, err := b.blockstore.AllKeysChan(ctx) + if err != nil { + b.buildErr = fmt.Errorf("AllKeysChan failed in bloomcache rebuild with: %v", err) + return b.buildErr + } + for { + select { + case key, ok := <-ch: + if !ok { + atomic.StoreInt32(&b.active, 1) + return nil + } + b.bloom.AddTS(key.Hash()) // Use binary key, the more compact the better + case <-ctx.Done(): + b.buildErr = ctx.Err() + return b.buildErr + } + } +} + +func (b *bloomcache) DeleteBlock(k cid.Cid) error { + if has, ok := b.hasCached(k); ok && !has { + return nil + } + + return b.blockstore.DeleteBlock(k) +} + +// if ok == false has is inconclusive +// if ok == true then has respons to question: is it contained +func (b *bloomcache) hasCached(k cid.Cid) (has bool, ok bool) { + b.total.Inc() + if !k.Defined() { + log.Error("undefined in bloom cache") + // Return cache invalid so call to blockstore + // in case of invalid key is forwarded deeper + return false, false + } + if b.BloomActive() { + blr := b.bloom.HasTS(k.Hash()) + if !blr { // not contained in bloom is only conclusive answer bloom gives + b.hits.Inc() + return false, true + } + } + return false, false +} + +func (b *bloomcache) Has(k cid.Cid) (bool, error) { + if has, ok := b.hasCached(k); ok { + return has, nil + } + + return b.blockstore.Has(k) +} + +func (b *bloomcache) GetSize(k cid.Cid) (int, error) { + return b.blockstore.GetSize(k) +} + +func (b *bloomcache) View(k cid.Cid, callback func([]byte) error) error { + if b.viewer == nil { + blk, err := b.Get(k) + if err != nil { + return err + } + return callback(blk.RawData()) + } + + if has, ok := b.hasCached(k); ok && !has { + return ErrNotFound + } + return b.viewer.View(k, callback) +} + +func (b *bloomcache) Get(k cid.Cid) (blocks.Block, error) { + if has, ok := b.hasCached(k); ok && !has { + return nil, ErrNotFound + } + + return b.blockstore.Get(k) +} + +func (b *bloomcache) Put(bl blocks.Block) error { + // See comment in PutMany + err := b.blockstore.Put(bl) + if err == nil { + b.bloom.AddTS(bl.Cid().Hash()) + } + return err +} + +func (b *bloomcache) PutMany(bs []blocks.Block) error { + // bloom cache gives only conclusive resulty if key is not contained + // to reduce number of puts we need conclusive information if block is contained + // this means that PutMany can't be improved with bloom cache so we just + // just do a passthrough. + err := b.blockstore.PutMany(bs) + if err != nil { + return err + } + for _, bl := range bs { + b.bloom.AddTS(bl.Cid().Hash()) + } + return nil +} + +func (b *bloomcache) HashOnRead(enabled bool) { + b.blockstore.HashOnRead(enabled) +} + +func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return b.blockstore.AllKeysChan(ctx) +} + +func (b *bloomcache) GCLock() Unlocker { + return b.blockstore.(GCBlockstore).GCLock() +} + +func (b *bloomcache) PinLock() Unlocker { + return b.blockstore.(GCBlockstore).PinLock() +} + +func (b *bloomcache) GCRequested() bool { + return b.blockstore.(GCBlockstore).GCRequested() +} diff --git a/bloom_cache_test.go b/bloom_cache_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a64dc402cb496f67f91c0755758d6bf1afb0ae8e --- /dev/null +++ b/bloom_cache_test.go @@ -0,0 +1,212 @@ +package blockstore + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + blocks "gitlab.dms3.io/dms3/public/go-block-format" + ds "gitlab.dms3.io/dms3/public/go-datastore" + dsq "gitlab.dms3.io/dms3/public/go-datastore/query" + syncds "gitlab.dms3.io/dms3/public/go-datastore/sync" +) + +func testBloomCached(ctx context.Context, bs Blockstore) (*bloomcache, error) { + if ctx == nil { + ctx = context.Background() + } + opts := DefaultCacheOpts() + opts.HasARCCacheSize = 0 + bbs, err := CachedBlockstore(ctx, bs, opts) + if err == nil { + return bbs.(*bloomcache), nil + } + return nil, err +} + +func TestPutManyAddsToBloom(t *testing.T) { + bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore())) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + cachedbs, err := testBloomCached(ctx, bs) + if err != nil { + t.Fatal(err) + } + + if err := cachedbs.Wait(ctx); err != nil { + t.Fatalf("Failed while waiting for the filter to build: %d", cachedbs.bloom.ElementsAdded()) + } + + block1 := blocks.NewBlock([]byte("foo")) + block2 := blocks.NewBlock([]byte("bar")) + emptyBlock := blocks.NewBlock([]byte{}) + + cachedbs.PutMany([]blocks.Block{block1, emptyBlock}) + has, err := cachedbs.Has(block1.Cid()) + if err != nil { + t.Fatal(err) + } + blockSize, err := cachedbs.GetSize(block1.Cid()) + if err != nil { + t.Fatal(err) + } + if blockSize == -1 || !has { + t.Fatal("added block is reported missing") + } + + has, err = cachedbs.Has(block2.Cid()) + if err != nil { + t.Fatal(err) + } + blockSize, err = cachedbs.GetSize(block2.Cid()) + if err != nil && err != ErrNotFound { + t.Fatal(err) + } + if blockSize > -1 || has { + t.Fatal("not added block is reported to be in blockstore") + } + + has, err = cachedbs.Has(emptyBlock.Cid()) + if err != nil { + t.Fatal(err) + } + blockSize, err = cachedbs.GetSize(emptyBlock.Cid()) + if err != nil { + t.Fatal(err) + } + if blockSize != 0 || !has { + t.Fatal("added block is reported missing") + } +} + +func TestReturnsErrorWhenSizeNegative(t *testing.T) { + bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore())) + _, err := bloomCached(context.Background(), bs, -1, 1) + if err == nil { + t.Fail() + } +} +func TestHasIsBloomCached(t *testing.T) { + cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} + bs := NewBlockstore(syncds.MutexWrap(cd)) + + for i := 0; i < 1000; i++ { + bs.Put(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i)))) + } + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + cachedbs, err := testBloomCached(ctx, bs) + if err != nil { + t.Fatal(err) + } + + if err := cachedbs.Wait(ctx); err != nil { + t.Fatalf("Failed while waiting for the filter to build: %d", cachedbs.bloom.ElementsAdded()) + } + + cacheFails := 0 + cd.SetFunc(func() { + cacheFails++ + }) + + for i := 0; i < 1000; i++ { + cachedbs.Has(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i+2000))).Cid()) + } + + if float64(cacheFails)/float64(1000) > float64(0.05) { + t.Fatalf("Bloom filter has cache miss rate of more than 5%%") + } + + cacheFails = 0 + block := blocks.NewBlock([]byte("newBlock")) + + cachedbs.PutMany([]blocks.Block{block}) + if cacheFails != 2 { + t.Fatalf("expected two datastore hits: %d", cacheFails) + } + cachedbs.Put(block) + if cacheFails != 3 { + t.Fatalf("expected datastore hit: %d", cacheFails) + } + + if has, err := cachedbs.Has(block.Cid()); !has || err != nil { + t.Fatal("has gave wrong response") + } + + bl, err := cachedbs.Get(block.Cid()) + if bl.String() != block.String() { + t.Fatal("block data doesn't match") + } + + if err != nil { + t.Fatal("there should't be an error") + } +} + +var _ ds.Batching = (*callbackDatastore)(nil) + +type callbackDatastore struct { + sync.Mutex + f func() + ds ds.Datastore +} + +func (c *callbackDatastore) SetFunc(f func()) { + c.Lock() + defer c.Unlock() + c.f = f +} + +func (c *callbackDatastore) CallF() { + c.Lock() + defer c.Unlock() + c.f() +} + +func (c *callbackDatastore) Put(key ds.Key, value []byte) (err error) { + c.CallF() + return c.ds.Put(key, value) +} + +func (c *callbackDatastore) Get(key ds.Key) (value []byte, err error) { + c.CallF() + return c.ds.Get(key) +} + +func (c *callbackDatastore) Has(key ds.Key) (exists bool, err error) { + c.CallF() + return c.ds.Has(key) +} + +func (c *callbackDatastore) GetSize(key ds.Key) (size int, err error) { + c.CallF() + return c.ds.GetSize(key) +} + +func (c *callbackDatastore) Close() error { + return nil +} + +func (c *callbackDatastore) Delete(key ds.Key) (err error) { + c.CallF() + return c.ds.Delete(key) +} + +func (c *callbackDatastore) Query(q dsq.Query) (dsq.Results, error) { + c.CallF() + return c.ds.Query(q) +} + +func (c *callbackDatastore) Sync(key ds.Key) error { + c.CallF() + return c.ds.Sync(key) +} + +func (c *callbackDatastore) Batch() (ds.Batch, error) { + return ds.NewBasicBatch(c), nil +} diff --git a/caching.go b/caching.go new file mode 100644 index 0000000000000000000000000000000000000000..86f050377e6edb52a46a68480828e3218e6445f6 --- /dev/null +++ b/caching.go @@ -0,0 +1,55 @@ +package blockstore + +import ( + "context" + "errors" + + metrics "gitlab.dms3.io/dms3/public/go-metrics-interface" +) + +// CacheOpts wraps options for CachedBlockStore(). +// Next to each option is it aproximate memory usage per unit +type CacheOpts struct { + HasBloomFilterSize int // 1 byte + HasBloomFilterHashes int // No size, 7 is usually best, consult bloom papers + HasARCCacheSize int // 32 bytes +} + +// DefaultCacheOpts returns a CacheOpts initialized with default values. +func DefaultCacheOpts() CacheOpts { + return CacheOpts{ + HasBloomFilterSize: 512 << 10, + HasBloomFilterHashes: 7, + HasARCCacheSize: 64 << 10, + } +} + +// CachedBlockstore returns a blockstore wrapped in an ARCCache and +// then in a bloom filter cache, if the options indicate it. +func CachedBlockstore( + ctx context.Context, + bs Blockstore, + opts CacheOpts) (cbs Blockstore, err error) { + cbs = bs + + if opts.HasBloomFilterSize < 0 || opts.HasBloomFilterHashes < 0 || + opts.HasARCCacheSize < 0 { + return nil, errors.New("all options for cache need to be greater than zero") + } + + if opts.HasBloomFilterSize != 0 && opts.HasBloomFilterHashes == 0 { + return nil, errors.New("bloom filter hash count can't be 0 when there is size set") + } + + ctx = metrics.CtxSubScope(ctx, "bs.cache") + + if opts.HasARCCacheSize > 0 { + cbs, err = newARCCachedBS(ctx, cbs, opts.HasARCCacheSize) + } + if opts.HasBloomFilterSize != 0 { + // *8 because of bytes to bits conversion + cbs, err = bloomCached(ctx, cbs, opts.HasBloomFilterSize*8, opts.HasBloomFilterHashes) + } + + return cbs, err +} diff --git a/caching_test.go b/caching_test.go new file mode 100644 index 0000000000000000000000000000000000000000..16066ad18c924bb7b0543d6af986e4d84eac1e8f --- /dev/null +++ b/caching_test.go @@ -0,0 +1,38 @@ +package blockstore + +import ( + "context" + "testing" +) + +func TestCachingOptsLessThanZero(t *testing.T) { + opts := DefaultCacheOpts() + opts.HasARCCacheSize = -1 + + if _, err := CachedBlockstore(context.TODO(), nil, opts); err == nil { + t.Error("wrong ARC setting was not detected") + } + + opts = DefaultCacheOpts() + opts.HasBloomFilterSize = -1 + + if _, err := CachedBlockstore(context.TODO(), nil, opts); err == nil { + t.Error("negative bloom size was not detected") + } + + opts = DefaultCacheOpts() + opts.HasBloomFilterHashes = -1 + + if _, err := CachedBlockstore(context.TODO(), nil, opts); err == nil { + t.Error("negative hashes setting was not detected") + } +} + +func TestBloomHashesAtZero(t *testing.T) { + opts := DefaultCacheOpts() + opts.HasBloomFilterHashes = 0 + + if _, err := CachedBlockstore(context.TODO(), nil, opts); err == nil { + t.Error("zero hashes setting with positive size was not detected") + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000000000000000000000000000000000000..fc56085d7c2094271c9a0209b5e91c6e88fae51d --- /dev/null +++ b/go.mod @@ -0,0 +1,17 @@ +module gitlab.dms3.io/dms3/public/go-dms3-blockstore + +require ( + github.com/hashicorp/golang-lru v0.5.4 + github.com/multiformats/go-multihash v0.0.14 + gitlab.dms3.io/dms3/public/bbloom v0.0.1 + gitlab.dms3.io/dms3/public/go-block-format v0.0.1 + gitlab.dms3.io/dms3/public/go-cid v0.0.1 + gitlab.dms3.io/dms3/public/go-datastore v0.0.1 + gitlab.dms3.io/dms3/public/go-dms3-ds-help v0.0.2 + gitlab.dms3.io/dms3/public/go-dms3-util v0.0.1 + gitlab.dms3.io/dms3/public/go-log v0.0.1 + gitlab.dms3.io/dms3/public/go-metrics-interface v0.0.1 + go.uber.org/atomic v1.7.0 +) + +go 1.15 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000000000000000000000000000000000000..1a6bd03365baf97f4fc12cee4cb653dc294b9130 --- /dev/null +++ b/go.sum @@ -0,0 +1,116 @@ +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= +github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= +github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= +github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= +github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771 h1:MHkK1uRtFbVqvAgvWxafZe54+5uBxLluGylDiKgdhwo= +github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= +github.com/mr-tron/base58 v1.1.0 h1:Y51FGVJ91WBqCEabAi5OPUz38eAx8DakuAm5svLcsfQ= +github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= +github.com/mr-tron/base58 v1.1.3 h1:v+sk57XuaCKGXpWtVBX8YJzO7hMGx4Aajh4TQbdEFdc= +github.com/mr-tron/base58 v1.1.3/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= +github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI= +github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA= +github.com/multiformats/go-base36 v0.1.0 h1:JR6TyF7JjGd3m6FbLU2cOxhC0Li8z8dLNGQ89tUg4F4= +github.com/multiformats/go-base36 v0.1.0/go.mod h1:kFGE83c6s80PklsHO9sRn2NCoffoRdUUOENyW/Vv6sM= +github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk= +github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc= +github.com/multiformats/go-multihash v0.0.13 h1:06x+mk/zj1FoMsgNejLpy6QTvJqlSt/BhLEy87zidlc= +github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= +github.com/multiformats/go-multihash v0.0.14 h1:QoBceQYQQtNUuf6s7wHxnE2c8bhbMqhfGzNI032se/I= +github.com/multiformats/go-multihash v0.0.14/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= +github.com/multiformats/go-varint v0.0.5 h1:XVZwSo04Cs3j/jS0uAEPpT3JY6DzMcVLLoWOSnCxOjg= +github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= +github.com/multiformats/go-varint v0.0.6 h1:gk85QWKxh3TazbLxED/NlDVv8+q+ReFJk7Y2W/KhfNY= +github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +gitlab.dms3.io/dms3/public/bbloom v0.0.1 h1:kgk1gfIoUl7cuxlYZ0/Mv8k01xikBH33fb2peWIQJic= +gitlab.dms3.io/dms3/public/bbloom v0.0.1/go.mod h1:n76ooRIrdCEsVmkoqml73ZuX1kVWNstRTPpoz4YyMzQ= +gitlab.dms3.io/dms3/public/go-block-format v0.0.1 h1:PQ6+E7zY6kUIHET86uJTQHTTj4Z9ZNfP7w281ZdExgk= +gitlab.dms3.io/dms3/public/go-block-format v0.0.1/go.mod h1:xlvtW/OF72rOzLa2RVWXX2Uw18qTAWTQEs/Xp7SCnuY= +gitlab.dms3.io/dms3/public/go-cid v0.0.1 h1:qs4dtkDigcLGY/58dIZaFjKLt+orrTcmTBvtqaM3570= +gitlab.dms3.io/dms3/public/go-cid v0.0.1/go.mod h1:GQw3gc4CSrFY+aX6M+OBQDlg0p5/eQJoRrayaZzkAOQ= +gitlab.dms3.io/dms3/public/go-datastore v0.0.1 h1:RjZLvnqlvWDpb5ZwvkVEWGONF7zKNPe4Q0DND5oxZec= +gitlab.dms3.io/dms3/public/go-datastore v0.0.1/go.mod h1:qSI0hpmVMo6HCp0uveHTKyQ87j1aVe2hqiTeiPCehYA= +gitlab.dms3.io/dms3/public/go-dms3-delay v0.0.1/go.mod h1:Mg+buHOoh8UruN+MMqeqBUPBKMRTmpsXAyxv5ZSt+X4= +gitlab.dms3.io/dms3/public/go-dms3-ds-help v0.0.2 h1:SYgjiHzcpFBgabZhLVdfC4LPKjcQ/Lc8xgh3c3JsoFs= +gitlab.dms3.io/dms3/public/go-dms3-ds-help v0.0.2/go.mod h1:szc3LU0qkzM55gKHJuGx2TU6tIn/4sRJt/ThZzffi9A= +gitlab.dms3.io/dms3/public/go-dms3-util v0.0.1 h1:Gd+kJl1Rc+ZEUb9CIS1ZctQnF9G1oruNFyxUC//QBUQ= +gitlab.dms3.io/dms3/public/go-dms3-util v0.0.1/go.mod h1:ymlwtzTNMq8Ug+gVtPAMxXKCKTXwXJAzXS+SUihfKgo= +gitlab.dms3.io/dms3/public/go-log v0.0.1 h1:jqz2g8pVdPW+Sy8CCo4rYfGEjktGhCBfgIb3oeY6yx8= +gitlab.dms3.io/dms3/public/go-log v0.0.1/go.mod h1:OsyF7lVYe47r03v1ZCbrmz0byeGUWB0Y219jN1DJx3s= +gitlab.dms3.io/dms3/public/go-metrics-interface v0.0.1 h1:oLJzd5zSjf5C1aEMHziNbV4RbJVuPaNzzKW4VBzKnQM= +gitlab.dms3.io/dms3/public/go-metrics-interface v0.0.1/go.mod h1:Ag1ayfnHxy0H659akn+bjAGn9k/HJrpxsVrB90DmRL8= +go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= +go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= +go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM= +go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8 h1:1wopBVtVdWnn03fZelqdXTqk7U7zPQCb+T4rbU9ZEoU= +golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs= +golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= +honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/idstore.go b/idstore.go new file mode 100644 index 0000000000000000000000000000000000000000..083c32ba6ed06d7ad422bead0bd7074424e9a563 --- /dev/null +++ b/idstore.go @@ -0,0 +1,123 @@ +package blockstore + +import ( + "context" + "io" + + mh "github.com/multiformats/go-multihash" + blocks "gitlab.dms3.io/dms3/public/go-block-format" + cid "gitlab.dms3.io/dms3/public/go-cid" +) + +// idstore wraps a BlockStore to add support for identity hashes +type idstore struct { + bs Blockstore + viewer Viewer +} + +var _ Blockstore = (*idstore)(nil) +var _ Viewer = (*idstore)(nil) +var _ io.Closer = (*idstore)(nil) + +func NewIdStore(bs Blockstore) Blockstore { + ids := &idstore{bs: bs} + if v, ok := bs.(Viewer); ok { + ids.viewer = v + } + return ids +} + +func extractContents(k cid.Cid) (bool, []byte) { + // Pre-check by calling Prefix(), this much faster than extracting the hash. + if k.Prefix().MhType != mh.IDENTITY { + return false, nil + } + + dmh, err := mh.Decode(k.Hash()) + if err != nil || dmh.Code != mh.ID { + return false, nil + } + return true, dmh.Digest +} + +func (b *idstore) DeleteBlock(k cid.Cid) error { + isId, _ := extractContents(k) + if isId { + return nil + } + return b.bs.DeleteBlock(k) +} + +func (b *idstore) Has(k cid.Cid) (bool, error) { + isId, _ := extractContents(k) + if isId { + return true, nil + } + return b.bs.Has(k) +} + +func (b *idstore) View(k cid.Cid, callback func([]byte) error) error { + if b.viewer == nil { + blk, err := b.Get(k) + if err != nil { + return err + } + return callback(blk.RawData()) + } + isId, bdata := extractContents(k) + if isId { + return callback(bdata) + } + return b.viewer.View(k, callback) +} + +func (b *idstore) GetSize(k cid.Cid) (int, error) { + isId, bdata := extractContents(k) + if isId { + return len(bdata), nil + } + return b.bs.GetSize(k) +} + +func (b *idstore) Get(k cid.Cid) (blocks.Block, error) { + isId, bdata := extractContents(k) + if isId { + return blocks.NewBlockWithCid(bdata, k) + } + return b.bs.Get(k) +} + +func (b *idstore) Put(bl blocks.Block) error { + isId, _ := extractContents(bl.Cid()) + if isId { + return nil + } + return b.bs.Put(bl) +} + +func (b *idstore) PutMany(bs []blocks.Block) error { + toPut := make([]blocks.Block, 0, len(bs)) + for _, bl := range bs { + isId, _ := extractContents(bl.Cid()) + if isId { + continue + } + toPut = append(toPut, bl) + } + return b.bs.PutMany(toPut) +} + +func (b *idstore) HashOnRead(enabled bool) { + b.bs.HashOnRead(enabled) +} + +func (b *idstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return b.bs.AllKeysChan(ctx) +} + +func (b *idstore) Close() error { + if c, ok := b.bs.(io.Closer); ok { + return c.Close() + } + return nil +} diff --git a/idstore_test.go b/idstore_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b965ef818fb5178be5542ab05a1c021e5d8715ae --- /dev/null +++ b/idstore_test.go @@ -0,0 +1,159 @@ +package blockstore + +import ( + "context" + "testing" + + mh "github.com/multiformats/go-multihash" + blk "gitlab.dms3.io/dms3/public/go-block-format" + cid "gitlab.dms3.io/dms3/public/go-cid" + ds "gitlab.dms3.io/dms3/public/go-datastore" +) + +func createTestStores() (Blockstore, *callbackDatastore) { + cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} + ids := NewIdStore(NewBlockstore(cd)) + return ids, cd +} + +func TestIdStore(t *testing.T) { + idhash1, _ := cid.NewPrefixV1(cid.Raw, mh.ID).Sum([]byte("idhash1")) + idblock1, _ := blk.NewBlockWithCid([]byte("idhash1"), idhash1) + hash1, _ := cid.NewPrefixV1(cid.Raw, mh.SHA2_256).Sum([]byte("hash1")) + block1, _ := blk.NewBlockWithCid([]byte("hash1"), hash1) + emptyHash, _ := cid.NewPrefixV1(cid.Raw, mh.SHA2_256).Sum([]byte("emptyHash")) + emptyBlock, _ := blk.NewBlockWithCid([]byte{}, emptyHash) + + ids, cb := createTestStores() + + have, _ := ids.Has(idhash1) + if !have { + t.Fatal("Has() failed on idhash") + } + + _, err := ids.Get(idhash1) + if err != nil { + t.Fatalf("Get() failed on idhash: %v", err) + } + + noop := func() {} + failIfPassThough := func() { + t.Fatal("operation on identity hash passed though to datastore") + } + + cb.f = failIfPassThough + err = ids.Put(idblock1) + if err != nil { + t.Fatal(err) + } + + cb.f = noop + err = ids.Put(block1) + if err != nil { + t.Fatalf("Put() failed on normal block: %v", err) + } + + have, _ = ids.Has(hash1) + if !have { + t.Fatal("normal block not added to datastore") + } + + blockSize, _ := ids.GetSize(hash1) + if blockSize == -1 { + t.Fatal("normal block not added to datastore") + } + + _, err = ids.Get(hash1) + if err != nil { + t.Fatal(err) + } + + err = ids.Put(emptyBlock) + if err != nil { + t.Fatalf("Put() failed on normal block: %v", err) + } + + have, _ = ids.Has(emptyHash) + if !have { + t.Fatal("normal block not added to datastore") + } + + blockSize, _ = ids.GetSize(emptyHash) + if blockSize != 0 { + t.Fatal("normal block not added to datastore") + } + + cb.f = failIfPassThough + err = ids.DeleteBlock(idhash1) + if err != nil { + t.Fatal(err) + } + + cb.f = noop + err = ids.DeleteBlock(hash1) + if err != nil { + t.Fatal(err) + } + + have, _ = ids.Has(hash1) + if have { + t.Fatal("normal block not deleted from datastore") + } + + blockSize, _ = ids.GetSize(hash1) + if blockSize > -1 { + t.Fatal("normal block not deleted from datastore") + } + + err = ids.DeleteBlock(emptyHash) + if err != nil { + t.Fatal(err) + } + + idhash2, _ := cid.NewPrefixV1(cid.Raw, mh.ID).Sum([]byte("idhash2")) + idblock2, _ := blk.NewBlockWithCid([]byte("idhash2"), idhash2) + hash2, _ := cid.NewPrefixV1(cid.Raw, mh.SHA2_256).Sum([]byte("hash2")) + block2, _ := blk.NewBlockWithCid([]byte("hash2"), hash2) + + cb.f = failIfPassThough + err = ids.PutMany([]blk.Block{idblock1, idblock2}) + if err != nil { + t.Fatal(err) + } + + opCount := 0 + cb.f = func() { + opCount++ + } + + err = ids.PutMany([]blk.Block{block1, block2}) + if err != nil { + t.Fatal(err) + } + if opCount != 4 { + // one call to Has and Put for each Cid + t.Fatalf("expected exactly 4 operations got %d", opCount) + } + + opCount = 0 + err = ids.PutMany([]blk.Block{idblock1, block1}) + if err != nil { + t.Fatal(err) + } + if opCount != 1 { + // just one call to Put from the normal (non-id) block + t.Fatalf("expected exactly 1 operations got %d", opCount) + } + + ch, err := ids.AllKeysChan(context.TODO()) + cnt := 0 + for c := range ch { + cnt++ + if c.Prefix().MhType == mh.ID { + t.Fatalf("block with identity hash found in blockstore") + } + } + if cnt != 2 { + t.Fatalf("expected exactly two keys returned by AllKeysChan got %d", cnt) + } +}