Commit 60781411 authored by Hector Sanjuan's avatar Hector Sanjuan

Fix #27: Remove batching from importers

A batching DAG service is forced onto the
users of the importers, but they could just wrap
the given DAGSerice in the batching one to get the same
functionality (remembering to Close it at the end of the
proccess).

As detailed in #27, the importers should not be making choices
about what DAGService is the right one to use and wrapping the
given one.

This change requires wrapping the DAGService in go-ipfs into
ipld.Batch. and closing it when Finishing the adding process.
parent 0d29aa16
......@@ -130,7 +130,7 @@ func Layout(db *h.DagBuilderHelper) (ipld.Node, error) {
// This works without Filestore support (`ProcessFileStore`).
// TODO: Why? Is there a test case missing?
return db.AddNodeAndClose(root)
return root, db.Add(root)
}
// The first `root` will be a single leaf node with data
......@@ -160,7 +160,7 @@ func Layout(db *h.DagBuilderHelper) (ipld.Node, error) {
}
}
return db.AddNodeAndClose(root)
return root, db.Add(root)
}
// fillNodeRec will "fill" the given internal (non-leaf) `node` with data by
......
......@@ -6,6 +6,7 @@ import (
"os"
dag "github.com/ipfs/go-merkledag"
ft "github.com/ipfs/go-unixfs"
pb "github.com/ipfs/go-unixfs/pb"
......@@ -25,7 +26,6 @@ type DagBuilderHelper struct {
rawLeaves bool
nextData []byte // the next item to return.
maxlinks int
batch *ipld.Batch
cidBuilder cid.Builder
// Filestore support variables.
......@@ -78,7 +78,6 @@ func (dbp *DagBuilderParams) New(spl chunker.Splitter) *DagBuilderHelper {
rawLeaves: dbp.RawLeaves,
cidBuilder: dbp.CidBuilder,
maxlinks: dbp.Maxlinks,
batch: ipld.NewBatch(context.TODO(), dbp.Dagserv),
}
if fi, ok := spl.Reader().(files.FileInfo); dbp.NoCopy && ok {
db.fullPath = fi.AbsPath()
......@@ -327,8 +326,8 @@ func (db *DagBuilderHelper) ProcessFileStore(node ipld.Node, dataSize uint64) ip
return node
}
// Add sends a node to the DAGService, and returns it.
func (db *DagBuilderHelper) Add(node *UnixfsNode) (ipld.Node, error) {
// AddUnixfsNode sends a node to the DAGService, and returns it as ipld.Node.
func (db *DagBuilderHelper) AddUnixfsNode(node *UnixfsNode) (ipld.Node, error) {
dn, err := node.GetDagNode()
if err != nil {
return nil, err
......@@ -342,36 +341,17 @@ func (db *DagBuilderHelper) Add(node *UnixfsNode) (ipld.Node, error) {
return dn, nil
}
// Add inserts the given node in the DAGService.
func (db *DagBuilderHelper) Add(node ipld.Node) error {
return db.dserv.Add(context.TODO(), node)
}
// Maxlinks returns the configured maximum number for links
// for nodes built with this helper.
func (db *DagBuilderHelper) Maxlinks() int {
return db.maxlinks
}
// Close has the DAGService perform a batch Commit operation.
// It should be called at the end of the building process to make
// sure all data is persisted.
func (db *DagBuilderHelper) Close() error {
return db.batch.Commit()
}
// AddNodeAndClose adds the last `ipld.Node` from the DAG and
// closes the builder. It returns the same `node` passed as
// argument.
func (db *DagBuilderHelper) AddNodeAndClose(node ipld.Node) (ipld.Node, error) {
err := db.batch.Add(node)
if err != nil {
return nil, err
}
err = db.Close()
if err != nil {
return nil, err
}
return node, nil
}
// FSNodeOverDag encapsulates an `unixfs.FSNode` that will be stored in a
// `dag.ProtoNode`. Instead of just having a single `ipld.Node` that
// would need to be constantly (un)packed to access and modify its
......@@ -421,7 +401,7 @@ func (n *FSNodeOverDag) AddChild(child ipld.Node, fileSize uint64, db *DagBuilde
n.file.AddBlockSize(fileSize)
return db.batch.Add(child)
return db.Add(child)
}
// Commit unifies (resolves) the cache nodes into a single `ipld.Node`
......
......@@ -6,6 +6,7 @@ import (
"os"
dag "github.com/ipfs/go-merkledag"
ft "github.com/ipfs/go-unixfs"
cid "github.com/ipfs/go-cid"
......@@ -103,8 +104,7 @@ func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error {
return err
}
err = db.batch.Add(childnode)
_, err = db.AddUnixfsNode(child)
return err
}
......
......@@ -42,16 +42,7 @@ func Layout(db *h.DagBuilderHelper) (ipld.Node, error) {
return nil, err
}
out, err := db.Add(root)
if err != nil {
return nil, err
}
if err := db.Close(); err != nil {
return nil, err
}
return out, nil
return db.AddUnixfsNode(root)
}
// fillTrickleRec creates a trickle (sub-)tree with an optional maximum specified depth
......@@ -92,14 +83,6 @@ func Append(ctx context.Context, basen ipld.Node, db *h.DagBuilderHelper) (out i
return nil, dag.ErrNotProtobuf
}
defer func() {
if errOut == nil {
if err := db.Close(); err != nil {
errOut = err
}
}
}()
// Convert to unixfs node for working with easily
ufsn, err := h.NewUnixfsNodeFromDag(base)
if err != nil {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment