Unverified Commit b2d848ba authored by Hector Sanjuan's avatar Hector Sanjuan Committed by GitHub

Merge pull request #48 from ipfs/feat/batching-ds

Add BufferedDAG wrapping Batch as a DAGService.
parents c96181f0 879e11f1
0.7.0: QmdVNBLt7RMYnZwqBQJeexmSbTEDzERjBQUfs5McuPfEtB
0.7.1: QmR7TcHkR9nxkUorfi8XMTAMLUK7GiP64TWWBzY3aacc1o
......@@ -4,6 +4,8 @@ import (
"context"
"errors"
"runtime"
cid "github.com/ipfs/go-cid"
)
// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking.
......@@ -23,14 +25,15 @@ var ErrClosed = errors.New("error: batch closed")
// to add or remove a lot of nodes all at once.
//
// If the passed context is canceled, any in-progress commits are aborted.
func NewBatch(ctx context.Context, ds DAGService, opts ...BatchOption) *Batch {
//
func NewBatch(ctx context.Context, na NodeAdder, opts ...BatchOption) *Batch {
ctx, cancel := context.WithCancel(ctx)
bopts := defaultBatchOptions
for _, o := range opts {
o(&bopts)
}
return &Batch{
ds: ds,
na: na,
ctx: ctx,
cancel: cancel,
commitResults: make(chan error, ParallelBatchCommits),
......@@ -40,7 +43,7 @@ func NewBatch(ctx context.Context, ds DAGService, opts ...BatchOption) *Batch {
// Batch is a buffer for batching adds to a dag.
type Batch struct {
ds DAGService
na NodeAdder
ctx context.Context
cancel func()
......@@ -89,12 +92,12 @@ func (t *Batch) asyncCommit() {
return
}
}
go func(ctx context.Context, b []Node, result chan error, ds DAGService) {
go func(ctx context.Context, b []Node, result chan error, na NodeAdder) {
select {
case result <- ds.AddMany(ctx, b):
case result <- na.AddMany(ctx, b):
case <-ctx.Done():
}
}(t.ctx, t.nodes, t.commitResults, t.ds)
}(t.ctx, t.nodes, t.commitResults, t.na)
t.activeCommits++
t.nodes = make([]Node, 0, numBlocks)
......@@ -108,7 +111,7 @@ func (t *Batch) Add(ctx context.Context, nd Node) error {
return t.AddMany(ctx, []Node{nd})
}
// Add many calls Add for every given Node, thus batching and
// AddMany many calls Add for every given Node, thus batching and
// commiting them as needed.
func (t *Batch) AddMany(ctx context.Context, nodes []Node) error {
if t.err != nil {
......@@ -175,7 +178,7 @@ loop:
// Be nice and cleanup. These can take a *lot* of memory.
t.commitResults = nil
t.ds = nil
t.na = nil
t.ctx = nil
t.nodes = nil
t.size = 0
......@@ -216,3 +219,77 @@ func MaxNodesBatchOption(num int) BatchOption {
o.maxNodes = num
}
}
// BufferedDAG implements DAGService using a Batch NodeAdder to wrap add
// operations in the given DAGService. It will trigger Commit() before any
// non-Add operations, but otherwise calling Commit() is left to the user.
type BufferedDAG struct {
ds DAGService
b *Batch
}
// NewBufferedDAG creates a BufferedDAG using the given DAGService and the
// given options for the Batch NodeAdder.
func NewBufferedDAG(ctx context.Context, ds DAGService, opts ...BatchOption) *BufferedDAG {
return &BufferedDAG{
ds: ds,
b: NewBatch(ctx, ds, opts...),
}
}
// Commit calls commit on the Batch.
func (bd *BufferedDAG) Commit() error {
return bd.b.Commit()
}
// Add adds a new node using Batch.
func (bd *BufferedDAG) Add(ctx context.Context, n Node) error {
return bd.b.Add(ctx, n)
}
// AddMany adds many nodes using Batch.
func (bd *BufferedDAG) AddMany(ctx context.Context, nds []Node) error {
return bd.b.AddMany(ctx, nds)
}
// Get commits and gets a node from the DAGService.
func (bd *BufferedDAG) Get(ctx context.Context, c cid.Cid) (Node, error) {
err := bd.b.Commit()
if err != nil {
return nil, err
}
return bd.ds.Get(ctx, c)
}
// GetMany commits and gets nodes from the DAGService.
func (bd *BufferedDAG) GetMany(ctx context.Context, cs []cid.Cid) <-chan *NodeOption {
err := bd.b.Commit()
if err != nil {
ch := make(chan *NodeOption, 1)
defer close(ch)
ch <- &NodeOption{
Node: nil,
Err: err,
}
return ch
}
return bd.ds.GetMany(ctx, cs)
}
// Remove commits and removes a node from the DAGService.
func (bd *BufferedDAG) Remove(ctx context.Context, c cid.Cid) error {
err := bd.b.Commit()
if err != nil {
return err
}
return bd.ds.Remove(ctx, c)
}
// RemoveMany commits and removes nodes from the DAGService.
func (bd *BufferedDAG) RemoveMany(ctx context.Context, cs []cid.Cid) error {
err := bd.b.Commit()
if err != nil {
return err
}
return bd.ds.RemoveMany(ctx, cs)
}
......@@ -109,6 +109,26 @@ func TestBatch(t *testing.T) {
}
}
func TestBufferedDAG(t *testing.T) {
ds := newTestDag()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var bdag DAGService = NewBufferedDAG(ctx, ds)
for i := 0; i < 1000; i++ {
n := new(EmptyNode)
if err := bdag.Add(ctx, n); err != nil {
t.Fatal(err)
}
if _, err := bdag.Get(ctx, n.Cid()); err != nil {
t.Fatal(err)
}
if err := bdag.Remove(ctx, n.Cid()); err != nil {
t.Fatal(err)
}
}
}
func TestBatchOptions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
......
......@@ -31,6 +31,6 @@
"license": "",
"name": "go-ipld-format",
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "0.7.0"
"version": "0.7.1"
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment