Commit 8eeb693a authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub

Merge pull request #3827 from ipfs/golint-1

Make Golint happy in the blocks submodule.
parents ff997c1f 3b6216b2
// package blocks contains the lowest level of IPFS data structures,
// the raw block with a checksum.
// Package blocks contains the lowest level of IPFS data structures.
// A block is raw data accompanied by a CID. The CID contains the multihash
// corresponding to the block.
package blocks
import (
......@@ -11,8 +12,11 @@ import (
mh "gx/ipfs/QmbZ6Cee2uHjG7hf19qLHppgKDRtaG4CVtMzdmK9VCVqLu/go-multihash"
)
var ErrWrongHash = errors.New("data did not match given hash!")
// ErrWrongHash is returned when the Cid of a block is not the expected
// according to the contents. It is currently used only when debugging.
var ErrWrongHash = errors.New("data did not match given hash")
// Block provides abstraction for blocks implementations.
type Block interface {
RawData() []byte
Cid() *cid.Cid
......@@ -20,7 +24,8 @@ type Block interface {
Loggable() map[string]interface{}
}
// Block is a singular block of data in ipfs
// A BasicBlock is a singular block of data in ipfs. It implements the Block
// interface.
type BasicBlock struct {
cid *cid.Cid
data []byte
......@@ -32,9 +37,9 @@ func NewBlock(data []byte) *BasicBlock {
return &BasicBlock{data: data, cid: cid.NewCidV0(u.Hash(data))}
}
// NewBlockWithHash creates a new block when the hash of the data
// NewBlockWithCid creates a new block when the hash of the data
// is already known, this is used to save time in situations where
// we are able to be confident that the data is correct
// we are able to be confident that the data is correct.
func NewBlockWithCid(data []byte, c *cid.Cid) (*BasicBlock, error) {
if u.Debug {
chkc, err := c.Prefix().Sum(data)
......@@ -49,22 +54,27 @@ func NewBlockWithCid(data []byte, c *cid.Cid) (*BasicBlock, error) {
return &BasicBlock{data: data, cid: c}, nil
}
// Multihash returns the hash contained in the block CID.
func (b *BasicBlock) Multihash() mh.Multihash {
return b.cid.Hash()
}
// RawData returns the block raw contents as a byte slice.
func (b *BasicBlock) RawData() []byte {
return b.data
}
// Cid returns the content identifier of the block.
func (b *BasicBlock) Cid() *cid.Cid {
return b.cid
}
// String provides a human-readable representation of the block CID.
func (b *BasicBlock) String() string {
return fmt.Sprintf("[Block %s]", b.Cid())
}
// Loggable returns a go-log loggable item.
func (b *BasicBlock) Loggable() map[string]interface{} {
return map[string]interface{}{
"block": b.Cid().String(),
......
......@@ -11,6 +11,9 @@ import (
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
)
// arccache wraps a BlockStore with an Adaptive Replacement Cache (ARC) for
// block Cids. This provides block access-time improvements, allowing
// to short-cut many searches without query-ing the underlying datastore.
type arccache struct {
arc *lru.ARCCache
blockstore Blockstore
......@@ -128,6 +131,10 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
return nil
}
func (b *arccache) HashOnRead(enabled bool) {
b.blockstore.HashOnRead(enabled)
}
func (b *arccache) addCache(c *cid.Cid, has bool) {
b.arc.Add(c.KeyString(), has)
}
......
......@@ -13,25 +13,24 @@ import (
var exampleBlock = blocks.NewBlock([]byte("foo"))
func testArcCached(bs Blockstore, ctx context.Context) (*arccache, error) {
func testArcCached(ctx context.Context, bs Blockstore) (*arccache, error) {
if ctx == nil {
ctx = context.TODO()
}
opts := DefaultCacheOpts()
opts.HasBloomFilterSize = 0
opts.HasBloomFilterHashes = 0
bbs, err := CachedBlockstore(bs, ctx, opts)
bbs, err := CachedBlockstore(ctx, bs, opts)
if err == nil {
return bbs.(*arccache), nil
} else {
return nil, err
}
return nil, err
}
func createStores(t *testing.T) (*arccache, *blockstore, *callbackDatastore) {
func createStores(t *testing.T) (*arccache, Blockstore, *callbackDatastore) {
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
bs := NewBlockstore(syncds.MutexWrap(cd))
arc, err := testArcCached(bs, nil)
arc, err := testArcCached(nil, bs)
if err != nil {
t.Fatal(err)
}
......
// package blockstore implements a thin wrapper over a datastore, giving a
// Package blockstore implements a thin wrapper over a datastore, giving a
// clean interface for Getting and Putting block objects.
package blockstore
......@@ -23,22 +23,36 @@ var log = logging.Logger("blockstore")
// BlockPrefix namespaces blockstore datastores
var BlockPrefix = ds.NewKey("blocks")
var ValueTypeMismatch = errors.New("the retrieved value is not a Block")
// ErrValueTypeMismatch is an error returned when the item retrieved from
// the datatstore is not a block.
var ErrValueTypeMismatch = errors.New("the retrieved value is not a Block")
// ErrHashMismatch is an error returned when the hash of a block
// is different than expected.
var ErrHashMismatch = errors.New("block in storage has different hash than requested")
// ErrNotFound is an error returned when a block is not found.
var ErrNotFound = errors.New("blockstore: block not found")
// Blockstore wraps a Datastore
// Blockstore wraps a Datastore block-centered methods and provides a layer
// of abstraction which allows to add different caching strategies.
type Blockstore interface {
DeleteBlock(*cid.Cid) error
Has(*cid.Cid) (bool, error)
Get(*cid.Cid) (blocks.Block, error)
Put(blocks.Block) error
PutMany([]blocks.Block) error
// AllKeysChan returns a channel from which
// the CIDs in the Blockstore can be read. It should respect
// the given context, closing the channel if it becomes Done.
AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error)
// HashOnRead specifies if every read block should be
// rehashed to make sure it matches its CID.
HashOnRead(enabled bool)
}
// GCLocker abstract functionality to lock a blockstore when performing
// garbage-collection operations.
type GCLocker interface {
// GCLock locks the blockstore for garbage collection. No operations
// that expect to finish with a pin should ocurr simultaneously.
......@@ -56,11 +70,15 @@ type GCLocker interface {
GCRequested() bool
}
// GCBlockstore is a blockstore that can safely run garbage-collection
// operations.
type GCBlockstore interface {
Blockstore
GCLocker
}
// NewGCBlockstore returns a default implementation of GCBlockstore
// using the given Blockstore and GCLocker.
func NewGCBlockstore(bs Blockstore, gcl GCLocker) GCBlockstore {
return gcBlockstore{bs, gcl}
}
......@@ -70,7 +88,9 @@ type gcBlockstore struct {
GCLocker
}
func NewBlockstore(d ds.Batching) *blockstore {
// NewBlockstore returns a default Blockstore implementation
// using the provided datastore.Batching backend.
func NewBlockstore(d ds.Batching) Blockstore {
var dsb ds.Batching
dd := dsns.Wrap(d, BlockPrefix)
dsb = dd
......@@ -108,7 +128,7 @@ func (bs *blockstore) Get(k *cid.Cid) (blocks.Block, error) {
}
bdata, ok := maybeData.([]byte)
if !ok {
return nil, ValueTypeMismatch
return nil, ErrValueTypeMismatch
}
if bs.rehash {
......@@ -122,9 +142,8 @@ func (bs *blockstore) Get(k *cid.Cid) (blocks.Block, error) {
}
return blocks.NewBlockWithCid(bdata, rbcid)
} else {
return blocks.NewBlockWithCid(bdata, k)
}
return blocks.NewBlockWithCid(bdata, k)
}
func (bs *blockstore) Put(block blocks.Block) error {
......@@ -162,8 +181,8 @@ func (bs *blockstore) Has(k *cid.Cid) (bool, error) {
return bs.datastore.Has(dshelp.CidToDsKey(k))
}
func (s *blockstore) DeleteBlock(k *cid.Cid) error {
err := s.datastore.Delete(dshelp.CidToDsKey(k))
func (bs *blockstore) DeleteBlock(k *cid.Cid) error {
err := bs.datastore.Delete(dshelp.CidToDsKey(k))
if err == ds.ErrNotFound {
return ErrNotFound
}
......@@ -173,7 +192,7 @@ func (s *blockstore) DeleteBlock(k *cid.Cid) error {
// AllKeysChan runs a query for keys from the blockstore.
// this is very simplistic, in the future, take dsq.Query as a param?
//
// AllKeysChan respects context
// AllKeysChan respects context.
func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
// KeysOnly, because that would be _a lot_ of data.
......@@ -220,7 +239,9 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error)
return output, nil
}
func NewGCLocker() *gclocker {
// NewGCLocker returns a default implementation of
// GCLocker using standard [RW] mutexes.
func NewGCLocker() GCLocker {
return &gclocker{}
}
......@@ -230,6 +251,8 @@ type gclocker struct {
gcreqlk sync.Mutex
}
// Unlocker represents an object which can Unlock
// something.
type Unlocker interface {
Unlock()
}
......
......@@ -186,7 +186,7 @@ func TestAllKeysRespectsContext(t *testing.T) {
}
func TestValueTypeMismatch(t *testing.T) {
func TestErrValueTypeMismatch(t *testing.T) {
block := blocks.NewBlock([]byte("some data"))
datastore := ds.NewMapDatastore()
......@@ -196,7 +196,7 @@ func TestValueTypeMismatch(t *testing.T) {
blockstore := NewBlockstore(ds_sync.MutexWrap(datastore))
_, err := blockstore.Get(block.Cid())
if err != ValueTypeMismatch {
if err != ErrValueTypeMismatch {
t.Fatal(err)
}
}
......
......@@ -12,9 +12,10 @@ import (
bloom "gx/ipfs/QmeiMCBkYHxkDkDfnDadzz4YxY5ruL5Pj499essE4vRsGM/bbloom"
)
// bloomCached returns Blockstore that caches Has requests using Bloom filter
// Size is size of bloom filter in bytes
func bloomCached(bs Blockstore, ctx context.Context, bloomSize, hashCount int) (*bloomcache, error) {
// bloomCached returns a Blockstore that caches Has requests using a Bloom
// filter. bloomSize is size of bloom filter in bytes. hashCount specifies the
// number of hashing functions in the bloom filter (usually known as k).
func bloomCached(ctx context.Context, bs Blockstore, bloomSize, hashCount int) (*bloomcache, error) {
bl, err := bloom.New(float64(bloomSize), float64(hashCount))
if err != nil {
return nil, err
......@@ -165,6 +166,10 @@ func (b *bloomcache) PutMany(bs []blocks.Block) error {
return nil
}
func (b *bloomcache) HashOnRead(enabled bool) {
b.blockstore.HashOnRead(enabled)
}
func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
return b.blockstore.AllKeysChan(ctx)
}
......
......@@ -14,18 +14,17 @@ import (
syncds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/sync"
)
func testBloomCached(bs Blockstore, ctx context.Context) (*bloomcache, error) {
func testBloomCached(ctx context.Context, bs Blockstore) (*bloomcache, error) {
if ctx == nil {
ctx = context.TODO()
ctx = context.Background()
}
opts := DefaultCacheOpts()
opts.HasARCCacheSize = 0
bbs, err := CachedBlockstore(bs, ctx, opts)
bbs, err := CachedBlockstore(ctx, bs, opts)
if err == nil {
return bbs.(*bloomcache), nil
} else {
return nil, err
}
return nil, err
}
func TestPutManyAddsToBloom(t *testing.T) {
......@@ -34,7 +33,7 @@ func TestPutManyAddsToBloom(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
cachedbs, err := testBloomCached(bs, ctx)
cachedbs, err := testBloomCached(ctx, bs)
select {
case <-cachedbs.rebuildChan:
......@@ -65,7 +64,7 @@ func TestPutManyAddsToBloom(t *testing.T) {
func TestReturnsErrorWhenSizeNegative(t *testing.T) {
bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
_, err := bloomCached(bs, context.TODO(), -1, 1)
_, err := bloomCached(context.Background(), bs, -1, 1)
if err == nil {
t.Fail()
}
......@@ -80,7 +79,7 @@ func TestHasIsBloomCached(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
cachedbs, err := testBloomCached(bs, ctx)
cachedbs, err := testBloomCached(ctx, bs)
if err != nil {
t.Fatal(err)
}
......
......@@ -7,6 +7,7 @@ import (
"gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
)
// CacheOpts wraps options for CachedBlockStore().
// Next to each option is it aproximate memory usage per unit
type CacheOpts struct {
HasBloomFilterSize int // 1 byte
......@@ -14,6 +15,7 @@ type CacheOpts struct {
HasARCCacheSize int // 32 bytes
}
// DefaultCacheOpts returns a CacheOpts initialized with default values.
func DefaultCacheOpts() CacheOpts {
return CacheOpts{
HasBloomFilterSize: 512 << 10,
......@@ -22,8 +24,12 @@ func DefaultCacheOpts() CacheOpts {
}
}
func CachedBlockstore(bs Blockstore,
ctx context.Context, opts CacheOpts) (cbs Blockstore, err error) {
// CachedBlockstore returns a blockstore wrapped in an ARCCache and
// then in a bloom filter cache, if the options indicate it.
func CachedBlockstore(
ctx context.Context,
bs Blockstore,
opts CacheOpts) (cbs Blockstore, err error) {
cbs = bs
if opts.HasBloomFilterSize < 0 || opts.HasBloomFilterHashes < 0 ||
......@@ -42,7 +48,7 @@ func CachedBlockstore(bs Blockstore,
}
if opts.HasBloomFilterSize != 0 {
// *8 because of bytes to bits conversion
cbs, err = bloomCached(cbs, ctx, opts.HasBloomFilterSize*8, opts.HasBloomFilterHashes)
cbs, err = bloomCached(ctx, cbs, opts.HasBloomFilterSize*8, opts.HasBloomFilterHashes)
}
return cbs, err
......
package blockstore_util
// Package blockstoreutil provides utility functions for Blockstores.
package blockstoreutil
import (
"fmt"
"io"
bs "github.com/ipfs/go-ipfs/blocks/blockstore"
"github.com/ipfs/go-ipfs/pin"
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
bs "github.com/ipfs/go-ipfs/blocks/blockstore"
"github.com/ipfs/go-ipfs/pin"
)
// RemovedBlock is used to respresent the result of removing a block.
......@@ -21,12 +23,17 @@ type RemovedBlock struct {
Error string `json:",omitempty"`
}
// RmBlocksOpts is used to wrap options for RmBlocks().
type RmBlocksOpts struct {
Prefix string
Quiet bool
Force bool
}
// RmBlocks removes the blocks provided in the cids slice.
// It returns a channel where objects of type RemovedBlock are placed, when
// not using the Quiet option. Block removal is asynchronous and will
// skip any pinned blocks.
func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, cids []*cid.Cid, opts RmBlocksOpts) (<-chan interface{}, error) {
// make the channel large enough to hold any result to avoid
// blocking while holding the GCLock
......@@ -53,6 +60,11 @@ func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, cids []*cid.Cid, opts RmB
return out, nil
}
// FilterPinned takes a slice of Cids and returns it with the pinned Cids
// removed. If a Cid is pinned, it will place RemovedBlock objects in the given
// out channel, with an error which indicates that the Cid is pinned.
// This function is used in RmBlocks to filter out any blocks which are not
// to be removed (because they are pinned).
func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid) []*cid.Cid {
stillOkay := make([]*cid.Cid, 0, len(cids))
res, err := pins.CheckIfPinned(cids...)
......@@ -73,6 +85,9 @@ func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid) []*c
return stillOkay
}
// ProcRmOutput takes the channel returned by RmBlocks and writes
// to stdout/stderr according to the RemovedBlock objects received in
// that channel.
func ProcRmOutput(in <-chan interface{}, sout io.Writer, serr io.Writer) error {
someFailed := false
for res := range in {
......
// Package blocksutil provides utility functions for working
// with Blocks.
package blocksutil
import "github.com/ipfs/go-ipfs/blocks"
// NewBlockGenerator returns an object capable of
// producing blocks.
func NewBlockGenerator() BlockGenerator {
return BlockGenerator{}
}
// BlockGenerator generates BasicBlocks on demand.
// For each instace of BlockGenerator,
// each new block is different from the previous,
// although two different instances will produce the same.
type BlockGenerator struct {
seq int
}
// Next generates a new BasicBlock.
func (bg *BlockGenerator) Next() *blocks.BasicBlock {
bg.seq++
return blocks.NewBlock([]byte(string(bg.seq)))
}
// Blocks generates as many BasicBlocks as specified by n.
func (bg *BlockGenerator) Blocks(n int) []*blocks.BasicBlock {
blocks := make([]*blocks.BasicBlock, 0)
for i := 0; i < n; i++ {
......
// package bloom implements a simple bloom filter.
// Package bloom implements a simple bloom filter.
package bloom
import (
"encoding/binary"
"errors"
// Non crypto hash, because speed
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/mtchavez/jenkins"
"gx/ipfs/QmeWQMDa5dSdP4n8WDeoY5z8L2EKVqF4ZvK4VEHsLqXsGu/hamming"
"hash"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/mtchavez/jenkins"
)
// A Filter represents a bloom filter.
type Filter interface {
Add([]byte)
Find([]byte) bool
......@@ -17,6 +19,8 @@ type Filter interface {
HammingDistance(Filter) (int, error)
}
// NewFilter creates a new bloom Filter with the given
// size. k (the number of hash functions), is hardcoded to 3.
func NewFilter(size int) Filter {
return &filter{
hash: jenkins.New(),
......@@ -31,6 +35,8 @@ type filter struct {
k int
}
// BasicFilter calls NewFilter with a bloom filter size of
// 2048 bytes.
func BasicFilter() Filter {
return NewFilter(2048)
}
......@@ -84,11 +90,11 @@ func (f *filter) Merge(o Filter) (Filter, error) {
}
if len(casfil.filter) != len(f.filter) {
return nil, errors.New("filter lengths must match!")
return nil, errors.New("filter lengths must match")
}
if casfil.k != f.k {
return nil, errors.New("filter k-values must match!")
return nil, errors.New("filter k-values must match")
}
nfilt := new(filter)
......@@ -110,7 +116,7 @@ func (f *filter) HammingDistance(o Filter) (int, error) {
}
if len(f.filter) != len(casfil.filter) {
return 0, errors.New("filter lengths must match!")
return 0, errors.New("filter lengths must match")
}
acc := 0
......
// package set contains various different types of 'BlockSet's
// Package set defines the BlockSet interface which provides
// abstraction for sets of Cids.
// It provides a default implementation using cid.Set.
package set
import (
"github.com/ipfs/go-ipfs/blocks/bloom"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
cid "gx/ipfs/QmV5gPoRsjN1Gid3LMdNZTyfCtP2DsvqEbMAmz82RmmiGk/go-cid"
"github.com/ipfs/go-ipfs/blocks/bloom"
)
var log = logging.Logger("blockset")
// BlockSet represents a mutable set of keyed blocks
// BlockSet represents a mutable set of blocks CIDs.
type BlockSet interface {
AddBlock(*cid.Cid)
RemoveBlock(*cid.Cid)
HasKey(*cid.Cid) bool
// GetBloomFilter creates and returns a bloom filter to which
// all the CIDs in the set have been added.
GetBloomFilter() bloom.Filter
GetKeys() []*cid.Cid
}
// SimpleSetFromKeys returns a default implementation of BlockSet
// using cid.Set. The given keys are added to the set.
func SimpleSetFromKeys(keys []*cid.Cid) BlockSet {
sbs := &simpleBlockSet{blocks: cid.NewSet()}
for _, k := range keys {
......@@ -27,6 +33,8 @@ func SimpleSetFromKeys(keys []*cid.Cid) BlockSet {
return sbs
}
// NewSimpleBlockSet returns a new empty default implementation
// of BlockSet using cid.Set.
func NewSimpleBlockSet() BlockSet {
return &simpleBlockSet{blocks: cid.NewSet()}
}
......
......@@ -184,7 +184,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
opts.HasBloomFilterSize = 0
}
cbs, err := bstore.CachedBlockstore(bs, ctx, opts)
cbs, err := bstore.CachedBlockstore(ctx, bs, opts)
if err != nil {
return err
}
......
......@@ -94,8 +94,9 @@ func Session(ctx context.Context, net tn.Network, p testutil.Identity) Instance
adapter := net.Adapter(p)
dstore := ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), bsdelay))
bstore, err := blockstore.CachedBlockstore(blockstore.NewBlockstore(
ds_sync.MutexWrap(dstore)), ctx, blockstore.DefaultCacheOpts())
bstore, err := blockstore.CachedBlockstore(ctx,
blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)),
blockstore.DefaultCacheOpts())
if err != nil {
panic(err.Error()) // FIXME perhaps change signature and return error.
}
......
......@@ -199,4 +199,9 @@ func (f *Filestore) PutMany(bs []blocks.Block) error {
return nil
}
// HashOnRead calls blockstore.HashOnRead.
func (f *Filestore) HashOnRead(enabled bool) {
f.bs.HashOnRead(enabled)
}
var _ blockstore.Blockstore = (*Filestore)(nil)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment