Commit 275b03f8 authored by Jeromy's avatar Jeromy

rework dagreader to have a dagservice for node resolution

parent dad97517
......@@ -7,7 +7,6 @@ import (
"github.com/gonuts/flag"
"github.com/jbenet/commander"
bserv "github.com/jbenet/go-ipfs/blockservice"
dag "github.com/jbenet/go-ipfs/merkledag"
u "github.com/jbenet/go-ipfs/util"
)
......@@ -41,12 +40,7 @@ func catCmd(c *commander.Command, inp []string) error {
return err
}
err = ExpandDag(nd, n.Blocks)
if err != nil {
return err
}
read, err := dag.NewDagReader(nd)
read, err := dag.NewDagReader(nd, n.DAG)
if err != nil {
fmt.Println(err)
continue
......@@ -60,19 +54,3 @@ func catCmd(c *commander.Command, inp []string) error {
}
return nil
}
// Expand all subnodes in this dag so printing can occur without error
//TODO: this needs to be done MUCH better in a somewhat asynchronous way.
//also should be moved elsewhere.
func ExpandDag(nd *dag.Node, bs *bserv.BlockService) error {
for _, lnk := range nd.Links {
if lnk.Node == nil {
blk, err := bs.GetBlock(u.Key(lnk.Hash))
if err != nil {
return err
}
lnk.Node = &dag.Node{Data: dag.WrapData(blk.Data)}
}
}
return nil
}
......@@ -74,6 +74,7 @@ func (*Root) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) {
type Node struct {
Ipfs *core.IpfsNode
Nd *mdag.Node
fd *mdag.DagReader
}
// Attr returns the attributes of a given node.
......@@ -120,7 +121,7 @@ func (s *Node) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) {
// ReadAll reads the object data as file data
func (s *Node) ReadAll(intr fs.Intr) ([]byte, fuse.Error) {
u.DOut("Read node.\n")
r, err := mdag.NewDagReader(s.Nd)
r, err := mdag.NewDagReader(s.Nd, s.Ipfs.DAG)
if err != nil {
return nil, err
}
......
......@@ -32,3 +32,27 @@ func TestFileConsistency(t *testing.T) {
t.Fatal("Output not the same as input.")
}
}
//Test where calls to read are smaller than the chunk size
func TestFileConsistencyLargeBlocks(t *testing.T) {
buf := new(bytes.Buffer)
io.CopyN(buf, rand.Reader, 4096*32)
should := buf.Bytes()
nd, err := NewDagFromReaderWithSplitter(buf, SplitterBySize(4096))
if err != nil {
t.Fatal(err)
}
r, err := dag.NewDagReader(nd)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(out, should) {
t.Fatal("Output not the same as input.")
}
}
......@@ -5,20 +5,22 @@ import (
"errors"
"io"
"code.google.com/p/goprotobuf/proto"
proto "code.google.com/p/goprotobuf/proto"
u "github.com/jbenet/go-ipfs/util"
)
var ErrIsDir = errors.New("this dag node is a directory.")
// DagReader provides a way to easily read the data contained in a dag.
type DagReader struct {
serv *DAGService
node *Node
position int
buf *bytes.Buffer
thisData []byte
}
func NewDagReader(n *Node) (io.Reader, error) {
func NewDagReader(n *Node, serv *DAGService) (io.Reader, error) {
pb := new(PBData)
err := proto.Unmarshal(n.Data, pb)
if err != nil {
......@@ -31,6 +33,7 @@ func NewDagReader(n *Node) (io.Reader, error) {
return &DagReader{
node: n,
thisData: pb.GetData(),
serv: serv,
}, nil
case PBData_Raw:
return bytes.NewBuffer(pb.GetData()), nil
......@@ -46,8 +49,11 @@ func (dr *DagReader) precalcNextBuf() error {
nxtLink := dr.node.Links[dr.position]
nxt := nxtLink.Node
if nxt == nil {
//TODO: should use dagservice or something to get needed block
return errors.New("Link to nil node! Tree not fully expanded!")
nxtNode, err := dr.serv.Get(u.Key(nxtLink.Hash))
if err != nil {
return err
}
nxt = nxtNode
}
pb := new(PBData)
err := proto.Unmarshal(nxt.Data, pb)
......@@ -96,3 +102,29 @@ func (dr *DagReader) Read(b []byte) (int, error) {
}
}
}
/*
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case os.SEEK_SET:
for i := 0; i < len(dr.node.Links); i++ {
nsize := dr.node.Links[i].Size - 8
if offset > nsize {
offset -= nsize
} else {
break
}
}
dr.position = i
err := dr.precalcNextBuf()
if err != nil {
return 0, err
}
case os.SEEK_CUR:
case os.SEEK_END:
default:
return 0, errors.New("invalid whence")
}
return 0, nil
}
*/
......@@ -96,6 +96,8 @@ func (n *Node) Key() (u.Key, error) {
// DAGService is an IPFS Merkle DAG service.
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
// TODO: should cache Nodes that are in memory, and be
// able to free some of them when vm pressure is high
type DAGService struct {
Blocks *bserv.BlockService
}
......
......@@ -192,7 +192,7 @@ func (dht *IpfsDHT) FindProvidersAsync(key u.Key, count int, timeout time.Durati
provs := dht.providers.GetProviders(key)
for _, p := range provs {
count--
// NOTE: assuming that the list of peers is unique
// NOTE: assuming that this list of peers is unique
ps.Add(p)
peerOut <- p
if count <= 0 {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment