diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml new file mode 100644 index 0000000000000000000000000000000000000000..4b86d7197f9a6f1a988c503defc8451392cd5e55 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/config.yml @@ -0,0 +1,8 @@ +blank_issues_enabled: false +contact_links: + - name: Getting Help on IPFS + url: https://ipfs.io/help + about: All information about how and where to get help on IPFS. + - name: IPFS Official Forum + url: https://discuss.ipfs.io + about: Please post general questions, support requests, and discussions here. diff --git a/.github/ISSUE_TEMPLATE/open_an_issue.md b/.github/ISSUE_TEMPLATE/open_an_issue.md new file mode 100644 index 0000000000000000000000000000000000000000..4fcbd00aca0d513c2729d8df4afb5a01fdbe7d02 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/open_an_issue.md @@ -0,0 +1,19 @@ +--- +name: Open an issue +about: Only for actionable issues relevant to this repository. +title: '' +labels: need/triage +assignees: '' + +--- + diff --git a/.github/config.yml b/.github/config.yml new file mode 100644 index 0000000000000000000000000000000000000000..ed26646a0f7cdda6cf10ede2b0b98cac89cf67b0 --- /dev/null +++ b/.github/config.yml @@ -0,0 +1,68 @@ +# Configuration for welcome - https://github.com/behaviorbot/welcome + +# Configuration for new-issue-welcome - https://github.com/behaviorbot/new-issue-welcome +# Comment to be posted to on first time issues +newIssueWelcomeComment: > + Thank you for submitting your first issue to this repository! A maintainer + will be here shortly to triage and review. + + In the meantime, please double-check that you have provided all the + necessary information to make this process easy! Any information that can + help save additional round trips is useful! We currently aim to give + initial feedback within **two business days**. If this does not happen, feel + free to leave a comment. + + Please keep an eye on how this issue will be labeled, as labels give an + overview of priorities, assignments and additional actions requested by the + maintainers: + + - "Priority" labels will show how urgent this is for the team. + - "Status" labels will show if this is ready to be worked on, blocked, or in progress. + - "Need" labels will indicate if additional input or analysis is required. + + Finally, remember to use https://discuss.ipfs.io if you just need general + support. + +# Configuration for new-pr-welcome - https://github.com/behaviorbot/new-pr-welcome +# Comment to be posted to on PRs from first time contributors in your repository +newPRWelcomeComment: > + Thank you for submitting this PR! + + A maintainer will be here shortly to review it. + + We are super grateful, but we are also overloaded! Help us by making sure + that: + + * The context for this PR is clear, with relevant discussion, decisions + and stakeholders linked/mentioned. + + * Your contribution itself is clear (code comments, self-review for the + rest) and in its best form. Follow the [code contribution + guidelines](https://github.com/ipfs/community/blob/master/CONTRIBUTING.md#code-contribution-guidelines) + if they apply. + + Getting other community members to do a review would be great help too on + complex PRs (you can ask in the chats/forums). If you are unsure about + something, just leave us a comment. + + Next steps: + + * A maintainer will triage and assign priority to this PR, commenting on + any missing things and potentially assigning a reviewer for high + priority items. + + * The PR gets reviews, discussed and approvals as needed. + + * The PR is merged by maintainers when it has been approved and comments addressed. + + We currently aim to provide initial feedback/triaging within **two business + days**. Please keep an eye on any labelling actions, as these will indicate + priorities and status of your contribution. + + We are very grateful for your contribution! + + +# Configuration for first-pr-merge - https://github.com/behaviorbot/first-pr-merge +# Comment to be posted to on pull requests merged by a first time user +# Currently disabled +#firstPRMergeComment: "" diff --git a/.gx/lastpubver b/.gx/lastpubver new file mode 100644 index 0000000000000000000000000000000000000000..0670e6452b077b0963e3f3fea072a95182c25ef7 --- /dev/null +++ b/.gx/lastpubver @@ -0,0 +1 @@ +0.1.8: QmXjKkjMDTtXAiLBwstVexofB8LeruZmE2eBd85GwGFFLA diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000000000000000000000000000000000000..a156d3eb5eb29547d593ebf789debd98e2135cf9 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,30 @@ +os: + - linux + +language: go + +go: + - 1.13.x + +env: + global: + - GOTFLAGS="-race" + matrix: + - BUILD_DEPTYPE=gomod + + +# disable travis install +install: + - true + +script: + - bash <(curl -s https://raw.githubusercontent.com/ipfs/ci-helpers/master/travis-ci/run-standard-tests.sh) + + +cache: + directories: + - $GOPATH/pkg/mod + - $HOME/.cache/go-build + +notifications: + email: false diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000000000000000000000000000000000000..51d20a894eae654802fc0d44566c74d7193c2b16 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2020 Protocol Labs + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md index d70a06b602fcc3a2f979cfc47a02828dabbb6fc6..f2cec940380b489c9e9fc54cf2c2506f514ec2c6 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,47 @@ -# go-dms3-blockstore +# go-ipfs-blockstore -dms3 blockstore \ No newline at end of file +[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) +[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/) +[![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme) +[![GoDoc](https://godoc.org/github.com/ipfs/go-ipfs-blockstore?status.svg)](https://godoc.org/github.com/ipfs/go-ipfs-blockstore) +[![Build Status](https://travis-ci.com/ipfs/go-ipfs-blockstore.svg?branch=master)](https://travis-ci.com/ipfs/go-ipfs-blockstore) + +> go-ipfs-blockstore implements a thin wrapper over a datastore, giving a clean interface for Getting and Putting block objects. + +## Lead Maintainer + +[Steven Allen](https://github.com/Stebalien) + + +## Table of Contents + +- [Install](#install) +- [Usage](#usage) +- [Contribute](#contribute) +- [License](#license) + +## Install + +`go-ipfs-blockstore` works like a regular Go module: + +``` +> go get github.com/ipfs/go-ipfs-blockstore +``` + +## Usage + +``` +import "github.com/ipfs/go-ipfs-blockstore" +``` + +Check the [GoDoc documentation](https://godoc.org/github.com/ipfs/go-ipfs-blockstore) + +## Contribute + +PRs accepted. + +Small note: If editing the README, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification. + +## License + +MIT © Protocol Labs, Inc. diff --git a/arc_cache.go b/arc_cache.go new file mode 100644 index 0000000000000000000000000000000000000000..1e497abf978a2f2774463c7d3463ac82eb59400d --- /dev/null +++ b/arc_cache.go @@ -0,0 +1,225 @@ +package blockstore + +import ( + "context" + + lru "github.com/hashicorp/golang-lru" + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" + metrics "github.com/ipfs/go-metrics-interface" +) + +type cacheHave bool +type cacheSize int + +// arccache wraps a BlockStore with an Adaptive Replacement Cache (ARC) that +// does not store the actual blocks, just metadata about them: existence and +// size. This provides block access-time improvements, allowing +// to short-cut many searches without querying the underlying datastore. +type arccache struct { + cache *lru.TwoQueueCache + blockstore Blockstore + viewer Viewer + + hits metrics.Counter + total metrics.Counter +} + +var _ Blockstore = (*arccache)(nil) +var _ Viewer = (*arccache)(nil) + +func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache, error) { + cache, err := lru.New2Q(lruSize) + if err != nil { + return nil, err + } + c := &arccache{cache: cache, blockstore: bs} + c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter() + c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter() + if v, ok := bs.(Viewer); ok { + c.viewer = v + } + return c, nil +} + +func (b *arccache) DeleteBlock(k cid.Cid) error { + if has, _, ok := b.queryCache(k); ok && !has { + return nil + } + + b.cache.Remove(k) // Invalidate cache before deleting. + err := b.blockstore.DeleteBlock(k) + if err == nil { + b.cacheHave(k, false) + } + return err +} + +func (b *arccache) Has(k cid.Cid) (bool, error) { + if has, _, ok := b.queryCache(k); ok { + return has, nil + } + has, err := b.blockstore.Has(k) + if err != nil { + return false, err + } + b.cacheHave(k, has) + return has, nil +} + +func (b *arccache) GetSize(k cid.Cid) (int, error) { + if has, blockSize, ok := b.queryCache(k); ok { + if !has { + // don't have it, return + return -1, ErrNotFound + } + if blockSize >= 0 { + // have it and we know the size + return blockSize, nil + } + // we have it but don't know the size, ask the datastore. + } + blockSize, err := b.blockstore.GetSize(k) + if err == ErrNotFound { + b.cacheHave(k, false) + } else if err == nil { + b.cacheSize(k, blockSize) + } + return blockSize, err +} + +func (b *arccache) View(k cid.Cid, callback func([]byte) error) error { + // shortcircuit and fall back to Get if the underlying store + // doesn't support Viewer. + if b.viewer == nil { + blk, err := b.Get(k) + if err != nil { + return err + } + return callback(blk.RawData()) + } + + if !k.Defined() { + log.Error("undefined cid in arc cache") + return ErrNotFound + } + + if has, _, ok := b.queryCache(k); ok && !has { + // short circuit if the cache deterministically tells us the item + // doesn't exist. + return ErrNotFound + } + + return b.viewer.View(k, callback) +} + +func (b *arccache) Get(k cid.Cid) (blocks.Block, error) { + if !k.Defined() { + log.Error("undefined cid in arc cache") + return nil, ErrNotFound + } + + if has, _, ok := b.queryCache(k); ok && !has { + return nil, ErrNotFound + } + + bl, err := b.blockstore.Get(k) + if bl == nil && err == ErrNotFound { + b.cacheHave(k, false) + } else if bl != nil { + b.cacheSize(k, len(bl.RawData())) + } + return bl, err +} + +func (b *arccache) Put(bl blocks.Block) error { + if has, _, ok := b.queryCache(bl.Cid()); ok && has { + return nil + } + + err := b.blockstore.Put(bl) + if err == nil { + b.cacheSize(bl.Cid(), len(bl.RawData())) + } + return err +} + +func (b *arccache) PutMany(bs []blocks.Block) error { + var good []blocks.Block + for _, block := range bs { + // call put on block if result is inconclusive or we are sure that + // the block isn't in storage + if has, _, ok := b.queryCache(block.Cid()); !ok || (ok && !has) { + good = append(good, block) + } + } + err := b.blockstore.PutMany(good) + if err != nil { + return err + } + for _, block := range good { + b.cacheSize(block.Cid(), len(block.RawData())) + } + return nil +} + +func (b *arccache) HashOnRead(enabled bool) { + b.blockstore.HashOnRead(enabled) +} + +func (b *arccache) cacheHave(c cid.Cid, have bool) { + b.cache.Add(string(c.Hash()), cacheHave(have)) +} + +func (b *arccache) cacheSize(c cid.Cid, blockSize int) { + b.cache.Add(string(c.Hash()), cacheSize(blockSize)) +} + +// queryCache checks if the CID is in the cache. If so, it returns: +// +// * exists (bool): whether the CID is known to exist or not. +// * size (int): the size if cached, or -1 if not cached. +// * ok (bool): whether present in the cache. +// +// When ok is false, the answer in inconclusive and the caller must ignore the +// other two return values. Querying the underying store is necessary. +// +// When ok is true, exists carries the correct answer, and size carries the +// size, if known, or -1 if not. +func (b *arccache) queryCache(k cid.Cid) (exists bool, size int, ok bool) { + b.total.Inc() + if !k.Defined() { + log.Error("undefined cid in arccache") + // Return cache invalid so the call to blockstore happens + // in case of invalid key and correct error is created. + return false, -1, false + } + + h, ok := b.cache.Get(string(k.Hash())) + if ok { + b.hits.Inc() + switch h := h.(type) { + case cacheHave: + return bool(h), -1, true + case cacheSize: + return true, int(h), true + } + } + return false, -1, false +} + +func (b *arccache) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return b.blockstore.AllKeysChan(ctx) +} + +func (b *arccache) GCLock() Unlocker { + return b.blockstore.(GCBlockstore).GCLock() +} + +func (b *arccache) PinLock() Unlocker { + return b.blockstore.(GCBlockstore).PinLock() +} + +func (b *arccache) GCRequested() bool { + return b.blockstore.(GCBlockstore).GCRequested() +} diff --git a/arc_cache_test.go b/arc_cache_test.go new file mode 100644 index 0000000000000000000000000000000000000000..dcd9c6e309d15bb2d6b98d8b8deca19072ca6627 --- /dev/null +++ b/arc_cache_test.go @@ -0,0 +1,261 @@ +package blockstore + +import ( + "context" + "testing" + + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + syncds "github.com/ipfs/go-datastore/sync" +) + +var exampleBlock = blocks.NewBlock([]byte("foo")) + +func testArcCached(ctx context.Context, bs Blockstore) (*arccache, error) { + if ctx == nil { + ctx = context.TODO() + } + opts := DefaultCacheOpts() + opts.HasBloomFilterSize = 0 + opts.HasBloomFilterHashes = 0 + bbs, err := CachedBlockstore(ctx, bs, opts) + if err == nil { + return bbs.(*arccache), nil + } + return nil, err +} + +func createStores(t *testing.T) (*arccache, Blockstore, *callbackDatastore) { + cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} + bs := NewBlockstore(syncds.MutexWrap(cd)) + arc, err := testArcCached(context.TODO(), bs) + if err != nil { + t.Fatal(err) + } + return arc, bs, cd +} + +func trap(message string, cd *callbackDatastore, t *testing.T) { + cd.SetFunc(func() { + t.Fatal(message) + }) +} +func untrap(cd *callbackDatastore) { + cd.SetFunc(func() {}) +} + +func TestRemoveCacheEntryOnDelete(t *testing.T) { + arc, _, cd := createStores(t) + + arc.Put(exampleBlock) + + cd.Lock() + writeHitTheDatastore := false + cd.Unlock() + + cd.SetFunc(func() { + writeHitTheDatastore = true + }) + + arc.DeleteBlock(exampleBlock.Cid()) + arc.Put(exampleBlock) + if !writeHitTheDatastore { + t.Fail() + } +} + +func TestElideDuplicateWrite(t *testing.T) { + arc, _, cd := createStores(t) + + arc.Put(exampleBlock) + trap("write hit datastore", cd, t) + arc.Put(exampleBlock) +} + +func TestHasRequestTriggersCache(t *testing.T) { + arc, _, cd := createStores(t) + + arc.Has(exampleBlock.Cid()) + trap("has hit datastore", cd, t) + if has, err := arc.Has(exampleBlock.Cid()); has || err != nil { + t.Fatal("has was true but there is no such block") + } + + untrap(cd) + err := arc.Put(exampleBlock) + if err != nil { + t.Fatal(err) + } + + trap("has hit datastore", cd, t) + + if has, err := arc.Has(exampleBlock.Cid()); !has || err != nil { + t.Fatal("has returned invalid result") + } +} + +func TestGetFillsCache(t *testing.T) { + arc, _, cd := createStores(t) + + if bl, err := arc.Get(exampleBlock.Cid()); bl != nil || err == nil { + t.Fatal("block was found or there was no error") + } + + trap("has hit datastore", cd, t) + + if has, err := arc.Has(exampleBlock.Cid()); has || err != nil { + t.Fatal("has was true but there is no such block") + } + if _, err := arc.GetSize(exampleBlock.Cid()); err != ErrNotFound { + t.Fatal("getsize was true but there is no such block") + } + + untrap(cd) + + if err := arc.Put(exampleBlock); err != nil { + t.Fatal(err) + } + + trap("has hit datastore", cd, t) + + if has, err := arc.Has(exampleBlock.Cid()); !has || err != nil { + t.Fatal("has returned invalid result") + } + if blockSize, err := arc.GetSize(exampleBlock.Cid()); blockSize == -1 || err != nil { + t.Fatal("getsize returned invalid result", blockSize, err) + } +} + +func TestGetAndDeleteFalseShortCircuit(t *testing.T) { + arc, _, cd := createStores(t) + + arc.Has(exampleBlock.Cid()) + arc.GetSize(exampleBlock.Cid()) + + trap("get hit datastore", cd, t) + + if bl, err := arc.Get(exampleBlock.Cid()); bl != nil || err != ErrNotFound { + t.Fatal("get returned invalid result") + } + + if arc.DeleteBlock(exampleBlock.Cid()) != nil { + t.Fatal("expected deletes to be idempotent") + } +} + +func TestArcCreationFailure(t *testing.T) { + if arc, err := newARCCachedBS(context.TODO(), nil, -1); arc != nil || err == nil { + t.Fatal("expected error and no cache") + } +} + +func TestInvalidKey(t *testing.T) { + arc, _, _ := createStores(t) + + bl, err := arc.Get(cid.Cid{}) + + if bl != nil { + t.Fatal("blocks should be nil") + } + if err == nil { + t.Fatal("expected error") + } +} + +func TestHasAfterSucessfulGetIsCached(t *testing.T) { + arc, bs, cd := createStores(t) + + bs.Put(exampleBlock) + + arc.Get(exampleBlock.Cid()) + + trap("has hit datastore", cd, t) + arc.Has(exampleBlock.Cid()) +} + +func TestGetSizeAfterSucessfulGetIsCached(t *testing.T) { + arc, bs, cd := createStores(t) + + bs.Put(exampleBlock) + + arc.Get(exampleBlock.Cid()) + + trap("has hit datastore", cd, t) + arc.GetSize(exampleBlock.Cid()) +} + +func TestGetSizeAfterSucessfulHas(t *testing.T) { + arc, bs, _ := createStores(t) + + bs.Put(exampleBlock) + has, err := arc.Has(exampleBlock.Cid()) + if err != nil { + t.Fatal(err) + } + if !has { + t.Fatal("expected to have block") + } + + if size, err := arc.GetSize(exampleBlock.Cid()); err != nil { + t.Fatal(err) + } else if size != len(exampleBlock.RawData()) { + t.Fatalf("expected size %d, got %d", len(exampleBlock.RawData()), size) + } +} + +func TestGetSizeMissingZeroSizeBlock(t *testing.T) { + arc, bs, cd := createStores(t) + emptyBlock := blocks.NewBlock([]byte{}) + missingBlock := blocks.NewBlock([]byte("missingBlock")) + + bs.Put(emptyBlock) + + arc.Get(emptyBlock.Cid()) + + trap("has hit datastore", cd, t) + if blockSize, err := arc.GetSize(emptyBlock.Cid()); blockSize != 0 || err != nil { + t.Fatal("getsize returned invalid result") + } + untrap(cd) + + arc.Get(missingBlock.Cid()) + + trap("has hit datastore", cd, t) + if _, err := arc.GetSize(missingBlock.Cid()); err != ErrNotFound { + t.Fatal("getsize returned invalid result") + } +} + +func TestDifferentKeyObjectsWork(t *testing.T) { + arc, bs, cd := createStores(t) + + bs.Put(exampleBlock) + + arc.Get(exampleBlock.Cid()) + + trap("has hit datastore", cd, t) + cidstr := exampleBlock.Cid().String() + + ncid, err := cid.Decode(cidstr) + if err != nil { + t.Fatal(err) + } + + arc.Has(ncid) +} + +func TestPutManyCaches(t *testing.T) { + arc, _, cd := createStores(t) + arc.PutMany([]blocks.Block{exampleBlock}) + + trap("has hit datastore", cd, t) + arc.Has(exampleBlock.Cid()) + arc.GetSize(exampleBlock.Cid()) + untrap(cd) + arc.DeleteBlock(exampleBlock.Cid()) + + arc.Put(exampleBlock) + trap("PunMany has hit datastore", cd, t) + arc.PutMany([]blocks.Block{exampleBlock}) +} diff --git a/blockstore.go b/blockstore.go new file mode 100644 index 0000000000000000000000000000000000000000..6625a3411335cfb9940a2edf337238fdf1ff096c --- /dev/null +++ b/blockstore.go @@ -0,0 +1,296 @@ +// Package blockstore implements a thin wrapper over a datastore, giving a +// clean interface for Getting and Putting block objects. +package blockstore + +import ( + "context" + "errors" + "sync" + "sync/atomic" + + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + dsns "github.com/ipfs/go-datastore/namespace" + dsq "github.com/ipfs/go-datastore/query" + dshelp "github.com/ipfs/go-ipfs-ds-help" + logging "github.com/ipfs/go-log" + uatomic "go.uber.org/atomic" +) + +var log = logging.Logger("blockstore") + +// BlockPrefix namespaces blockstore datastores +var BlockPrefix = ds.NewKey("blocks") + +// ErrHashMismatch is an error returned when the hash of a block +// is different than expected. +var ErrHashMismatch = errors.New("block in storage has different hash than requested") + +// ErrNotFound is an error returned when a block is not found. +var ErrNotFound = errors.New("blockstore: block not found") + +// Blockstore wraps a Datastore block-centered methods and provides a layer +// of abstraction which allows to add different caching strategies. +type Blockstore interface { + DeleteBlock(cid.Cid) error + Has(cid.Cid) (bool, error) + Get(cid.Cid) (blocks.Block, error) + + // GetSize returns the CIDs mapped BlockSize + GetSize(cid.Cid) (int, error) + + // Put puts a given block to the underlying datastore + Put(blocks.Block) error + + // PutMany puts a slice of blocks at the same time using batching + // capabilities of the underlying datastore whenever possible. + PutMany([]blocks.Block) error + + // AllKeysChan returns a channel from which + // the CIDs in the Blockstore can be read. It should respect + // the given context, closing the channel if it becomes Done. + AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) + + // HashOnRead specifies if every read block should be + // rehashed to make sure it matches its CID. + HashOnRead(enabled bool) +} + +// Viewer can be implemented by blockstores that offer zero-copy access to +// values. +// +// Callers of View must not mutate or retain the byte slice, as it could be +// an mmapped memory region, or a pooled byte buffer. +// +// View is especially suitable for deserialising in place. +// +// The callback will only be called iff the query operation is successful (and +// the block is found); otherwise, the error will be propagated. Errors returned +// by the callback will be propagated as well. +type Viewer interface { + View(cid cid.Cid, callback func([]byte) error) error +} + +// GCLocker abstract functionality to lock a blockstore when performing +// garbage-collection operations. +type GCLocker interface { + // GCLock locks the blockstore for garbage collection. No operations + // that expect to finish with a pin should ocurr simultaneously. + // Reading during GC is safe, and requires no lock. + GCLock() Unlocker + + // PinLock locks the blockstore for sequences of puts expected to finish + // with a pin (before GC). Multiple put->pin sequences can write through + // at the same time, but no GC should happen simulatenously. + // Reading during Pinning is safe, and requires no lock. + PinLock() Unlocker + + // GcRequested returns true if GCLock has been called and is waiting to + // take the lock + GCRequested() bool +} + +// GCBlockstore is a blockstore that can safely run garbage-collection +// operations. +type GCBlockstore interface { + Blockstore + GCLocker +} + +// NewGCBlockstore returns a default implementation of GCBlockstore +// using the given Blockstore and GCLocker. +func NewGCBlockstore(bs Blockstore, gcl GCLocker) GCBlockstore { + return gcBlockstore{bs, gcl} +} + +type gcBlockstore struct { + Blockstore + GCLocker +} + +// NewBlockstore returns a default Blockstore implementation +// using the provided datastore.Batching backend. +func NewBlockstore(d ds.Batching) Blockstore { + var dsb ds.Batching + dd := dsns.Wrap(d, BlockPrefix) + dsb = dd + return &blockstore{ + datastore: dsb, + rehash: uatomic.NewBool(false), + } +} + +type blockstore struct { + datastore ds.Batching + + rehash *uatomic.Bool +} + +func (bs *blockstore) HashOnRead(enabled bool) { + bs.rehash.Store(enabled) +} + +func (bs *blockstore) Get(k cid.Cid) (blocks.Block, error) { + if !k.Defined() { + log.Error("undefined cid in blockstore") + return nil, ErrNotFound + } + bdata, err := bs.datastore.Get(dshelp.MultihashToDsKey(k.Hash())) + if err == ds.ErrNotFound { + return nil, ErrNotFound + } + if err != nil { + return nil, err + } + if bs.rehash.Load() { + rbcid, err := k.Prefix().Sum(bdata) + if err != nil { + return nil, err + } + + if !rbcid.Equals(k) { + return nil, ErrHashMismatch + } + + return blocks.NewBlockWithCid(bdata, rbcid) + } + return blocks.NewBlockWithCid(bdata, k) +} + +func (bs *blockstore) Put(block blocks.Block) error { + k := dshelp.MultihashToDsKey(block.Cid().Hash()) + + // Has is cheaper than Put, so see if we already have it + exists, err := bs.datastore.Has(k) + if err == nil && exists { + return nil // already stored. + } + return bs.datastore.Put(k, block.RawData()) +} + +func (bs *blockstore) PutMany(blocks []blocks.Block) error { + t, err := bs.datastore.Batch() + if err != nil { + return err + } + for _, b := range blocks { + k := dshelp.MultihashToDsKey(b.Cid().Hash()) + exists, err := bs.datastore.Has(k) + if err == nil && exists { + continue + } + + err = t.Put(k, b.RawData()) + if err != nil { + return err + } + } + return t.Commit() +} + +func (bs *blockstore) Has(k cid.Cid) (bool, error) { + return bs.datastore.Has(dshelp.MultihashToDsKey(k.Hash())) +} + +func (bs *blockstore) GetSize(k cid.Cid) (int, error) { + size, err := bs.datastore.GetSize(dshelp.MultihashToDsKey(k.Hash())) + if err == ds.ErrNotFound { + return -1, ErrNotFound + } + return size, err +} + +func (bs *blockstore) DeleteBlock(k cid.Cid) error { + return bs.datastore.Delete(dshelp.MultihashToDsKey(k.Hash())) +} + +// AllKeysChan runs a query for keys from the blockstore. +// this is very simplistic, in the future, take dsq.Query as a param? +// +// AllKeysChan respects context. +func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + + // KeysOnly, because that would be _a lot_ of data. + q := dsq.Query{KeysOnly: true} + res, err := bs.datastore.Query(q) + if err != nil { + return nil, err + } + + output := make(chan cid.Cid, dsq.KeysOnlyBufSize) + go func() { + defer func() { + res.Close() // ensure exit (signals early exit, too) + close(output) + }() + + for { + e, ok := res.NextSync() + if !ok { + return + } + if e.Error != nil { + log.Errorf("blockstore.AllKeysChan got err: %s", e.Error) + return + } + + // need to convert to key.Key using key.KeyFromDsKey. + bk, err := dshelp.BinaryFromDsKey(ds.RawKey(e.Key)) + if err != nil { + log.Warningf("error parsing key from binary: %s", err) + continue + } + k := cid.NewCidV1(cid.Raw, bk) + select { + case <-ctx.Done(): + return + case output <- k: + } + } + }() + + return output, nil +} + +// NewGCLocker returns a default implementation of +// GCLocker using standard [RW] mutexes. +func NewGCLocker() GCLocker { + return &gclocker{} +} + +type gclocker struct { + lk sync.RWMutex + gcreq int32 +} + +// Unlocker represents an object which can Unlock +// something. +type Unlocker interface { + Unlock() +} + +type unlocker struct { + unlock func() +} + +func (u *unlocker) Unlock() { + u.unlock() + u.unlock = nil // ensure its not called twice +} + +func (bs *gclocker) GCLock() Unlocker { + atomic.AddInt32(&bs.gcreq, 1) + bs.lk.Lock() + atomic.AddInt32(&bs.gcreq, -1) + return &unlocker{bs.lk.Unlock} +} + +func (bs *gclocker) PinLock() Unlocker { + bs.lk.RLock() + return &unlocker{bs.lk.RUnlock} +} + +func (bs *gclocker) GCRequested() bool { + return atomic.LoadInt32(&bs.gcreq) > 0 +} diff --git a/blockstore_test.go b/blockstore_test.go new file mode 100644 index 0000000000000000000000000000000000000000..28f98e14a3dff246fe7755a111fa43ba75fe8df1 --- /dev/null +++ b/blockstore_test.go @@ -0,0 +1,332 @@ +package blockstore + +import ( + "bytes" + "context" + "fmt" + "testing" + + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" + ds_sync "github.com/ipfs/go-datastore/sync" + u "github.com/ipfs/go-ipfs-util" +) + +func TestGetWhenKeyNotPresent(t *testing.T) { + bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + c := cid.NewCidV0(u.Hash([]byte("stuff"))) + bl, err := bs.Get(c) + + if bl != nil { + t.Error("nil block expected") + } + if err == nil { + t.Error("error expected, got nil") + } +} + +func TestGetWhenKeyIsNil(t *testing.T) { + bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + _, err := bs.Get(cid.Cid{}) + if err != ErrNotFound { + t.Fail() + } +} + +func TestPutThenGetBlock(t *testing.T) { + bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + block := blocks.NewBlock([]byte("some data")) + + err := bs.Put(block) + if err != nil { + t.Fatal(err) + } + + blockFromBlockstore, err := bs.Get(block.Cid()) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(block.RawData(), blockFromBlockstore.RawData()) { + t.Fail() + } +} + +func TestCidv0v1(t *testing.T) { + bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + block := blocks.NewBlock([]byte("some data")) + + err := bs.Put(block) + if err != nil { + t.Fatal(err) + } + + blockFromBlockstore, err := bs.Get(cid.NewCidV1(cid.DagProtobuf, block.Cid().Hash())) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(block.RawData(), blockFromBlockstore.RawData()) { + t.Fail() + } +} + +func TestPutThenGetSizeBlock(t *testing.T) { + bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + block := blocks.NewBlock([]byte("some data")) + missingBlock := blocks.NewBlock([]byte("missingBlock")) + emptyBlock := blocks.NewBlock([]byte{}) + + err := bs.Put(block) + if err != nil { + t.Fatal(err) + } + + blockSize, err := bs.GetSize(block.Cid()) + if err != nil { + t.Fatal(err) + } + if len(block.RawData()) != blockSize { + t.Fail() + } + + err = bs.Put(emptyBlock) + if err != nil { + t.Fatal(err) + } + + if blockSize, err := bs.GetSize(emptyBlock.Cid()); blockSize != 0 || err != nil { + t.Fatal(err) + } + + if blockSize, err := bs.GetSize(missingBlock.Cid()); blockSize != -1 || err == nil { + t.Fatal("getsize returned invalid result") + } +} + +type countHasDS struct { + ds.Datastore + hasCount int +} + +func (ds *countHasDS) Has(key ds.Key) (exists bool, err error) { + ds.hasCount += 1 + return ds.Datastore.Has(key) +} + +func TestPutUsesHas(t *testing.T) { + // Some datastores rely on the implementation detail that Put checks Has + // first, to avoid overriding existing objects' metadata. This test ensures + // that Blockstore continues to behave this way. + // Please ping https://github.com/ipfs/go-ipfs-blockstore/pull/47 if this + // behavior is being removed. + ds := &countHasDS{ + Datastore: ds.NewMapDatastore(), + } + bs := NewBlockstore(ds_sync.MutexWrap(ds)) + bl := blocks.NewBlock([]byte("some data")) + if err := bs.Put(bl); err != nil { + t.Fatal(err) + } + if err := bs.Put(bl); err != nil { + t.Fatal(err) + } + if ds.hasCount != 2 { + t.Errorf("Blockstore did not call Has before attempting Put, this breaks compatibility") + } +} + +func TestHashOnRead(t *testing.T) { + orginalDebug := u.Debug + defer (func() { + u.Debug = orginalDebug + })() + u.Debug = false + + bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + bl := blocks.NewBlock([]byte("some data")) + blBad, err := blocks.NewBlockWithCid([]byte("some other data"), bl.Cid()) + if err != nil { + t.Fatal("debug is off, still got an error") + } + bl2 := blocks.NewBlock([]byte("some other data")) + bs.Put(blBad) + bs.Put(bl2) + bs.HashOnRead(true) + + if _, err := bs.Get(bl.Cid()); err != ErrHashMismatch { + t.Fatalf("expected '%v' got '%v'\n", ErrHashMismatch, err) + } + + if b, err := bs.Get(bl2.Cid()); err != nil || b.String() != bl2.String() { + t.Fatal("got wrong blocks") + } +} + +func newBlockStoreWithKeys(t *testing.T, d ds.Datastore, N int) (Blockstore, []cid.Cid) { + if d == nil { + d = ds.NewMapDatastore() + } + bs := NewBlockstore(ds_sync.MutexWrap(d)) + + keys := make([]cid.Cid, N) + for i := 0; i < N; i++ { + block := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i))) + err := bs.Put(block) + if err != nil { + t.Fatal(err) + } + keys[i] = block.Cid() + } + return bs, keys +} + +func collect(ch <-chan cid.Cid) []cid.Cid { + var keys []cid.Cid + for k := range ch { + keys = append(keys, k) + } + return keys +} + +func TestAllKeysSimple(t *testing.T) { + bs, keys := newBlockStoreWithKeys(t, nil, 100) + + ctx := context.Background() + ch, err := bs.AllKeysChan(ctx) + if err != nil { + t.Fatal(err) + } + keys2 := collect(ch) + + // for _, k2 := range keys2 { + // t.Log("found ", k2.B58String()) + // } + + expectMatches(t, keys, keys2) +} + +func TestAllKeysRespectsContext(t *testing.T) { + N := 100 + + d := &queryTestDS{ds: ds.NewMapDatastore()} + bs, _ := newBlockStoreWithKeys(t, d, N) + + started := make(chan struct{}, 1) + done := make(chan struct{}, 1) + errors := make(chan error, 100) + + getKeys := func(ctx context.Context) { + started <- struct{}{} + ch, err := bs.AllKeysChan(ctx) // once without cancelling + if err != nil { + errors <- err + } + _ = collect(ch) + done <- struct{}{} + errors <- nil // a nil one to signal break + } + + var results dsq.Results + var resultsmu = make(chan struct{}) + resultChan := make(chan dsq.Result) + d.SetFunc(func(q dsq.Query) (dsq.Results, error) { + results = dsq.ResultsWithChan(q, resultChan) + resultsmu <- struct{}{} + return results, nil + }) + + go getKeys(context.Background()) + + // make sure it's waiting. + <-started + <-resultsmu + select { + case <-done: + t.Fatal("sync is wrong") + case <-results.Process().Closing(): + t.Fatal("should not be closing") + case <-results.Process().Closed(): + t.Fatal("should not be closed") + default: + } + + e := dsq.Entry{Key: BlockPrefix.ChildString("foo").String()} + resultChan <- dsq.Result{Entry: e} // let it go. + close(resultChan) + <-done // should be done now. + <-results.Process().Closed() // should be closed now + + // print any errors + for err := range errors { + if err == nil { + break + } + t.Error(err) + } + +} + +func expectMatches(t *testing.T, expect, actual []cid.Cid) { + t.Helper() + + if len(expect) != len(actual) { + t.Errorf("expect and actual differ: %d != %d", len(expect), len(actual)) + } + + actualSet := make(map[string]bool, len(actual)) + for _, k := range actual { + actualSet[string(k.Hash())] = true + } + + for _, ek := range expect { + if !actualSet[string(ek.Hash())] { + t.Error("expected key not found: ", ek) + } + } +} + +type queryTestDS struct { + cb func(q dsq.Query) (dsq.Results, error) + ds ds.Datastore +} + +func (c *queryTestDS) SetFunc(f func(dsq.Query) (dsq.Results, error)) { c.cb = f } + +func (c *queryTestDS) Put(key ds.Key, value []byte) (err error) { + return c.ds.Put(key, value) +} + +func (c *queryTestDS) Get(key ds.Key) (value []byte, err error) { + return c.ds.Get(key) +} + +func (c *queryTestDS) Has(key ds.Key) (exists bool, err error) { + return c.ds.Has(key) +} + +func (c *queryTestDS) GetSize(key ds.Key) (size int, err error) { + return c.ds.GetSize(key) +} + +func (c *queryTestDS) Delete(key ds.Key) (err error) { + return c.ds.Delete(key) +} + +func (c *queryTestDS) Query(q dsq.Query) (dsq.Results, error) { + if c.cb != nil { + return c.cb(q) + } + return c.ds.Query(q) +} + +func (c *queryTestDS) Sync(key ds.Key) error { + return c.ds.Sync(key) +} + +func (c *queryTestDS) Batch() (ds.Batch, error) { + return ds.NewBasicBatch(c), nil +} +func (c *queryTestDS) Close() error { + return nil +} diff --git a/bloom_cache.go b/bloom_cache.go new file mode 100644 index 0000000000000000000000000000000000000000..da302c97d1fd23a2a7e5118c7aad0e3dfa5ff850 --- /dev/null +++ b/bloom_cache.go @@ -0,0 +1,226 @@ +package blockstore + +import ( + "context" + "fmt" + "sync/atomic" + "time" + + bloom "github.com/ipfs/bbloom" + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" + metrics "github.com/ipfs/go-metrics-interface" +) + +// bloomCached returns a Blockstore that caches Has requests using a Bloom +// filter. bloomSize is size of bloom filter in bytes. hashCount specifies the +// number of hashing functions in the bloom filter (usually known as k). +func bloomCached(ctx context.Context, bs Blockstore, bloomSize, hashCount int) (*bloomcache, error) { + bl, err := bloom.New(float64(bloomSize), float64(hashCount)) + if err != nil { + return nil, err + } + bc := &bloomcache{ + blockstore: bs, + bloom: bl, + hits: metrics.NewCtx(ctx, "bloom.hits_total", + "Number of cache hits in bloom cache").Counter(), + total: metrics.NewCtx(ctx, "bloom_total", + "Total number of requests to bloom cache").Counter(), + buildChan: make(chan struct{}), + } + if v, ok := bs.(Viewer); ok { + bc.viewer = v + } + go func() { + err := bc.build(ctx) + if err != nil { + select { + case <-ctx.Done(): + log.Warning("Cache rebuild closed by context finishing: ", err) + default: + log.Error(err) + } + return + } + if metrics.Active() { + fill := metrics.NewCtx(ctx, "bloom_fill_ratio", + "Ratio of bloom filter fullnes, (updated once a minute)").Gauge() + + t := time.NewTicker(1 * time.Minute) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + fill.Set(bc.bloom.FillRatioTS()) + } + } + } + }() + return bc, nil +} + +type bloomcache struct { + active int32 + + bloom *bloom.Bloom + buildErr error + + buildChan chan struct{} + blockstore Blockstore + viewer Viewer + + // Statistics + hits metrics.Counter + total metrics.Counter +} + +var _ Blockstore = (*bloomcache)(nil) +var _ Viewer = (*bloomcache)(nil) + +func (b *bloomcache) BloomActive() bool { + return atomic.LoadInt32(&b.active) != 0 +} + +func (b *bloomcache) Wait(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-b.buildChan: + return b.buildErr + } +} + +func (b *bloomcache) build(ctx context.Context) error { + evt := log.EventBegin(ctx, "bloomcache.build") + defer evt.Done() + defer close(b.buildChan) + + ch, err := b.blockstore.AllKeysChan(ctx) + if err != nil { + b.buildErr = fmt.Errorf("AllKeysChan failed in bloomcache rebuild with: %v", err) + return b.buildErr + } + for { + select { + case key, ok := <-ch: + if !ok { + atomic.StoreInt32(&b.active, 1) + return nil + } + b.bloom.AddTS(key.Hash()) // Use binary key, the more compact the better + case <-ctx.Done(): + b.buildErr = ctx.Err() + return b.buildErr + } + } +} + +func (b *bloomcache) DeleteBlock(k cid.Cid) error { + if has, ok := b.hasCached(k); ok && !has { + return nil + } + + return b.blockstore.DeleteBlock(k) +} + +// if ok == false has is inconclusive +// if ok == true then has respons to question: is it contained +func (b *bloomcache) hasCached(k cid.Cid) (has bool, ok bool) { + b.total.Inc() + if !k.Defined() { + log.Error("undefined in bloom cache") + // Return cache invalid so call to blockstore + // in case of invalid key is forwarded deeper + return false, false + } + if b.BloomActive() { + blr := b.bloom.HasTS(k.Hash()) + if !blr { // not contained in bloom is only conclusive answer bloom gives + b.hits.Inc() + return false, true + } + } + return false, false +} + +func (b *bloomcache) Has(k cid.Cid) (bool, error) { + if has, ok := b.hasCached(k); ok { + return has, nil + } + + return b.blockstore.Has(k) +} + +func (b *bloomcache) GetSize(k cid.Cid) (int, error) { + return b.blockstore.GetSize(k) +} + +func (b *bloomcache) View(k cid.Cid, callback func([]byte) error) error { + if b.viewer == nil { + blk, err := b.Get(k) + if err != nil { + return err + } + return callback(blk.RawData()) + } + + if has, ok := b.hasCached(k); ok && !has { + return ErrNotFound + } + return b.viewer.View(k, callback) +} + +func (b *bloomcache) Get(k cid.Cid) (blocks.Block, error) { + if has, ok := b.hasCached(k); ok && !has { + return nil, ErrNotFound + } + + return b.blockstore.Get(k) +} + +func (b *bloomcache) Put(bl blocks.Block) error { + // See comment in PutMany + err := b.blockstore.Put(bl) + if err == nil { + b.bloom.AddTS(bl.Cid().Hash()) + } + return err +} + +func (b *bloomcache) PutMany(bs []blocks.Block) error { + // bloom cache gives only conclusive resulty if key is not contained + // to reduce number of puts we need conclusive information if block is contained + // this means that PutMany can't be improved with bloom cache so we just + // just do a passthrough. + err := b.blockstore.PutMany(bs) + if err != nil { + return err + } + for _, bl := range bs { + b.bloom.AddTS(bl.Cid().Hash()) + } + return nil +} + +func (b *bloomcache) HashOnRead(enabled bool) { + b.blockstore.HashOnRead(enabled) +} + +func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return b.blockstore.AllKeysChan(ctx) +} + +func (b *bloomcache) GCLock() Unlocker { + return b.blockstore.(GCBlockstore).GCLock() +} + +func (b *bloomcache) PinLock() Unlocker { + return b.blockstore.(GCBlockstore).PinLock() +} + +func (b *bloomcache) GCRequested() bool { + return b.blockstore.(GCBlockstore).GCRequested() +} diff --git a/bloom_cache_test.go b/bloom_cache_test.go new file mode 100644 index 0000000000000000000000000000000000000000..3b290a0c2c737ed99e68eb98b6657dac8f6d38a0 --- /dev/null +++ b/bloom_cache_test.go @@ -0,0 +1,212 @@ +package blockstore + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + blocks "github.com/ipfs/go-block-format" + ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" + syncds "github.com/ipfs/go-datastore/sync" +) + +func testBloomCached(ctx context.Context, bs Blockstore) (*bloomcache, error) { + if ctx == nil { + ctx = context.Background() + } + opts := DefaultCacheOpts() + opts.HasARCCacheSize = 0 + bbs, err := CachedBlockstore(ctx, bs, opts) + if err == nil { + return bbs.(*bloomcache), nil + } + return nil, err +} + +func TestPutManyAddsToBloom(t *testing.T) { + bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore())) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + cachedbs, err := testBloomCached(ctx, bs) + if err != nil { + t.Fatal(err) + } + + if err := cachedbs.Wait(ctx); err != nil { + t.Fatalf("Failed while waiting for the filter to build: %d", cachedbs.bloom.ElementsAdded()) + } + + block1 := blocks.NewBlock([]byte("foo")) + block2 := blocks.NewBlock([]byte("bar")) + emptyBlock := blocks.NewBlock([]byte{}) + + cachedbs.PutMany([]blocks.Block{block1, emptyBlock}) + has, err := cachedbs.Has(block1.Cid()) + if err != nil { + t.Fatal(err) + } + blockSize, err := cachedbs.GetSize(block1.Cid()) + if err != nil { + t.Fatal(err) + } + if blockSize == -1 || !has { + t.Fatal("added block is reported missing") + } + + has, err = cachedbs.Has(block2.Cid()) + if err != nil { + t.Fatal(err) + } + blockSize, err = cachedbs.GetSize(block2.Cid()) + if err != nil && err != ErrNotFound { + t.Fatal(err) + } + if blockSize > -1 || has { + t.Fatal("not added block is reported to be in blockstore") + } + + has, err = cachedbs.Has(emptyBlock.Cid()) + if err != nil { + t.Fatal(err) + } + blockSize, err = cachedbs.GetSize(emptyBlock.Cid()) + if err != nil { + t.Fatal(err) + } + if blockSize != 0 || !has { + t.Fatal("added block is reported missing") + } +} + +func TestReturnsErrorWhenSizeNegative(t *testing.T) { + bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore())) + _, err := bloomCached(context.Background(), bs, -1, 1) + if err == nil { + t.Fail() + } +} +func TestHasIsBloomCached(t *testing.T) { + cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} + bs := NewBlockstore(syncds.MutexWrap(cd)) + + for i := 0; i < 1000; i++ { + bs.Put(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i)))) + } + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + cachedbs, err := testBloomCached(ctx, bs) + if err != nil { + t.Fatal(err) + } + + if err := cachedbs.Wait(ctx); err != nil { + t.Fatalf("Failed while waiting for the filter to build: %d", cachedbs.bloom.ElementsAdded()) + } + + cacheFails := 0 + cd.SetFunc(func() { + cacheFails++ + }) + + for i := 0; i < 1000; i++ { + cachedbs.Has(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i+2000))).Cid()) + } + + if float64(cacheFails)/float64(1000) > float64(0.05) { + t.Fatalf("Bloom filter has cache miss rate of more than 5%%") + } + + cacheFails = 0 + block := blocks.NewBlock([]byte("newBlock")) + + cachedbs.PutMany([]blocks.Block{block}) + if cacheFails != 2 { + t.Fatalf("expected two datastore hits: %d", cacheFails) + } + cachedbs.Put(block) + if cacheFails != 3 { + t.Fatalf("expected datastore hit: %d", cacheFails) + } + + if has, err := cachedbs.Has(block.Cid()); !has || err != nil { + t.Fatal("has gave wrong response") + } + + bl, err := cachedbs.Get(block.Cid()) + if bl.String() != block.String() { + t.Fatal("block data doesn't match") + } + + if err != nil { + t.Fatal("there should't be an error") + } +} + +var _ ds.Batching = (*callbackDatastore)(nil) + +type callbackDatastore struct { + sync.Mutex + f func() + ds ds.Datastore +} + +func (c *callbackDatastore) SetFunc(f func()) { + c.Lock() + defer c.Unlock() + c.f = f +} + +func (c *callbackDatastore) CallF() { + c.Lock() + defer c.Unlock() + c.f() +} + +func (c *callbackDatastore) Put(key ds.Key, value []byte) (err error) { + c.CallF() + return c.ds.Put(key, value) +} + +func (c *callbackDatastore) Get(key ds.Key) (value []byte, err error) { + c.CallF() + return c.ds.Get(key) +} + +func (c *callbackDatastore) Has(key ds.Key) (exists bool, err error) { + c.CallF() + return c.ds.Has(key) +} + +func (c *callbackDatastore) GetSize(key ds.Key) (size int, err error) { + c.CallF() + return c.ds.GetSize(key) +} + +func (c *callbackDatastore) Close() error { + return nil +} + +func (c *callbackDatastore) Delete(key ds.Key) (err error) { + c.CallF() + return c.ds.Delete(key) +} + +func (c *callbackDatastore) Query(q dsq.Query) (dsq.Results, error) { + c.CallF() + return c.ds.Query(q) +} + +func (c *callbackDatastore) Sync(key ds.Key) error { + c.CallF() + return c.ds.Sync(key) +} + +func (c *callbackDatastore) Batch() (ds.Batch, error) { + return ds.NewBasicBatch(c), nil +} diff --git a/caching.go b/caching.go new file mode 100644 index 0000000000000000000000000000000000000000..798b84ce2bb552e4aef0d520f659fb7c870120ed --- /dev/null +++ b/caching.go @@ -0,0 +1,55 @@ +package blockstore + +import ( + "context" + "errors" + + metrics "github.com/ipfs/go-metrics-interface" +) + +// CacheOpts wraps options for CachedBlockStore(). +// Next to each option is it aproximate memory usage per unit +type CacheOpts struct { + HasBloomFilterSize int // 1 byte + HasBloomFilterHashes int // No size, 7 is usually best, consult bloom papers + HasARCCacheSize int // 32 bytes +} + +// DefaultCacheOpts returns a CacheOpts initialized with default values. +func DefaultCacheOpts() CacheOpts { + return CacheOpts{ + HasBloomFilterSize: 512 << 10, + HasBloomFilterHashes: 7, + HasARCCacheSize: 64 << 10, + } +} + +// CachedBlockstore returns a blockstore wrapped in an ARCCache and +// then in a bloom filter cache, if the options indicate it. +func CachedBlockstore( + ctx context.Context, + bs Blockstore, + opts CacheOpts) (cbs Blockstore, err error) { + cbs = bs + + if opts.HasBloomFilterSize < 0 || opts.HasBloomFilterHashes < 0 || + opts.HasARCCacheSize < 0 { + return nil, errors.New("all options for cache need to be greater than zero") + } + + if opts.HasBloomFilterSize != 0 && opts.HasBloomFilterHashes == 0 { + return nil, errors.New("bloom filter hash count can't be 0 when there is size set") + } + + ctx = metrics.CtxSubScope(ctx, "bs.cache") + + if opts.HasARCCacheSize > 0 { + cbs, err = newARCCachedBS(ctx, cbs, opts.HasARCCacheSize) + } + if opts.HasBloomFilterSize != 0 { + // *8 because of bytes to bits conversion + cbs, err = bloomCached(ctx, cbs, opts.HasBloomFilterSize*8, opts.HasBloomFilterHashes) + } + + return cbs, err +} diff --git a/caching_test.go b/caching_test.go new file mode 100644 index 0000000000000000000000000000000000000000..16066ad18c924bb7b0543d6af986e4d84eac1e8f --- /dev/null +++ b/caching_test.go @@ -0,0 +1,38 @@ +package blockstore + +import ( + "context" + "testing" +) + +func TestCachingOptsLessThanZero(t *testing.T) { + opts := DefaultCacheOpts() + opts.HasARCCacheSize = -1 + + if _, err := CachedBlockstore(context.TODO(), nil, opts); err == nil { + t.Error("wrong ARC setting was not detected") + } + + opts = DefaultCacheOpts() + opts.HasBloomFilterSize = -1 + + if _, err := CachedBlockstore(context.TODO(), nil, opts); err == nil { + t.Error("negative bloom size was not detected") + } + + opts = DefaultCacheOpts() + opts.HasBloomFilterHashes = -1 + + if _, err := CachedBlockstore(context.TODO(), nil, opts); err == nil { + t.Error("negative hashes setting was not detected") + } +} + +func TestBloomHashesAtZero(t *testing.T) { + opts := DefaultCacheOpts() + opts.HasBloomFilterHashes = 0 + + if _, err := CachedBlockstore(context.TODO(), nil, opts); err == nil { + t.Error("zero hashes setting with positive size was not detected") + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000000000000000000000000000000000000..64675b4a07c0fe90a21ec02d56c11542e55f17bd --- /dev/null +++ b/go.mod @@ -0,0 +1,17 @@ +module github.com/ipfs/go-ipfs-blockstore + +require ( + github.com/hashicorp/golang-lru v0.5.4 + github.com/ipfs/bbloom v0.0.4 + github.com/ipfs/go-block-format v0.0.3 + github.com/ipfs/go-cid v0.0.7 + github.com/ipfs/go-datastore v0.4.2 + github.com/ipfs/go-ipfs-ds-help v1.0.0 + github.com/ipfs/go-ipfs-util v0.0.2 + github.com/ipfs/go-log v0.0.1 + github.com/ipfs/go-metrics-interface v0.0.1 + github.com/multiformats/go-multihash v0.0.14 + go.uber.org/atomic v1.6.0 +) + +go 1.13 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000000000000000000000000000000000000..cc30010d99afdf4cafbf87659466a3f1df7b47c5 --- /dev/null +++ b/go.sum @@ -0,0 +1,105 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= +github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= +github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= +github.com/ipfs/go-block-format v0.0.3 h1:r8t66QstRp/pd/or4dpnbVfXT5Gt7lOqRvC+/dDTpMc= +github.com/ipfs/go-block-format v0.0.3/go.mod h1:4LmD4ZUw0mhO+JSKdpWwrzATiEfM7WWgQ8H5l6P8MVk= +github.com/ipfs/go-cid v0.0.5 h1:o0Ix8e/ql7Zb5UVUJEUfjsWCIY8t48++9lR8qi6oiJU= +github.com/ipfs/go-cid v0.0.5/go.mod h1:plgt+Y5MnOey4vO4UlUazGqdbEXuFYitED67FexhXog= +github.com/ipfs/go-cid v0.0.7 h1:ysQJVJA3fNDF1qigJbsSQOdjhVLsOEoPdh0+R97k3jY= +github.com/ipfs/go-cid v0.0.7/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I= +github.com/ipfs/go-datastore v0.4.1 h1:W4ZfzyhNi3xmuU5dQhjfuRn/wFuqEE1KnOmmQiOevEY= +github.com/ipfs/go-datastore v0.4.1/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA= +github.com/ipfs/go-datastore v0.4.2 h1:h8/n7WPzhp239kkLws+epN3Ic7YtcBPgcaXfEfdVDWM= +github.com/ipfs/go-datastore v0.4.2/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA= +github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= +github.com/ipfs/go-ipfs-ds-help v1.0.0 h1:bEQ8hMGs80h0sR8O4tfDgV6B01aaF9qeTrujrTLYV3g= +github.com/ipfs/go-ipfs-ds-help v1.0.0/go.mod h1:ujAbkeIgkKAWtxxNkoZHWLCyk5JpPoKnGyCcsoF6ueE= +github.com/ipfs/go-ipfs-util v0.0.2 h1:59Sswnk1MFaiq+VcaknX7aYEyGyGDAA73ilhEK2POp8= +github.com/ipfs/go-ipfs-util v0.0.2/go.mod h1:CbPtkWJzjLdEcezDns2XYaehFVNXG9zrdrtMecczcsQ= +github.com/ipfs/go-log v0.0.1 h1:9XTUN/rW64BCG1YhPK9Hoy3q8nr4gOmHHBpgFdfw6Lc= +github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM= +github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg= +github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= +github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8 h1:bspPhN+oKYFk5fcGNuQzp6IGzYQSenLEgH3s6jkXrWw= +github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY= +github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/mattn/go-colorable v0.1.1 h1:G1f5SKeVxmagw/IyvzvtZE4Gybcc4Tr1tf7I8z0XgOg= +github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= +github.com/mattn/go-isatty v0.0.5 h1:tHXDdz1cpzGaovsTB+TVB8q90WEokoVmfMqoVcrLUgw= +github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= +github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= +github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771 h1:MHkK1uRtFbVqvAgvWxafZe54+5uBxLluGylDiKgdhwo= +github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= +github.com/mr-tron/base58 v1.1.0 h1:Y51FGVJ91WBqCEabAi5OPUz38eAx8DakuAm5svLcsfQ= +github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= +github.com/mr-tron/base58 v1.1.3 h1:v+sk57XuaCKGXpWtVBX8YJzO7hMGx4Aajh4TQbdEFdc= +github.com/mr-tron/base58 v1.1.3/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= +github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI= +github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA= +github.com/multiformats/go-base36 v0.1.0 h1:JR6TyF7JjGd3m6FbLU2cOxhC0Li8z8dLNGQ89tUg4F4= +github.com/multiformats/go-base36 v0.1.0/go.mod h1:kFGE83c6s80PklsHO9sRn2NCoffoRdUUOENyW/Vv6sM= +github.com/multiformats/go-multibase v0.0.1 h1:PN9/v21eLywrFWdFNsFKaU04kLJzuYzmrJR+ubhT9qA= +github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs= +github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk= +github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc= +github.com/multiformats/go-multihash v0.0.13 h1:06x+mk/zj1FoMsgNejLpy6QTvJqlSt/BhLEy87zidlc= +github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= +github.com/multiformats/go-multihash v0.0.14 h1:QoBceQYQQtNUuf6s7wHxnE2c8bhbMqhfGzNI032se/I= +github.com/multiformats/go-multihash v0.0.14/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= +github.com/multiformats/go-varint v0.0.5 h1:XVZwSo04Cs3j/jS0uAEPpT3JY6DzMcVLLoWOSnCxOjg= +github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= +github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= +github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc h1:9lDbC6Rz4bwmou+oE6Dt4Cb2BGMur5eR/GYptkKUVHo= +github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= +go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8 h1:1wopBVtVdWnn03fZelqdXTqk7U7zPQCb+T4rbU9ZEoU= +golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20190227160552-c95aed5357e7 h1:C2F/nMkR/9sfUTpvR3QrjBuTdvMUC/cFajkphs1YLQo= +golang.org/x/net v0.0.0-20190227160552-c95aed5357e7/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 h1:DH4skfRX4EBpamg7iV4ZlCpblAHI6s6TDM39bFZumv8= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/idstore.go b/idstore.go new file mode 100644 index 0000000000000000000000000000000000000000..1166e5bda11c34e7730858a674f187ddf4a3197e --- /dev/null +++ b/idstore.go @@ -0,0 +1,123 @@ +package blockstore + +import ( + "context" + "io" + + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" + mh "github.com/multiformats/go-multihash" +) + +// idstore wraps a BlockStore to add support for identity hashes +type idstore struct { + bs Blockstore + viewer Viewer +} + +var _ Blockstore = (*idstore)(nil) +var _ Viewer = (*idstore)(nil) +var _ io.Closer = (*idstore)(nil) + +func NewIdStore(bs Blockstore) Blockstore { + ids := &idstore{bs: bs} + if v, ok := bs.(Viewer); ok { + ids.viewer = v + } + return ids +} + +func extractContents(k cid.Cid) (bool, []byte) { + // Pre-check by calling Prefix(), this much faster than extracting the hash. + if k.Prefix().MhType != mh.IDENTITY { + return false, nil + } + + dmh, err := mh.Decode(k.Hash()) + if err != nil || dmh.Code != mh.ID { + return false, nil + } + return true, dmh.Digest +} + +func (b *idstore) DeleteBlock(k cid.Cid) error { + isId, _ := extractContents(k) + if isId { + return nil + } + return b.bs.DeleteBlock(k) +} + +func (b *idstore) Has(k cid.Cid) (bool, error) { + isId, _ := extractContents(k) + if isId { + return true, nil + } + return b.bs.Has(k) +} + +func (b *idstore) View(k cid.Cid, callback func([]byte) error) error { + if b.viewer == nil { + blk, err := b.Get(k) + if err != nil { + return err + } + return callback(blk.RawData()) + } + isId, bdata := extractContents(k) + if isId { + return callback(bdata) + } + return b.viewer.View(k, callback) +} + +func (b *idstore) GetSize(k cid.Cid) (int, error) { + isId, bdata := extractContents(k) + if isId { + return len(bdata), nil + } + return b.bs.GetSize(k) +} + +func (b *idstore) Get(k cid.Cid) (blocks.Block, error) { + isId, bdata := extractContents(k) + if isId { + return blocks.NewBlockWithCid(bdata, k) + } + return b.bs.Get(k) +} + +func (b *idstore) Put(bl blocks.Block) error { + isId, _ := extractContents(bl.Cid()) + if isId { + return nil + } + return b.bs.Put(bl) +} + +func (b *idstore) PutMany(bs []blocks.Block) error { + toPut := make([]blocks.Block, 0, len(bs)) + for _, bl := range bs { + isId, _ := extractContents(bl.Cid()) + if isId { + continue + } + toPut = append(toPut, bl) + } + return b.bs.PutMany(toPut) +} + +func (b *idstore) HashOnRead(enabled bool) { + b.bs.HashOnRead(enabled) +} + +func (b *idstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return b.bs.AllKeysChan(ctx) +} + +func (b *idstore) Close() error { + if c, ok := b.bs.(io.Closer); ok { + return c.Close() + } + return nil +} diff --git a/idstore_test.go b/idstore_test.go new file mode 100644 index 0000000000000000000000000000000000000000..321d5ec7799c0d111be99c168345f7ed610c3154 --- /dev/null +++ b/idstore_test.go @@ -0,0 +1,159 @@ +package blockstore + +import ( + "context" + "testing" + + blk "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + mh "github.com/multiformats/go-multihash" +) + +func createTestStores() (Blockstore, *callbackDatastore) { + cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} + ids := NewIdStore(NewBlockstore(cd)) + return ids, cd +} + +func TestIdStore(t *testing.T) { + idhash1, _ := cid.NewPrefixV1(cid.Raw, mh.ID).Sum([]byte("idhash1")) + idblock1, _ := blk.NewBlockWithCid([]byte("idhash1"), idhash1) + hash1, _ := cid.NewPrefixV1(cid.Raw, mh.SHA2_256).Sum([]byte("hash1")) + block1, _ := blk.NewBlockWithCid([]byte("hash1"), hash1) + emptyHash, _ := cid.NewPrefixV1(cid.Raw, mh.SHA2_256).Sum([]byte("emptyHash")) + emptyBlock, _ := blk.NewBlockWithCid([]byte{}, emptyHash) + + ids, cb := createTestStores() + + have, _ := ids.Has(idhash1) + if !have { + t.Fatal("Has() failed on idhash") + } + + _, err := ids.Get(idhash1) + if err != nil { + t.Fatalf("Get() failed on idhash: %v", err) + } + + noop := func() {} + failIfPassThough := func() { + t.Fatal("operation on identity hash passed though to datastore") + } + + cb.f = failIfPassThough + err = ids.Put(idblock1) + if err != nil { + t.Fatal(err) + } + + cb.f = noop + err = ids.Put(block1) + if err != nil { + t.Fatalf("Put() failed on normal block: %v", err) + } + + have, _ = ids.Has(hash1) + if !have { + t.Fatal("normal block not added to datastore") + } + + blockSize, _ := ids.GetSize(hash1) + if blockSize == -1 { + t.Fatal("normal block not added to datastore") + } + + _, err = ids.Get(hash1) + if err != nil { + t.Fatal(err) + } + + err = ids.Put(emptyBlock) + if err != nil { + t.Fatalf("Put() failed on normal block: %v", err) + } + + have, _ = ids.Has(emptyHash) + if !have { + t.Fatal("normal block not added to datastore") + } + + blockSize, _ = ids.GetSize(emptyHash) + if blockSize != 0 { + t.Fatal("normal block not added to datastore") + } + + cb.f = failIfPassThough + err = ids.DeleteBlock(idhash1) + if err != nil { + t.Fatal(err) + } + + cb.f = noop + err = ids.DeleteBlock(hash1) + if err != nil { + t.Fatal(err) + } + + have, _ = ids.Has(hash1) + if have { + t.Fatal("normal block not deleted from datastore") + } + + blockSize, _ = ids.GetSize(hash1) + if blockSize > -1 { + t.Fatal("normal block not deleted from datastore") + } + + err = ids.DeleteBlock(emptyHash) + if err != nil { + t.Fatal(err) + } + + idhash2, _ := cid.NewPrefixV1(cid.Raw, mh.ID).Sum([]byte("idhash2")) + idblock2, _ := blk.NewBlockWithCid([]byte("idhash2"), idhash2) + hash2, _ := cid.NewPrefixV1(cid.Raw, mh.SHA2_256).Sum([]byte("hash2")) + block2, _ := blk.NewBlockWithCid([]byte("hash2"), hash2) + + cb.f = failIfPassThough + err = ids.PutMany([]blk.Block{idblock1, idblock2}) + if err != nil { + t.Fatal(err) + } + + opCount := 0 + cb.f = func() { + opCount++ + } + + err = ids.PutMany([]blk.Block{block1, block2}) + if err != nil { + t.Fatal(err) + } + if opCount != 4 { + // one call to Has and Put for each Cid + t.Fatalf("expected exactly 4 operations got %d", opCount) + } + + opCount = 0 + err = ids.PutMany([]blk.Block{idblock1, block1}) + if err != nil { + t.Fatal(err) + } + if opCount != 1 { + // just one call to Put from the normal (non-id) block + t.Fatalf("expected exactly 1 operations got %d", opCount) + } + + ch, err := ids.AllKeysChan(context.TODO()) + cnt := 0 + for c := range ch { + cnt++ + if c.Prefix().MhType == mh.ID { + t.Fatalf("block with identity hash found in blockstore") + } + } + if cnt != 2 { + t.Fatalf("expected exactly two keys returned by AllKeysChan got %d", cnt) + } +}