Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-dms3
Commits
b4a38541
Commit
b4a38541
authored
Nov 15, 2015
by
rht
Committed by
Jeromy
Jan 12, 2016
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Remove chunk channels
License: MIT Signed-off-by:
rht
<
rhtbot@gmail.com
>
parent
a961b1f7
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
23 additions
and
53 deletions
+23
-53
importer/balanced/balanced_test.go
importer/balanced/balanced_test.go
+1
-4
importer/helpers/dagbuilder.go
importer/helpers/dagbuilder.go
+11
-21
importer/importer.go
importer/importer.go
+2
-8
importer/trickle/trickle_test.go
importer/trickle/trickle_test.go
+5
-14
unixfs/mod/dagmodifier.go
unixfs/mod/dagmodifier.go
+4
-6
No files found.
importer/balanced/balanced_test.go
View file @
b4a38541
...
...
@@ -22,15 +22,12 @@ import (
// TODO: extract these tests and more as a generic layout test suite
func
buildTestDag
(
ds
dag
.
DAGService
,
spl
chunk
.
Splitter
)
(
*
dag
.
Node
,
error
)
{
// Start the splitter
blkch
,
errs
:=
chunk
.
Chan
(
spl
)
dbp
:=
h
.
DagBuilderParams
{
Dagserv
:
ds
,
Maxlinks
:
h
.
DefaultLinksPerBlock
,
}
return
BalancedLayout
(
dbp
.
New
(
blkch
,
errs
))
return
BalancedLayout
(
dbp
.
New
(
spl
))
}
func
getTestDag
(
t
*
testing
.
T
,
ds
dag
.
DAGService
,
size
int64
,
blksize
int64
)
(
*
dag
.
Node
,
[]
byte
)
{
...
...
importer/helpers/dagbuilder.go
View file @
b4a38541
package
helpers
import
(
"github.com/ipfs/go-ipfs/importer/chunk"
dag
"github.com/ipfs/go-ipfs/merkledag"
)
...
...
@@ -8,8 +9,7 @@ import (
// efficiently create unixfs dag trees
type
DagBuilderHelper
struct
{
dserv
dag
.
DAGService
in
<-
chan
[]
byte
errs
<-
chan
error
spl
chunk
.
Splitter
recvdErr
error
nextData
[]
byte
// the next item to return.
maxlinks
int
...
...
@@ -24,45 +24,35 @@ type DagBuilderParams struct {
Dagserv
dag
.
DAGService
}
// Generate a new DagBuilderHelper from the given params,
using 'in' as a
//
data source
func
(
dbp
*
DagBuilderParams
)
New
(
in
<-
chan
[]
byte
,
errs
<-
chan
erro
r
)
*
DagBuilderHelper
{
// Generate a new DagBuilderHelper from the given params,
which data source comes
//
from chunks object
func
(
dbp
*
DagBuilderParams
)
New
(
spl
chunk
.
Splitte
r
)
*
DagBuilderHelper
{
return
&
DagBuilderHelper
{
dserv
:
dbp
.
Dagserv
,
in
:
in
,
errs
:
errs
,
spl
:
spl
,
maxlinks
:
dbp
.
Maxlinks
,
batch
:
dbp
.
Dagserv
.
Batch
(),
}
}
// prepareNext consumes the next item from the
channel
and puts it
// prepareNext consumes the next item from the
splitter
and puts it
// in the nextData field. it is idempotent-- if nextData is full
// it will do nothing.
//
// i realized that building the dag becomes _a lot_ easier if we can
// "peek" the "are done yet?" (i.e. not consume it from the channel)
func
(
db
*
DagBuilderHelper
)
prepareNext
()
{
if
db
.
in
==
nil
{
// if our input is nil, there is "nothing to do". we're done.
// as if there was no data at all. (a sort of zero-value)
return
}
// if we already have data waiting to be consumed, we're ready.
// if we already have data waiting to be consumed, we're ready
if
db
.
nextData
!=
nil
{
return
}
// if it's closed, nextData will be correctly set to nil, signaling
// that we're done consuming from the channel.
db
.
nextData
=
<-
db
.
in
// TODO: handle err (which wasn't handled either when the splitter was channeled)
db
.
nextData
,
_
=
db
.
spl
.
NextBytes
()
}
// Done returns whether or not we're done consuming the incoming data.
func
(
db
*
DagBuilderHelper
)
Done
()
bool
{
// ensure we have an accurate perspective on data
// as `done` this may be called before `next`.
//db.prepareNext() // idempotent
db
.
prepareNext
()
// idempotent
return
db
.
nextData
==
nil
}
...
...
importer/importer.go
View file @
b4a38541
...
...
@@ -39,25 +39,19 @@ func BuildDagFromFile(fpath string, ds dag.DAGService) (*dag.Node, error) {
}
func
BuildDagFromReader
(
ds
dag
.
DAGService
,
spl
chunk
.
Splitter
)
(
*
dag
.
Node
,
error
)
{
// Start the splitter
blkch
,
errch
:=
chunk
.
Chan
(
spl
)
dbp
:=
h
.
DagBuilderParams
{
Dagserv
:
ds
,
Maxlinks
:
h
.
DefaultLinksPerBlock
,
}
return
bal
.
BalancedLayout
(
dbp
.
New
(
blkch
,
errch
))
return
bal
.
BalancedLayout
(
dbp
.
New
(
spl
))
}
func
BuildTrickleDagFromReader
(
ds
dag
.
DAGService
,
spl
chunk
.
Splitter
)
(
*
dag
.
Node
,
error
)
{
// Start the splitter
blkch
,
errch
:=
chunk
.
Chan
(
spl
)
dbp
:=
h
.
DagBuilderParams
{
Dagserv
:
ds
,
Maxlinks
:
h
.
DefaultLinksPerBlock
,
}
return
trickle
.
TrickleLayout
(
dbp
.
New
(
blkch
,
errch
))
return
trickle
.
TrickleLayout
(
dbp
.
New
(
spl
))
}
importer/trickle/trickle_test.go
View file @
b4a38541
...
...
@@ -21,15 +21,12 @@ import (
)
func
buildTestDag
(
ds
merkledag
.
DAGService
,
spl
chunk
.
Splitter
)
(
*
merkledag
.
Node
,
error
)
{
// Start the splitter
blkch
,
errs
:=
chunk
.
Chan
(
spl
)
dbp
:=
h
.
DagBuilderParams
{
Dagserv
:
ds
,
Maxlinks
:
h
.
DefaultLinksPerBlock
,
}
nd
,
err
:=
TrickleLayout
(
dbp
.
New
(
blkch
,
errs
))
nd
,
err
:=
TrickleLayout
(
dbp
.
New
(
spl
))
if
err
!=
nil
{
return
nil
,
err
}
...
...
@@ -441,10 +438,9 @@ func TestAppend(t *testing.T) {
}
r
:=
bytes
.
NewReader
(
should
[
nbytes
/
2
:
])
blks
,
errs
:=
chunk
.
Chan
(
chunk
.
NewSizeSplitter
(
r
,
500
))
ctx
:=
context
.
Background
()
nnode
,
err
:=
TrickleAppend
(
ctx
,
nd
,
dbp
.
New
(
blks
,
errs
))
nnode
,
err
:=
TrickleAppend
(
ctx
,
nd
,
dbp
.
New
(
chunk
.
NewSizeSplitter
(
r
,
500
)
))
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
...
...
@@ -494,9 +490,8 @@ func TestMultipleAppends(t *testing.T) {
ctx
:=
context
.
Background
()
for
i
:=
0
;
i
<
len
(
should
);
i
++
{
blks
,
errs
:=
chunk
.
Chan
(
spl
(
bytes
.
NewReader
(
should
[
i
:
i
+
1
])))
nnode
,
err
:=
TrickleAppend
(
ctx
,
nd
,
dbp
.
New
(
blks
,
errs
))
nnode
,
err
:=
TrickleAppend
(
ctx
,
nd
,
dbp
.
New
(
spl
(
bytes
.
NewReader
(
should
[
i
:
i
+
1
]))
))
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
...
...
@@ -538,17 +533,13 @@ func TestAppendSingleBytesToEmpty(t *testing.T) {
spl
:=
chunk
.
SizeSplitterGen
(
500
)
blks
,
errs
:=
chunk
.
Chan
(
spl
(
bytes
.
NewReader
(
data
[
:
1
])))
ctx
:=
context
.
Background
()
nnode
,
err
:=
TrickleAppend
(
ctx
,
nd
,
dbp
.
New
(
blks
,
errs
))
nnode
,
err
:=
TrickleAppend
(
ctx
,
nd
,
dbp
.
New
(
spl
(
bytes
.
NewReader
(
data
[
:
1
]))
))
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
blks
,
errs
=
chunk
.
Chan
(
spl
(
bytes
.
NewReader
(
data
[
1
:
])))
nnode
,
err
=
TrickleAppend
(
ctx
,
nnode
,
dbp
.
New
(
blks
,
errs
))
nnode
,
err
=
TrickleAppend
(
ctx
,
nnode
,
dbp
.
New
(
spl
(
bytes
.
NewReader
(
data
[
1
:
]))))
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
...
...
unixfs/mod/dagmodifier.go
View file @
b4a38541
...
...
@@ -103,8 +103,7 @@ func (zr zeroReader) Read(b []byte) (int, error) {
func
(
dm
*
DagModifier
)
expandSparse
(
size
int64
)
error
{
r
:=
io
.
LimitReader
(
zeroReader
{},
size
)
spl
:=
chunk
.
NewSizeSplitter
(
r
,
4096
)
blks
,
errs
:=
chunk
.
Chan
(
spl
)
nnode
,
err
:=
dm
.
appendData
(
dm
.
curNode
,
blks
,
errs
)
nnode
,
err
:=
dm
.
appendData
(
dm
.
curNode
,
spl
)
if
err
!=
nil
{
return
err
}
...
...
@@ -191,8 +190,7 @@ func (dm *DagModifier) Sync() error {
// need to write past end of current dag
if
!
done
{
blks
,
errs
:=
chunk
.
Chan
(
dm
.
splitter
(
dm
.
wrBuf
))
nd
,
err
=
dm
.
appendData
(
dm
.
curNode
,
blks
,
errs
)
nd
,
err
=
dm
.
appendData
(
dm
.
curNode
,
dm
.
splitter
(
dm
.
wrBuf
))
if
err
!=
nil
{
return
err
}
...
...
@@ -286,13 +284,13 @@ func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader)
}
// appendData appends the blocks from the given chan to the end of this dag
func
(
dm
*
DagModifier
)
appendData
(
node
*
mdag
.
Node
,
blks
<-
chan
[]
byte
,
errs
<-
chan
erro
r
)
(
*
mdag
.
Node
,
error
)
{
func
(
dm
*
DagModifier
)
appendData
(
node
*
mdag
.
Node
,
spl
chunk
.
Splitte
r
)
(
*
mdag
.
Node
,
error
)
{
dbp
:=
&
help
.
DagBuilderParams
{
Dagserv
:
dm
.
dagserv
,
Maxlinks
:
help
.
DefaultLinksPerBlock
,
}
return
trickle
.
TrickleAppend
(
dm
.
ctx
,
node
,
dbp
.
New
(
blks
,
errs
))
return
trickle
.
TrickleAppend
(
dm
.
ctx
,
node
,
dbp
.
New
(
spl
))
}
// Read data from this dag starting at the current offset
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment