Commit 67829690 authored by Kevin Atkinson's avatar Kevin Atkinson

Make BlockService an interface.

License: MIT
Signed-off-by: default avatarKevin Atkinson <k@kevina.org>
parent b36159b9
...@@ -23,34 +23,52 @@ var ErrNotFound = errors.New("blockservice: key not found") ...@@ -23,34 +23,52 @@ var ErrNotFound = errors.New("blockservice: key not found")
// BlockService is a hybrid block datastore. It stores data in a local // BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange. // datastore and may retrieve data from a remote Exchange.
// It uses an internal `datastore.Datastore` instance to store values. // It uses an internal `datastore.Datastore` instance to store values.
type BlockService struct { type BlockService interface {
// TODO don't expose underlying impl details Blockstore() blockstore.Blockstore
Blockstore blockstore.Blockstore Exchange() exchange.Interface
Exchange exchange.Interface AddBlock(o blocks.Block) (*cid.Cid, error)
AddBlocks(bs []blocks.Block) ([]*cid.Cid, error)
GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error)
GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block
DeleteBlock(o blocks.Block) error
Close() error
}
type blockService struct {
blockstore blockstore.Blockstore
exchange exchange.Interface
} }
// NewBlockService creates a BlockService with given datastore instance. // NewBlockService creates a BlockService with given datastore instance.
func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService { func New(bs blockstore.Blockstore, rem exchange.Interface) BlockService {
if rem == nil { if rem == nil {
log.Warning("blockservice running in local (offline) mode.") log.Warning("blockservice running in local (offline) mode.")
} }
return &BlockService{ return &blockService{
Blockstore: bs, blockstore: bs,
Exchange: rem, exchange: rem,
} }
} }
func (bs *blockService) Blockstore() blockstore.Blockstore {
return bs.blockstore
}
func (bs *blockService) Exchange() exchange.Interface {
return bs.exchange
}
// AddBlock adds a particular block to the service, Putting it into the datastore. // AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here. // TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *BlockService) AddBlock(o blocks.Block) (*cid.Cid, error) { func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
// TODO: while this is a great optimization, we should think about the // TODO: while this is a great optimization, we should think about the
// possibility of streaming writes directly to disk. If we can pass this object // possibility of streaming writes directly to disk. If we can pass this object
// all the way down to the datastore without having to 'buffer' its data, // all the way down to the datastore without having to 'buffer' its data,
// we could implement a `WriteTo` method on it that could do a streaming write // we could implement a `WriteTo` method on it that could do a streaming write
// of the content, saving us (probably) considerable memory. // of the content, saving us (probably) considerable memory.
c := o.Cid() c := o.Cid()
has, err := s.Blockstore.Has(c) has, err := s.blockstore.Has(c)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -59,22 +77,22 @@ func (s *BlockService) AddBlock(o blocks.Block) (*cid.Cid, error) { ...@@ -59,22 +77,22 @@ func (s *BlockService) AddBlock(o blocks.Block) (*cid.Cid, error) {
return c, nil return c, nil
} }
err = s.Blockstore.Put(o) err = s.blockstore.Put(o)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := s.Exchange.HasBlock(o); err != nil { if err := s.exchange.HasBlock(o); err != nil {
return nil, errors.New("blockservice is closed") return nil, errors.New("blockservice is closed")
} }
return c, nil return c, nil
} }
func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) { func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
var toput []blocks.Block var toput []blocks.Block
for _, b := range bs { for _, b := range bs {
has, err := s.Blockstore.Has(b.Cid()) has, err := s.blockstore.Has(b.Cid())
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -86,14 +104,14 @@ func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) { ...@@ -86,14 +104,14 @@ func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
toput = append(toput, b) toput = append(toput, b)
} }
err := s.Blockstore.PutMany(toput) err := s.blockstore.PutMany(toput)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var ks []*cid.Cid var ks []*cid.Cid
for _, o := range toput { for _, o := range toput {
if err := s.Exchange.HasBlock(o); err != nil { if err := s.exchange.HasBlock(o); err != nil {
return nil, fmt.Errorf("blockservice is closed (%s)", err) return nil, fmt.Errorf("blockservice is closed (%s)", err)
} }
...@@ -104,19 +122,19 @@ func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) { ...@@ -104,19 +122,19 @@ func (s *BlockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) {
// GetBlock retrieves a particular block from the service, // GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash). // Getting it from the datastore using the key (hash).
func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) { func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) {
log.Debugf("BlockService GetBlock: '%s'", c) log.Debugf("BlockService GetBlock: '%s'", c)
block, err := s.Blockstore.Get(c) block, err := s.blockstore.Get(c)
if err == nil { if err == nil {
return block, nil return block, nil
} }
if err == blockstore.ErrNotFound && s.Exchange != nil { if err == blockstore.ErrNotFound && s.exchange != nil {
// TODO be careful checking ErrNotFound. If the underlying // TODO be careful checking ErrNotFound. If the underlying
// implementation changes, this will break. // implementation changes, this will break.
log.Debug("Blockservice: Searching bitswap") log.Debug("Blockservice: Searching bitswap")
blk, err := s.Exchange.GetBlock(ctx, c) blk, err := s.exchange.GetBlock(ctx, c)
if err != nil { if err != nil {
if err == blockstore.ErrNotFound { if err == blockstore.ErrNotFound {
return nil, ErrNotFound return nil, ErrNotFound
...@@ -137,13 +155,13 @@ func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, ...@@ -137,13 +155,13 @@ func (s *BlockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block,
// GetBlocks gets a list of blocks asynchronously and returns through // GetBlocks gets a list of blocks asynchronously and returns through
// the returned channel. // the returned channel.
// NB: No guarantees are made about order. // NB: No guarantees are made about order.
func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block { func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block {
out := make(chan blocks.Block, 0) out := make(chan blocks.Block, 0)
go func() { go func() {
defer close(out) defer close(out)
var misses []*cid.Cid var misses []*cid.Cid
for _, c := range ks { for _, c := range ks {
hit, err := s.Blockstore.Get(c) hit, err := s.blockstore.Get(c)
if err != nil { if err != nil {
misses = append(misses, c) misses = append(misses, c)
continue continue
...@@ -160,7 +178,7 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc ...@@ -160,7 +178,7 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc
return return
} }
rblocks, err := s.Exchange.GetBlocks(ctx, misses) rblocks, err := s.exchange.GetBlocks(ctx, misses)
if err != nil { if err != nil {
log.Debugf("Error with GetBlocks: %s", err) log.Debugf("Error with GetBlocks: %s", err)
return return
...@@ -178,11 +196,11 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc ...@@ -178,11 +196,11 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc
} }
// DeleteBlock deletes a block in the blockservice from the datastore // DeleteBlock deletes a block in the blockservice from the datastore
func (s *BlockService) DeleteBlock(o blocks.Block) error { func (s *blockService) DeleteBlock(o blocks.Block) error {
return s.Blockstore.DeleteBlock(o.Cid()) return s.blockstore.DeleteBlock(o.Cid())
} }
func (s *BlockService) Close() error { func (s *blockService) Close() error {
log.Debug("blockservice is shutting down...") log.Debug("blockservice is shutting down...")
return s.Exchange.Close() return s.exchange.Close()
} }
...@@ -9,13 +9,13 @@ import ( ...@@ -9,13 +9,13 @@ import (
) )
// Mocks returns |n| connected mock Blockservices // Mocks returns |n| connected mock Blockservices
func Mocks(n int) []*BlockService { func Mocks(n int) []BlockService {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
sg := bitswap.NewTestSessionGenerator(net) sg := bitswap.NewTestSessionGenerator(net)
instances := sg.Instances(n) instances := sg.Instances(n)
var servs []*BlockService var servs []BlockService
for _, i := range instances { for _, i := range instances {
servs = append(servs, New(i.Blockstore(), i.Exchange)) servs = append(servs, New(i.Blockstore(), i.Exchange))
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment