Commit 80a00f43 authored by Steven Allen's avatar Steven Allen

fix: change the batch size to avoid buffering too much

Otherwise, the max outstanding blocks/size depends on the number of cores.
parent 2c2e0d19
......@@ -8,10 +8,10 @@ import (
cid "github.com/ipfs/go-cid"
)
// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking.
// parallelBatchCommits is the number of batch commits that can be in-flight before blocking.
// TODO(ipfs/go-ipfs#4299): Experiment with multiple datastores, storage
// devices, and CPUs to find the right value/formula.
var ParallelBatchCommits = runtime.NumCPU() * 2
var parallelCommits = runtime.NumCPU()
// ErrNotCommited is returned when closing a batch that hasn't been successfully
// committed.
......@@ -32,11 +32,15 @@ func NewBatch(ctx context.Context, na NodeAdder, opts ...BatchOption) *Batch {
for _, o := range opts {
o(&bopts)
}
// Commit numCPU batches at once, but split the maximum buffer size over all commits in flight.
bopts.maxSize /= parallelCommits
bopts.maxNodes /= parallelCommits
return &Batch{
na: na,
ctx: ctx,
cancel: cancel,
commitResults: make(chan error, ParallelBatchCommits),
commitResults: make(chan error, parallelCommits),
opts: bopts,
}
}
......@@ -78,7 +82,7 @@ func (t *Batch) asyncCommit() {
if numBlocks == 0 {
return
}
if t.activeCommits >= ParallelBatchCommits {
if t.activeCommits >= parallelCommits {
select {
case err := <-t.commitResults:
t.activeCommits--
......@@ -206,14 +210,16 @@ var defaultBatchOptions = batchOptions{
maxNodes: 128,
}
// MaxSizeBatchOption sets the maximum size of a Batch.
// MaxSizeBatchOption sets the maximum amount of buffered data before writing
// blocks.
func MaxSizeBatchOption(size int) BatchOption {
return func(o *batchOptions) {
o.maxSize = size
}
}
// MaxNodesBatchOption sets the maximum number of nodes in a Batch.
// MaxNodesBatchOption sets the maximum number of buffered nodes before writing
// blocks.
func MaxNodesBatchOption(num int) BatchOption {
return func(o *batchOptions) {
o.maxNodes = num
......
......@@ -139,10 +139,10 @@ func TestBatchOptions(t *testing.T) {
wantMaxNodes := 500
d := newTestDag()
b := NewBatch(ctx, d, MaxSizeBatchOption(wantMaxSize), MaxNodesBatchOption(wantMaxNodes))
if b.opts.maxSize != wantMaxSize {
if b.opts.maxSize != wantMaxSize/parallelCommits {
t.Fatalf("maxSize incorrect, want: %d, got: %d", wantMaxSize, b.opts.maxSize)
}
if b.opts.maxNodes != wantMaxNodes {
if b.opts.maxNodes != wantMaxNodes/parallelCommits {
t.Fatalf("maxNodes incorrect, want: %d, got: %d", wantMaxNodes, b.opts.maxNodes)
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment