Commit 6cf32cc8 authored by Steven Allen's avatar Steven Allen

add context to batch

I considered (well, implemented then threw it away) allowing contexts on all
calls to Batch (Add, Commit, etc). However, really, you should treat a batch
as a single large "operation".

I also went down the road of generalizing batches to sessions. However, it
became immediately obvious that permitting add *and* remove *and* fetch would
require a lot of bookkeeping and that you'd lose a lot of performance. So, we'll
do that separately.
parent 44a78014
......@@ -2,9 +2,8 @@ package format
import (
"context"
"errors"
"runtime"
cid "github.com/ipfs/go-cid"
)
// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking.
......@@ -12,12 +11,24 @@ import (
// devices, and CPUs to find the right value/formula.
var ParallelBatchCommits = runtime.NumCPU() * 2
// ErrNotCommited is returned when closing a batch that hasn't been successfully
// committed.
var ErrNotCommited = errors.New("error: batch not commited")
// ErrClosed is returned when operating on a batch that has already been closed.
var ErrClosed = errors.New("error: batch closed")
// NewBatch returns a node buffer (Batch) that buffers nodes internally and
// commits them to the underlying DAGService in batches. Use this if you intend
// to add a lot of nodes all at once.
func NewBatch(ds DAGService) *Batch {
// to add or remove a lot of nodes all at once.
//
// If the passed context is canceled, any in-progress commits are aborted.
func NewBatch(ctx context.Context, ds DAGService) *Batch {
ctx, cancel := context.WithCancel(ctx)
return &Batch{
ds: ds,
ctx: ctx,
cancel: cancel,
commitResults: make(chan error, ParallelBatchCommits),
MaxSize: 8 << 20,
......@@ -32,8 +43,11 @@ func NewBatch(ds DAGService) *Batch {
type Batch struct {
ds DAGService
ctx context.Context
cancel func()
activeCommits int
commitError error
err error
commitResults chan error
nodes []Node
......@@ -44,12 +58,13 @@ type Batch struct {
}
func (t *Batch) processResults() {
for t.activeCommits > 0 && t.commitError == nil {
for t.activeCommits > 0 {
select {
case err := <-t.commitResults:
t.activeCommits--
if err != nil {
t.commitError = err
t.setError(err)
return
}
default:
return
......@@ -59,22 +74,29 @@ func (t *Batch) processResults() {
func (t *Batch) asyncCommit() {
numBlocks := len(t.nodes)
if numBlocks == 0 || t.commitError != nil {
if numBlocks == 0 {
return
}
if t.activeCommits >= ParallelBatchCommits {
err := <-t.commitResults
t.activeCommits--
select {
case err := <-t.commitResults:
t.activeCommits--
if err != nil {
t.commitError = err
if err != nil {
t.setError(err)
return
}
case <-t.ctx.Done():
t.setError(t.ctx.Err())
return
}
}
go func(b []Node) {
err := t.ds.AddMany(context.Background(), b)
t.commitResults <- err
}(t.nodes)
go func(ctx context.Context, b []Node, result chan error, ds DAGService) {
select {
case result <- ds.AddMany(ctx, b):
case <-ctx.Done():
}
}(t.ctx, t.nodes, t.commitResults, t.ds)
t.activeCommits++
t.nodes = make([]Node, 0, numBlocks)
......@@ -84,31 +106,72 @@ func (t *Batch) asyncCommit() {
}
// Add adds a node to the batch and commits the batch if necessary.
func (t *Batch) Add(nd Node) (*cid.Cid, error) {
func (t *Batch) Add(nd Node) error {
if t.err != nil {
return t.err
}
// Not strictly necessary but allows us to catch errors early.
t.processResults()
if t.commitError != nil {
return nil, t.commitError
if t.err != nil {
return t.err
}
t.nodes = append(t.nodes, nd)
t.size += len(nd.RawData())
if t.size > t.MaxSize || len(t.nodes) > t.MaxNodes {
t.asyncCommit()
}
return nd.Cid(), t.commitError
return t.err
}
// Commit commits batched nodes.
func (t *Batch) Commit() error {
if t.err != nil {
return t.err
}
t.asyncCommit()
for t.activeCommits > 0 && t.commitError == nil {
err := <-t.commitResults
t.activeCommits--
if err != nil {
t.commitError = err
loop:
for t.activeCommits > 0 {
select {
case err := <-t.commitResults:
t.activeCommits--
if err != nil {
t.setError(err)
break loop
}
case <-t.ctx.Done():
t.setError(t.ctx.Err())
break loop
}
}
return t.err
}
func (t *Batch) setError(err error) {
t.err = err
t.cancel()
// Drain as much as we can without blocking.
loop:
for {
select {
case <-t.commitResults:
default:
break loop
}
}
return t.commitError
// Be nice and cleanup. These can take a *lot* of memory.
t.commitResults = nil
t.ds = nil
t.ctx = nil
t.nodes = nil
t.size = 0
t.activeCommits = 0
}
......@@ -76,13 +76,16 @@ func (d *testDag) RemoveMany(ctx context.Context, cids []*cid.Cid) error {
var _ DAGService = new(testDag)
func TestBatch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := newTestDag()
b := NewBatch(d)
b := NewBatch(ctx, d)
for i := 0; i < 1000; i++ {
// It would be great if we could use *many* different nodes here
// but we can't add any dependencies and I don't feel like adding
// any more testing code.
if _, err := b.Add(new(EmptyNode)); err != nil {
if err := b.Add(new(EmptyNode)); err != nil {
t.Fatal(err)
}
}
......@@ -90,7 +93,7 @@ func TestBatch(t *testing.T) {
t.Fatal(err)
}
n, err := d.Get(context.Background(), new(EmptyNode).Cid())
n, err := d.Get(ctx, new(EmptyNode).Cid())
if err != nil {
t.Fatal(err)
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment