Commit dad97517 authored by Jeromy's avatar Jeromy

implement a basic data format for data inside dag nodes

parent e6498b37
......@@ -2,3 +2,4 @@
.ipfsconfig
*.out
*.test
*.orig
......@@ -85,7 +85,7 @@ func addPath(n *core.IpfsNode, fpath string, depth int) (*dag.Node, error) {
}
func addDir(n *core.IpfsNode, fpath string, depth int) (*dag.Node, error) {
tree := &dag.Node{}
tree := &dag.Node{Data: dag.FolderPBData()}
files, err := ioutil.ReadDir(fpath)
if err != nil {
......
......@@ -2,9 +2,13 @@ package main
import (
"fmt"
"io"
"os"
"github.com/gonuts/flag"
"github.com/jbenet/commander"
bserv "github.com/jbenet/go-ipfs/blockservice"
dag "github.com/jbenet/go-ipfs/merkledag"
u "github.com/jbenet/go-ipfs/util"
)
......@@ -37,21 +41,37 @@ func catCmd(c *commander.Command, inp []string) error {
return err
}
fmt.Println("Printing Data!")
_, err = fmt.Printf("%s", nd.Data)
err = ExpandDag(nd, n.Blocks)
if err != nil {
return err
}
fmt.Println("Printing child nodes:")
for _, subn := range nd.Links {
k := u.Key(subn.Hash)
blk, err := n.Blocks.GetBlock(k)
fmt.Printf("Getting link: %s\n", k.Pretty())
read, err := dag.NewDagReader(nd)
if err != nil {
fmt.Println(err)
continue
}
_, err = io.Copy(os.Stdout, read)
if err != nil {
fmt.Println(err)
continue
}
}
return nil
}
// Expand all subnodes in this dag so printing can occur without error
//TODO: this needs to be done MUCH better in a somewhat asynchronous way.
//also should be moved elsewhere.
func ExpandDag(nd *dag.Node, bs *bserv.BlockService) error {
for _, lnk := range nd.Links {
if lnk.Node == nil {
blk, err := bs.GetBlock(u.Key(lnk.Hash))
if err != nil {
return err
}
fmt.Println(string(blk.Data))
lnk.Node = &dag.Node{Data: dag.WrapData(blk.Data)}
}
}
return nil
......
......@@ -31,10 +31,12 @@ There are multiple subpackages:
- multiplexing connections (tcp atm)
- peer addressing
- dht - impl basic kademlia routing
- bitswap - impl basic block exchange functionality
### What's in progress:
- bitswap - impl basic block exchange functionality
- crypto - building trust between peers in the network
### What's next:
......
......@@ -6,6 +6,7 @@ package readonly
import (
"fmt"
"io/ioutil"
"os"
"os/exec"
"os/signal"
......@@ -119,7 +120,13 @@ func (s *Node) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) {
// ReadAll reads the object data as file data
func (s *Node) ReadAll(intr fs.Intr) ([]byte, fuse.Error) {
u.DOut("Read node.\n")
return []byte(s.Nd.Data), nil
r, err := mdag.NewDagReader(s.Nd)
if err != nil {
return nil, err
}
// this is a terrible function... 'ReadAll'?
// what if i have a 6TB file? GG RAM.
return ioutil.ReadAll(r)
}
// Mount mounts an IpfsNode instance at a particular path. It
......@@ -132,7 +139,13 @@ func Mount(ipfs *core.IpfsNode, fpath string) error {
go func() {
<-sigc
Unmount(fpath)
for {
err := Unmount(fpath)
if err == nil {
return
}
time.Sleep(time.Millisecond * 10)
}
}()
c, err := fuse.Mount(fpath)
......
......@@ -20,19 +20,21 @@ var ErrSizeLimitExceeded = fmt.Errorf("object size limit exceeded")
// NewDagFromReader constructs a Merkle DAG from the given io.Reader.
// size required for block construction.
func NewDagFromReader(r io.Reader) (*dag.Node, error) {
blkChan := SplitterBySize(1024 * 512)(r)
root := &dag.Node{}
return NewDagFromReaderWithSplitter(r, SplitterBySize(1024*512))
}
func NewDagFromReaderWithSplitter(r io.Reader, spl BlockSplitter) (*dag.Node, error) {
blkChan := spl(r)
root := &dag.Node{Data: dag.FilePBData()}
for blk := range blkChan {
child := &dag.Node{Data: blk}
child := &dag.Node{Data: dag.WrapData(blk)}
err := root.AddNodeLink("", child)
if err != nil {
return nil, err
}
}
fmt.Println(root.Links)
return root, nil
}
......
package importer
import (
"bytes"
"crypto/rand"
"io"
"io/ioutil"
"testing"
dag "github.com/jbenet/go-ipfs/merkledag"
)
func TestFileConsistency(t *testing.T) {
buf := new(bytes.Buffer)
io.CopyN(buf, rand.Reader, 512*32)
should := buf.Bytes()
nd, err := NewDagFromReaderWithSplitter(buf, SplitterBySize(512))
if err != nil {
t.Fatal(err)
}
r, err := dag.NewDagReader(nd)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(out, should) {
t.Fatal("Output not the same as input.")
}
}
all: node.pb.go
all: node.pb.go data.pb.go
node.pb.go: node.proto
protoc --gogo_out=. --proto_path=../../../../:/usr/local/opt/protobuf/include:. $<
data.pb.go: data.proto
protoc --go_out=. data.proto
clean:
rm node.pb.go
package merkledag
import (
"bytes"
"errors"
"io"
"code.google.com/p/goprotobuf/proto"
)
var ErrIsDir = errors.New("this dag node is a directory.")
// DagReader provides a way to easily read the data contained in a dag.
type DagReader struct {
node *Node
position int
buf *bytes.Buffer
thisData []byte
}
func NewDagReader(n *Node) (io.Reader, error) {
pb := new(PBData)
err := proto.Unmarshal(n.Data, pb)
if err != nil {
return nil, err
}
switch pb.GetType() {
case PBData_Directory:
return nil, ErrIsDir
case PBData_File:
return &DagReader{
node: n,
thisData: pb.GetData(),
}, nil
case PBData_Raw:
return bytes.NewBuffer(pb.GetData()), nil
default:
panic("Unrecognized node type!")
}
}
func (dr *DagReader) precalcNextBuf() error {
if dr.position >= len(dr.node.Links) {
return io.EOF
}
nxtLink := dr.node.Links[dr.position]
nxt := nxtLink.Node
if nxt == nil {
//TODO: should use dagservice or something to get needed block
return errors.New("Link to nil node! Tree not fully expanded!")
}
pb := new(PBData)
err := proto.Unmarshal(nxt.Data, pb)
if err != nil {
return err
}
dr.position++
// TODO: dont assume a single layer of indirection
switch pb.GetType() {
case PBData_Directory:
panic("Why is there a directory under a file?")
case PBData_File:
//TODO: maybe have a PBData_Block type for indirect blocks?
panic("Not yet handling different layers of indirection!")
case PBData_Raw:
dr.buf = bytes.NewBuffer(pb.GetData())
return nil
default:
panic("Unrecognized node type!")
}
}
func (dr *DagReader) Read(b []byte) (int, error) {
if dr.buf == nil {
err := dr.precalcNextBuf()
if err != nil {
return 0, err
}
}
total := 0
for {
n, err := dr.buf.Read(b[total:])
total += n
if err != nil {
if err != io.EOF {
return total, err
}
}
if total == len(b) {
return total, nil
}
err = dr.precalcNextBuf()
if err != nil {
return total, err
}
}
}
// Code generated by protoc-gen-go.
// source: data.proto
// DO NOT EDIT!
/*
Package merkledag is a generated protocol buffer package.
It is generated from these files:
data.proto
It has these top-level messages:
PBData
*/
package merkledag
import proto "code.google.com/p/goprotobuf/proto"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = math.Inf
type PBData_DataType int32
const (
PBData_Raw PBData_DataType = 0
PBData_Directory PBData_DataType = 1
PBData_File PBData_DataType = 2
)
var PBData_DataType_name = map[int32]string{
0: "Raw",
1: "Directory",
2: "File",
}
var PBData_DataType_value = map[string]int32{
"Raw": 0,
"Directory": 1,
"File": 2,
}
func (x PBData_DataType) Enum() *PBData_DataType {
p := new(PBData_DataType)
*p = x
return p
}
func (x PBData_DataType) String() string {
return proto.EnumName(PBData_DataType_name, int32(x))
}
func (x *PBData_DataType) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(PBData_DataType_value, data, "PBData_DataType")
if err != nil {
return err
}
*x = PBData_DataType(value)
return nil
}
type PBData struct {
Type *PBData_DataType `protobuf:"varint,1,req,enum=merkledag.PBData_DataType" json:"Type,omitempty"`
Data []byte `protobuf:"bytes,2,opt" json:"Data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *PBData) Reset() { *m = PBData{} }
func (m *PBData) String() string { return proto.CompactTextString(m) }
func (*PBData) ProtoMessage() {}
func (m *PBData) GetType() PBData_DataType {
if m != nil && m.Type != nil {
return *m.Type
}
return PBData_Raw
}
func (m *PBData) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
func init() {
proto.RegisterEnum("merkledag.PBData_DataType", PBData_DataType_name, PBData_DataType_value)
}
package merkledag;
message PBData {
enum DataType {
Raw = 0;
Directory = 1;
File = 2;
}
required DataType Type = 1;
optional bytes Data = 2;
}
......@@ -3,6 +3,8 @@ package merkledag
import (
"fmt"
"code.google.com/p/goprotobuf/proto"
blocks "github.com/jbenet/go-ipfs/blocks"
bserv "github.com/jbenet/go-ipfs/blockservice"
u "github.com/jbenet/go-ipfs/util"
......@@ -152,3 +154,44 @@ func (n *DAGService) Get(k u.Key) (*Node, error) {
return Decoded(b.Data)
}
func FilePBData() []byte {
pbfile := new(PBData)
typ := PBData_File
pbfile.Type = &typ
data, err := proto.Marshal(pbfile)
if err != nil {
//this really shouldnt happen, i promise
panic(err)
}
return data
}
func FolderPBData() []byte {
pbfile := new(PBData)
typ := PBData_Directory
pbfile.Type = &typ
data, err := proto.Marshal(pbfile)
if err != nil {
//this really shouldnt happen, i promise
panic(err)
}
return data
}
func WrapData(b []byte) []byte {
pbdata := new(PBData)
typ := PBData_Raw
pbdata.Data = b
pbdata.Type = &typ
out, err := proto.Marshal(pbdata)
if err != nil {
// This shouldnt happen. seriously.
panic(err)
}
return out
}
......@@ -13,7 +13,8 @@ import (
// ChanBuffer is the size of the buffer in the Conn Chan
const ChanBuffer = 10
const MaxMessageSize = 1 << 19
// 1 MB
const MaxMessageSize = 1 << 20
// Conn represents a connection to another Peer (IPFS Node).
type Conn struct {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment