Commit 1af7e81d authored by Steven Allen's avatar Steven Allen

port async batch commit code from ipfs

(ipfs/go-ipfs#4296)

1. Modern storage devices (i.e., SSDs) tend to be highly parallel.
2. Allows us to read and write at the same time (avoids pausing while flushing).

fixes https://github.com/ipfs/go-ipfs/issues/898#issuecomment-331849064
parent 52259789
package format
import (
"runtime"
cid "github.com/ipfs/go-cid"
)
// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking.
// TODO(ipfs/go-ipfs#4299): Experiment with multiple datastores, storage
// devices, and CPUs to find the right value/formula.
var ParallelBatchCommits = runtime.NumCPU() * 2
// NewBatch returns a node buffer (Batch) that buffers nodes internally and
// commits them to the underlying DAGService in batches. Use this if you intend
// to add a lot of nodes all at once.
func NewBatch(ds DAGService) *Batch {
return &Batch{
ds: ds,
MaxSize: 8 << 20,
ds: ds,
commitResults: make(chan error, ParallelBatchCommits),
MaxSize: 8 << 20,
// By default, only batch up to 128 nodes at a time.
// The current implementation of flatfs opens this many file
// descriptors at the same time for the optimized batch write.
MaxBlocks: 128,
MaxNodes: 128,
}
}
// Batch is a buffer for batching adds to a dag.
type Batch struct {
ds DAGService
// TODO: try to re-use memory.
nodes []Node
size int
MaxSize int
MaxBlocks int
activeCommits int
commitError error
commitResults chan error
nodes []Node
size int
MaxSize int
MaxNodes int
}
func (t *Batch) processResults() {
for t.activeCommits > 0 && t.commitError == nil {
select {
case err := <-t.commitResults:
t.activeCommits--
if err != nil {
t.commitError = err
}
default:
return
}
}
}
func (t *Batch) asyncCommit() {
numBlocks := len(t.nodes)
if numBlocks == 0 || t.commitError != nil {
return
}
if t.activeCommits >= ParallelBatchCommits {
err := <-t.commitResults
t.activeCommits--
if err != nil {
t.commitError = err
return
}
}
go func(b []Node) {
_, err := t.ds.AddMany(b)
t.commitResults <- err
}(t.nodes)
t.activeCommits++
t.nodes = make([]Node, 0, numBlocks)
t.size = 0
return
}
// Add a node to this batch of nodes, potentially committing the set of batched
// nodes to the underlying DAGService.
// Add adds a node to the batch and commits the batch if necessary.
func (t *Batch) Add(nd Node) (*cid.Cid, error) {
// Not strictly necessary but allows us to catch errors early.
t.processResults()
if t.commitError != nil {
return nil, t.commitError
}
t.nodes = append(t.nodes, nd)
t.size += len(nd.RawData())
if t.size > t.MaxSize || len(t.nodes) > t.MaxBlocks {
return nd.Cid(), t.Commit()
if t.size > t.MaxSize || len(t.nodes) > t.MaxNodes {
t.asyncCommit()
}
return nd.Cid(), nil
return nd.Cid(), t.commitError
}
// Commit commits the buffered of nodes to the underlying DAGService.
// Make sure to call this after you're done adding nodes to the batch to ensure
// that they're actually added to the DAGService.
// Commit commits batched nodes.
func (t *Batch) Commit() error {
_, err := t.ds.AddMany(t.nodes)
t.nodes = nil
t.size = 0
return err
t.asyncCommit()
for t.activeCommits > 0 && t.commitError == nil {
err := <-t.commitResults
t.activeCommits--
if err != nil {
t.commitError = err
}
}
return t.commitError
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment