batch: add functional opts to Batch

License: MIT
Signed-off-by: default avatarAdrian Lanzafame <adrianlanzafame92@gmail.com>
parent 6b969cac
......@@ -23,19 +23,18 @@ var ErrClosed = errors.New("error: batch closed")
// to add or remove a lot of nodes all at once.
//
// If the passed context is canceled, any in-progress commits are aborted.
func NewBatch(ctx context.Context, ds DAGService) *Batch {
func NewBatch(ctx context.Context, ds DAGService, opts ...BatchOption) *Batch {
ctx, cancel := context.WithCancel(ctx)
bopts := defaultBatchOptions
for _, o := range opts {
o(&bopts)
}
return &Batch{
ds: ds,
ctx: ctx,
cancel: cancel,
commitResults: make(chan error, ParallelBatchCommits),
MaxSize: 8 << 20,
// By default, only batch up to 128 nodes at a time.
// The current implementation of flatfs opens this many file
// descriptors at the same time for the optimized batch write.
MaxNodes: 128,
opts: bopts,
}
}
......@@ -53,8 +52,7 @@ type Batch struct {
nodes []Node
size int
MaxSize int
MaxNodes int
opts batchOptions
}
func (t *Batch) processResults() {
......@@ -120,7 +118,7 @@ func (t *Batch) Add(nd Node) error {
t.nodes = append(t.nodes, nd)
t.size += len(nd.RawData())
if t.size > t.MaxSize || len(t.nodes) > t.MaxNodes {
if t.size > t.opts.maxSize || len(t.nodes) > t.opts.maxNodes {
t.asyncCommit()
}
return t.err
......@@ -175,3 +173,38 @@ loop:
t.size = 0
t.activeCommits = 0
}
// BatchOption provides a way of setting internal options of
// a Batch.
//
// See this post about the "functional options" pattern:
// http://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis
type BatchOption func(o *batchOptions)
type batchOptions struct {
maxSize int
maxNodes int
}
var defaultBatchOptions = batchOptions{
maxSize: 8 << 20,
// By default, only batch up to 128 nodes at a time.
// The current implementation of flatfs opens this many file
// descriptors at the same time for the optimized batch write.
maxNodes: 128,
}
// MaxSizeBatchOption sets the maximum size of a Batch.
func MaxSizeBatchOption(size int) BatchOption {
return func(o *batchOptions) {
o.maxSize = size
}
}
// MaxNodesBatchOption sets the maximum number of nodes in a Batch.
func MaxNodesBatchOption(num int) func(o *batchOptions) {
return func(o *batchOptions) {
o.maxNodes = num
}
}
......@@ -108,3 +108,19 @@ func TestBatch(t *testing.T) {
t.Fatal("should have one node")
}
}
func TestBatchOptions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wantMaxSize := 8 << 10
wantMaxNodes := 500
d := newTestDag()
b := NewBatch(ctx, d, MaxSizeBatchOption(wantMaxSize), MaxNodesBatchOption(wantMaxNodes))
if b.opts.maxSize != wantMaxSize {
t.Fatalf("maxSize incorrect, want: %d, got: %d", wantMaxSize, b.opts.maxSize)
}
if b.opts.maxNodes != wantMaxNodes {
t.Fatalf("maxNodes incorrect, want: %d, got: %d", wantMaxNodes, b.opts.maxNodes)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment