Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-unixfs
Commits
1ff12898
Commit
1ff12898
authored
Jan 26, 2015
by
Jeromy Johnson
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #655 from jbenet/feat/dagseek
Implement io.Seeker on unixio.DAGReader
parents
6e12a491
7c05bd61
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
129 additions
and
48 deletions
+129
-48
io/dagmodifier_test.go
io/dagmodifier_test.go
+5
-4
io/dagreader.go
io/dagreader.go
+122
-43
tar/reader.go
tar/reader.go
+2
-1
No files found.
io/dagmodifier_test.go
View file @
1ff12898
...
...
@@ -6,6 +6,7 @@ import (
"io/ioutil"
"testing"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
"github.com/jbenet/go-ipfs/blocks/blockstore"
bs
"github.com/jbenet/go-ipfs/blockservice"
...
...
@@ -38,7 +39,7 @@ func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Nod
t
.
Fatal
(
err
)
}
dr
,
err
:=
NewDagReader
(
node
,
dserv
)
dr
,
err
:=
NewDagReader
(
context
.
Background
(),
node
,
dserv
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
...
...
@@ -75,7 +76,7 @@ func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier)
t
.
Fatal
(
err
)
}
rd
,
err
:=
NewDagReader
(
nd
,
dm
.
dagserv
)
rd
,
err
:=
NewDagReader
(
context
.
Background
(),
nd
,
dm
.
dagserv
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
...
...
@@ -173,7 +174,7 @@ func TestMultiWrite(t *testing.T) {
t
.
Fatal
(
err
)
}
read
,
err
:=
NewDagReader
(
nd
,
dserv
)
read
,
err
:=
NewDagReader
(
context
.
Background
(),
nd
,
dserv
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
...
...
@@ -215,7 +216,7 @@ func TestMultiWriteCoal(t *testing.T) {
t
.
Fatal
(
err
)
}
read
,
err
:=
NewDagReader
(
nd
,
dserv
)
read
,
err
:=
NewDagReader
(
context
.
Background
(),
nd
,
dserv
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
...
...
io/dagreader.go
View file @
1ff12898
...
...
@@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"io"
"os"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
...
...
@@ -17,16 +18,43 @@ var ErrIsDir = errors.New("this dag node is a directory")
// DagReader provides a way to easily read the data contained in a dag.
type
DagReader
struct
{
serv
mdag
.
DAGService
node
*
mdag
.
Node
buf
io
.
Reader
fetchChan
<-
chan
*
mdag
.
Node
serv
mdag
.
DAGService
// the node being read
node
*
mdag
.
Node
// cached protobuf structure from node.Data
pbdata
*
ftpb
.
Data
// the current data buffer to be read from
// will either be a bytes.Reader or a child DagReader
buf
ReadSeekCloser
// NodeGetters for each of 'nodes' child links
promises
[]
mdag
.
NodeGetter
// the index of the child link currently being read from
linkPosition
int
// current offset for the read head within the 'file'
offset
int64
// Our context
ctx
context
.
Context
// context cancel for children
cancel
func
()
}
type
ReadSeekCloser
interface
{
io
.
Reader
io
.
Seeker
io
.
Closer
}
// NewDagReader creates a new reader object that reads the data represented by the given
// node, using the passed in DAGService for data retreival
func
NewDagReader
(
n
*
mdag
.
Node
,
serv
mdag
.
DAGService
)
(
io
.
Reader
,
error
)
{
func
NewDagReader
(
ctx
context
.
Context
,
n
*
mdag
.
Node
,
serv
mdag
.
DAGService
)
(
Read
SeekClos
er
,
error
)
{
pb
:=
new
(
ftpb
.
Data
)
err
:=
proto
.
Unmarshal
(
n
.
Data
,
pb
)
if
err
!=
nil
{
...
...
@@ -38,16 +66,20 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
// Dont allow reading directories
return
nil
,
ErrIsDir
case
ftpb
.
Data_File
:
fetchChan
:=
serv
.
GetDAG
(
context
.
TODO
(),
n
)
fctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
promises
:=
serv
.
GetDAG
(
fctx
,
n
)
return
&
DagReader
{
node
:
n
,
serv
:
serv
,
buf
:
bytes
.
NewBuffer
(
pb
.
GetData
()),
fetchChan
:
fetchChan
,
node
:
n
,
serv
:
serv
,
buf
:
NewRSNCFromBytes
(
pb
.
GetData
()),
promises
:
promises
,
ctx
:
fctx
,
cancel
:
cancel
,
pbdata
:
pb
,
},
nil
case
ftpb
.
Data_Raw
:
// Raw block will just be a single level, return a byte buffer
return
bytes
.
NewBuffer
(
pb
.
GetData
()),
nil
return
NewRSNCFromBytes
(
pb
.
GetData
()),
nil
default
:
return
nil
,
ft
.
ErrUnrecognizedType
}
...
...
@@ -56,24 +88,18 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
// precalcNextBuf follows the next link in line and loads it from the DAGService,
// setting the next buffer to read from
func
(
dr
*
DagReader
)
precalcNextBuf
()
error
{
var
nxt
*
mdag
.
Node
var
ok
bool
if
dr
.
fetchChan
==
nil
{
// This panic is appropriate because the select statement
// will not panic if you try and read from a nil channel
// it will simply hang.
panic
(
"fetchChan should NOT be nil"
)
dr
.
buf
.
Close
()
// Just to make sure
if
dr
.
linkPosition
>=
len
(
dr
.
promises
)
{
return
io
.
EOF
}
select
{
case
nxt
,
ok
=
<-
dr
.
fetchChan
:
if
!
ok
{
return
io
.
EOF
}
nxt
,
err
:=
dr
.
promises
[
dr
.
linkPosition
]
.
Get
()
if
err
!=
nil
{
return
err
}
dr
.
linkPosition
++
pb
:=
new
(
ftpb
.
Data
)
err
:
=
proto
.
Unmarshal
(
nxt
.
Data
,
pb
)
err
=
proto
.
Unmarshal
(
nxt
.
Data
,
pb
)
if
err
!=
nil
{
return
err
}
...
...
@@ -83,16 +109,14 @@ func (dr *DagReader) precalcNextBuf() error {
// A directory should not exist within a file
return
ft
.
ErrInvalidDirLocation
case
ftpb
.
Data_File
:
//TODO: this *should* work, needs testing first
log
.
Warning
(
"Running untested code for multilayered indirect FS reads."
)
subr
,
err
:=
NewDagReader
(
nxt
,
dr
.
serv
)
subr
,
err
:=
NewDagReader
(
dr
.
ctx
,
nxt
,
dr
.
serv
)
if
err
!=
nil
{
return
err
}
dr
.
buf
=
subr
return
nil
case
ftpb
.
Data_Raw
:
dr
.
buf
=
bytes
.
NewBuffer
(
pb
.
GetData
())
dr
.
buf
=
NewRSNCFromBytes
(
pb
.
GetData
())
return
nil
default
:
return
ft
.
ErrUnrecognizedType
...
...
@@ -102,17 +126,12 @@ func (dr *DagReader) precalcNextBuf() error {
// Read reads data from the DAG structured file
func
(
dr
*
DagReader
)
Read
(
b
[]
byte
)
(
int
,
error
)
{
// If no cached buffer, load one
if
dr
.
buf
==
nil
{
err
:=
dr
.
precalcNextBuf
()
if
err
!=
nil
{
return
0
,
err
}
}
total
:=
0
for
{
// Attempt to fill bytes from cached buffer
n
,
err
:=
dr
.
buf
.
Read
(
b
[
total
:
])
total
+=
n
dr
.
offset
+=
int64
(
n
)
if
err
!=
nil
{
// EOF is expected
if
err
!=
io
.
EOF
{
...
...
@@ -133,28 +152,88 @@ func (dr *DagReader) Read(b []byte) (int, error) {
}
}
/*
func
(
dr
*
DagReader
)
Close
()
error
{
dr
.
cancel
()
return
nil
}
// Seek implements io.Seeker, and will seek to a given offset in the file
// interface matches standard unix seek
func
(
dr
*
DagReader
)
Seek
(
offset
int64
,
whence
int
)
(
int64
,
error
)
{
switch
whence
{
case
os
.
SEEK_SET
:
for i := 0; i < len(dr.node.Links); i++ {
nsize := dr.node.Links[i].Size - 8
if offset > nsize {
offset -= nsize
} else {
if
offset
<
0
{
return
-
1
,
errors
.
New
(
"Invalid offset"
)
}
// Grab cached protobuf object (solely to make code look cleaner)
pb
:=
dr
.
pbdata
// left represents the number of bytes remaining to seek to (from beginning)
left
:=
offset
if
int64
(
len
(
pb
.
Data
))
>
offset
{
// Close current buf to close potential child dagreader
dr
.
buf
.
Close
()
dr
.
buf
=
NewRSNCFromBytes
(
pb
.
GetData
()[
offset
:
])
// start reading links from the beginning
dr
.
linkPosition
=
0
dr
.
offset
=
offset
return
offset
,
nil
}
else
{
// skip past root block data
left
-=
int64
(
len
(
pb
.
Data
))
}
// iterate through links and find where we need to be
for
i
:=
0
;
i
<
len
(
pb
.
Blocksizes
);
i
++
{
if
pb
.
Blocksizes
[
i
]
>
uint64
(
left
)
{
dr
.
linkPosition
=
i
break
}
else
{
left
-=
int64
(
pb
.
Blocksizes
[
i
])
}
}
dr.position = i
// start sub-block request
err
:=
dr
.
precalcNextBuf
()
if
err
!=
nil
{
return
0
,
err
}
// set proper offset within child readseeker
n
,
err
:=
dr
.
buf
.
Seek
(
left
,
os
.
SEEK_SET
)
if
err
!=
nil
{
return
-
1
,
err
}
// sanity
left
-=
n
if
left
!=
0
{
return
-
1
,
errors
.
New
(
"failed to seek properly"
)
}
dr
.
offset
=
offset
return
offset
,
nil
case
os
.
SEEK_CUR
:
// TODO: be smarter here
noffset
:=
dr
.
offset
+
offset
return
dr
.
Seek
(
noffset
,
os
.
SEEK_SET
)
case
os
.
SEEK_END
:
noffset
:=
int64
(
dr
.
pbdata
.
GetFilesize
())
-
offset
return
dr
.
Seek
(
noffset
,
os
.
SEEK_SET
)
default
:
return
0
,
errors
.
New
(
"invalid whence"
)
}
return
0
,
nil
}
*/
// readSeekNopCloser wraps a bytes.Reader to implement ReadSeekCloser
type
readSeekNopCloser
struct
{
*
bytes
.
Reader
}
func
NewRSNCFromBytes
(
b
[]
byte
)
ReadSeekCloser
{
return
&
readSeekNopCloser
{
bytes
.
NewReader
(
b
)}
}
func
(
r
*
readSeekNopCloser
)
Close
()
error
{
return
nil
}
tar/reader.go
View file @
1ff12898
...
...
@@ -4,6 +4,7 @@ import (
"archive/tar"
"bytes"
"compress/gzip"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"io"
gopath
"path"
"strings"
...
...
@@ -114,7 +115,7 @@ func (i *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
}
i
.
flush
()
reader
,
err
:=
uio
.
NewDagReader
(
dagnode
,
i
.
dag
)
reader
,
err
:=
uio
.
NewDagReader
(
context
.
TODO
(),
dagnode
,
i
.
dag
)
if
err
!=
nil
{
i
.
emitError
(
err
)
return
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment