Commit 298305e4 authored by Jeromy's avatar Jeromy

implement append to trickledag

parent bff7f730
......@@ -16,6 +16,7 @@ import (
merkledag "github.com/jbenet/go-ipfs/merkledag"
mdtest "github.com/jbenet/go-ipfs/merkledag/test"
pin "github.com/jbenet/go-ipfs/pin"
ft "github.com/jbenet/go-ipfs/unixfs"
uio "github.com/jbenet/go-ipfs/unixfs/io"
u "github.com/jbenet/go-ipfs/util"
)
......@@ -29,7 +30,12 @@ func buildTestDag(r io.Reader, ds merkledag.DAGService, spl chunk.BlockSplitter)
Maxlinks: h.DefaultLinksPerBlock,
}
return TrickleLayout(dbp.New(blkch))
nd, err := TrickleLayout(dbp.New(blkch))
if err != nil {
return nil, err
}
return nd, VerifyTrickleDagStructure(nd, ds, dbp.Maxlinks, layerRepeat)
}
//Test where calls to read are smaller than the chunk size
......@@ -441,3 +447,176 @@ func TestSeekingConsistency(t *testing.T) {
t.Fatal(err)
}
}
func TestAppend(t *testing.T) {
nbytes := int64(128 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
// Reader for half the bytes
read := bytes.NewReader(should[:nbytes/2])
ds := mdtest.Mock(t)
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
dbp := &h.DagBuilderParams{
Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock,
}
spl := &chunk.SizeSplitter{500}
blks := spl.Split(bytes.NewReader(should[nbytes/2:]))
nnode, err := TrickleAppend(nd, dbp.New(blks))
if err != nil {
t.Fatal(err)
}
err = VerifyTrickleDagStructure(nnode, ds, dbp.Maxlinks, layerRepeat)
if err != nil {
t.Fatal(err)
}
fread, err := uio.NewDagReader(context.TODO(), nnode, ds)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(fread)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should)
if err != nil {
t.Fatal(err)
}
}
// This test appends one byte at a time to an empty file
func TestMultipleAppends(t *testing.T) {
ds := mdtest.Mock(t)
// TODO: fix small size appends and make this number bigger
nbytes := int64(1000)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(nil)
nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
dbp := &h.DagBuilderParams{
Dagserv: ds,
Maxlinks: 4,
}
spl := &chunk.SizeSplitter{500}
for i := 0; i < len(should); i++ {
blks := spl.Split(bytes.NewReader(should[i : i+1]))
nnode, err := TrickleAppend(nd, dbp.New(blks))
if err != nil {
t.Fatal(err)
}
err = VerifyTrickleDagStructure(nnode, ds, dbp.Maxlinks, layerRepeat)
if err != nil {
t.Fatal(err)
}
fread, err := uio.NewDagReader(context.TODO(), nnode, ds)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(fread)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should[:i+1])
if err != nil {
t.Fatal(err)
}
}
}
func TestAppendSingleBytesToEmpty(t *testing.T) {
ds := mdtest.Mock(t)
data := []byte("AB")
nd := new(merkledag.Node)
nd.Data = ft.FilePBData(nil, 0)
dbp := &h.DagBuilderParams{
Dagserv: ds,
Maxlinks: 4,
}
spl := &chunk.SizeSplitter{500}
blks := spl.Split(bytes.NewReader(data[:1]))
nnode, err := TrickleAppend(nd, dbp.New(blks))
if err != nil {
t.Fatal(err)
}
blks = spl.Split(bytes.NewReader(data[1:]))
nnode, err = TrickleAppend(nnode, dbp.New(blks))
if err != nil {
t.Fatal(err)
}
fread, err := uio.NewDagReader(context.TODO(), nnode, ds)
if err != nil {
t.Fatal(err)
}
out, err := ioutil.ReadAll(fread)
if err != nil {
t.Fatal(err)
}
fmt.Println(out, data)
err = arrComp(out, data)
if err != nil {
t.Fatal(err)
}
}
func printDag(nd *merkledag.Node, ds merkledag.DAGService, indent int) {
pbd, err := ft.FromBytes(nd.Data)
if err != nil {
panic(err)
}
for i := 0; i < indent; i++ {
fmt.Print(" ")
}
fmt.Printf("{size = %d, type = %s, nc = %d", pbd.GetFilesize(), pbd.GetType().String(), len(pbd.GetBlocksizes()))
if len(nd.Links) > 0 {
fmt.Println()
}
for _, lnk := range nd.Links {
child, err := lnk.GetNode(ds)
if err != nil {
panic(err)
}
printDag(child, ds, indent+1)
}
if len(nd.Links) > 0 {
for i := 0; i < indent; i++ {
fmt.Print(" ")
}
}
fmt.Println("}")
}
package trickle
import (
"errors"
h "github.com/jbenet/go-ipfs/importer/helpers"
dag "github.com/jbenet/go-ipfs/merkledag"
ft "github.com/jbenet/go-ipfs/unixfs"
)
// layerRepeat specifies how many times to append a child tree of a
......@@ -41,7 +43,7 @@ func fillTrickleRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error
}
for i := 1; i < depth && !db.Done(); i++ {
for j := 0; j < layerRepeat; j++ {
for j := 0; j < layerRepeat && !db.Done(); j++ {
next := h.NewUnixfsNode()
err := fillTrickleRec(db, next, i)
if err != nil {
......@@ -56,3 +58,210 @@ func fillTrickleRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error
}
return nil
}
// TrickleAppend appends the data in `db` to the dag, using the Trickledag format
func TrickleAppend(base *dag.Node, db *h.DagBuilderHelper) (*dag.Node, error) {
// Convert to unixfs node for working with easily
ufsn, err := h.NewUnixfsNodeFromDag(base)
if err != nil {
return nil, err
}
// Get depth of this 'tree'
n, j := trickleDepthInfo(ufsn, db.Maxlinks())
if n == 0 {
// If direct blocks not filled...
err := db.FillNodeLayer(ufsn)
if err != nil {
return nil, err
}
if db.Done() {
return ufsn.GetDagNode()
}
// If continuing, our depth has increased by one
n++
}
err = appendFillLastChild(ufsn, n-1, j, db)
if err != nil {
return nil, err
}
// after appendFillLastChild, our depth is now increased by one
if !db.Done() {
n++
}
// Now, continue filling out tree like normal
for i := n; !db.Done(); i++ {
for j := 0; j < layerRepeat && !db.Done(); j++ {
next := h.NewUnixfsNode()
err := fillTrickleRec(db, next, i)
if err != nil {
return nil, err
}
err = ufsn.AddChild(next, db)
if err != nil {
return nil, err
}
}
}
return ufsn.GetDagNode()
}
func appendFillLastChild(ufsn *h.UnixfsNode, depth int, layerFill int, db *h.DagBuilderHelper) error {
if ufsn.NumChildren() > db.Maxlinks() {
// Recursive step, grab last child
last := ufsn.NumChildren() - 1
lastChild, err := ufsn.GetChild(last, db.GetDagServ())
if err != nil {
return err
}
// Fill out last child (may not be full tree)
nchild, err := trickleAppendRec(lastChild, db, depth-1)
if err != nil {
return err
}
// Update changed child in parent node
ufsn.RemoveChild(last)
err = ufsn.AddChild(nchild, db)
if err != nil {
return err
}
// Partially filled depth layer
if layerFill != 0 {
for ; layerFill < layerRepeat && !db.Done(); layerFill++ {
next := h.NewUnixfsNode()
err := fillTrickleRec(db, next, depth)
if err != nil {
return err
}
err = ufsn.AddChild(next, db)
if err != nil {
return err
}
}
}
}
return nil
}
func trickleAppendRec(ufsn *h.UnixfsNode, db *h.DagBuilderHelper, depth int) (*h.UnixfsNode, error) {
if depth == 0 || db.Done() {
return ufsn, nil
}
// Get depth of this 'tree'
n, j := trickleDepthInfo(ufsn, db.Maxlinks())
if n == 0 {
// If direct blocks not filled...
err := db.FillNodeLayer(ufsn)
if err != nil {
return nil, err
}
n++
}
// If at correct depth, no need to continue
if n == depth {
return ufsn, nil
}
err := appendFillLastChild(ufsn, n, j, db)
if err != nil {
return nil, err
}
// after appendFillLastChild, our depth is now increased by one
if !db.Done() {
n++
}
// Now, continue filling out tree like normal
for i := n; i < depth && !db.Done(); i++ {
for j := 0; j < layerRepeat && !db.Done(); j++ {
next := h.NewUnixfsNode()
err := fillTrickleRec(db, next, i)
if err != nil {
return nil, err
}
err = ufsn.AddChild(next, db)
if err != nil {
return nil, err
}
}
}
return ufsn, nil
}
func trickleDepthInfo(node *h.UnixfsNode, maxlinks int) (int, int) {
n := node.NumChildren()
if n < maxlinks {
return 0, 0
}
return ((n - maxlinks) / layerRepeat) + 1, (n - maxlinks) % layerRepeat
}
// VerifyTrickleDagStructure checks that the given dag matches exactly the trickle dag datastructure
// layout
func VerifyTrickleDagStructure(nd *dag.Node, ds dag.DAGService, direct int, layerRepeat int) error {
return verifyTDagRec(nd, -1, direct, layerRepeat, ds)
}
// Recursive call for verifying the structure of a trickledag
func verifyTDagRec(nd *dag.Node, depth, direct, layerRepeat int, ds dag.DAGService) error {
if depth == 0 {
// zero depth dag is raw data block
if len(nd.Links) > 0 {
return errors.New("expected direct block")
}
pbn, err := ft.FromBytes(nd.Data)
if err != nil {
return err
}
if pbn.GetType() != ft.TRaw {
return errors.New("Expected raw block")
}
return nil
}
for i := 0; i < len(nd.Links); i++ {
child, err := nd.Links[i].GetNode(ds)
if err != nil {
return nil
}
if i < direct {
// Direct blocks
err := verifyTDagRec(child, 0, direct, layerRepeat, ds)
if err != nil {
return err
}
} else {
// Recursive trickle dags
rdepth := ((i - direct) / layerRepeat) + 1
if rdepth >= depth && depth > 0 {
return errors.New("Child dag was too deep!")
}
err := verifyTDagRec(child, rdepth, direct, layerRepeat, ds)
if err != nil {
return err
}
}
}
return nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment