Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-dms3
Commits
465044ab
Commit
465044ab
authored
8 years ago
by
Jeromy Johnson
Committed by
GitHub
8 years ago
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #3314 from ipfs/kevina/posinfo-2
Create a FilestoreNode object to carry PosInfo
parents
6f3ae5da
65ffff24
Changes
9
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
231 additions
and
14 deletions
+231
-14
commands/files/file.go
commands/files/file.go
+5
-0
core/coreunix/add.go
core/coreunix/add.go
+11
-1
core/coreunix/add_test.go
core/coreunix/add_test.go
+115
-1
importer/balanced/builder.go
importer/balanced/builder.go
+9
-3
importer/chunk/rabin.go
importer/chunk/rabin.go
+8
-2
importer/chunk/splitting.go
importer/chunk/splitting.go
+5
-0
importer/helpers/dagbuilder.go
importer/helpers/dagbuilder.go
+37
-7
importer/helpers/helpers.go
importer/helpers/helpers.go
+23
-0
thirdparty/posinfo/posinfo.go
thirdparty/posinfo/posinfo.go
+18
-0
No files found.
commands/files/file.go
View file @
465044ab
...
...
@@ -55,3 +55,8 @@ type SizeFile interface {
Size
()
(
int64
,
error
)
}
type
FileInfo
interface
{
FullPath
()
string
Stat
()
os
.
FileInfo
}
This diff is collapsed.
Click to expand it.
core/coreunix/add.go
View file @
465044ab
...
...
@@ -398,7 +398,12 @@ func (adder *Adder) addFile(file files.File) error {
// progress updates to the client (over the output channel)
var
reader
io
.
Reader
=
file
if
adder
.
Progress
{
reader
=
&
progressReader
{
file
:
file
,
out
:
adder
.
Out
}
rdr
:=
&
progressReader
{
file
:
file
,
out
:
adder
.
Out
}
if
fi
,
ok
:=
file
.
(
files
.
FileInfo
);
ok
{
reader
=
&
progressReader2
{
rdr
,
fi
}
}
else
{
reader
=
rdr
}
}
dagnode
,
err
:=
adder
.
add
(
reader
)
...
...
@@ -520,3 +525,8 @@ func (i *progressReader) Read(p []byte) (int, error) {
return
n
,
err
}
type
progressReader2
struct
{
*
progressReader
files
.
FileInfo
}
This diff is collapsed.
Click to expand it.
core/coreunix/add_test.go
View file @
465044ab
...
...
@@ -2,20 +2,26 @@ package coreunix
import
(
"bytes"
"context"
"io"
"io/ioutil"
"math/rand"
"os"
"testing"
"time"
"github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
"github.com/ipfs/go-ipfs/blockservice"
"github.com/ipfs/go-ipfs/commands/files"
"github.com/ipfs/go-ipfs/core"
dag
"github.com/ipfs/go-ipfs/merkledag"
"github.com/ipfs/go-ipfs/pin/gc"
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/repo/config"
pi
"github.com/ipfs/go-ipfs/thirdparty/posinfo"
"github.com/ipfs/go-ipfs/thirdparty/testutil"
"context"
cid
"gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid"
)
...
...
@@ -162,3 +168,111 @@ func TestAddGCLive(t *testing.T) {
t
.
Fatal
(
err
)
}
}
func
testAddWPosInfo
(
t
*
testing
.
T
,
rawLeaves
bool
)
{
r
:=
&
repo
.
Mock
{
C
:
config
.
Config
{
Identity
:
config
.
Identity
{
PeerID
:
"Qmfoo"
,
// required by offline node
},
},
D
:
testutil
.
ThreadSafeCloserMapDatastore
(),
}
node
,
err
:=
core
.
NewNode
(
context
.
Background
(),
&
core
.
BuildCfg
{
Repo
:
r
})
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
bs
:=
&
testBlockstore
{
GCBlockstore
:
node
.
Blockstore
,
expectedPath
:
"/tmp/foo.txt"
,
t
:
t
}
bserv
:=
blockservice
.
New
(
bs
,
node
.
Exchange
)
dserv
:=
dag
.
NewDAGService
(
bserv
)
adder
,
err
:=
NewAdder
(
context
.
Background
(),
node
.
Pinning
,
bs
,
dserv
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
adder
.
Out
=
make
(
chan
interface
{})
adder
.
Progress
=
true
adder
.
RawLeaves
=
rawLeaves
data
:=
make
([]
byte
,
5
*
1024
*
1024
)
rand
.
New
(
rand
.
NewSource
(
2
))
.
Read
(
data
)
// Rand.Read never returns an error
fileData
:=
ioutil
.
NopCloser
(
bytes
.
NewBuffer
(
data
))
fileInfo
:=
dummyFileInfo
{
"foo.txt"
,
int64
(
len
(
data
)),
time
.
Now
()}
file
:=
files
.
NewReaderFile
(
"foo.txt"
,
"/tmp/foo.txt"
,
fileData
,
&
fileInfo
)
go
func
()
{
defer
close
(
adder
.
Out
)
err
=
adder
.
AddFile
(
file
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
}()
for
_
=
range
adder
.
Out
{
}
if
bs
.
countAtOffsetZero
!=
2
{
t
.
Fatal
(
"expected 2 blocks with an offset at zero (one root and one leafh), got"
,
bs
.
countAtOffsetZero
)
}
if
bs
.
countAtOffsetNonZero
!=
19
{
// note: the exact number will depend on the size and the sharding algo. used
t
.
Fatal
(
"expected 19 blocks with an offset > 0, got"
,
bs
.
countAtOffsetNonZero
)
}
}
func
TestAddWPosInfo
(
t
*
testing
.
T
)
{
testAddWPosInfo
(
t
,
false
)
}
func
TestAddWPosInfoAndRawLeafs
(
t
*
testing
.
T
)
{
testAddWPosInfo
(
t
,
true
)
}
type
testBlockstore
struct
{
blockstore
.
GCBlockstore
expectedPath
string
t
*
testing
.
T
countAtOffsetZero
int
countAtOffsetNonZero
int
}
func
(
bs
*
testBlockstore
)
Put
(
block
blocks
.
Block
)
error
{
bs
.
CheckForPosInfo
(
block
)
return
bs
.
GCBlockstore
.
Put
(
block
)
}
func
(
bs
*
testBlockstore
)
PutMany
(
blocks
[]
blocks
.
Block
)
error
{
for
_
,
blk
:=
range
blocks
{
bs
.
CheckForPosInfo
(
blk
)
}
return
bs
.
GCBlockstore
.
PutMany
(
blocks
)
}
func
(
bs
*
testBlockstore
)
CheckForPosInfo
(
block
blocks
.
Block
)
error
{
fsn
,
ok
:=
block
.
(
*
pi
.
FilestoreNode
)
if
ok
{
posInfo
:=
fsn
.
PosInfo
if
posInfo
.
FullPath
!=
bs
.
expectedPath
{
bs
.
t
.
Fatal
(
"PosInfo does not have the expected path"
)
}
if
posInfo
.
Offset
==
0
{
bs
.
countAtOffsetZero
+=
1
}
else
{
bs
.
countAtOffsetNonZero
+=
1
}
}
return
nil
}
type
dummyFileInfo
struct
{
name
string
size
int64
modTime
time
.
Time
}
func
(
fi
*
dummyFileInfo
)
Name
()
string
{
return
fi
.
name
}
func
(
fi
*
dummyFileInfo
)
Size
()
int64
{
return
fi
.
size
}
func
(
fi
*
dummyFileInfo
)
Mode
()
os
.
FileMode
{
return
0
}
func
(
fi
*
dummyFileInfo
)
ModTime
()
time
.
Time
{
return
fi
.
modTime
}
func
(
fi
*
dummyFileInfo
)
IsDir
()
bool
{
return
false
}
func
(
fi
*
dummyFileInfo
)
Sys
()
interface
{}
{
return
nil
}
This diff is collapsed.
Click to expand it.
importer/balanced/builder.go
View file @
465044ab
...
...
@@ -9,10 +9,12 @@ import (
)
func
BalancedLayout
(
db
*
h
.
DagBuilderHelper
)
(
node
.
Node
,
error
)
{
var
offset
uint64
=
0
var
root
*
h
.
UnixfsNode
for
level
:=
0
;
!
db
.
Done
();
level
++
{
nroot
:=
h
.
NewUnixfsNode
()
db
.
SetPosInfo
(
nroot
,
0
)
// add our old root as a child of the new root.
if
root
!=
nil
{
// nil if it's the first node.
...
...
@@ -22,11 +24,13 @@ func BalancedLayout(db *h.DagBuilderHelper) (node.Node, error) {
}
// fill it up.
if
err
:=
fillNodeRec
(
db
,
nroot
,
level
);
err
!=
nil
{
if
err
:=
fillNodeRec
(
db
,
nroot
,
level
,
offset
);
err
!=
nil
{
return
nil
,
err
}
offset
=
nroot
.
FileSize
()
root
=
nroot
}
if
root
==
nil
{
root
=
h
.
NewUnixfsNode
()
...
...
@@ -50,7 +54,7 @@ func BalancedLayout(db *h.DagBuilderHelper) (node.Node, error) {
// it returns the total dataSize of the node, and a potential error
//
// warning: **children** pinned indirectly, but input node IS NOT pinned.
func
fillNodeRec
(
db
*
h
.
DagBuilderHelper
,
node
*
h
.
UnixfsNode
,
depth
int
)
error
{
func
fillNodeRec
(
db
*
h
.
DagBuilderHelper
,
node
*
h
.
UnixfsNode
,
depth
int
,
offset
uint64
)
error
{
if
depth
<
0
{
return
errors
.
New
(
"attempt to fillNode at depth < 0"
)
}
...
...
@@ -69,8 +73,9 @@ func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error {
// while we have room AND we're not done
for
node
.
NumChildren
()
<
db
.
Maxlinks
()
&&
!
db
.
Done
()
{
child
:=
h
.
NewUnixfsNode
()
db
.
SetPosInfo
(
child
,
offset
)
err
:=
fillNodeRec
(
db
,
child
,
depth
-
1
)
err
:=
fillNodeRec
(
db
,
child
,
depth
-
1
,
offset
)
if
err
!=
nil
{
return
err
}
...
...
@@ -78,6 +83,7 @@ func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error {
if
err
:=
node
.
AddChild
(
child
,
db
);
err
!=
nil
{
return
err
}
offset
+=
child
.
FileSize
()
}
return
nil
...
...
This diff is collapsed.
Click to expand it.
importer/chunk/rabin.go
View file @
465044ab
...
...
@@ -10,7 +10,8 @@ import (
var
IpfsRabinPoly
=
chunker
.
Pol
(
17437180132763653
)
type
Rabin
struct
{
r
*
chunker
.
Chunker
r
*
chunker
.
Chunker
reader
io
.
Reader
}
func
NewRabin
(
r
io
.
Reader
,
avgBlkSize
uint64
)
*
Rabin
{
...
...
@@ -25,7 +26,8 @@ func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin {
ch
:=
chunker
.
New
(
r
,
IpfsRabinPoly
,
h
,
avg
,
min
,
max
)
return
&
Rabin
{
r
:
ch
,
r
:
ch
,
reader
:
r
,
}
}
...
...
@@ -37,3 +39,7 @@ func (r *Rabin) NextBytes() ([]byte, error) {
return
ch
.
Data
,
nil
}
func
(
r
*
Rabin
)
Reader
()
io
.
Reader
{
return
r
.
reader
}
This diff is collapsed.
Click to expand it.
importer/chunk/splitting.go
View file @
465044ab
...
...
@@ -12,6 +12,7 @@ var log = logging.Logger("chunk")
var
DefaultBlockSize
int64
=
1024
*
256
type
Splitter
interface
{
Reader
()
io
.
Reader
NextBytes
()
([]
byte
,
error
)
}
...
...
@@ -77,3 +78,7 @@ func (ss *sizeSplitterv2) NextBytes() ([]byte, error) {
return
buf
[
:
n
],
nil
}
func
(
ss
*
sizeSplitterv2
)
Reader
()
io
.
Reader
{
return
ss
.
r
}
This diff is collapsed.
Click to expand it.
importer/helpers/dagbuilder.go
View file @
465044ab
package
helpers
import
(
"io"
"os"
"github.com/ipfs/go-ipfs/commands/files"
"github.com/ipfs/go-ipfs/importer/chunk"
dag
"github.com/ipfs/go-ipfs/merkledag"
...
...
@@ -17,6 +21,8 @@ type DagBuilderHelper struct {
nextData
[]
byte
// the next item to return.
maxlinks
int
batch
*
dag
.
Batch
fullPath
string
stat
os
.
FileInfo
}
type
DagBuilderParams
struct
{
...
...
@@ -34,13 +40,18 @@ type DagBuilderParams struct {
// Generate a new DagBuilderHelper from the given params, which data source comes
// from chunks object
func
(
dbp
*
DagBuilderParams
)
New
(
spl
chunk
.
Splitter
)
*
DagBuilderHelper
{
return
&
DagBuilderHelper
{
db
:=
&
DagBuilderHelper
{
dserv
:
dbp
.
Dagserv
,
spl
:
spl
,
rawLeaves
:
dbp
.
RawLeaves
,
maxlinks
:
dbp
.
Maxlinks
,
batch
:
dbp
.
Dagserv
.
Batch
(),
}
if
fi
,
ok
:=
spl
.
Reader
()
.
(
files
.
FileInfo
);
ok
{
db
.
fullPath
=
fi
.
FullPath
()
db
.
stat
=
fi
.
Stat
()
}
return
db
}
// prepareNext consumes the next item from the splitter and puts it
...
...
@@ -48,12 +59,14 @@ func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper {
// it will do nothing.
func
(
db
*
DagBuilderHelper
)
prepareNext
()
{
// if we already have data waiting to be consumed, we're ready
if
db
.
nextData
!=
nil
{
if
db
.
nextData
!=
nil
||
db
.
recvdErr
!=
nil
{
return
}
// TODO: handle err (which wasn't handled either when the splitter was channeled)
db
.
nextData
,
_
=
db
.
spl
.
NextBytes
()
db
.
nextData
,
db
.
recvdErr
=
db
.
spl
.
NextBytes
()
if
db
.
recvdErr
==
io
.
EOF
{
db
.
recvdErr
=
nil
}
}
// Done returns whether or not we're done consuming the incoming data.
...
...
@@ -61,17 +74,24 @@ func (db *DagBuilderHelper) Done() bool {
// ensure we have an accurate perspective on data
// as `done` this may be called before `next`.
db
.
prepareNext
()
// idempotent
if
db
.
recvdErr
!=
nil
{
return
false
}
return
db
.
nextData
==
nil
}
// Next returns the next chunk of data to be inserted into the dag
// if it returns nil, that signifies that the stream is at an end, and
// that the current building operation should finish
func
(
db
*
DagBuilderHelper
)
Next
()
[]
byte
{
func
(
db
*
DagBuilderHelper
)
Next
()
(
[]
byte
,
error
)
{
db
.
prepareNext
()
// idempotent
d
:=
db
.
nextData
db
.
nextData
=
nil
// signal we've consumed it
return
d
if
db
.
recvdErr
!=
nil
{
return
nil
,
db
.
recvdErr
}
else
{
return
d
,
nil
}
}
// GetDagServ returns the dagservice object this Helper is using
...
...
@@ -100,7 +120,11 @@ func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error {
}
func
(
db
*
DagBuilderHelper
)
GetNextDataNode
()
(
*
UnixfsNode
,
error
)
{
data
:=
db
.
Next
()
data
,
err
:=
db
.
Next
()
if
err
!=
nil
{
return
nil
,
err
}
if
data
==
nil
{
// we're done!
return
nil
,
nil
}
...
...
@@ -121,6 +145,12 @@ func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) {
}
}
func
(
db
*
DagBuilderHelper
)
SetPosInfo
(
node
*
UnixfsNode
,
offset
uint64
)
{
if
db
.
stat
!=
nil
{
node
.
SetPosInfo
(
offset
,
db
.
fullPath
,
db
.
stat
)
}
}
func
(
db
*
DagBuilderHelper
)
Add
(
node
*
UnixfsNode
)
(
node
.
Node
,
error
)
{
dn
,
err
:=
node
.
GetDagNode
()
if
err
!=
nil
{
...
...
This diff is collapsed.
Click to expand it.
importer/helpers/helpers.go
View file @
465044ab
...
...
@@ -3,9 +3,11 @@ package helpers
import
(
"context"
"fmt"
"os"
chunk
"github.com/ipfs/go-ipfs/importer/chunk"
dag
"github.com/ipfs/go-ipfs/merkledag"
pi
"github.com/ipfs/go-ipfs/thirdparty/posinfo"
ft
"github.com/ipfs/go-ipfs/unixfs"
node
"gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
...
...
@@ -43,6 +45,7 @@ type UnixfsNode struct {
rawnode
*
dag
.
RawNode
node
*
dag
.
ProtoNode
ufmt
*
ft
.
FSNode
posInfo
*
pi
.
PosInfo
}
// NewUnixfsNode creates a new Unixfs node to represent a file
...
...
@@ -144,9 +147,29 @@ func (n *UnixfsNode) FileSize() uint64 {
return
n
.
ufmt
.
FileSize
()
}
func
(
n
*
UnixfsNode
)
SetPosInfo
(
offset
uint64
,
fullPath
string
,
stat
os
.
FileInfo
)
{
n
.
posInfo
=
&
pi
.
PosInfo
{
offset
,
fullPath
,
stat
}
}
// getDagNode fills out the proper formatting for the unixfs node
// inside of a DAG node and returns the dag node
func
(
n
*
UnixfsNode
)
GetDagNode
()
(
node
.
Node
,
error
)
{
nd
,
err
:=
n
.
getBaseDagNode
()
if
err
!=
nil
{
return
nil
,
err
}
if
n
.
posInfo
!=
nil
{
return
&
pi
.
FilestoreNode
{
Node
:
nd
,
PosInfo
:
n
.
posInfo
,
},
nil
}
return
nd
,
nil
}
func
(
n
*
UnixfsNode
)
getBaseDagNode
()
(
node
.
Node
,
error
)
{
if
n
.
raw
{
return
n
.
rawnode
,
nil
}
...
...
This diff is collapsed.
Click to expand it.
thirdparty/posinfo/posinfo.go
0 → 100644
View file @
465044ab
package
posinfo
import
(
"os"
node
"gx/ipfs/QmZx42H5khbVQhV5odp66TApShV4XCujYazcvYduZ4TroB/go-ipld-node"
)
type
PosInfo
struct
{
Offset
uint64
FullPath
string
Stat
os
.
FileInfo
// can be nil
}
type
FilestoreNode
struct
{
node
.
Node
PosInfo
*
PosInfo
}
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment