MIT License
Copyright (c) 2018 IPFS
# go-mfs # go-mfs
[![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme)
[![Build Status](https://travis-ci.com/ipfs/go-mfs.svg?branch=master)](https://travis-ci.com/ipfs/go-mfs)
> go-mfs implements an in-memory model of a mutable IPFS filesystem.
## Lead Maintainer
[Steven Allen](https://github.com/Stebalien)
## Table of Contents
- [Install](#install)
- [Usage](#usage)
- [Contribute](#contribute)
- [License](#license)
## Install
`go-mfs` works like a regular Go module:
> go get github.com/ipfs/go-mfs
It uses [Gx](https://github.com/whyrusleeping/gx) to manage dependencies. You can use `make all` to build it with the `gx` dependencies.
## Usage
import "github.com/ipfs/go-mfs"
Check the [GoDoc documentation](https://godoc.org/github.com/ipfs/go-mfs)
## Documentation
Documentation around the MFS and the Files API in general around IPFS is a work in progress the following links may be of use:
* [UnixFS](https://docs.ipfs.io/guides/concepts/unixfs/)
* [MFS](https://docs.ipfs.io/guides/concepts/mfs/)
* [General concept document about how are files handled in IPFS (WIP)](https://github.com/ipfs/docs/issues/133)
## Repository Structure
This repository contains many files, all belonging to the root `mfs` package.
* `file.go`: MFS `File`.
* `dir.go`: MFS `Directory`.
* `fd.go`: `FileDescriptor` used to operate on `File`s.
* `ops.go`: Functions that do not belong to either `File` nor `Directory` (although they mostly operate on them) that contain common operations to the MFS, e.g., find, move, add a file, make a directory.
* `root.go`: MFS `Root` (a `Directory` with republishing support).
* `repub.go`: `Republisher`.
* `mfs_test.go`: General tests (needs a [revision](https://github.com/ipfs/go-mfs/issues/9)).
* `repub_test.go`: Republisher-specific tests (contains only the `TestRepublisher` function).
## Contribute
PRs accepted.
Small note: If editing the README, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification.
## License
MIT © Protocol Labs, Inc.
\ No newline at end of file
package mfs
import (
dag "gitlab.dms3.io/dms3/go-merkledag"
ft "gitlab.dms3.io/dms3/go-unixfs"
uio "gitlab.dms3.io/dms3/go-unixfs/io"
cid "gitlab.dms3.io/dms3/go-cid"
ld "gitlab.dms3.io/dms3/go-ld-format"
var ErrNotYetImplemented = errors.New("not yet implemented")
var ErrInvalidChild = errors.New("invalid child node")
var ErrDirExists = errors.New("directory already has entry by that name")
// TODO: There's too much functionality associated with this structure,
// let's organize it (and if possible extract part of it elsewhere)
// and document the main features of `Directory` here.
type Directory struct {
// Internal cache with added entries to the directory, its cotents
// are synched with the underlying `unixfsDir` node in `sync()`.
entriesCache map[string]FSNode
lock sync.Mutex
// TODO: What content is being protected here exactly? The entire directory?
ctx context.Context
// UnixFS directory implementation used for creating,
// reading and editing directories.
unixfsDir uio.Directory
modTime time.Time
// NewDirectory constructs a new MFS directory.
// You probably don't want to call this directly. Instead, construct a new root
// using NewRoot.
func NewDirectory(ctx context.Context, name string, node ld.Node, parent parent, dserv ld.DAGService) (*Directory, error) {
db, err := uio.NewDirectoryFromNode(dserv, node)
if err != nil {
return nil, err
return &Directory{
inode: inode{
name: name,
parent: parent,
dagService: dserv,
ctx: ctx,
unixfsDir: db,
entriesCache: make(map[string]FSNode),
modTime: time.Now(),
}, nil
// GetCidBuilder gets the CID builder of the root node
func (d *Directory) GetCidBuilder() cid.Builder {
return d.unixfsDir.GetCidBuilder()
// SetCidBuilder sets the CID builder
func (d *Directory) SetCidBuilder(b cid.Builder) {
// This method implements the `parent` interface. It first does the local
// update of the child entry in the underlying UnixFS directory and saves
// the newly created directory node with the updated entry in the DAG
// service. Then it propagates the update upwards (through this same
// interface) repeating the whole process in the parent.
func (d *Directory) updateChildEntry(c child) error {
newDirNode, err := d.localUpdate(c)
if err != nil {
return err
// Continue to propagate the update process upwards
// (all the way up to the root).
return d.parent.updateChildEntry(child{d.name, newDirNode})
// This method implements the part of `updateChildEntry` that needs
// to be locked around: in charge of updating the UnixFS layer and
// generating the new node reflecting the update. It also stores the
// new node in the DAG layer.
func (d *Directory) localUpdate(c child) (*dag.ProtoNode, error) {
defer d.lock.Unlock()
err := d.updateChild(c)
if err != nil {
return nil, err
// TODO: Clearly define how are we propagating changes to lower layers
// like UnixFS.
nd, err := d.unixfsDir.GetNode()
if err != nil {
return nil, err
pbnd, ok := nd.(*dag.ProtoNode)
if !ok {
return nil, dag.ErrNotProtobuf
err = d.dagService.Add(d.ctx, nd)
if err != nil {
return nil, err
return pbnd.Copy().(*dag.ProtoNode), nil
// TODO: Why do we need a copy?
// Update child entry in the underlying UnixFS directory.
func (d *Directory) updateChild(c child) error {
err := d.unixfsDir.AddChild(d.ctx, c.Name, c.Node)
if err != nil {
return err
d.modTime = time.Now()
return nil
func (d *Directory) Type() NodeType {
return TDir
// childNode returns a FSNode under this directory by the given name if it exists.
// it does *not* check the cached dirs and files
func (d *Directory) childNode(name string) (FSNode, error) {
nd, err := d.childFromDag(name)
if err != nil {
return nil, err
return d.cacheNode(name, nd)
// cacheNode caches a node into d.childDirs or d.files and returns the FSNode.
func (d *Directory) cacheNode(name string, nd ld.Node) (FSNode, error) {
switch nd := nd.(type) {
case *dag.ProtoNode:
fsn, err := ft.FSNodeFromBytes(nd.Data())
if err != nil {
return nil, err
switch fsn.Type() {
case ft.TDirectory, ft.THAMTShard:
ndir, err := NewDirectory(d.ctx, name, nd, d, d.dagService)
if err != nil {
return nil, err
d.entriesCache[name] = ndir
return ndir, nil
case ft.TFile, ft.TRaw, ft.TSymlink:
nfi, err := NewFile(name, nd, d, d.dagService)
if err != nil {
return nil, err
d.entriesCache[name] = nfi
return nfi, nil
case ft.TMetadata:
return nil, ErrNotYetImplemented
return nil, ErrInvalidChild
case *dag.RawNode:
nfi, err := NewFile(name, nd, d, d.dagService)
if err != nil {
return nil, err
d.entriesCache[name] = nfi
return nfi, nil
return nil, fmt.Errorf("unrecognized node type in cache node")
// Child returns the child of this directory by the given name
func (d *Directory) Child(name string) (FSNode, error) {
defer d.lock.Unlock()
return d.childUnsync(name)
func (d *Directory) Uncache(name string) {
defer d.lock.Unlock()
delete(d.entriesCache, name)
// childFromDag searches through this directories dag node for a child link
// with the given name
func (d *Directory) childFromDag(name string) (ld.Node, error) {
return d.unixfsDir.Find(d.ctx, name)
// childUnsync returns the child under this directory by the given name
// without locking, useful for operations which already hold a lock
func (d *Directory) childUnsync(name string) (FSNode, error) {
entry, ok := d.entriesCache[name]
if ok {
return entry, nil
return d.childNode(name)
type NodeListing struct {
Name string
Type int
Size int64
Hash string
func (d *Directory) ListNames(ctx context.Context) ([]string, error) {
defer d.lock.Unlock()
var out []string
err := d.unixfsDir.ForEachLink(ctx, func(l *ld.Link) error {
out = append(out, l.Name)
return nil
if err != nil {
return nil, err
return out, nil
func (d *Directory) List(ctx context.Context) ([]NodeListing, error) {
var out []NodeListing
err := d.ForEachEntry(ctx, func(nl NodeListing) error {
out = append(out, nl)
return nil
return out, err
func (d *Directory) ForEachEntry(ctx context.Context, f func(NodeListing) error) error {
defer d.lock.Unlock()
return d.unixfsDir.ForEachLink(ctx, func(l *ld.Link) error {
c, err := d.childUnsync(l.Name)
if err != nil {
return err
nd, err := c.GetNode()
if err != nil {
return err
child := NodeListing{
Name: l.Name,
Type: int(c.Type()),
Hash: nd.Cid().String(),
if c, ok := c.(*File); ok {
size, err := c.Size()
if err != nil {
return err
child.Size = size
return f(child)
func (d *Directory) Mkdir(name string) (*Directory, error) {
defer d.lock.Unlock()
fsn, err := d.childUnsync(name)
if err == nil {
switch fsn := fsn.(type) {
case *Directory:
return fsn, os.ErrExist
case *File:
return nil, os.ErrExist
return nil, fmt.Errorf("unrecognized type: %#v", fsn)
ndir := ft.EmptyDirNode()
err = d.dagService.Add(d.ctx, ndir)
if err != nil {
return nil, err
err = d.unixfsDir.AddChild(d.ctx, name, ndir)
if err != nil {
return nil, err
dirobj, err := NewDirectory(d.ctx, name, ndir, d, d.dagService)
if err != nil {
return nil, err
d.entriesCache[name] = dirobj
return dirobj, nil
func (d *Directory) Unlink(name string) error {
defer d.lock.Unlock()
delete(d.entriesCache, name)
return d.unixfsDir.RemoveChild(d.ctx, name)
func (d *Directory) Flush() error {
nd, err := d.GetNode()
if err != nil {
return err
return d.parent.updateChildEntry(child{d.name, nd})
// AddChild adds the node 'nd' under this directory giving it the name 'name'
func (d *Directory) AddChild(name string, nd ld.Node) error {
defer d.lock.Unlock()
_, err := d.childUnsync(name)
if err == nil {
return ErrDirExists
err = d.dagService.Add(d.ctx, nd)
if err != nil {
return err
err = d.unixfsDir.AddChild(d.ctx, name, nd)
if err != nil {
return err
d.modTime = time.Now()
return nil
func (d *Directory) sync() error {
for name, entry := range d.entriesCache {
nd, err := entry.GetNode()
if err != nil {
return err
err = d.updateChild(child{name, nd})
if err != nil {
return err
// TODO: Should we clean the cache here?
return nil
func (d *Directory) Path() string {
cur := d
var out string
for cur != nil {
switch parent := cur.parent.(type) {
case *Directory:
out = path.Join(cur.name, out)
cur = parent
case *Root:
return "/" + out
panic("directory parent neither a directory nor a root")
return out
func (d *Directory) GetNode() (ld.Node, error) {
defer d.lock.Unlock()
err := d.sync()
if err != nil {
return nil, err
nd, err := d.unixfsDir.GetNode()
if err != nil {
return nil, err
err = d.dagService.Add(d.ctx, nd)
if err != nil {
return nil, err
return nd.Copy(), err
package mfs
import (
mod "gitlab.dms3.io/dms3/go-unixfs/mod"
context "context"
ld "gitlab.dms3.io/dms3/go-ld-format"
type state uint8
const (
stateCreated state = iota
// One `File` can have many `FileDescriptor`s associated to it
// (only one if it's RW, many if they are RO, see `File.desclock`).
// A `FileDescriptor` contains the "view" of the file (through an
// instance of a `DagModifier`), that's why it (and not the `File`)
// has the responsibility to `Flush` (which crystallizes that view
// in the `File`'s `Node`).
type FileDescriptor interface {
CtxReadFull(context.Context, []byte) (int, error)
Truncate(int64) error
Size() (int64, error)
Flush() error
type fileDescriptor struct {
inode *File
mod *mod.DagModifier
flags Flags
state state
func (fi *fileDescriptor) checkWrite() error {
if fi.state == stateClosed {
return ErrClosed
if !fi.flags.Write {
return fmt.Errorf("file is read-only")
return nil
func (fi *fileDescriptor) checkRead() error {
if fi.state == stateClosed {
return ErrClosed
if !fi.flags.Read {
return fmt.Errorf("file is write-only")
return nil
// Size returns the size of the file referred to by this descriptor
func (fi *fileDescriptor) Size() (int64, error) {
return fi.mod.Size()
// Truncate truncates the file to size
func (fi *fileDescriptor) Truncate(size int64) error {
if err := fi.checkWrite(); err != nil {
return fmt.Errorf("truncate failed: %s", err)
fi.state = stateDirty
return fi.mod.Truncate(size)
// Write writes the given data to the file at its current offset
func (fi *fileDescriptor) Write(b []byte) (int, error) {
if err := fi.checkWrite(); err != nil {
return 0, fmt.Errorf("write failed: %s", err)
fi.state = stateDirty
return fi.mod.Write(b)
// Read reads into the given buffer from the current offset
func (fi *fileDescriptor) Read(b []byte) (int, error) {
if err := fi.checkRead(); err != nil {
return 0, fmt.Errorf("read failed: %s", err)
return fi.mod.Read(b)
// Read reads into the given buffer from the current offset
func (fi *fileDescriptor) CtxReadFull(ctx context.Context, b []byte) (int, error) {
if err := fi.checkRead(); err != nil {
return 0, fmt.Errorf("read failed: %s", err)
return fi.mod.CtxReadFull(ctx, b)
// Close flushes, then propogates the modified dag node up the directory structure
// and signals a republish to occur
func (fi *fileDescriptor) Close() error {
if fi.state == stateClosed {
return ErrClosed
if fi.flags.Write {
defer fi.inode.desclock.Unlock()
} else if fi.flags.Read {
defer fi.inode.desclock.RUnlock()
err := fi.flushUp(fi.flags.Sync)
fi.state = stateClosed
return err
// Flush generates a new version of the node of the underlying
// UnixFS directory (adding it to the DAG service) and updates
// the entry in the parent directory (setting `fullSync` to
// propagate the update all the way to the root).
func (fi *fileDescriptor) Flush() error {
return fi.flushUp(true)
// flushUp syncs the file and adds it to the dagservice
// it *must* be called with the File's lock taken
// If `fullSync` is set the changes are propagated upwards
// (the `Up` part of `flushUp`).
func (fi *fileDescriptor) flushUp(fullSync bool) error {
var nd ld.Node
switch fi.state {
case stateCreated, stateDirty:
var err error
nd, err = fi.mod.GetNode()
if err != nil {
return err
err = fi.inode.dagService.Add(context.TODO(), nd)
if err != nil {
return err
// TODO: Very similar logic to the update process in
// `Directory`, the logic should be unified, both structures
// (`File` and `Directory`) are backed by a LD node with
// a UnixFS format that is the actual target of the update
// (regenerating it and adding it to the DAG service).
// Always update the file descriptor's inode with the created/modified node.
fi.inode.node = nd
// Save the members to be used for subsequent calls
parent := fi.inode.parent
name := fi.inode.name
// Bubble up the update's to the parent, only if fullSync is set to true.
if fullSync {
if err := parent.updateChildEntry(child{name, nd}); err != nil {
return err
fi.state = stateFlushed
return nil
case stateFlushed:
return nil
panic("invalid state")
// Seek implements io.Seeker
func (fi *fileDescriptor) Seek(offset int64, whence int) (int64, error) {
if fi.state == stateClosed {
return 0, fmt.Errorf("seek failed: %s", ErrClosed)
return fi.mod.Seek(offset, whence)
// Write At writes the given bytes at the offset 'at'
func (fi *fileDescriptor) WriteAt(b []byte, at int64) (int, error) {
if err := fi.checkWrite(); err != nil {
return 0, fmt.Errorf("write-at failed: %s", err)
fi.state = stateDirty
return fi.mod.WriteAt(b, at)
package mfs
import (
dag "gitlab.dms3.io/dms3/go-merkledag"
ft "gitlab.dms3.io/dms3/go-unixfs"
mod "gitlab.dms3.io/dms3/go-unixfs/mod"
chunker "gitlab.dms3.io/dms3/go-dms3-chunker"
ld "gitlab.dms3.io/dms3/go-ld-format"
// File represents a file in the MFS, its logic its mainly targeted
// to coordinating (potentially many) `FileDescriptor`s pointing to
// it.
type File struct {
// Lock to coordinate the `FileDescriptor`s associated to this file.
desclock sync.RWMutex
// This isn't any node, it's the root node that represents the
// entire DAG of nodes that comprise the file.
// TODO: Rename, there should be an explicit term for these root nodes
// of a particular sub-DAG that abstract an upper layer's entity.
node ld.Node
// Lock around the `node` that represents this file, necessary because
// there may be many `FileDescriptor`s operating on this `File`.
nodeLock sync.RWMutex
RawLeaves bool
// NewFile returns a NewFile object with the given parameters. If the
// Cid version is non-zero RawLeaves will be enabled.
func NewFile(name string, node ld.Node, parent parent, dserv ld.DAGService) (*File, error) {
fi := &File{
inode: inode{
name: name,
parent: parent,
dagService: dserv,
node: node,
if node.Cid().Prefix().Version > 0 {
fi.RawLeaves = true
return fi, nil
func (fi *File) Open(flags Flags) (_ FileDescriptor, _retErr error) {
if flags.Write {
defer func() {
if _retErr != nil {
} else if flags.Read {
defer func() {
if _retErr != nil {
} else {
return nil, fmt.Errorf("file opened for neither reading nor writing")
node := fi.node
// TODO: Move this `switch` logic outside (maybe even
// to another package, this seems like a job of UnixFS),
// `NewDagModifier` uses the LD node, we're not
// extracting anything just doing a safety check.
switch node := node.(type) {
case *dag.ProtoNode:
fsn, err := ft.FSNodeFromBytes(node.Data())
if err != nil {
return nil, err
switch fsn.Type() {
return nil, fmt.Errorf("unsupported fsnode type for 'file'")
case ft.TSymlink:
return nil, fmt.Errorf("symlinks not yet supported")
case ft.TFile, ft.TRaw:
// OK case
case *dag.RawNode:
// Ok as well.
dmod, err := mod.NewDagModifier(context.TODO(), node, fi.dagService, chunker.DefaultSplitter)
// TODO: Remove the use of the `chunker` package here, add a new `NewDagModifier` in
// `go-unixfs` with the `DefaultSplitter` already included.
if err != nil {
return nil, err
dmod.RawLeaves = fi.RawLeaves
return &fileDescriptor{
inode: fi,
flags: flags,
mod: dmod,
state: stateCreated,
}, nil
// Size returns the size of this file
// TODO: Should we be providing this API?
// TODO: There's already a `FileDescriptor.Size()` that
// through the `DagModifier`'s `fileSize` function is doing
// pretty much the same thing as here, we should at least call
// that function and wrap the `ErrNotUnixfs` with an MFS text.
func (fi *File) Size() (int64, error) {
defer fi.nodeLock.RUnlock()
switch nd := fi.node.(type) {
case *dag.ProtoNode:
fsn, err := ft.FSNodeFromBytes(nd.Data())
if err != nil {
return 0, err
return int64(fsn.FileSize()), nil
case *dag.RawNode:
return int64(len(nd.RawData())), nil
return 0, fmt.Errorf("unrecognized node type in mfs/file.Size()")
// GetNode returns the dag node associated with this file
// TODO: Use this method and do not access the `nodeLock` directly anywhere else.
func (fi *File) GetNode() (ld.Node, error) {
defer fi.nodeLock.RUnlock()
return fi.node, nil
// TODO: Tight coupling with the `FileDescriptor`, at the
// very least this should be an independent function that
// takes a `File` argument and automates the open/flush/close
// operations.
// TODO: Why do we need to flush a file that isn't opened?
// (the `OpenWriteOnly` seems to implicitly be targeting a
// closed file, a file we forgot to flush? can we close
// a file without flushing?)
func (fi *File) Flush() error {
// open the file in fullsync mode
fd, err := fi.Open(Flags{Write: true, Sync: true})
if err != nil {
return err
defer fd.Close()
return fd.Flush()
func (fi *File) Sync() error {
// just being able to take the writelock means the descriptor is synced
// TODO: Why?
return nil
// Type returns the type FSNode this is
func (fi *File) Type() NodeType {
return TFile
This diff is collapsed.
package mfs
import (
ld "gitlab.dms3.io/dms3/go-ld-format"
// inode abstracts the common characteristics of the MFS `File`
// and `Directory`. All of its attributes are initialized at
// creation.
type inode struct {
// name of this `inode` in the MFS path (the same value
// is also stored as the name of the DAG link).
name string
// parent directory of this `inode` (which may be the `Root`).
parent parent
// dagService used to store modifications made to the contents
// of the file or directory the `inode` belongs to.
dagService ld.DAGService
This diff is collapsed.
package mfs
import (
gopath "path"
path "gitlab.dms3.io/dms3/go-path"
cid "gitlab.dms3.io/dms3/go-cid"
ld "gitlab.dms3.io/dms3/go-ld-format"
// TODO: Evaluate moving all this operations to as `Root`
// methods, since all of them use it as its first argument
// and there is no clear documentation that explains this
// separation.
// Mv moves the file or directory at 'src' to 'dst'
// TODO: Document what the strings 'src' and 'dst' represent.
func Mv(r *Root, src, dst string) error {
srcDirName, srcFname := gopath.Split(src)
var dstDirName string
var dstFname string
if dst[len(dst)-1] == '/' {
dstDirName = dst
dstFname = srcFname
} else {
dstDirName, dstFname = gopath.Split(dst)
// get parent directories of both src and dest first
dstDir, err := lookupDir(r, dstDirName)
if err != nil {
return err
srcDir, err := lookupDir(r, srcDirName)
if err != nil {
return err
srcObj, err := srcDir.Child(srcFname)
if err != nil {
return err
nd, err := srcObj.GetNode()
if err != nil {
return err
fsn, err := dstDir.Child(dstFname)
if err == nil {
switch n := fsn.(type) {
case *File:
_ = dstDir.Unlink(dstFname)
case *Directory:
dstDir = n
dstFname = srcFname
return fmt.Errorf("unexpected type at path: %s", dst)
} else if err != os.ErrNotExist {
return err
err = dstDir.AddChild(dstFname, nd)
if err != nil {
return err
if srcDir.name == dstDir.name && srcFname == dstFname {
return nil
return srcDir.Unlink(srcFname)
func lookupDir(r *Root, path string) (*Directory, error) {
di, err := Lookup(r, path)
if err != nil {
return nil, err
d, ok := di.(*Directory)
if !ok {
return nil, fmt.Errorf("%s is not a directory", path)
return d, nil
// PutNode inserts 'nd' at 'path' in the given mfs
// TODO: Rename or clearly document that this is not about nodes but actually
// MFS files/directories (that in the underlying representation can be
// considered as just nodes).
// TODO: Document why are we handling LD nodes in the first place when we
// are actually referring to files/directories (that is, it can't be any
// node, it has to have a specific format).
// TODO: Can this function add directories or just files? What would be the
// difference between adding a directory with this method and creating it
// with `Mkdir`.
func PutNode(r *Root, path string, nd ld.Node) error {
dirp, filename := gopath.Split(path)
if filename == "" {
return fmt.Errorf("cannot create file with empty name")
pdir, err := lookupDir(r, dirp)
if err != nil {
return err
return pdir.AddChild(filename, nd)
// MkdirOpts is used by Mkdir
type MkdirOpts struct {
Mkparents bool
Flush bool
CidBuilder cid.Builder
// Mkdir creates a directory at 'path' under the directory 'd', creating
// intermediary directories as needed if 'mkparents' is set to true
func Mkdir(r *Root, pth string, opts MkdirOpts) error {
if pth == "" {
return fmt.Errorf("no path given to Mkdir")
parts := path.SplitList(pth)
if parts[0] == "" {
parts = parts[1:]
// allow 'mkdir /a/b/c/' to create c
if parts[len(parts)-1] == "" {
parts = parts[:len(parts)-1]
if len(parts) == 0 {
// this will only happen on 'mkdir /'
if opts.Mkparents {
return nil
return fmt.Errorf("cannot create directory '/': Already exists")
cur := r.GetDirectory()
for i, d := range parts[:len(parts)-1] {
fsn, err := cur.Child(d)
if err == os.ErrNotExist && opts.Mkparents {
mkd, err := cur.Mkdir(d)
if err != nil {
return err
if opts.CidBuilder != nil {
fsn = mkd
} else if err != nil {
return err
next, ok := fsn.(*Directory)
if !ok {
return fmt.Errorf("%s was not a directory", path.Join(parts[:i]))
cur = next
final, err := cur.Mkdir(parts[len(parts)-1])
if err != nil {
if !opts.Mkparents || err != os.ErrExist || final == nil {
return err
if opts.CidBuilder != nil {
if opts.Flush {
err := final.Flush()
if err != nil {
return err
return nil
// Lookup extracts the root directory and performs a lookup under it.
// TODO: Now that the root is always a directory, can this function
// be collapsed with `DirLookup`? Or at least be made a method of `Root`?
func Lookup(r *Root, path string) (FSNode, error) {
dir := r.GetDirectory()
return DirLookup(dir, path)
// DirLookup will look up a file or directory at the given path
// under the directory 'd'
func DirLookup(d *Directory, pth string) (FSNode, error) {
pth = strings.Trim(pth, "/")
parts := path.SplitList(pth)
if len(parts) == 1 && parts[0] == "" {
return d, nil
var cur FSNode
cur = d
for i, p := range parts {
chdir, ok := cur.(*Directory)
if !ok {
return nil, fmt.Errorf("cannot access %s: Not a directory", path.Join(parts[:i+1]))
child, err := chdir.Child(p)
if err != nil {
return nil, err
cur = child
return cur, nil
// TODO: Document this function and link its functionality
// with the republisher.
func FlushPath(ctx context.Context, rt *Root, pth string) (ld.Node, error) {
nd, err := Lookup(rt, pth)
if err != nil {
return nil, err
err = nd.Flush()
if err != nil {
return nil, err
return nd.GetNode()
package mfs
type Flags struct {
Read bool
Write bool
Sync bool
package mfs
import (
cid "gitlab.dms3.io/dms3/go-cid"
// PubFunc is the user-defined function that determines exactly what
// logic entails "publishing" a `Cid` value.
type PubFunc func(context.Context, cid.Cid) error
// Republisher manages when to publish a given entry.
type Republisher struct {
TimeoutLong time.Duration
TimeoutShort time.Duration
RetryTimeout time.Duration
pubfunc PubFunc
update chan cid.Cid
immediatePublish chan chan struct{}
ctx context.Context
cancel func()
// NewRepublisher creates a new Republisher object to republish the given root
// using the given short and long time intervals.
func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration) *Republisher {
ctx, cancel := context.WithCancel(ctx)
return &Republisher{
TimeoutShort: tshort,
TimeoutLong: tlong,
RetryTimeout: tlong,
update: make(chan cid.Cid, 1),
pubfunc: pf,
immediatePublish: make(chan chan struct{}),
ctx: ctx,
cancel: cancel,
// WaitPub waits for the current value to be published (or returns early
// if it already has).
func (rp *Republisher) WaitPub(ctx context.Context) error {
wait := make(chan struct{})
select {
case rp.immediatePublish <- wait:
case <-ctx.Done():
return ctx.Err()
select {
case <-wait:
return nil
case <-ctx.Done():
return ctx.Err()
func (rp *Republisher) Close() error {
// TODO(steb): Wait for `Run` to stop
err := rp.WaitPub(rp.ctx)
return err
// Update the current value. The value will be published after a delay but each
// consecutive call to Update may extend this delay up to TimeoutLong.
func (rp *Republisher) Update(c cid.Cid) {
select {
case <-rp.update:
select {
case rp.update <- c:
// Don't try again. If we hit this case, there's a
// concurrent publish and we can safely let that
// concurrent publish win.
case rp.update <- c:
// Run contains the core logic of the `Republisher`. It calls the user-defined
// `pubfunc` function whenever the `Cid` value is updated to a *new* value. The
// complexity comes from the fact that `pubfunc` may be slow so we need to batch
// updates.
// Algorithm:
// 1. When we receive the first update after publishing, we set a `longer` timer.
// 2. When we receive any update, we reset the `quick` timer.
// 3. If either the `quick` timeout or the `longer` timeout elapses,
// we call `publish` with the latest updated value.
// The `longer` timer ensures that we delay publishing by at most
// `TimeoutLong`. The `quick` timer allows us to publish sooner if
// it looks like there are no more updates coming down the pipe.
// Note: If a publish fails, we retry repeatedly every TimeoutRetry.
func (rp *Republisher) Run(lastPublished cid.Cid) {
quick := time.NewTimer(0)
if !quick.Stop() {
longer := time.NewTimer(0)
if !longer.Stop() {
var toPublish cid.Cid
for rp.ctx.Err() == nil {
var waiter chan struct{}
select {
case <-rp.ctx.Done():
case newValue := <-rp.update:
// Skip already published values.
if lastPublished.Equals(newValue) {
// Break to the end of the switch to cleanup any
// timers.
toPublish = cid.Undef
// If we aren't already waiting to publish something,
// reset the long timeout.
if !toPublish.Defined() {
// Always reset the short timeout.
// Finally, set the new value to publish.
toPublish = newValue
case waiter = <-rp.immediatePublish:
// Make sure to grab the *latest* value to publish.
select {
case toPublish = <-rp.update:
// Avoid publishing duplicate values
if lastPublished.Equals(toPublish) {
toPublish = cid.Undef
case <-quick.C:
case <-longer.C:
// Cleanup, publish, and close waiters.
// 1. Stop any timers. Don't use the `if !t.Stop() { ... }`
// idiom as these timers may not be running.
select {
case <-quick.C:
select {
case <-longer.C:
// 2. If we have a value to publish, publish it now.
if toPublish.Defined() {
for {
err := rp.pubfunc(rp.ctx, toPublish)
if err == nil {
// Keep retrying until we succeed or we abort.
// TODO(steb): We could try pulling new values
// off `update` but that's not critical (and
// complicates this code a bit). We'll pull off
// a new value on the next loop through.
select {
case <-time.After(rp.RetryTimeout):
case <-rp.ctx.Done():
lastPublished = toPublish
toPublish = cid.Undef
// 3. Trigger anything waiting in `WaitPub`.
if waiter != nil {
package mfs
import (
cid "gitlab.dms3.io/dms3/go-cid"
ci "gitlab.dms3.io/p2p/go-p2p-testing/ci"
func TestRepublisher(t *testing.T) {
if ci.IsRunning() {
t.Skip("dont run timing tests in CI")
ctx := context.TODO()
pub := make(chan struct{})
pf := func(ctx context.Context, c cid.Cid) error {
pub <- struct{}{}
return nil
testCid1, _ := cid.Parse("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH")
testCid2, _ := cid.Parse("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVX")
tshort := time.Millisecond * 50
tlong := time.Second / 2
rp := NewRepublisher(ctx, pf, tshort, tlong)
go rp.Run(cid.Undef)
// should hit short timeout
select {
case <-time.After(tshort * 2):
t.Fatal("publish didnt happen in time")
case <-pub:
cctx, cancel := context.WithCancel(context.Background())
go func() {
for {
time.Sleep(time.Millisecond * 10)
select {
case <-cctx.Done():
select {
case <-pub:
t.Fatal("shouldnt have received publish yet!")
case <-time.After((tlong * 9) / 10):
select {
case <-pub:
case <-time.After(tlong / 2):
t.Fatal("waited too long for pub!")
err := rp.Close()
if err != nil {
// package mfs implements an in memory model of a mutable DMS3 filesystem.
// TODO: Develop on this line (and move it to `doc.go`).
package mfs
import (
dag "gitlab.dms3.io/dms3/go-merkledag"
ft "gitlab.dms3.io/dms3/go-unixfs"
ld "gitlab.dms3.io/dms3/go-ld-format"
logging "gitlab.dms3.io/dms3/go-log"
// TODO: Remove if not used.
var ErrNotExist = errors.New("no such rootfs")
var ErrClosed = errors.New("file closed")
var log = logging.Logger("mfs")
// TODO: Remove if not used.
var ErrIsDirectory = errors.New("error: is a directory")
// The information that an MFS `Directory` has about its children
// when updating one of its entries: when a child mutates it signals
// its parent directory to update its entry (under `Name`) with the
// new content (in `Node`).
type child struct {
Name string
Node ld.Node
// This interface represents the basic property of MFS directories of updating
// children entries with modified content. Implemented by both the MFS
// `Directory` and `Root` (which is basically a `Directory` with republishing
// support).
// TODO: What is `fullsync`? (unnamed `bool` argument)
// TODO: There are two types of persistence/flush that need to be
// distinguished here, one at the DAG level (when I store the modified
// nodes in the DAG service) and one in the UnixFS/MFS level (when I modify
// the entry/link of the directory that pointed to the modified node).
type parent interface {
// Method called by a child to its parent to signal to update the content
// pointed to in the entry by that child's name. The child sends its own
// information in the `child` structure. As modifying a directory entry
// entails modifying its contents the parent will also call *its* parent's
// `updateChildEntry` to update the entry pointing to the new directory,
// this mechanism is in turn repeated until reaching the `Root`.
updateChildEntry(c child) error
type NodeType int
const (
TFile NodeType = iota
// FSNode abstracts the `Directory` and `File` structures, it represents
// any child node in the MFS (i.e., all the nodes besides the `Root`). It
// is the counterpart of the `parent` interface which represents any
// parent node in the MFS (`Root` and `Directory`).
// (Not to be confused with the `unixfs.FSNode`.)
type FSNode interface {
GetNode() (ld.Node, error)
Flush() error
Type() NodeType
// IsDir checks whether the FSNode is dir type
func IsDir(fsn FSNode) bool {
return fsn.Type() == TDir
// IsFile checks whether the FSNode is file type
func IsFile(fsn FSNode) bool {
return fsn.Type() == TFile
// Root represents the root of a filesystem tree.
type Root struct {
// Root directory of the MFS layout.
dir *Directory
repub *Republisher
// NewRoot creates a new Root and starts up a republisher routine for it.
func NewRoot(parent context.Context, ds ld.DAGService, node *dag.ProtoNode, pf PubFunc) (*Root, error) {
var repub *Republisher
if pf != nil {
repub = NewRepublisher(parent, pf, time.Millisecond*300, time.Second*3)
// No need to take the lock here since we just created
// the `Republisher` and no one has access to it yet.
go repub.Run(node.Cid())
root := &Root{
repub: repub,
fsn, err := ft.FSNodeFromBytes(node.Data())
if err != nil {
log.Error("DMS3NS pointer was not unixfs node")
// TODO: DMS3NS pointer?
return nil, err
switch fsn.Type() {
case ft.TDirectory, ft.THAMTShard:
newDir, err := NewDirectory(parent, node.String(), node, root, ds)
if err != nil {
return nil, err
root.dir = newDir
case ft.TFile, ft.TMetadata, ft.TRaw:
return nil, fmt.Errorf("root can't be a file (unixfs type: %s)", fsn.Type())
// TODO: This special error reporting case doesn't seem worth it, we either
// have a UnixFS directory or we don't.
return nil, fmt.Errorf("unrecognized unixfs type: %s", fsn.Type())
return root, nil
// GetDirectory returns the root directory.
func (kr *Root) GetDirectory() *Directory {
return kr.dir
// Flush signals that an update has occurred since the last publish,
// and updates the Root republisher.
// TODO: We are definitely abusing the "flush" terminology here.
func (kr *Root) Flush() error {
nd, err := kr.GetDirectory().GetNode()
if err != nil {
return err
if kr.repub != nil {
return nil
// FlushMemFree flushes the root directory and then uncaches all of its links.
// This has the effect of clearing out potentially stale references and allows
// them to be garbage collected.
// CAUTION: Take care not to ever call this while holding a reference to any
// child directories. Those directories will be bad references and using them
// may have unintended racy side effects.
// A better implemented mfs system (one that does smarter internal caching and
// refcounting) shouldnt need this method.
// TODO: Review the motivation behind this method once the cache system is
// refactored.
func (kr *Root) FlushMemFree(ctx context.Context) error {
dir := kr.GetDirectory()
if err := dir.Flush(); err != nil {
return err
defer dir.lock.Unlock()
for name := range dir.entriesCache {
delete(dir.entriesCache, name)
// TODO: Can't we just create new maps?
return nil
// updateChildEntry implements the `parent` interface, and signals
// to the publisher that there are changes ready to be published.
// This is the only thing that separates a `Root` from a `Directory`.
// TODO: Evaluate merging both.
// TODO: The `sync` argument isn't used here (we've already reached
// the top), document it and maybe make it an anonymous variable (if
// that's possible).
func (kr *Root) updateChildEntry(c child) error {
err := kr.GetDirectory().dagService.Add(context.TODO(), c.Node)
if err != nil {
return err
// TODO: Why are we not using the inner directory lock nor
// applying the same procedure as `Directory.updateChildEntry`?
if kr.repub != nil {
return nil
func (kr *Root) Close() error {
nd, err := kr.GetDirectory().GetNode()
if err != nil {
return err
if kr.repub != nil {
return kr.repub.Close()
return nil