Unverified Commit b126601d authored by Whyrusleeping's avatar Whyrusleeping Committed by GitHub

Merge pull request #5160 from schomatis/feat/unixfs/dir-interface

unixfs: add a directory interface
parents 8fa1c881 c47cd13c
...@@ -33,7 +33,9 @@ type Directory struct { ...@@ -33,7 +33,9 @@ type Directory struct {
lock sync.Mutex lock sync.Mutex
ctx context.Context ctx context.Context
dirbuilder *uio.Directory // UnixFS directory implementation used for creating,
// reading and editing directories.
unixfsDir uio.Directory
modTime time.Time modTime time.Time
...@@ -51,25 +53,25 @@ func NewDirectory(ctx context.Context, name string, node ipld.Node, parent child ...@@ -51,25 +53,25 @@ func NewDirectory(ctx context.Context, name string, node ipld.Node, parent child
} }
return &Directory{ return &Directory{
dserv: dserv, dserv: dserv,
ctx: ctx, ctx: ctx,
name: name, name: name,
dirbuilder: db, unixfsDir: db,
parent: parent, parent: parent,
childDirs: make(map[string]*Directory), childDirs: make(map[string]*Directory),
files: make(map[string]*File), files: make(map[string]*File),
modTime: time.Now(), modTime: time.Now(),
}, nil }, nil
} }
// GetPrefix gets the CID prefix of the root node // GetPrefix gets the CID prefix of the root node
func (d *Directory) GetPrefix() *cid.Prefix { func (d *Directory) GetPrefix() *cid.Prefix {
return d.dirbuilder.GetPrefix() return d.unixfsDir.GetPrefix()
} }
// SetPrefix sets the CID prefix // SetPrefix sets the CID prefix
func (d *Directory) SetPrefix(prefix *cid.Prefix) { func (d *Directory) SetPrefix(prefix *cid.Prefix) {
d.dirbuilder.SetPrefix(prefix) d.unixfsDir.SetPrefix(prefix)
} }
// closeChild updates the child by the given name to the dag node 'nd' // closeChild updates the child by the given name to the dag node 'nd'
...@@ -103,7 +105,7 @@ func (d *Directory) closeChildUpdate(name string, nd ipld.Node, sync bool) (*dag ...@@ -103,7 +105,7 @@ func (d *Directory) closeChildUpdate(name string, nd ipld.Node, sync bool) (*dag
} }
func (d *Directory) flushCurrentNode() (*dag.ProtoNode, error) { func (d *Directory) flushCurrentNode() (*dag.ProtoNode, error) {
nd, err := d.dirbuilder.GetNode() nd, err := d.unixfsDir.GetNode()
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -122,7 +124,7 @@ func (d *Directory) flushCurrentNode() (*dag.ProtoNode, error) { ...@@ -122,7 +124,7 @@ func (d *Directory) flushCurrentNode() (*dag.ProtoNode, error) {
} }
func (d *Directory) updateChild(name string, nd ipld.Node) error { func (d *Directory) updateChild(name string, nd ipld.Node) error {
err := d.dirbuilder.AddChild(d.ctx, name, nd) err := d.AddUnixFSChild(name, nd)
if err != nil { if err != nil {
return err return err
} }
...@@ -206,7 +208,7 @@ func (d *Directory) Uncache(name string) { ...@@ -206,7 +208,7 @@ func (d *Directory) Uncache(name string) {
// childFromDag searches through this directories dag node for a child link // childFromDag searches through this directories dag node for a child link
// with the given name // with the given name
func (d *Directory) childFromDag(name string) (ipld.Node, error) { func (d *Directory) childFromDag(name string) (ipld.Node, error) {
return d.dirbuilder.Find(d.ctx, name) return d.unixfsDir.Find(d.ctx, name)
} }
// childUnsync returns the child under this directory by the given name // childUnsync returns the child under this directory by the given name
...@@ -237,7 +239,7 @@ func (d *Directory) ListNames(ctx context.Context) ([]string, error) { ...@@ -237,7 +239,7 @@ func (d *Directory) ListNames(ctx context.Context) ([]string, error) {
defer d.lock.Unlock() defer d.lock.Unlock()
var out []string var out []string
err := d.dirbuilder.ForEachLink(ctx, func(l *ipld.Link) error { err := d.unixfsDir.ForEachLink(ctx, func(l *ipld.Link) error {
out = append(out, l.Name) out = append(out, l.Name)
return nil return nil
}) })
...@@ -262,7 +264,7 @@ func (d *Directory) List(ctx context.Context) ([]NodeListing, error) { ...@@ -262,7 +264,7 @@ func (d *Directory) List(ctx context.Context) ([]NodeListing, error) {
func (d *Directory) ForEachEntry(ctx context.Context, f func(NodeListing) error) error { func (d *Directory) ForEachEntry(ctx context.Context, f func(NodeListing) error) error {
d.lock.Lock() d.lock.Lock()
defer d.lock.Unlock() defer d.lock.Unlock()
return d.dirbuilder.ForEachLink(ctx, func(l *ipld.Link) error { return d.unixfsDir.ForEachLink(ctx, func(l *ipld.Link) error {
c, err := d.childUnsync(l.Name) c, err := d.childUnsync(l.Name)
if err != nil { if err != nil {
return err return err
...@@ -315,7 +317,7 @@ func (d *Directory) Mkdir(name string) (*Directory, error) { ...@@ -315,7 +317,7 @@ func (d *Directory) Mkdir(name string) (*Directory, error) {
return nil, err return nil, err
} }
err = d.dirbuilder.AddChild(d.ctx, name, ndir) err = d.AddUnixFSChild(name, ndir)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -336,7 +338,7 @@ func (d *Directory) Unlink(name string) error { ...@@ -336,7 +338,7 @@ func (d *Directory) Unlink(name string) error {
delete(d.childDirs, name) delete(d.childDirs, name)
delete(d.files, name) delete(d.files, name)
return d.dirbuilder.RemoveChild(d.ctx, name) return d.unixfsDir.RemoveChild(d.ctx, name)
} }
func (d *Directory) Flush() error { func (d *Directory) Flush() error {
...@@ -363,7 +365,7 @@ func (d *Directory) AddChild(name string, nd ipld.Node) error { ...@@ -363,7 +365,7 @@ func (d *Directory) AddChild(name string, nd ipld.Node) error {
return err return err
} }
err = d.dirbuilder.AddChild(d.ctx, name, nd) err = d.AddUnixFSChild(name, nd)
if err != nil { if err != nil {
return err return err
} }
...@@ -372,6 +374,29 @@ func (d *Directory) AddChild(name string, nd ipld.Node) error { ...@@ -372,6 +374,29 @@ func (d *Directory) AddChild(name string, nd ipld.Node) error {
return nil return nil
} }
// AddUnixFSChild adds a child to the inner UnixFS directory
// and transitions to a HAMT implementation if needed.
func (d *Directory) AddUnixFSChild(name string, node ipld.Node) error {
if uio.UseHAMTSharding {
// If the directory HAMT implementation is being used and this
// directory is actually a basic implementation switch it to HAMT.
if basicDir, ok := d.unixfsDir.(*uio.BasicDirectory); ok {
hamtDir, err := basicDir.SwitchToSharding(d.ctx)
if err != nil {
return err
}
d.unixfsDir = hamtDir
}
}
err := d.unixfsDir.AddChild(d.ctx, name, node)
if err != nil {
return err
}
return nil
}
func (d *Directory) sync() error { func (d *Directory) sync() error {
for name, dir := range d.childDirs { for name, dir := range d.childDirs {
nd, err := dir.GetNode() nd, err := dir.GetNode()
...@@ -426,7 +451,7 @@ func (d *Directory) GetNode() (ipld.Node, error) { ...@@ -426,7 +451,7 @@ func (d *Directory) GetNode() (ipld.Node, error) {
return nil, err return nil, err
} }
nd, err := d.dirbuilder.GetNode() nd, err := d.unixfsDir.GetNode()
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
package io
import (
"context"
"fmt"
"os"
mdag "github.com/ipfs/go-ipfs/merkledag"
format "github.com/ipfs/go-ipfs/unixfs"
hamt "github.com/ipfs/go-ipfs/unixfs/hamt"
ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format"
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
)
// ShardSplitThreshold specifies how large of an unsharded directory
// the Directory code will generate. Adding entries over this value will
// result in the node being restructured into a sharded object.
var ShardSplitThreshold = 1000
// UseHAMTSharding is a global flag that signifies whether or not to use the
// HAMT sharding scheme for directory creation
var UseHAMTSharding = false
// DefaultShardWidth is the default value used for hamt sharding width.
var DefaultShardWidth = 256
// Directory allows to work with UnixFS directory nodes, adding and removing
// children. It allows to work with different directory schemes,
// like the classic or the HAMT one.
type Directory struct {
dserv ipld.DAGService
dirnode *mdag.ProtoNode
shard *hamt.Shard
}
// NewDirectory returns a Directory. It needs a DAGService to add the Children
func NewDirectory(dserv ipld.DAGService) *Directory {
db := new(Directory)
db.dserv = dserv
if UseHAMTSharding {
s, err := hamt.NewShard(dserv, DefaultShardWidth)
if err != nil {
panic(err) // will only panic if DefaultShardWidth is a bad value
}
db.shard = s
} else {
db.dirnode = format.EmptyDirNode()
}
return db
}
// ErrNotADir implies that the given node was not a unixfs directory
var ErrNotADir = fmt.Errorf("merkledag node was not a directory or shard")
// NewDirectoryFromNode loads a unixfs directory from the given IPLD node and
// DAGService.
func NewDirectoryFromNode(dserv ipld.DAGService, nd ipld.Node) (*Directory, error) {
pbnd, ok := nd.(*mdag.ProtoNode)
if !ok {
return nil, ErrNotADir
}
pbd, err := format.FromBytes(pbnd.Data())
if err != nil {
return nil, err
}
switch pbd.GetType() {
case format.TDirectory:
return &Directory{
dserv: dserv,
dirnode: pbnd.Copy().(*mdag.ProtoNode),
}, nil
case format.THAMTShard:
shard, err := hamt.NewHamtFromDag(dserv, nd)
if err != nil {
return nil, err
}
return &Directory{
dserv: dserv,
shard: shard,
}, nil
default:
return nil, ErrNotADir
}
}
// SetPrefix sets the prefix of the root node
func (d *Directory) SetPrefix(prefix *cid.Prefix) {
if d.dirnode != nil {
d.dirnode.SetPrefix(prefix)
}
if d.shard != nil {
d.shard.SetPrefix(prefix)
}
}
// AddChild adds a (name, key)-pair to the root node.
func (d *Directory) AddChild(ctx context.Context, name string, nd ipld.Node) error {
if d.shard == nil {
if !UseHAMTSharding {
_ = d.dirnode.RemoveNodeLink(name)
return d.dirnode.AddNodeLink(name, nd)
}
err := d.switchToSharding(ctx)
if err != nil {
return err
}
}
return d.shard.Set(ctx, name, nd)
}
func (d *Directory) switchToSharding(ctx context.Context) error {
s, err := hamt.NewShard(d.dserv, DefaultShardWidth)
if err != nil {
return err
}
s.SetPrefix(&d.dirnode.Prefix)
d.shard = s
for _, lnk := range d.dirnode.Links() {
cnd, err := d.dserv.Get(ctx, lnk.Cid)
if err != nil {
return err
}
err = d.shard.Set(ctx, lnk.Name, cnd)
if err != nil {
return err
}
}
d.dirnode = nil
return nil
}
// ForEachLink applies the given function to Links in the directory.
func (d *Directory) ForEachLink(ctx context.Context, f func(*ipld.Link) error) error {
if d.shard == nil {
for _, l := range d.dirnode.Links() {
if err := f(l); err != nil {
return err
}
}
return nil
}
return d.shard.ForEachLink(ctx, f)
}
// Links returns the all the links in the directory node.
func (d *Directory) Links(ctx context.Context) ([]*ipld.Link, error) {
if d.shard == nil {
return d.dirnode.Links(), nil
}
return d.shard.EnumLinks(ctx)
}
// Find returns the root node of the file named 'name' within this directory.
// In the case of HAMT-directories, it will traverse the tree.
func (d *Directory) Find(ctx context.Context, name string) (ipld.Node, error) {
if d.shard == nil {
lnk, err := d.dirnode.GetNodeLink(name)
switch err {
case mdag.ErrLinkNotFound:
return nil, os.ErrNotExist
default:
return nil, err
case nil:
}
return d.dserv.Get(ctx, lnk.Cid)
}
lnk, err := d.shard.Find(ctx, name)
if err != nil {
return nil, err
}
return lnk.GetNode(ctx, d.dserv)
}
// RemoveChild removes the child with the given name.
func (d *Directory) RemoveChild(ctx context.Context, name string) error {
if d.shard == nil {
return d.dirnode.RemoveNodeLink(name)
}
return d.shard.Remove(ctx, name)
}
// GetNode returns the root of this Directory
func (d *Directory) GetNode() (ipld.Node, error) {
if d.shard == nil {
return d.dirnode, nil
}
return d.shard.Node()
}
// GetPrefix returns the CID Prefix used
func (d *Directory) GetPrefix() *cid.Prefix {
if d.shard == nil {
return &d.dirnode.Prefix
}
return d.shard.Prefix()
}
package io
import (
"context"
"fmt"
"os"
mdag "github.com/ipfs/go-ipfs/merkledag"
format "github.com/ipfs/go-ipfs/unixfs"
hamt "github.com/ipfs/go-ipfs/unixfs/hamt"
ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format"
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
)
// UseHAMTSharding is a global flag that signifies whether or not to use the
// HAMT sharding scheme for directory creation
var UseHAMTSharding = false
// DefaultShardWidth is the default value used for hamt sharding width.
var DefaultShardWidth = 256
// Directory defines a UnixFS directory. It is used for creating, reading and
// editing directories. It allows to work with different directory schemes,
// like the basic or the HAMT implementation.
//
// It just allows to perform explicit edits on a single directory, working with
// directory trees is out of its scope, they are managed by the MFS layer
// (which is the main consumer of this interface).
type Directory interface {
// SetPrefix sets the CID prefix of the root node.
SetPrefix(*cid.Prefix)
// AddChild adds a (name, key) pair to the root node.
AddChild(context.Context, string, ipld.Node) error
// ForEachLink applies the given function to Links in the directory.
ForEachLink(context.Context, func(*ipld.Link) error) error
// Links returns the all the links in the directory node.
Links(context.Context) ([]*ipld.Link, error)
// Find returns the root node of the file named 'name' within this directory.
// In the case of HAMT-directories, it will traverse the tree.
Find(context.Context, string) (ipld.Node, error)
// RemoveChild removes the child with the given name.
RemoveChild(context.Context, string) error
// GetNode returns the root of this directory.
GetNode() (ipld.Node, error)
// GetPrefix returns the CID Prefix used.
GetPrefix() *cid.Prefix
}
// TODO: Evaluate removing `dserv` from this layer and providing it in MFS.
// (The functions should in that case add a `DAGService` argument.)
// BasicDirectory is the basic implementation of `Directory`. All the entries
// are stored in a single node.
type BasicDirectory struct {
node *mdag.ProtoNode
dserv ipld.DAGService
}
// HAMTDirectory is the HAMT implementation of `Directory`.
// (See package `hamt` for more information.)
type HAMTDirectory struct {
shard *hamt.Shard
dserv ipld.DAGService
}
// NewDirectory returns a Directory. It needs a `DAGService` to add the children.
func NewDirectory(dserv ipld.DAGService) Directory {
if UseHAMTSharding {
dir := new(HAMTDirectory)
s, err := hamt.NewShard(dserv, DefaultShardWidth)
if err != nil {
panic(err) // will only panic if DefaultShardWidth is a bad value
}
dir.shard = s
dir.dserv = dserv
return dir
}
dir := new(BasicDirectory)
dir.node = format.EmptyDirNode()
dir.dserv = dserv
return dir
}
// ErrNotADir implies that the given node was not a unixfs directory
var ErrNotADir = fmt.Errorf("merkledag node was not a directory or shard")
// NewDirectoryFromNode loads a unixfs directory from the given IPLD node and
// DAGService.
func NewDirectoryFromNode(dserv ipld.DAGService, node ipld.Node) (Directory, error) {
protoBufNode, ok := node.(*mdag.ProtoNode)
if !ok {
return nil, ErrNotADir
}
fsNode, err := format.FSNodeFromBytes(protoBufNode.Data())
if err != nil {
return nil, err
}
switch fsNode.GetType() {
case format.TDirectory:
return &BasicDirectory{
dserv: dserv,
node: protoBufNode.Copy().(*mdag.ProtoNode),
}, nil
case format.THAMTShard:
shard, err := hamt.NewHamtFromDag(dserv, node)
if err != nil {
return nil, err
}
return &HAMTDirectory{
dserv: dserv,
shard: shard,
}, nil
}
return nil, ErrNotADir
}
// SetPrefix implements the `Directory` interface.
func (d *BasicDirectory) SetPrefix(prefix *cid.Prefix) {
d.node.SetPrefix(prefix)
}
// AddChild implements the `Directory` interface. It adds (or replaces)
// a link to the given `node` under `name`.
func (d *BasicDirectory) AddChild(ctx context.Context, name string, node ipld.Node) error {
d.node.RemoveNodeLink(name)
// Remove old link (if it existed), don't check a potential `ErrNotFound`.
return d.node.AddNodeLink(name, node)
}
// ForEachLink implements the `Directory` interface.
func (d *BasicDirectory) ForEachLink(ctx context.Context, f func(*ipld.Link) error) error {
for _, l := range d.node.Links() {
if err := f(l); err != nil {
return err
}
}
return nil
}
// Links implements the `Directory` interface.
func (d *BasicDirectory) Links(ctx context.Context) ([]*ipld.Link, error) {
return d.node.Links(), nil
}
// Find implements the `Directory` interface.
func (d *BasicDirectory) Find(ctx context.Context, name string) (ipld.Node, error) {
lnk, err := d.node.GetNodeLink(name)
if err == mdag.ErrLinkNotFound {
err = os.ErrNotExist
}
if err != nil {
return nil, err
}
return d.dserv.Get(ctx, lnk.Cid)
}
// RemoveChild implements the `Directory` interface.
func (d *BasicDirectory) RemoveChild(ctx context.Context, name string) error {
return d.node.RemoveNodeLink(name)
}
// GetNode implements the `Directory` interface.
func (d *BasicDirectory) GetNode() (ipld.Node, error) {
return d.node, nil
}
// GetPrefix implements the `Directory` interface.
func (d *BasicDirectory) GetPrefix() *cid.Prefix {
return &d.node.Prefix
}
// SwitchToSharding returns a HAMT implementation of this directory.
func (d *BasicDirectory) SwitchToSharding(ctx context.Context) (Directory, error) {
hamtDir := new(HAMTDirectory)
hamtDir.dserv = d.dserv
shard, err := hamt.NewShard(d.dserv, DefaultShardWidth)
if err != nil {
return nil, err
}
shard.SetPrefix(&d.node.Prefix)
hamtDir.shard = shard
for _, lnk := range d.node.Links() {
node, err := d.dserv.Get(ctx, lnk.Cid)
if err != nil {
return nil, err
}
err = hamtDir.shard.Set(ctx, lnk.Name, node)
if err != nil {
return nil, err
}
}
return hamtDir, nil
}
// SetPrefix implements the `Directory` interface.
func (d *HAMTDirectory) SetPrefix(prefix *cid.Prefix) {
d.shard.SetPrefix(prefix)
}
// AddChild implements the `Directory` interface.
func (d *HAMTDirectory) AddChild(ctx context.Context, name string, nd ipld.Node) error {
return d.shard.Set(ctx, name, nd)
}
// ForEachLink implements the `Directory` interface.
func (d *HAMTDirectory) ForEachLink(ctx context.Context, f func(*ipld.Link) error) error {
return d.shard.ForEachLink(ctx, f)
}
// Links implements the `Directory` interface.
func (d *HAMTDirectory) Links(ctx context.Context) ([]*ipld.Link, error) {
return d.shard.EnumLinks(ctx)
}
// Find implements the `Directory` interface. It will traverse the tree.
func (d *HAMTDirectory) Find(ctx context.Context, name string) (ipld.Node, error) {
lnk, err := d.shard.Find(ctx, name)
if err != nil {
return nil, err
}
return lnk.GetNode(ctx, d.dserv)
}
// RemoveChild implements the `Directory` interface.
func (d *HAMTDirectory) RemoveChild(ctx context.Context, name string) error {
return d.shard.Remove(ctx, name)
}
// GetNode implements the `Directory` interface.
func (d *HAMTDirectory) GetNode() (ipld.Node, error) {
return d.shard.Node()
}
// GetPrefix implements the `Directory` interface.
func (d *HAMTDirectory) GetPrefix() *cid.Prefix {
return d.shard.Prefix()
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment