Commit 7c1c4266 authored by Brian Tiger Chow's avatar Brian Tiger Chow Committed by Jeromy

refactor(blockstore, blockservice) use Blockstore and offline.Exchange

License: MIT
Signed-off-by: default avatarBrian Tiger Chow <brian@perfmode.com>
parent a1355947
...@@ -6,15 +6,18 @@ import ( ...@@ -6,15 +6,18 @@ import (
"time" "time"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
offline "github.com/jbenet/go-ipfs/exchange/offline"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
) )
func TestBlocks(t *testing.T) { func TestBlocks(t *testing.T) {
d := ds.NewMapDatastore() d := ds.NewMapDatastore()
bs, err := NewBlockService(d, nil) tsds := dssync.MutexWrap(d)
bs, err := New(blockstore.NewBlockstore(tsds), offline.Exchange())
if err != nil { if err != nil {
t.Error("failed to construct block service", err) t.Error("failed to construct block service", err)
return return
......
...@@ -9,9 +9,9 @@ import ( ...@@ -9,9 +9,9 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
blocks "github.com/jbenet/go-ipfs/blocks" blocks "github.com/jbenet/go-ipfs/blocks"
"github.com/jbenet/go-ipfs/blocks/blockstore"
exchange "github.com/jbenet/go-ipfs/exchange" exchange "github.com/jbenet/go-ipfs/exchange"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
) )
...@@ -19,25 +19,28 @@ import ( ...@@ -19,25 +19,28 @@ import (
var log = u.Logger("blockservice") var log = u.Logger("blockservice")
var ErrNotFound = errors.New("blockservice: key not found") var ErrNotFound = errors.New("blockservice: key not found")
// BlockService is a block datastore. // BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
// It uses an internal `datastore.Datastore` instance to store values. // It uses an internal `datastore.Datastore` instance to store values.
type BlockService struct { type BlockService struct {
Datastore ds.Datastore // TODO don't expose underlying impl details
Remote exchange.Interface Blockstore blockstore.Blockstore
Remote exchange.Interface
} }
// NewBlockService creates a BlockService with given datastore instance. // NewBlockService creates a BlockService with given datastore instance.
func NewBlockService(d ds.Datastore, rem exchange.Interface) (*BlockService, error) { func New(bs blockstore.Blockstore, rem exchange.Interface) (*BlockService, error) {
if d == nil { if bs == nil {
return nil, fmt.Errorf("BlockService requires valid datastore") return nil, fmt.Errorf("BlockService requires valid blockstore")
} }
if rem == nil { if rem == nil {
log.Warning("blockservice running in local (offline) mode.") log.Warning("blockservice running in local (offline) mode.")
} }
return &BlockService{Datastore: d, Remote: rem}, nil return &BlockService{Blockstore: bs, Remote: rem}, nil
} }
// AddBlock adds a particular block to the service, Putting it into the datastore. // AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
k := b.Key() k := b.Key()
log.Debugf("blockservice: storing [%s] in datastore", k) log.Debugf("blockservice: storing [%s] in datastore", k)
...@@ -47,7 +50,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { ...@@ -47,7 +50,7 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
// check if we have it before adding. this is an extra read, but large writes // check if we have it before adding. this is an extra read, but large writes
// are more expensive. // are more expensive.
// TODO(jbenet) cheaper has. https://github.com/jbenet/go-datastore/issues/6 // TODO(jbenet) cheaper has. https://github.com/jbenet/go-datastore/issues/6
has, err := s.Datastore.Has(k.DsKey()) has, err := s.Blockstore.Has(k)
if err != nil { if err != nil {
return k, err return k, err
} }
...@@ -55,12 +58,14 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { ...@@ -55,12 +58,14 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
log.Debugf("blockservice: storing [%s] in datastore (already stored)", k) log.Debugf("blockservice: storing [%s] in datastore (already stored)", k)
} else { } else {
log.Debugf("blockservice: storing [%s] in datastore", k) log.Debugf("blockservice: storing [%s] in datastore", k)
err := s.Datastore.Put(k.DsKey(), b.Data) err := s.Blockstore.Put(b)
if err != nil { if err != nil {
return k, err return k, err
} }
} }
// TODO this operation rate-limits blockservice operations, we should
// consider moving this to an sync process.
if s.Remote != nil { if s.Remote != nil {
ctx := context.TODO() ctx := context.TODO()
err = s.Remote.HasBlock(ctx, *b) err = s.Remote.HasBlock(ctx, *b)
...@@ -72,17 +77,11 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { ...@@ -72,17 +77,11 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
// Getting it from the datastore using the key (hash). // Getting it from the datastore using the key (hash).
func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, error) { func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, error) {
log.Debugf("BlockService GetBlock: '%s'", k) log.Debugf("BlockService GetBlock: '%s'", k)
datai, err := s.Datastore.Get(k.DsKey()) block, err := s.Blockstore.Get(k)
if err == nil { if err == nil {
log.Debug("Blockservice: Got data in datastore.") return block, nil
bdata, ok := datai.([]byte) // TODO be careful checking ErrNotFound. If the underlying
if !ok { // implementation changes, this will break.
return nil, fmt.Errorf("data associated with %s is not a []byte", k)
}
return &blocks.Block{
Multihash: mh.Multihash(k),
Data: bdata,
}, nil
} else if err == ds.ErrNotFound && s.Remote != nil { } else if err == ds.ErrNotFound && s.Remote != nil {
log.Debug("Blockservice: Searching bitswap.") log.Debug("Blockservice: Searching bitswap.")
blk, err := s.Remote.GetBlock(ctx, k) blk, err := s.Remote.GetBlock(ctx, k)
...@@ -101,21 +100,13 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks ...@@ -101,21 +100,13 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks
go func() { go func() {
var toFetch []u.Key var toFetch []u.Key
for _, k := range ks { for _, k := range ks {
datai, err := s.Datastore.Get(k.DsKey()) block, err := s.Blockstore.Get(k)
if err == nil { if err != nil {
log.Debug("Blockservice: Got data in datastore.")
bdata, ok := datai.([]byte)
if !ok {
log.Criticalf("data associated with %s is not a []byte", k)
continue
}
out <- &blocks.Block{
Multihash: mh.Multihash(k),
Data: bdata,
}
} else {
toFetch = append(toFetch, k) toFetch = append(toFetch, k)
continue
} }
log.Debug("Blockservice: Got data in datastore.")
out <- block
} }
}() }()
return out return out
...@@ -123,5 +114,5 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks ...@@ -123,5 +114,5 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks
// DeleteBlock deletes a block in the blockservice from the datastore // DeleteBlock deletes a block in the blockservice from the datastore
func (s *BlockService) DeleteBlock(k u.Key) error { func (s *BlockService) DeleteBlock(k u.Key) error {
return s.Datastore.Delete(k.DsKey()) return s.Blockstore.DeleteBlock(k)
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment