Commit 98cde157 authored by Jeromy's avatar Jeromy

integrate dagmodifier into ipns

parent 5c802ae8
package ipns package ipns
import ( import (
"bytes"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
...@@ -14,6 +13,7 @@ import ( ...@@ -14,6 +13,7 @@ import (
"github.com/jbenet/go-ipfs/core" "github.com/jbenet/go-ipfs/core"
ci "github.com/jbenet/go-ipfs/crypto" ci "github.com/jbenet/go-ipfs/crypto"
imp "github.com/jbenet/go-ipfs/importer" imp "github.com/jbenet/go-ipfs/importer"
dt "github.com/jbenet/go-ipfs/importer/dagwriter"
ft "github.com/jbenet/go-ipfs/importer/format" ft "github.com/jbenet/go-ipfs/importer/format"
mdag "github.com/jbenet/go-ipfs/merkledag" mdag "github.com/jbenet/go-ipfs/merkledag"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
...@@ -199,11 +199,8 @@ type Node struct { ...@@ -199,11 +199,8 @@ type Node struct {
Ipfs *core.IpfsNode Ipfs *core.IpfsNode
Nd *mdag.Node Nd *mdag.Node
fd *mdag.DagReader dagMod *dt.DagModifier
cached *ft.PBData cached *ft.PBData
// For writing
writerBuf WriteAtBuf
} }
func (s *Node) loadData() error { func (s *Node) loadData() error {
...@@ -303,35 +300,32 @@ func (s *Node) ReadAll(intr fs.Intr) ([]byte, fuse.Error) { ...@@ -303,35 +300,32 @@ func (s *Node) ReadAll(intr fs.Intr) ([]byte, fuse.Error) {
func (n *Node) Write(req *fuse.WriteRequest, resp *fuse.WriteResponse, intr fs.Intr) fuse.Error { func (n *Node) Write(req *fuse.WriteRequest, resp *fuse.WriteResponse, intr fs.Intr) fuse.Error {
log.Debug("ipns: Node Write [%s]: flags = %s, offset = %d, size = %d", n.name, req.Flags.String(), req.Offset, len(req.Data)) log.Debug("ipns: Node Write [%s]: flags = %s, offset = %d, size = %d", n.name, req.Flags.String(), req.Offset, len(req.Data))
if n.writerBuf == nil { if n.dagMod == nil {
n.writerBuf = NewWriterAtFromBytes(nil) dmod, err := dt.NewDagModifier(n.Nd, n.Ipfs.DAG, imp.DefaultSplitter)
if err != nil {
log.Error("Error creating dag modifier: %s", err)
return err
}
n.dagMod = dmod
} }
_, err := n.writerBuf.WriteAt(req.Data, req.Offset) wrote, err := n.dagMod.WriteAt(req.Data, uint64(req.Offset))
if err != nil { if err != nil {
return err return err
} }
resp.Size = len(req.Data) resp.Size = wrote
return nil return nil
} }
func (n *Node) Flush(req *fuse.FlushRequest, intr fs.Intr) fuse.Error { func (n *Node) Flush(req *fuse.FlushRequest, intr fs.Intr) fuse.Error {
log.Debug("Got flush request [%s]!", n.name) log.Debug("Got flush request [%s]!", n.name)
if n.writerBuf != nil { if n.dagMod != nil {
//TODO: newNode, err := n.dagMod.GetNode()
// This operation holds everything in memory,
// should be changed to stream the block creation/storage
// but for now, since the buf is all in memory anyways...
//NOTE:
// This should only occur on a file object, if this were to be a
// folder, bad things would happen.
buf := bytes.NewReader(n.writerBuf.Bytes())
newNode, err := imp.NewDagFromReader(buf)
if err != nil { if err != nil {
log.Critical("error creating dag from writerBuf: %s", err) log.Error("Error getting dag node from dagMod: %s", err)
return err return err
} }
if n.parent != nil { if n.parent != nil {
log.Debug("updating self in parent!") log.Debug("updating self in parent!")
err := n.parent.update(n.name, newNode) err := n.parent.update(n.name, newNode)
...@@ -358,7 +352,7 @@ func (n *Node) Flush(req *fuse.FlushRequest, intr fs.Intr) fuse.Error { ...@@ -358,7 +352,7 @@ func (n *Node) Flush(req *fuse.FlushRequest, intr fs.Intr) fuse.Error {
fmt.Println(b) fmt.Println(b)
//*/ //*/
n.writerBuf = nil n.dagMod = nil
n.wasChanged() n.wasChanged()
} }
...@@ -390,8 +384,6 @@ func (n *Node) republishRoot() error { ...@@ -390,8 +384,6 @@ func (n *Node) republishRoot() error {
return err return err
} }
n.writerBuf = nil
ndkey, err := root.Nd.Key() ndkey, err := root.Nd.Key()
if err != nil { if err != nil {
log.Error("getKey error: %s", err) log.Error("getKey error: %s", err)
...@@ -451,8 +443,9 @@ func (n *Node) Open(req *fuse.OpenRequest, resp *fuse.OpenResponse, intr fs.Intr ...@@ -451,8 +443,9 @@ func (n *Node) Open(req *fuse.OpenRequest, resp *fuse.OpenResponse, intr fs.Intr
//TODO: check open flags and truncate if necessary //TODO: check open flags and truncate if necessary
if req.Flags&fuse.OpenTruncate != 0 { if req.Flags&fuse.OpenTruncate != 0 {
log.Warning("Need to truncate file!") log.Warning("Need to truncate file!")
} n.cached = nil
if req.Flags&fuse.OpenAppend != 0 { n.Nd = &mdag.Node{Data: ft.FilePBData(nil, 0)}
} else if req.Flags&fuse.OpenAppend != 0 {
log.Warning("Need to append to file!") log.Warning("Need to append to file!")
} }
return n, nil return n, nil
......
...@@ -109,8 +109,6 @@ func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) { ...@@ -109,8 +109,6 @@ func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) {
for i, size := range dm.pbdata.Blocksizes[startsubblk:] { for i, size := range dm.pbdata.Blocksizes[startsubblk:] {
if end > traversed { if end > traversed {
changed = append(changed, i+startsubblk) changed = append(changed, i+startsubblk)
} else if end == traversed {
break
} else { } else {
break break
} }
...@@ -122,6 +120,7 @@ func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) { ...@@ -122,6 +120,7 @@ func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) {
} }
} }
// If our write starts in the middle of a block...
var midlnk *mdag.Link var midlnk *mdag.Link
if mid >= 0 { if mid >= 0 {
midlnk = dm.curNode.Links[mid] midlnk = dm.curNode.Links[mid]
...@@ -139,6 +138,7 @@ func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) { ...@@ -139,6 +138,7 @@ func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) {
b = append(b, data[midoff:]...) b = append(b, data[midoff:]...)
} }
// Generate new sub-blocks, and sizes
subblocks := splitBytes(b, dm.splitter) subblocks := splitBytes(b, dm.splitter)
var links []*mdag.Link var links []*mdag.Link
var sizes []uint64 var sizes []uint64
...@@ -159,8 +159,10 @@ func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) { ...@@ -159,8 +159,10 @@ func (dm *DagModifier) WriteAt(b []byte, offset uint64) (int, error) {
// This is disgusting // This is disgusting
if len(changed) > 0 { if len(changed) > 0 {
dm.curNode.Links = append(dm.curNode.Links[:changed[0]], append(links, dm.curNode.Links[changed[len(changed)-1]+1:]...)...) sechalflink := append(links, dm.curNode.Links[changed[len(changed)-1]+1:]...)
dm.pbdata.Blocksizes = append(dm.pbdata.Blocksizes[:changed[0]], append(sizes, dm.pbdata.Blocksizes[changed[len(changed)-1]+1:]...)...) dm.curNode.Links = append(dm.curNode.Links[:changed[0]], sechalflink...)
sechalfblks := append(sizes, dm.pbdata.Blocksizes[changed[len(changed)-1]+1:]...)
dm.pbdata.Blocksizes = append(dm.pbdata.Blocksizes[:changed[0]], sechalfblks...)
} else { } else {
dm.curNode.Links = append(dm.curNode.Links, links...) dm.curNode.Links = append(dm.curNode.Links, links...)
dm.pbdata.Blocksizes = append(dm.pbdata.Blocksizes, sizes...) dm.pbdata.Blocksizes = append(dm.pbdata.Blocksizes, sizes...)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment