Unverified Commit b15a4bd4 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #56 from ipfs/fix/parallel-commit

fix: change the batch size to avoid buffering too much
parents 2c2e0d19 80a00f43
...@@ -8,10 +8,10 @@ import ( ...@@ -8,10 +8,10 @@ import (
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
) )
// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking. // parallelBatchCommits is the number of batch commits that can be in-flight before blocking.
// TODO(ipfs/go-ipfs#4299): Experiment with multiple datastores, storage // TODO(ipfs/go-ipfs#4299): Experiment with multiple datastores, storage
// devices, and CPUs to find the right value/formula. // devices, and CPUs to find the right value/formula.
var ParallelBatchCommits = runtime.NumCPU() * 2 var parallelCommits = runtime.NumCPU()
// ErrNotCommited is returned when closing a batch that hasn't been successfully // ErrNotCommited is returned when closing a batch that hasn't been successfully
// committed. // committed.
...@@ -32,11 +32,15 @@ func NewBatch(ctx context.Context, na NodeAdder, opts ...BatchOption) *Batch { ...@@ -32,11 +32,15 @@ func NewBatch(ctx context.Context, na NodeAdder, opts ...BatchOption) *Batch {
for _, o := range opts { for _, o := range opts {
o(&bopts) o(&bopts)
} }
// Commit numCPU batches at once, but split the maximum buffer size over all commits in flight.
bopts.maxSize /= parallelCommits
bopts.maxNodes /= parallelCommits
return &Batch{ return &Batch{
na: na, na: na,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
commitResults: make(chan error, ParallelBatchCommits), commitResults: make(chan error, parallelCommits),
opts: bopts, opts: bopts,
} }
} }
...@@ -78,7 +82,7 @@ func (t *Batch) asyncCommit() { ...@@ -78,7 +82,7 @@ func (t *Batch) asyncCommit() {
if numBlocks == 0 { if numBlocks == 0 {
return return
} }
if t.activeCommits >= ParallelBatchCommits { if t.activeCommits >= parallelCommits {
select { select {
case err := <-t.commitResults: case err := <-t.commitResults:
t.activeCommits-- t.activeCommits--
...@@ -206,14 +210,16 @@ var defaultBatchOptions = batchOptions{ ...@@ -206,14 +210,16 @@ var defaultBatchOptions = batchOptions{
maxNodes: 128, maxNodes: 128,
} }
// MaxSizeBatchOption sets the maximum size of a Batch. // MaxSizeBatchOption sets the maximum amount of buffered data before writing
// blocks.
func MaxSizeBatchOption(size int) BatchOption { func MaxSizeBatchOption(size int) BatchOption {
return func(o *batchOptions) { return func(o *batchOptions) {
o.maxSize = size o.maxSize = size
} }
} }
// MaxNodesBatchOption sets the maximum number of nodes in a Batch. // MaxNodesBatchOption sets the maximum number of buffered nodes before writing
// blocks.
func MaxNodesBatchOption(num int) BatchOption { func MaxNodesBatchOption(num int) BatchOption {
return func(o *batchOptions) { return func(o *batchOptions) {
o.maxNodes = num o.maxNodes = num
......
...@@ -139,10 +139,10 @@ func TestBatchOptions(t *testing.T) { ...@@ -139,10 +139,10 @@ func TestBatchOptions(t *testing.T) {
wantMaxNodes := 500 wantMaxNodes := 500
d := newTestDag() d := newTestDag()
b := NewBatch(ctx, d, MaxSizeBatchOption(wantMaxSize), MaxNodesBatchOption(wantMaxNodes)) b := NewBatch(ctx, d, MaxSizeBatchOption(wantMaxSize), MaxNodesBatchOption(wantMaxNodes))
if b.opts.maxSize != wantMaxSize { if b.opts.maxSize != wantMaxSize/parallelCommits {
t.Fatalf("maxSize incorrect, want: %d, got: %d", wantMaxSize, b.opts.maxSize) t.Fatalf("maxSize incorrect, want: %d, got: %d", wantMaxSize, b.opts.maxSize)
} }
if b.opts.maxNodes != wantMaxNodes { if b.opts.maxNodes != wantMaxNodes/parallelCommits {
t.Fatalf("maxNodes incorrect, want: %d, got: %d", wantMaxNodes, b.opts.maxNodes) t.Fatalf("maxNodes incorrect, want: %d, got: %d", wantMaxNodes, b.opts.maxNodes)
} }
} }
......
...@@ -10,6 +10,8 @@ github.com/ipfs/go-cid v0.0.2 h1:tuuKaZPU1M6HcejsO3AcYWW8sZ8MTvyxfc4uqB4eFE8= ...@@ -10,6 +10,8 @@ github.com/ipfs/go-cid v0.0.2 h1:tuuKaZPU1M6HcejsO3AcYWW8sZ8MTvyxfc4uqB4eFE8=
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-ipfs-util v0.0.1 h1:Wz9bL2wB2YBJqggkA4dD7oSmqB4cAnpNbGrlHJulv50= github.com/ipfs/go-ipfs-util v0.0.1 h1:Wz9bL2wB2YBJqggkA4dD7oSmqB4cAnpNbGrlHJulv50=
github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc= github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc=
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 h1:5W7KhL8HVF3XCFOweFD3BNESdnO8ewyYTFT2R+/b8FQ= github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 h1:5W7KhL8HVF3XCFOweFD3BNESdnO8ewyYTFT2R+/b8FQ=
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment