Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-ld-format
Commits
b5c424b6
Commit
b5c424b6
authored
Oct 26, 2018
by
Hector Sanjuan
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add BatchDAG wrapping Batch as a DAGService.
parent
c96181f0
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
105 additions
and
8 deletions
+105
-8
batch.go
batch.go
+85
-8
batch_test.go
batch_test.go
+20
-0
No files found.
batch.go
View file @
b5c424b6
...
...
@@ -4,6 +4,8 @@ import (
"context"
"errors"
"runtime"
cid
"github.com/ipfs/go-cid"
)
// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking.
...
...
@@ -23,14 +25,15 @@ var ErrClosed = errors.New("error: batch closed")
// to add or remove a lot of nodes all at once.
//
// If the passed context is canceled, any in-progress commits are aborted.
func
NewBatch
(
ctx
context
.
Context
,
ds
DAGService
,
opts
...
BatchOption
)
*
Batch
{
//
func
NewBatch
(
ctx
context
.
Context
,
na
NodeAdder
,
opts
...
BatchOption
)
*
Batch
{
ctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
bopts
:=
defaultBatchOptions
for
_
,
o
:=
range
opts
{
o
(
&
bopts
)
}
return
&
Batch
{
ds
:
ds
,
na
:
na
,
ctx
:
ctx
,
cancel
:
cancel
,
commitResults
:
make
(
chan
error
,
ParallelBatchCommits
),
...
...
@@ -40,7 +43,7 @@ func NewBatch(ctx context.Context, ds DAGService, opts ...BatchOption) *Batch {
// Batch is a buffer for batching adds to a dag.
type
Batch
struct
{
ds
DAGService
na
NodeAdder
ctx
context
.
Context
cancel
func
()
...
...
@@ -89,12 +92,12 @@ func (t *Batch) asyncCommit() {
return
}
}
go
func
(
ctx
context
.
Context
,
b
[]
Node
,
result
chan
error
,
ds
DAGService
)
{
go
func
(
ctx
context
.
Context
,
b
[]
Node
,
result
chan
error
,
na
NodeAdder
)
{
select
{
case
result
<-
ds
.
AddMany
(
ctx
,
b
)
:
case
result
<-
na
.
AddMany
(
ctx
,
b
)
:
case
<-
ctx
.
Done
()
:
}
}(
t
.
ctx
,
t
.
nodes
,
t
.
commitResults
,
t
.
ds
)
}(
t
.
ctx
,
t
.
nodes
,
t
.
commitResults
,
t
.
na
)
t
.
activeCommits
++
t
.
nodes
=
make
([]
Node
,
0
,
numBlocks
)
...
...
@@ -108,7 +111,7 @@ func (t *Batch) Add(ctx context.Context, nd Node) error {
return
t
.
AddMany
(
ctx
,
[]
Node
{
nd
})
}
// Add many calls Add for every given Node, thus batching and
// Add
Many
many calls Add for every given Node, thus batching and
// commiting them as needed.
func
(
t
*
Batch
)
AddMany
(
ctx
context
.
Context
,
nodes
[]
Node
)
error
{
if
t
.
err
!=
nil
{
...
...
@@ -175,7 +178,7 @@ loop:
// Be nice and cleanup. These can take a *lot* of memory.
t
.
commitResults
=
nil
t
.
ds
=
nil
t
.
na
=
nil
t
.
ctx
=
nil
t
.
nodes
=
nil
t
.
size
=
0
...
...
@@ -216,3 +219,77 @@ func MaxNodesBatchOption(num int) BatchOption {
o
.
maxNodes
=
num
}
}
// BufferedDAG implements DAGService using a Batch NodeAdder to wrap add
// operations in the given DAGService. It will trigger Commit() before any
// non-Add operations, but otherwise calling Commit() is left to the user.
type
BufferedDAG
struct
{
ds
DAGService
b
*
Batch
}
// NewBufferedDAG creates a BufferedDAG using the given DAGService and the
// given options for the Batch NodeAdder.
func
NewBufferedDAG
(
ctx
context
.
Context
,
ds
DAGService
,
opts
...
BatchOption
)
*
BufferedDAG
{
return
&
BufferedDAG
{
ds
:
ds
,
b
:
NewBatch
(
ctx
,
ds
,
opts
...
),
}
}
// Commit calls commit on the Batch.
func
(
bd
*
BufferedDAG
)
Commit
()
error
{
return
bd
.
b
.
Commit
()
}
// Add adds a new node using Batch.
func
(
bd
*
BufferedDAG
)
Add
(
ctx
context
.
Context
,
n
Node
)
error
{
return
bd
.
b
.
Add
(
ctx
,
n
)
}
// AddMany adds many nodes using Batch.
func
(
bd
*
BufferedDAG
)
AddMany
(
ctx
context
.
Context
,
nds
[]
Node
)
error
{
return
bd
.
b
.
AddMany
(
ctx
,
nds
)
}
// Get commits and gets a node from the DAGService.
func
(
bd
*
BufferedDAG
)
Get
(
ctx
context
.
Context
,
c
cid
.
Cid
)
(
Node
,
error
)
{
err
:=
bd
.
b
.
Commit
()
if
err
!=
nil
{
return
nil
,
err
}
return
bd
.
ds
.
Get
(
ctx
,
c
)
}
// GetMany commits and gets nodes from the DAGService.
func
(
bd
*
BufferedDAG
)
GetMany
(
ctx
context
.
Context
,
cs
[]
cid
.
Cid
)
<-
chan
*
NodeOption
{
err
:=
bd
.
b
.
Commit
()
if
err
!=
nil
{
ch
:=
make
(
chan
*
NodeOption
,
1
)
defer
close
(
ch
)
ch
<-
&
NodeOption
{
Node
:
nil
,
Err
:
err
,
}
return
ch
}
return
bd
.
ds
.
GetMany
(
ctx
,
cs
)
}
// Remove commits and removes a node from the DAGService.
func
(
bd
*
BufferedDAG
)
Remove
(
ctx
context
.
Context
,
c
cid
.
Cid
)
error
{
err
:=
bd
.
b
.
Commit
()
if
err
!=
nil
{
return
err
}
return
bd
.
ds
.
Remove
(
ctx
,
c
)
}
// RemoveMany commits and removes nodes from the DAGService.
func
(
bd
*
BufferedDAG
)
RemoveMany
(
ctx
context
.
Context
,
cs
[]
cid
.
Cid
)
error
{
err
:=
bd
.
b
.
Commit
()
if
err
!=
nil
{
return
err
}
return
bd
.
ds
.
RemoveMany
(
ctx
,
cs
)
}
batch_test.go
View file @
b5c424b6
...
...
@@ -109,6 +109,26 @@ func TestBatch(t *testing.T) {
}
}
func
TestBufferedDAG
(
t
*
testing
.
T
)
{
ds
:=
newTestDag
()
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
defer
cancel
()
var
bdag
DAGService
=
NewBufferedDAG
(
ctx
,
ds
)
for
i
:=
0
;
i
<
1000
;
i
++
{
n
:=
new
(
EmptyNode
)
if
err
:=
bdag
.
Add
(
ctx
,
n
);
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
_
,
err
:=
bdag
.
Get
(
ctx
,
n
.
Cid
());
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
err
:=
bdag
.
Remove
(
ctx
,
n
.
Cid
());
err
!=
nil
{
t
.
Fatal
(
err
)
}
}
}
func
TestBatchOptions
(
t
*
testing
.
T
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
defer
cancel
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment