Commit 1aafbacd authored by Jeromy's avatar Jeromy

integrate CIDv0

License: MIT
Signed-off-by: default avatarJeromy <why@ipfs.io>
parent 5ff692f2
......@@ -5,13 +5,16 @@ package blockservice
import (
"errors"
"fmt"
blocks "github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
key "github.com/ipfs/go-ipfs/blocks/key"
exchange "github.com/ipfs/go-ipfs/exchange"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
cid "gx/ipfs/QmfSc2xehWmWLnwwYR91Y8QF4xdASypTFVknutoKQS3GHp/go-cid"
)
var log = logging.Logger("blockservice")
......@@ -27,6 +30,12 @@ type BlockService struct {
Exchange exchange.Interface
}
// an Object is simply a typed block
type Object interface {
Cid() *cid.Cid
blocks.Block
}
// NewBlockService creates a BlockService with given datastore instance.
func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService {
if rem == nil {
......@@ -41,30 +50,41 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService {
// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *BlockService) AddBlock(b blocks.Block) (key.Key, error) {
k := b.Key()
has, err := s.Blockstore.Has(k)
func (s *BlockService) AddObject(o Object) (*cid.Cid, error) {
// TODO: while this is a great optimization, we should think about the
// possibility of streaming writes directly to disk. If we can pass this object
// all the way down to the datastore without having to 'buffer' its data,
// we could implement a `WriteTo` method on it that could do a streaming write
// of the content, saving us (probably) considerable memory.
c := o.Cid()
has, err := s.Blockstore.Has(key.Key(c.Hash()))
if err != nil {
return k, err
return nil, err
}
if has {
return k, nil
return c, nil
}
err = s.Blockstore.Put(b)
err = s.Blockstore.Put(o)
if err != nil {
return k, err
return nil, err
}
if err := s.Exchange.HasBlock(b); err != nil {
return "", errors.New("blockservice is closed")
if err := s.Exchange.HasBlock(o); err != nil {
return nil, errors.New("blockservice is closed")
}
return k, nil
return c, nil
}
func (s *BlockService) AddBlocks(bs []blocks.Block) ([]key.Key, error) {
func (s *BlockService) AddObjects(bs []Object) ([]*cid.Cid, error) {
var toput []blocks.Block
var toputcids []*cid.Cid
for _, b := range bs {
has, err := s.Blockstore.Has(b.Key())
c := b.Cid()
has, err := s.Blockstore.Has(key.Key(c.Hash()))
if err != nil {
return nil, err
}
......@@ -74,6 +94,7 @@ func (s *BlockService) AddBlocks(bs []blocks.Block) ([]key.Key, error) {
}
toput = append(toput, b)
toputcids = append(toputcids, c)
}
err := s.Blockstore.PutMany(toput)
......@@ -81,26 +102,25 @@ func (s *BlockService) AddBlocks(bs []blocks.Block) ([]key.Key, error) {
return nil, err
}
var ks []key.Key
for _, b := range toput {
if err := s.Exchange.HasBlock(b); err != nil {
return nil, errors.New("blockservice is closed")
var ks []*cid.Cid
for _, o := range toput {
if err := s.Exchange.HasBlock(o); err != nil {
return nil, fmt.Errorf("blockservice is closed (%s)", err)
}
ks = append(ks, b.Key())
c := o.(Object).Cid() // cast is safe, we created these
ks = append(ks, c)
}
return ks, nil
}
// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (blocks.Block, error) {
if k == "" {
log.Debug("BlockService GetBlock: Nil Key")
return nil, ErrNotFound
}
func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
log.Debugf("BlockService GetBlock: '%s'", c)
log.Debugf("BlockService GetBlock: '%s'", k)
block, err := s.Blockstore.Get(k)
// TODO: blockstore shouldnt care about Cids, need an easier way to strip the abstraction
block, err := s.Blockstore.Get(key.Key(c.Hash()))
if err == nil {
return block, nil
}
......@@ -109,7 +129,7 @@ func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (blocks.Block, e
// TODO be careful checking ErrNotFound. If the underlying
// implementation changes, this will break.
log.Debug("Blockservice: Searching bitswap")
blk, err := s.Exchange.GetBlock(ctx, k)
blk, err := s.Exchange.GetBlock(ctx, key.Key(c.Hash()))
if err != nil {
if err == blockstore.ErrNotFound {
return nil, ErrNotFound
......@@ -130,12 +150,13 @@ func (s *BlockService) GetBlock(ctx context.Context, k key.Key) (blocks.Block, e
// GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel.
// NB: No guarantees are made about order.
func (s *BlockService) GetBlocks(ctx context.Context, ks []key.Key) <-chan blocks.Block {
func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
out := make(chan blocks.Block, 0)
go func() {
defer close(out)
var misses []key.Key
for _, k := range ks {
for _, c := range ks {
k := key.Key(c.Hash())
hit, err := s.Blockstore.Get(k)
if err != nil {
misses = append(misses, k)
......@@ -171,11 +192,19 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []key.Key) <-chan block
}
// DeleteBlock deletes a block in the blockservice from the datastore
func (s *BlockService) DeleteBlock(k key.Key) error {
return s.Blockstore.DeleteBlock(k)
func (s *BlockService) DeleteObject(o Object) error {
return s.Blockstore.DeleteBlock(o.Key())
}
func (s *BlockService) Close() error {
log.Debug("blockservice is shutting down...")
return s.Exchange.Close()
}
type RawBlockObject struct {
blocks.Block
}
func (rob *RawBlockObject) Cid() *cid.Cid {
return cid.NewCidV0(rob.Block.Multihash())
}
......@@ -2,80 +2,98 @@ package bstest
import (
"bytes"
"fmt"
"testing"
"time"
blocks "github.com/ipfs/go-ipfs/blocks"
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
key "github.com/ipfs/go-ipfs/blocks/key"
. "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
ds "gx/ipfs/QmNgqJarToRiq2GBaPJhkmW4B5BxS5B74E1rkGvv2JoaTp/go-datastore"
dssync "gx/ipfs/QmNgqJarToRiq2GBaPJhkmW4B5BxS5B74E1rkGvv2JoaTp/go-datastore/sync"
u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
"gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
cid "gx/ipfs/QmfSc2xehWmWLnwwYR91Y8QF4xdASypTFVknutoKQS3GHp/go-cid"
)
func newObject(data []byte) *testObject {
return &testObject{
Block: blocks.NewBlock(data),
}
}
type testObject struct {
blocks.Block
}
func (o *testObject) Cid() *cid.Cid {
return cid.NewCidV0(o.Block.Multihash())
}
func TestBlocks(t *testing.T) {
bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
bs := New(bstore, offline.Exchange(bstore))
defer bs.Close()
_, err := bs.GetBlock(context.Background(), key.Key(""))
if err != ErrNotFound {
t.Error("Empty String Key should error", err)
}
b := blocks.NewBlock([]byte("beep boop"))
o := newObject([]byte("beep boop"))
h := u.Hash([]byte("beep boop"))
if !bytes.Equal(b.Multihash(), h) {
if !bytes.Equal(o.Multihash(), h) {
t.Error("Block Multihash and data multihash not equal")
}
if b.Key() != key.Key(h) {
if o.Key() != key.Key(h) {
t.Error("Block key and data multihash key not equal")
}
k, err := bs.AddBlock(b)
k, err := bs.AddObject(o)
if err != nil {
t.Error("failed to add block to BlockService", err)
return
}
if k != b.Key() {
if !k.Equals(o.Cid()) {
t.Error("returned key is not equal to block key", err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
b2, err := bs.GetBlock(ctx, b.Key())
b2, err := bs.GetBlock(ctx, o.Cid())
if err != nil {
t.Error("failed to retrieve block from BlockService", err)
return
}
if b.Key() != b2.Key() {
if o.Key() != b2.Key() {
t.Error("Block keys not equal.")
}
if !bytes.Equal(b.Data(), b2.Data()) {
if !bytes.Equal(o.RawData(), b2.RawData()) {
t.Error("Block data is not equal.")
}
}
func makeObjects(n int) []*testObject {
var out []*testObject
for i := 0; i < n; i++ {
out = append(out, newObject([]byte(fmt.Sprintf("object %d", i))))
}
return out
}
func TestGetBlocksSequential(t *testing.T) {
var servs = Mocks(4)
for _, s := range servs {
defer s.Close()
}
bg := blocksutil.NewBlockGenerator()
blks := bg.Blocks(50)
objs := makeObjects(50)
var keys []key.Key
for _, blk := range blks {
keys = append(keys, blk.Key())
servs[0].AddBlock(blk)
var cids []*cid.Cid
for _, o := range objs {
cids = append(cids, o.Cid())
servs[0].AddObject(o)
}
t.Log("one instance at a time, get blocks concurrently")
......@@ -83,7 +101,7 @@ func TestGetBlocksSequential(t *testing.T) {
for i := 1; i < len(servs); i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*50)
defer cancel()
out := servs[i].GetBlocks(ctx, keys)
out := servs[i].GetBlocks(ctx, cids)
gotten := make(map[key.Key]blocks.Block)
for blk := range out {
if _, ok := gotten[blk.Key()]; ok {
......@@ -91,8 +109,8 @@ func TestGetBlocksSequential(t *testing.T) {
}
gotten[blk.Key()] = blk
}
if len(gotten) != len(blks) {
t.Fatalf("Didnt get enough blocks back: %d/%d", len(gotten), len(blks))
if len(gotten) != len(objs) {
t.Fatalf("Didnt get enough blocks back: %d/%d", len(gotten), len(objs))
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment