Commit 8fcf235b authored by Steven Allen's avatar Steven Allen

merkledag: switch to new dag interface

Also:

* Update the blockstore/blockservice methods to match.
* Construct a new temporary offline dag instead of having a
  GetOfflineLinkService method.

License: MIT
Signed-off-by: default avatarSteven Allen <steven@stebalien.com>
parent 851ef0de
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io"
"github.com/ipfs/go-ipfs/blocks/blockstore" "github.com/ipfs/go-ipfs/blocks/blockstore"
exchange "github.com/ipfs/go-ipfs/exchange" exchange "github.com/ipfs/go-ipfs/exchange"
...@@ -21,10 +22,27 @@ var log = logging.Logger("blockservice") ...@@ -21,10 +22,27 @@ var log = logging.Logger("blockservice")
var ErrNotFound = errors.New("blockservice: key not found") var ErrNotFound = errors.New("blockservice: key not found")
type BlockGetter interface {
// GetBlock gets the requested block.
GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)
// GetBlocks does a batch request for the given cids, returning blocks as
// they are found, in no particular order.
//
// It may not be able to find all requested blocks (or the context may
// be canceled). In that case, it will close the channel early. It is up
// to the consumer to detect this situation and keep track which blocks
// it has received and which it hasn't.
GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block
}
// BlockService is a hybrid block datastore. It stores data in a local // BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange. // datastore and may retrieve data from a remote Exchange.
// It uses an internal `datastore.Datastore` instance to store values. // It uses an internal `datastore.Datastore` instance to store values.
type BlockService interface { type BlockService interface {
io.Closer
BlockGetter
// Blockstore returns a reference to the underlying blockstore // Blockstore returns a reference to the underlying blockstore
Blockstore() blockstore.Blockstore Blockstore() blockstore.Blockstore
...@@ -32,20 +50,14 @@ type BlockService interface { ...@@ -32,20 +50,14 @@ type BlockService interface {
Exchange() exchange.Interface Exchange() exchange.Interface
// AddBlock puts a given block to the underlying datastore // AddBlock puts a given block to the underlying datastore
AddBlock(o blocks.Block) (*cid.Cid, error) AddBlock(o blocks.Block) error
// AddBlocks adds a slice of blocks at the same time using batching // AddBlocks adds a slice of blocks at the same time using batching
// capabilities of the underlying datastore whenever possible. // capabilities of the underlying datastore whenever possible.
AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) AddBlocks(bs []blocks.Block) error
GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)
DeleteBlock(o blocks.Block) error
// GetBlocks does a batch request for the given cids, returning blocks as // DeleteBlock deletes the given block from the blockservice.
// they are found, in no particular order. DeleteBlock(o *cid.Cid) error
GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block
Close() error
} }
type blockService struct { type blockService struct {
...@@ -110,38 +122,34 @@ func NewSession(ctx context.Context, bs BlockService) *Session { ...@@ -110,38 +122,34 @@ func NewSession(ctx context.Context, bs BlockService) *Session {
// AddBlock adds a particular block to the service, Putting it into the datastore. // AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here. // TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) { func (s *blockService) AddBlock(o blocks.Block) error {
c := o.Cid() c := o.Cid()
if s.checkFirst { if s.checkFirst {
has, err := s.blockstore.Has(c) if has, err := s.blockstore.Has(c); has || err != nil {
if err != nil { return err
return nil, err
}
if has {
return c, nil
} }
} }
err := s.blockstore.Put(o) if err := s.blockstore.Put(o); err != nil {
if err != nil { return err
return nil, err
} }
if err := s.exchange.HasBlock(o); err != nil { if err := s.exchange.HasBlock(o); err != nil {
return nil, errors.New("blockservice is closed") // TODO(stebalien): really an error?
return errors.New("blockservice is closed")
} }
return c, nil return nil
} }
func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) { func (s *blockService) AddBlocks(bs []blocks.Block) error {
var toput []blocks.Block var toput []blocks.Block
if s.checkFirst { if s.checkFirst {
toput = make([]blocks.Block, 0, len(bs))
for _, b := range bs { for _, b := range bs {
has, err := s.blockstore.Has(b.Cid()) has, err := s.blockstore.Has(b.Cid())
if err != nil { if err != nil {
return nil, err return err
} }
if !has { if !has {
toput = append(toput, b) toput = append(toput, b)
...@@ -153,18 +161,16 @@ func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) { ...@@ -153,18 +161,16 @@ func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
err := s.blockstore.PutMany(toput) err := s.blockstore.PutMany(toput)
if err != nil { if err != nil {
return nil, err return err
} }
var ks []*cid.Cid
for _, o := range toput { for _, o := range toput {
if err := s.exchange.HasBlock(o); err != nil { if err := s.exchange.HasBlock(o); err != nil {
return nil, fmt.Errorf("blockservice is closed (%s)", err) // TODO(stebalien): Should this really *return*?
return fmt.Errorf("blockservice is closed (%s)", err)
} }
ks = append(ks, o.Cid())
} }
return ks, nil return nil
} }
// GetBlock retrieves a particular block from the service, // GetBlock retrieves a particular block from the service,
...@@ -256,8 +262,8 @@ func getBlocks(ctx context.Context, ks []*cid.Cid, bs blockstore.Blockstore, f e ...@@ -256,8 +262,8 @@ func getBlocks(ctx context.Context, ks []*cid.Cid, bs blockstore.Blockstore, f e
} }
// DeleteBlock deletes a block in the blockservice from the datastore // DeleteBlock deletes a block in the blockservice from the datastore
func (s *blockService) DeleteBlock(o blocks.Block) error { func (s *blockService) DeleteBlock(c *cid.Cid) error {
return s.blockstore.DeleteBlock(o.Cid()) return s.blockstore.DeleteBlock(c)
} }
func (s *blockService) Close() error { func (s *blockService) Close() error {
......
...@@ -33,16 +33,12 @@ func TestBlocks(t *testing.T) { ...@@ -33,16 +33,12 @@ func TestBlocks(t *testing.T) {
t.Error("Block key and data multihash key not equal") t.Error("Block key and data multihash key not equal")
} }
k, err := bs.AddBlock(o) err := bs.AddBlock(o)
if err != nil { if err != nil {
t.Error("failed to add block to BlockService", err) t.Error("failed to add block to BlockService", err)
return return
} }
if !k.Equals(o.Cid()) {
t.Error("returned key is not equal to block key", err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel() defer cancel()
b2, err := bs.GetBlock(ctx, o.Cid()) b2, err := bs.GetBlock(ctx, o.Cid())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment