Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-dms3
Commits
4940c3e0
Commit
4940c3e0
authored
Jan 26, 2015
by
Jeromy Johnson
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #655 from jbenet/feat/dagseek
Implement io.Seeker on unixio.DAGReader
parents
d87aacc8
e9596260
Changes
12
Hide whitespace changes
Inline
Side-by-side
Showing
12 changed files
with
438 additions
and
95 deletions
+438
-95
core/commands/cat.go
core/commands/cat.go
+1
-1
core/corehttp/gateway_handler.go
core/corehttp/gateway_handler.go
+1
-1
core/coreunix/cat.go
core/coreunix/cat.go
+1
-1
fuse/ipns/ipns_unix.go
fuse/ipns/ipns_unix.go
+2
-1
fuse/readonly/readonly_unix.go
fuse/readonly/readonly_unix.go
+2
-1
importer/importer_test.go
importer/importer_test.go
+219
-3
merkledag/merkledag.go
merkledag/merkledag.go
+78
-36
merkledag/merkledag_test.go
merkledag/merkledag_test.go
+3
-2
server/http/ipfs.go
server/http/ipfs.go
+2
-1
unixfs/io/dagmodifier_test.go
unixfs/io/dagmodifier_test.go
+5
-4
unixfs/io/dagreader.go
unixfs/io/dagreader.go
+122
-43
unixfs/tar/reader.go
unixfs/tar/reader.go
+2
-1
No files found.
core/commands/cat.go
View file @
4940c3e0
...
...
@@ -73,7 +73,7 @@ func cat(node *core.IpfsNode, paths []string) ([]io.Reader, uint64, error) {
}
length
+=
nodeLength
read
,
err
:=
uio
.
NewDagReader
(
dagnode
,
node
.
DAG
)
read
,
err
:=
uio
.
NewDagReader
(
node
.
Context
(),
dagnode
,
node
.
DAG
)
if
err
!=
nil
{
return
nil
,
0
,
err
}
...
...
core/corehttp/gateway_handler.go
View file @
4940c3e0
...
...
@@ -77,7 +77,7 @@ func (i *gatewayHandler) AddNodeToDAG(nd *dag.Node) (u.Key, error) {
}
func
(
i
*
gatewayHandler
)
NewDagReader
(
nd
*
dag
.
Node
)
(
io
.
Reader
,
error
)
{
return
uio
.
NewDagReader
(
nd
,
i
.
node
.
DAG
)
return
uio
.
NewDagReader
(
i
.
node
.
Context
(),
nd
,
i
.
node
.
DAG
)
}
func
(
i
*
gatewayHandler
)
ServeHTTP
(
w
http
.
ResponseWriter
,
r
*
http
.
Request
)
{
...
...
core/coreunix/cat.go
View file @
4940c3e0
...
...
@@ -12,5 +12,5 @@ func Cat(n *core.IpfsNode, path string) (io.Reader, error) {
if
err
!=
nil
{
return
nil
,
err
}
return
uio
.
NewDagReader
(
dagNode
,
n
.
DAG
)
return
uio
.
NewDagReader
(
n
.
ContextGroup
.
Context
(),
dagNode
,
n
.
DAG
)
}
fuse/ipns/ipns_unix.go
View file @
4940c3e0
...
...
@@ -11,6 +11,7 @@ import (
fuse
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
fs
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
proto
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
core
"github.com/jbenet/go-ipfs/core"
...
...
@@ -337,7 +338,7 @@ func (s *Node) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) {
// ReadAll reads the object data as file data
func
(
s
*
Node
)
ReadAll
(
intr
fs
.
Intr
)
([]
byte
,
fuse
.
Error
)
{
log
.
Debugf
(
"ipns: ReadAll [%s]"
,
s
.
name
)
r
,
err
:=
uio
.
NewDagReader
(
s
.
Nd
,
s
.
Ipfs
.
DAG
)
r
,
err
:=
uio
.
NewDagReader
(
context
.
TODO
(),
s
.
Nd
,
s
.
Ipfs
.
DAG
)
if
err
!=
nil
{
return
nil
,
err
}
...
...
fuse/readonly/readonly_unix.go
View file @
4940c3e0
...
...
@@ -10,6 +10,7 @@ import (
fuse
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
fs
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
proto
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
core
"github.com/jbenet/go-ipfs/core"
...
...
@@ -145,7 +146,7 @@ func (s *Node) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) {
// ReadAll reads the object data as file data
func
(
s
*
Node
)
ReadAll
(
intr
fs
.
Intr
)
([]
byte
,
fuse
.
Error
)
{
log
.
Debug
(
"Read node."
)
r
,
err
:=
uio
.
NewDagReader
(
s
.
Nd
,
s
.
Ipfs
.
DAG
)
r
,
err
:=
uio
.
NewDagReader
(
context
.
TODO
(),
s
.
Nd
,
s
.
Ipfs
.
DAG
)
if
err
!=
nil
{
return
nil
,
err
}
...
...
importer/importer_test.go
View file @
4940c3e0
...
...
@@ -6,8 +6,11 @@ import (
"fmt"
"io"
"io/ioutil"
mrand
"math/rand"
"os"
"testing"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dssync
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
bstore
"github.com/jbenet/go-ipfs/blocks/blockstore"
...
...
@@ -51,7 +54,7 @@ func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) {
t
.
Fatal
(
err
)
}
r
,
err
:=
uio
.
NewDagReader
(
nd
,
dnp
.
ds
)
r
,
err
:=
uio
.
NewDagReader
(
context
.
Background
(),
nd
,
dnp
.
ds
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
...
...
@@ -77,7 +80,7 @@ func TestBuilderConsistency(t *testing.T) {
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
r
,
err
:=
uio
.
NewDagReader
(
nd
,
dagserv
)
r
,
err
:=
uio
.
NewDagReader
(
context
.
Background
(),
nd
,
dagserv
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
...
...
@@ -165,7 +168,7 @@ func TestIndirectBlocks(t *testing.T) {
t
.
Fatal
(
err
)
}
reader
,
err
:=
uio
.
NewDagReader
(
dag
,
dnp
.
ds
)
reader
,
err
:=
uio
.
NewDagReader
(
context
.
Background
(),
dag
,
dnp
.
ds
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
...
...
@@ -179,3 +182,216 @@ func TestIndirectBlocks(t *testing.T) {
t
.
Fatal
(
"Not equal!"
)
}
}
func
TestSeekingBasic
(
t
*
testing
.
T
)
{
nbytes
:=
int64
(
10
*
1024
)
should
:=
make
([]
byte
,
nbytes
)
u
.
NewTimeSeededRand
()
.
Read
(
should
)
read
:=
bytes
.
NewReader
(
should
)
dnp
:=
getDagservAndPinner
(
t
)
nd
,
err
:=
BuildDagFromReader
(
read
,
dnp
.
ds
,
dnp
.
mp
,
&
chunk
.
SizeSplitter
{
500
})
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
rs
,
err
:=
uio
.
NewDagReader
(
context
.
Background
(),
nd
,
dnp
.
ds
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
start
:=
int64
(
4000
)
n
,
err
:=
rs
.
Seek
(
start
,
os
.
SEEK_SET
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
n
!=
start
{
t
.
Fatal
(
"Failed to seek to correct offset"
)
}
out
,
err
:=
ioutil
.
ReadAll
(
rs
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
err
=
arrComp
(
out
,
should
[
start
:
])
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
}
func
TestSeekToBegin
(
t
*
testing
.
T
)
{
nbytes
:=
int64
(
10
*
1024
)
should
:=
make
([]
byte
,
nbytes
)
u
.
NewTimeSeededRand
()
.
Read
(
should
)
read
:=
bytes
.
NewReader
(
should
)
dnp
:=
getDagservAndPinner
(
t
)
nd
,
err
:=
BuildDagFromReader
(
read
,
dnp
.
ds
,
dnp
.
mp
,
&
chunk
.
SizeSplitter
{
500
})
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
rs
,
err
:=
uio
.
NewDagReader
(
context
.
Background
(),
nd
,
dnp
.
ds
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
n
,
err
:=
io
.
CopyN
(
ioutil
.
Discard
,
rs
,
1024
*
4
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
n
!=
4096
{
t
.
Fatal
(
"Copy didnt copy enough bytes"
)
}
seeked
,
err
:=
rs
.
Seek
(
0
,
os
.
SEEK_SET
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
seeked
!=
0
{
t
.
Fatal
(
"Failed to seek to beginning"
)
}
out
,
err
:=
ioutil
.
ReadAll
(
rs
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
err
=
arrComp
(
out
,
should
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
}
func
TestSeekToAlmostBegin
(
t
*
testing
.
T
)
{
nbytes
:=
int64
(
10
*
1024
)
should
:=
make
([]
byte
,
nbytes
)
u
.
NewTimeSeededRand
()
.
Read
(
should
)
read
:=
bytes
.
NewReader
(
should
)
dnp
:=
getDagservAndPinner
(
t
)
nd
,
err
:=
BuildDagFromReader
(
read
,
dnp
.
ds
,
dnp
.
mp
,
&
chunk
.
SizeSplitter
{
500
})
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
rs
,
err
:=
uio
.
NewDagReader
(
context
.
Background
(),
nd
,
dnp
.
ds
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
n
,
err
:=
io
.
CopyN
(
ioutil
.
Discard
,
rs
,
1024
*
4
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
n
!=
4096
{
t
.
Fatal
(
"Copy didnt copy enough bytes"
)
}
seeked
,
err
:=
rs
.
Seek
(
1
,
os
.
SEEK_SET
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
seeked
!=
1
{
t
.
Fatal
(
"Failed to seek to almost beginning"
)
}
out
,
err
:=
ioutil
.
ReadAll
(
rs
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
err
=
arrComp
(
out
,
should
[
1
:
])
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
}
func
TestSeekingStress
(
t
*
testing
.
T
)
{
nbytes
:=
int64
(
1024
*
1024
)
should
:=
make
([]
byte
,
nbytes
)
u
.
NewTimeSeededRand
()
.
Read
(
should
)
read
:=
bytes
.
NewReader
(
should
)
dnp
:=
getDagservAndPinner
(
t
)
nd
,
err
:=
BuildDagFromReader
(
read
,
dnp
.
ds
,
dnp
.
mp
,
&
chunk
.
SizeSplitter
{
1000
})
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
rs
,
err
:=
uio
.
NewDagReader
(
context
.
Background
(),
nd
,
dnp
.
ds
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
testbuf
:=
make
([]
byte
,
nbytes
)
for
i
:=
0
;
i
<
50
;
i
++
{
offset
:=
mrand
.
Intn
(
int
(
nbytes
))
l
:=
int
(
nbytes
)
-
offset
n
,
err
:=
rs
.
Seek
(
int64
(
offset
),
os
.
SEEK_SET
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
n
!=
int64
(
offset
)
{
t
.
Fatal
(
"Seek failed to move to correct position"
)
}
nread
,
err
:=
rs
.
Read
(
testbuf
[
:
l
])
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
nread
!=
l
{
t
.
Fatal
(
"Failed to read enough bytes"
)
}
err
=
arrComp
(
testbuf
[
:
l
],
should
[
offset
:
offset
+
l
])
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
}
}
func
TestSeekingConsistency
(
t
*
testing
.
T
)
{
nbytes
:=
int64
(
128
*
1024
)
should
:=
make
([]
byte
,
nbytes
)
u
.
NewTimeSeededRand
()
.
Read
(
should
)
read
:=
bytes
.
NewReader
(
should
)
dnp
:=
getDagservAndPinner
(
t
)
nd
,
err
:=
BuildDagFromReader
(
read
,
dnp
.
ds
,
dnp
.
mp
,
&
chunk
.
SizeSplitter
{
500
})
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
rs
,
err
:=
uio
.
NewDagReader
(
context
.
Background
(),
nd
,
dnp
.
ds
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
out
:=
make
([]
byte
,
nbytes
)
for
coff
:=
nbytes
-
4096
;
coff
>=
0
;
coff
-=
4096
{
t
.
Log
(
coff
)
n
,
err
:=
rs
.
Seek
(
coff
,
os
.
SEEK_SET
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
n
!=
coff
{
t
.
Fatal
(
"wasnt able to seek to the right position"
)
}
nread
,
err
:=
rs
.
Read
(
out
[
coff
:
coff
+
4096
])
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
nread
!=
4096
{
t
.
Fatal
(
"didnt read the correct number of bytes"
)
}
}
err
=
arrComp
(
out
,
should
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
}
merkledag/merkledag.go
View file @
4940c3e0
...
...
@@ -2,7 +2,6 @@
package
merkledag
import
(
"bytes"
"fmt"
"sync"
"time"
...
...
@@ -26,7 +25,8 @@ type DAGService interface {
// GetDAG returns, in order, all the single leve child
// nodes of the passed in node.
GetDAG
(
context
.
Context
,
*
Node
)
<-
chan
*
Node
GetDAG
(
context
.
Context
,
*
Node
)
[]
NodeGetter
GetNodes
(
context
.
Context
,
[]
u
.
Key
)
[]
NodeGetter
}
func
NewDAGService
(
bs
*
bserv
.
BlockService
)
DAGService
{
...
...
@@ -155,11 +155,10 @@ func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{}
// FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it
func
FindLinks
(
n
*
Node
,
k
u
.
Key
,
start
int
)
[]
int
{
func
FindLinks
(
links
[]
u
.
Key
,
k
u
.
Key
,
start
int
)
[]
int
{
var
out
[]
int
keybytes
:=
[]
byte
(
k
)
for
i
,
lnk
:=
range
n
.
Links
[
start
:
]
{
if
bytes
.
Equal
([]
byte
(
lnk
.
Hash
),
keybytes
)
{
for
i
,
lnk_k
:=
range
links
[
start
:
]
{
if
k
==
lnk_k
{
out
=
append
(
out
,
i
+
start
)
}
}
...
...
@@ -169,41 +168,84 @@ func FindLinks(n *Node, k u.Key, start int) []int {
// GetDAG will fill out all of the links of the given Node.
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
func
(
ds
*
dagService
)
GetDAG
(
ctx
context
.
Context
,
root
*
Node
)
<-
chan
*
Node
{
sig
:=
make
(
chan
*
Node
)
go
func
()
{
defer
close
(
sig
)
func
(
ds
*
dagService
)
GetDAG
(
ctx
context
.
Context
,
root
*
Node
)
[]
NodeGetter
{
var
keys
[]
u
.
Key
for
_
,
lnk
:=
range
root
.
Links
{
keys
=
append
(
keys
,
u
.
Key
(
lnk
.
Hash
))
}
var
keys
[]
u
.
Key
for
_
,
lnk
:=
range
root
.
Links
{
keys
=
append
(
keys
,
u
.
Key
(
lnk
.
Hash
))
}
blkchan
:=
ds
.
Blocks
.
GetBlocks
(
ctx
,
keys
)
return
ds
.
GetNodes
(
ctx
,
keys
)
}
nodes
:=
make
([]
*
Node
,
len
(
root
.
Links
))
next
:=
0
for
blk
:=
range
blkchan
{
nd
,
err
:=
Decoded
(
blk
.
Data
)
if
err
!=
nil
{
// NB: can occur in normal situations, with improperly formatted
// input data
log
.
Error
(
"Got back bad block!"
)
break
}
is
:=
FindLinks
(
root
,
blk
.
Key
(),
next
)
for
_
,
i
:=
range
is
{
nodes
[
i
]
=
nd
}
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
func
(
ds
*
dagService
)
GetNodes
(
ctx
context
.
Context
,
keys
[]
u
.
Key
)
[]
NodeGetter
{
promises
:=
make
([]
NodeGetter
,
len
(
keys
))
sendChans
:=
make
([]
chan
<-
*
Node
,
len
(
keys
))
for
i
,
_
:=
range
keys
{
promises
[
i
],
sendChans
[
i
]
=
newNodePromise
(
ctx
)
}
go
func
()
{
blkchan
:=
ds
.
Blocks
.
GetBlocks
(
ctx
,
keys
)
for
;
next
<
len
(
nodes
)
&&
nodes
[
next
]
!=
nil
;
next
++
{
sig
<-
nodes
[
next
]
for
{
select
{
case
blk
,
ok
:=
<-
blkchan
:
if
!
ok
{
return
}
nd
,
err
:=
Decoded
(
blk
.
Data
)
if
err
!=
nil
{
// NB: can happen with improperly formatted input data
log
.
Error
(
"Got back bad block!"
)
return
}
is
:=
FindLinks
(
keys
,
blk
.
Key
(),
0
)
for
_
,
i
:=
range
is
{
sendChans
[
i
]
<-
nd
}
case
<-
ctx
.
Done
()
:
return
}
}
if
next
<
len
(
nodes
)
{
// TODO: bubble errors back up.
log
.
Errorf
(
"Did not receive correct number of nodes!"
)
}
}()
return
promises
}
return
sig
func
newNodePromise
(
ctx
context
.
Context
)
(
NodeGetter
,
chan
<-
*
Node
)
{
ch
:=
make
(
chan
*
Node
,
1
)
return
&
nodePromise
{
recv
:
ch
,
ctx
:
ctx
,
},
ch
}
type
nodePromise
struct
{
cache
*
Node
recv
<-
chan
*
Node
ctx
context
.
Context
}
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
type
NodeGetter
interface
{
Get
()
(
*
Node
,
error
)
}
func
(
np
*
nodePromise
)
Get
()
(
*
Node
,
error
)
{
if
np
.
cache
!=
nil
{
return
np
.
cache
,
nil
}
select
{
case
blk
:=
<-
np
.
recv
:
np
.
cache
=
blk
case
<-
np
.
ctx
.
Done
()
:
return
nil
,
np
.
ctx
.
Err
()
}
return
np
.
cache
,
nil
}
merkledag/merkledag_test.go
View file @
4940c3e0
...
...
@@ -8,6 +8,7 @@ import (
"sync"
"testing"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dssync
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
bstore
"github.com/jbenet/go-ipfs/blocks/blockstore"
...
...
@@ -162,7 +163,7 @@ func runBatchFetchTest(t *testing.T, read io.Reader) {
t
.
Log
(
"finished setup."
)
dagr
,
err
:=
uio
.
NewDagReader
(
root
,
dagservs
[
0
])
dagr
,
err
:=
uio
.
NewDagReader
(
context
.
TODO
(),
root
,
dagservs
[
0
])
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
...
...
@@ -195,7 +196,7 @@ func runBatchFetchTest(t *testing.T, read io.Reader) {
}
fmt
.
Println
(
"Got first node back."
)
read
,
err
:=
uio
.
NewDagReader
(
first
,
dagservs
[
i
])
read
,
err
:=
uio
.
NewDagReader
(
context
.
TODO
(),
first
,
dagservs
[
i
])
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
...
...
server/http/ipfs.go
View file @
4940c3e0
...
...
@@ -3,6 +3,7 @@ package http
import
(
"io"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
core
"github.com/jbenet/go-ipfs/core"
"github.com/jbenet/go-ipfs/importer"
chunk
"github.com/jbenet/go-ipfs/importer/chunk"
...
...
@@ -36,5 +37,5 @@ func (i *ipfsHandler) AddNodeToDAG(nd *dag.Node) (u.Key, error) {
}
func
(
i
*
ipfsHandler
)
NewDagReader
(
nd
*
dag
.
Node
)
(
io
.
Reader
,
error
)
{
return
uio
.
NewDagReader
(
nd
,
i
.
node
.
DAG
)
return
uio
.
NewDagReader
(
context
.
TODO
(),
nd
,
i
.
node
.
DAG
)
}
unixfs/io/dagmodifier_test.go
View file @
4940c3e0
...
...
@@ -6,6 +6,7 @@ import (
"io/ioutil"
"testing"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
"github.com/jbenet/go-ipfs/blocks/blockstore"
bs
"github.com/jbenet/go-ipfs/blockservice"
...
...
@@ -38,7 +39,7 @@ func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Nod
t
.
Fatal
(
err
)
}
dr
,
err
:=
NewDagReader
(
node
,
dserv
)
dr
,
err
:=
NewDagReader
(
context
.
Background
(),
node
,
dserv
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
...
...
@@ -75,7 +76,7 @@ func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier)
t
.
Fatal
(
err
)
}
rd
,
err
:=
NewDagReader
(
nd
,
dm
.
dagserv
)
rd
,
err
:=
NewDagReader
(
context
.
Background
(),
nd
,
dm
.
dagserv
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
...
...
@@ -173,7 +174,7 @@ func TestMultiWrite(t *testing.T) {
t
.
Fatal
(
err
)
}
read
,
err
:=
NewDagReader
(
nd
,
dserv
)
read
,
err
:=
NewDagReader
(
context
.
Background
(),
nd
,
dserv
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
...
...
@@ -215,7 +216,7 @@ func TestMultiWriteCoal(t *testing.T) {
t
.
Fatal
(
err
)
}
read
,
err
:=
NewDagReader
(
nd
,
dserv
)
read
,
err
:=
NewDagReader
(
context
.
Background
(),
nd
,
dserv
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
...
...
unixfs/io/dagreader.go
View file @
4940c3e0
...
...
@@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"io"
"os"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
...
...
@@ -17,16 +18,43 @@ var ErrIsDir = errors.New("this dag node is a directory")
// DagReader provides a way to easily read the data contained in a dag.
type
DagReader
struct
{
serv
mdag
.
DAGService
node
*
mdag
.
Node
buf
io
.
Reader
fetchChan
<-
chan
*
mdag
.
Node
serv
mdag
.
DAGService
// the node being read
node
*
mdag
.
Node
// cached protobuf structure from node.Data
pbdata
*
ftpb
.
Data
// the current data buffer to be read from
// will either be a bytes.Reader or a child DagReader
buf
ReadSeekCloser
// NodeGetters for each of 'nodes' child links
promises
[]
mdag
.
NodeGetter
// the index of the child link currently being read from
linkPosition
int
// current offset for the read head within the 'file'
offset
int64
// Our context
ctx
context
.
Context
// context cancel for children
cancel
func
()
}
type
ReadSeekCloser
interface
{
io
.
Reader
io
.
Seeker
io
.
Closer
}
// NewDagReader creates a new reader object that reads the data represented by the given
// node, using the passed in DAGService for data retreival
func
NewDagReader
(
n
*
mdag
.
Node
,
serv
mdag
.
DAGService
)
(
io
.
Reader
,
error
)
{
func
NewDagReader
(
ctx
context
.
Context
,
n
*
mdag
.
Node
,
serv
mdag
.
DAGService
)
(
Read
SeekClos
er
,
error
)
{
pb
:=
new
(
ftpb
.
Data
)
err
:=
proto
.
Unmarshal
(
n
.
Data
,
pb
)
if
err
!=
nil
{
...
...
@@ -38,16 +66,20 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
// Dont allow reading directories
return
nil
,
ErrIsDir
case
ftpb
.
Data_File
:
fetchChan
:=
serv
.
GetDAG
(
context
.
TODO
(),
n
)
fctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
promises
:=
serv
.
GetDAG
(
fctx
,
n
)
return
&
DagReader
{
node
:
n
,
serv
:
serv
,
buf
:
bytes
.
NewBuffer
(
pb
.
GetData
()),
fetchChan
:
fetchChan
,
node
:
n
,
serv
:
serv
,
buf
:
NewRSNCFromBytes
(
pb
.
GetData
()),
promises
:
promises
,
ctx
:
fctx
,
cancel
:
cancel
,
pbdata
:
pb
,
},
nil
case
ftpb
.
Data_Raw
:
// Raw block will just be a single level, return a byte buffer
return
bytes
.
NewBuffer
(
pb
.
GetData
()),
nil
return
NewRSNCFromBytes
(
pb
.
GetData
()),
nil
default
:
return
nil
,
ft
.
ErrUnrecognizedType
}
...
...
@@ -56,24 +88,18 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
// precalcNextBuf follows the next link in line and loads it from the DAGService,
// setting the next buffer to read from
func
(
dr
*
DagReader
)
precalcNextBuf
()
error
{
var
nxt
*
mdag
.
Node
var
ok
bool
if
dr
.
fetchChan
==
nil
{
// This panic is appropriate because the select statement
// will not panic if you try and read from a nil channel
// it will simply hang.
panic
(
"fetchChan should NOT be nil"
)
dr
.
buf
.
Close
()
// Just to make sure
if
dr
.
linkPosition
>=
len
(
dr
.
promises
)
{
return
io
.
EOF
}
select
{
case
nxt
,
ok
=
<-
dr
.
fetchChan
:
if
!
ok
{
return
io
.
EOF
}
nxt
,
err
:=
dr
.
promises
[
dr
.
linkPosition
]
.
Get
()
if
err
!=
nil
{
return
err
}
dr
.
linkPosition
++
pb
:=
new
(
ftpb
.
Data
)
err
:
=
proto
.
Unmarshal
(
nxt
.
Data
,
pb
)
err
=
proto
.
Unmarshal
(
nxt
.
Data
,
pb
)
if
err
!=
nil
{
return
err
}
...
...
@@ -83,16 +109,14 @@ func (dr *DagReader) precalcNextBuf() error {
// A directory should not exist within a file
return
ft
.
ErrInvalidDirLocation
case
ftpb
.
Data_File
:
//TODO: this *should* work, needs testing first
log
.
Warning
(
"Running untested code for multilayered indirect FS reads."
)
subr
,
err
:=
NewDagReader
(
nxt
,
dr
.
serv
)
subr
,
err
:=
NewDagReader
(
dr
.
ctx
,
nxt
,
dr
.
serv
)
if
err
!=
nil
{
return
err
}
dr
.
buf
=
subr
return
nil
case
ftpb
.
Data_Raw
:
dr
.
buf
=
bytes
.
NewBuffer
(
pb
.
GetData
())
dr
.
buf
=
NewRSNCFromBytes
(
pb
.
GetData
())
return
nil
default
:
return
ft
.
ErrUnrecognizedType
...
...
@@ -102,17 +126,12 @@ func (dr *DagReader) precalcNextBuf() error {
// Read reads data from the DAG structured file
func
(
dr
*
DagReader
)
Read
(
b
[]
byte
)
(
int
,
error
)
{
// If no cached buffer, load one
if
dr
.
buf
==
nil
{
err
:=
dr
.
precalcNextBuf
()
if
err
!=
nil
{
return
0
,
err
}
}
total
:=
0
for
{
// Attempt to fill bytes from cached buffer
n
,
err
:=
dr
.
buf
.
Read
(
b
[
total
:
])
total
+=
n
dr
.
offset
+=
int64
(
n
)
if
err
!=
nil
{
// EOF is expected
if
err
!=
io
.
EOF
{
...
...
@@ -133,28 +152,88 @@ func (dr *DagReader) Read(b []byte) (int, error) {
}
}
/*
func
(
dr
*
DagReader
)
Close
()
error
{
dr
.
cancel
()
return
nil
}
// Seek implements io.Seeker, and will seek to a given offset in the file
// interface matches standard unix seek
func
(
dr
*
DagReader
)
Seek
(
offset
int64
,
whence
int
)
(
int64
,
error
)
{
switch
whence
{
case
os
.
SEEK_SET
:
for i := 0; i < len(dr.node.Links); i++ {
nsize := dr.node.Links[i].Size - 8
if offset > nsize {
offset -= nsize
} else {
if
offset
<
0
{
return
-
1
,
errors
.
New
(
"Invalid offset"
)
}
// Grab cached protobuf object (solely to make code look cleaner)
pb
:=
dr
.
pbdata
// left represents the number of bytes remaining to seek to (from beginning)
left
:=
offset
if
int64
(
len
(
pb
.
Data
))
>
offset
{
// Close current buf to close potential child dagreader
dr
.
buf
.
Close
()
dr
.
buf
=
NewRSNCFromBytes
(
pb
.
GetData
()[
offset
:
])
// start reading links from the beginning
dr
.
linkPosition
=
0
dr
.
offset
=
offset
return
offset
,
nil
}
else
{
// skip past root block data
left
-=
int64
(
len
(
pb
.
Data
))
}
// iterate through links and find where we need to be
for
i
:=
0
;
i
<
len
(
pb
.
Blocksizes
);
i
++
{
if
pb
.
Blocksizes
[
i
]
>
uint64
(
left
)
{
dr
.
linkPosition
=
i
break
}
else
{
left
-=
int64
(
pb
.
Blocksizes
[
i
])
}
}
dr.position = i
// start sub-block request
err
:=
dr
.
precalcNextBuf
()
if
err
!=
nil
{
return
0
,
err
}
// set proper offset within child readseeker
n
,
err
:=
dr
.
buf
.
Seek
(
left
,
os
.
SEEK_SET
)
if
err
!=
nil
{
return
-
1
,
err
}
// sanity
left
-=
n
if
left
!=
0
{
return
-
1
,
errors
.
New
(
"failed to seek properly"
)
}
dr
.
offset
=
offset
return
offset
,
nil
case
os
.
SEEK_CUR
:
// TODO: be smarter here
noffset
:=
dr
.
offset
+
offset
return
dr
.
Seek
(
noffset
,
os
.
SEEK_SET
)
case
os
.
SEEK_END
:
noffset
:=
int64
(
dr
.
pbdata
.
GetFilesize
())
-
offset
return
dr
.
Seek
(
noffset
,
os
.
SEEK_SET
)
default
:
return
0
,
errors
.
New
(
"invalid whence"
)
}
return
0
,
nil
}
*/
// readSeekNopCloser wraps a bytes.Reader to implement ReadSeekCloser
type
readSeekNopCloser
struct
{
*
bytes
.
Reader
}
func
NewRSNCFromBytes
(
b
[]
byte
)
ReadSeekCloser
{
return
&
readSeekNopCloser
{
bytes
.
NewReader
(
b
)}
}
func
(
r
*
readSeekNopCloser
)
Close
()
error
{
return
nil
}
unixfs/tar/reader.go
View file @
4940c3e0
...
...
@@ -4,6 +4,7 @@ import (
"archive/tar"
"bytes"
"compress/gzip"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"io"
gopath
"path"
"strings"
...
...
@@ -114,7 +115,7 @@ func (i *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
}
i
.
flush
()
reader
,
err
:=
uio
.
NewDagReader
(
dagnode
,
i
.
dag
)
reader
,
err
:=
uio
.
NewDagReader
(
context
.
TODO
(),
dagnode
,
i
.
dag
)
if
err
!=
nil
{
i
.
emitError
(
err
)
return
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment