Commit 7e356be6 authored by Jeromy's avatar Jeromy

use batching transaction interface from datastore

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent 67f1e932
...@@ -30,6 +30,7 @@ type Blockstore interface { ...@@ -30,6 +30,7 @@ type Blockstore interface {
Has(key.Key) (bool, error) Has(key.Key) (bool, error)
Get(key.Key) (*blocks.Block, error) Get(key.Key) (*blocks.Block, error)
Put(*blocks.Block) error Put(*blocks.Block) error
PutMany([]*blocks.Block) error
AllKeysChan(ctx context.Context) (<-chan key.Key, error) AllKeysChan(ctx context.Context) (<-chan key.Key, error)
} }
...@@ -42,7 +43,7 @@ func NewBlockstore(d ds.ThreadSafeDatastore) Blockstore { ...@@ -42,7 +43,7 @@ func NewBlockstore(d ds.ThreadSafeDatastore) Blockstore {
} }
type blockstore struct { type blockstore struct {
datastore ds.Datastore datastore ds.BatchingDatastore
// cant be ThreadSafeDatastore cause namespace.Datastore doesnt support it. // cant be ThreadSafeDatastore cause namespace.Datastore doesnt support it.
// we do check it on `NewBlockstore` though. // we do check it on `NewBlockstore` though.
} }
...@@ -74,6 +75,26 @@ func (bs *blockstore) Put(block *blocks.Block) error { ...@@ -74,6 +75,26 @@ func (bs *blockstore) Put(block *blocks.Block) error {
return bs.datastore.Put(k, block.Data) return bs.datastore.Put(k, block.Data)
} }
func (bs *blockstore) PutMany(blocks []*blocks.Block) error {
t, err := bs.datastore.Batch()
if err != nil {
return err
}
for _, b := range blocks {
k := b.Key().DsKey()
exists, err := bs.datastore.Has(k)
if err == nil && exists {
continue
}
err = t.Put(k, b.Data)
if err != nil {
return err
}
}
return t.Commit()
}
func (bs *blockstore) Has(k key.Key) (bool, error) { func (bs *blockstore) Has(k key.Key) (bool, error) {
return bs.datastore.Has(k.DsKey()) return bs.datastore.Has(k.DsKey())
} }
......
...@@ -266,3 +266,7 @@ func (c *queryTestDS) Query(q dsq.Query) (dsq.Results, error) { ...@@ -266,3 +266,7 @@ func (c *queryTestDS) Query(q dsq.Query) (dsq.Results, error) {
} }
return c.ds.Query(q) return c.ds.Query(q)
} }
func (c *queryTestDS) Batch() (ds.Batch, error) {
return ds.NewBasicBatch(c), nil
}
...@@ -45,6 +45,16 @@ func (w *writecache) Put(b *blocks.Block) error { ...@@ -45,6 +45,16 @@ func (w *writecache) Put(b *blocks.Block) error {
return w.blockstore.Put(b) return w.blockstore.Put(b)
} }
func (w *writecache) PutMany(bs []*blocks.Block) error {
var good []*blocks.Block
for _, b := range bs {
if _, ok := w.cache.Get(b.Key()); !ok {
good = append(good, b)
}
}
return w.blockstore.PutMany(good)
}
func (w *writecache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) { func (w *writecache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
return w.blockstore.AllKeysChan(ctx) return w.blockstore.AllKeysChan(ctx)
} }
...@@ -88,3 +88,7 @@ func (c *callbackDatastore) Query(q dsq.Query) (dsq.Results, error) { ...@@ -88,3 +88,7 @@ func (c *callbackDatastore) Query(q dsq.Query) (dsq.Results, error) {
c.f() c.f()
return c.ds.Query(q) return c.ds.Query(q)
} }
func (c *callbackDatastore) Batch() (ds.Batch, error) {
return ds.NewBasicBatch(c), nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment