batch.go 2.43 KB
Newer Older
1 2 3
package format

import (
4
	"context"
5 6
	"runtime"

7 8 9
	cid "github.com/ipfs/go-cid"
)

10 11 12 13 14
// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking.
// TODO(ipfs/go-ipfs#4299): Experiment with multiple datastores, storage
// devices, and CPUs to find the right value/formula.
var ParallelBatchCommits = runtime.NumCPU() * 2

15 16 17 18
// NewBatch returns a node buffer (Batch) that buffers nodes internally and
// commits them to the underlying DAGService in batches. Use this if you intend
// to add a lot of nodes all at once.
func NewBatch(ds DAGService) *Batch {
19
	return &Batch{
20 21 22
		ds:            ds,
		commitResults: make(chan error, ParallelBatchCommits),
		MaxSize:       8 << 20,
23 24 25 26

		// By default, only batch up to 128 nodes at a time.
		// The current implementation of flatfs opens this many file
		// descriptors at the same time for the optimized batch write.
27
		MaxNodes: 128,
28 29 30
	}
}

31
// Batch is a buffer for batching adds to a dag.
32 33 34
type Batch struct {
	ds DAGService

35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
	activeCommits int
	commitError   error
	commitResults chan error

	nodes []Node
	size  int

	MaxSize  int
	MaxNodes int
}

func (t *Batch) processResults() {
	for t.activeCommits > 0 && t.commitError == nil {
		select {
		case err := <-t.commitResults:
			t.activeCommits--
			if err != nil {
				t.commitError = err
			}
		default:
			return
		}
	}
}

func (t *Batch) asyncCommit() {
	numBlocks := len(t.nodes)
	if numBlocks == 0 || t.commitError != nil {
		return
	}
	if t.activeCommits >= ParallelBatchCommits {
		err := <-t.commitResults
		t.activeCommits--

		if err != nil {
			t.commitError = err
			return
		}
	}
	go func(b []Node) {
75
		err := t.ds.AddMany(context.Background(), b)
76 77 78 79 80 81 82 83
		t.commitResults <- err
	}(t.nodes)

	t.activeCommits++
	t.nodes = make([]Node, 0, numBlocks)
	t.size = 0

	return
84 85
}

86
// Add adds a node to the batch and commits the batch if necessary.
87
func (t *Batch) Add(nd Node) (*cid.Cid, error) {
88 89 90 91 92 93
	// Not strictly necessary but allows us to catch errors early.
	t.processResults()
	if t.commitError != nil {
		return nil, t.commitError
	}

94 95
	t.nodes = append(t.nodes, nd)
	t.size += len(nd.RawData())
96 97
	if t.size > t.MaxSize || len(t.nodes) > t.MaxNodes {
		t.asyncCommit()
98
	}
99
	return nd.Cid(), t.commitError
100 101
}

102
// Commit commits batched nodes.
103
func (t *Batch) Commit() error {
104 105 106 107 108 109 110 111 112 113
	t.asyncCommit()
	for t.activeCommits > 0 && t.commitError == nil {
		err := <-t.commitResults
		t.activeCommits--
		if err != nil {
			t.commitError = err
		}
	}

	return t.commitError
114
}