...
 
Commits (301)
blank_issues_enabled: false
contact_links:
- name: Getting Help on IPFS
url: https://ipfs.io/help
about: All information about how and where to get help on IPFS.
- name: IPFS Official Forum
url: https://discuss.ipfs.io
about: Please post general questions, support requests, and discussions here.
---
name: Open an issue
about: Only for actionable issues relevant to this repository.
title: ''
labels: need/triage
assignees: ''
---
<!--
Hello! To ensure this issue is correctly addressed as soon as possible by the IPFS team, please try to make sure:
- This issue is relevant to this repository's topic or codebase.
- A clear description is provided. It should includes as much relevant information as possible and clear scope for the issue to be actionable.
FOR GENERAL DISCUSSION, HELP OR QUESTIONS, please see the options at https://ipfs.io/help or head directly to https://discuss.ipfs.io.
(you can delete this section after reading)
-->
# Configuration for welcome - https://github.com/behaviorbot/welcome
# Configuration for new-issue-welcome - https://github.com/behaviorbot/new-issue-welcome
# Comment to be posted to on first time issues
newIssueWelcomeComment: >
Thank you for submitting your first issue to this repository! A maintainer
will be here shortly to triage and review.
In the meantime, please double-check that you have provided all the
necessary information to make this process easy! Any information that can
help save additional round trips is useful! We currently aim to give
initial feedback within **two business days**. If this does not happen, feel
free to leave a comment.
Please keep an eye on how this issue will be labeled, as labels give an
overview of priorities, assignments and additional actions requested by the
maintainers:
- "Priority" labels will show how urgent this is for the team.
- "Status" labels will show if this is ready to be worked on, blocked, or in progress.
- "Need" labels will indicate if additional input or analysis is required.
Finally, remember to use https://discuss.ipfs.io if you just need general
support.
# Configuration for new-pr-welcome - https://github.com/behaviorbot/new-pr-welcome
# Comment to be posted to on PRs from first time contributors in your repository
newPRWelcomeComment: >
Thank you for submitting this PR!
A maintainer will be here shortly to review it.
We are super grateful, but we are also overloaded! Help us by making sure
that:
* The context for this PR is clear, with relevant discussion, decisions
and stakeholders linked/mentioned.
* Your contribution itself is clear (code comments, self-review for the
rest) and in its best form. Follow the [code contribution
guidelines](https://github.com/ipfs/community/blob/master/CONTRIBUTING.md#code-contribution-guidelines)
if they apply.
Getting other community members to do a review would be great help too on
complex PRs (you can ask in the chats/forums). If you are unsure about
something, just leave us a comment.
Next steps:
* A maintainer will triage and assign priority to this PR, commenting on
any missing things and potentially assigning a reviewer for high
priority items.
* The PR gets reviews, discussed and approvals as needed.
* The PR is merged by maintainers when it has been approved and comments addressed.
We currently aim to provide initial feedback/triaging within **two business
days**. Please keep an eye on any labelling actions, as these will indicate
priorities and status of your contribution.
We are very grateful for your contribution!
# Configuration for first-pr-merge - https://github.com/behaviorbot/first-pr-merge
# Comment to be posted to on pull requests merged by a first time user
# Currently disabled
#firstPRMergeComment: ""
0.1.54: QmNQtvcJon7xo5V8DMn6MEKMFMCeXnKUNdtGC2NwGYkqk1
os:
- linux
language: go
go:
- 1.11.x
env:
global:
- GOTFLAGS="-race"
matrix:
- BUILD_DEPTYPE=gomod
# disable travis install
install:
- true
script:
- bash <(curl -s https://raw.githubusercontent.com/ipfs/ci-helpers/master/travis-ci/run-standard-tests.sh)
cache:
directories:
- $GOPATH/pkg/mod
- $HOME/.cache/go-build
notifications:
email: false
# Please see https://help.github.com/articles/about-codeowners/ for more information
# Global owner
* @schomatis
# Subsystem specific owners
MIT License
Copyright (c) 2018 IPFS
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
all: deps
gx:
go get github.com/whyrusleeping/gx
go get github.com/whyrusleeping/gx-go
deps: gx
gx --verbose install --global
gx-go rewrite
test: deps
gx test -v -race -coverprofile=coverage.txt -covermode=atomic .
rw:
gx-go rewrite
rwundo:
gx-go rewrite --undo
publish: rwundo
gx publish
.PHONY: all gx deps test rw rwundo publish
# go-mfs # go-mfs
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/)
[![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme)
[![GoDoc](https://godoc.org/github.com/ipfs/go-mfs?status.svg)](https://godoc.org/github.com/ipfs/go-mfs)
[![Build Status](https://travis-ci.com/ipfs/go-mfs.svg?branch=master)](https://travis-ci.com/ipfs/go-mfs)
> go-mfs implements an in-memory model of a mutable IPFS filesystem.
## Lead Maintainer
[Steven Allen](https://github.com/Stebalien)
## Table of Contents
- [Install](#install)
- [Usage](#usage)
- [Contribute](#contribute)
- [License](#license)
## Install
`go-mfs` works like a regular Go module:
```
> go get github.com/ipfs/go-mfs
```
It uses [Gx](https://github.com/whyrusleeping/gx) to manage dependencies. You can use `make all` to build it with the `gx` dependencies.
## Usage
```
import "github.com/ipfs/go-mfs"
```
Check the [GoDoc documentation](https://godoc.org/github.com/ipfs/go-mfs)
## Documentation
Documentation around the MFS and the Files API in general around IPFS is a work in progress the following links may be of use:
* [UnixFS](https://docs.ipfs.io/guides/concepts/unixfs/)
* [MFS](https://docs.ipfs.io/guides/concepts/mfs/)
* [General concept document about how are files handled in IPFS (WIP)](https://github.com/ipfs/docs/issues/133)
## Repository Structure
This repository contains many files, all belonging to the root `mfs` package.
* `file.go`: MFS `File`.
* `dir.go`: MFS `Directory`.
* `fd.go`: `FileDescriptor` used to operate on `File`s.
* `ops.go`: Functions that do not belong to either `File` nor `Directory` (although they mostly operate on them) that contain common operations to the MFS, e.g., find, move, add a file, make a directory.
* `root.go`: MFS `Root` (a `Directory` with republishing support).
* `repub.go`: `Republisher`.
* `mfs_test.go`: General tests (needs a [revision](https://github.com/ipfs/go-mfs/issues/9)).
* `repub_test.go`: Republisher-specific tests (contains only the `TestRepublisher` function).
## Contribute
PRs accepted.
Small note: If editing the README, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification.
## License
MIT © Protocol Labs, Inc.
golang()
\ No newline at end of file
package mfs
import (
"context"
"errors"
"fmt"
"os"
"path"
"sync"
"time"
dag "gitlab.dms3.io/dms3/go-merkledag"
ft "gitlab.dms3.io/dms3/go-unixfs"
uio "gitlab.dms3.io/dms3/go-unixfs/io"
cid "gitlab.dms3.io/dms3/go-cid"
ld "gitlab.dms3.io/dms3/go-ld-format"
)
var ErrNotYetImplemented = errors.New("not yet implemented")
var ErrInvalidChild = errors.New("invalid child node")
var ErrDirExists = errors.New("directory already has entry by that name")
// TODO: There's too much functionality associated with this structure,
// let's organize it (and if possible extract part of it elsewhere)
// and document the main features of `Directory` here.
type Directory struct {
inode
// Internal cache with added entries to the directory, its cotents
// are synched with the underlying `unixfsDir` node in `sync()`.
entriesCache map[string]FSNode
lock sync.Mutex
// TODO: What content is being protected here exactly? The entire directory?
ctx context.Context
// UnixFS directory implementation used for creating,
// reading and editing directories.
unixfsDir uio.Directory
modTime time.Time
}
// NewDirectory constructs a new MFS directory.
//
// You probably don't want to call this directly. Instead, construct a new root
// using NewRoot.
func NewDirectory(ctx context.Context, name string, node ld.Node, parent parent, dserv ld.DAGService) (*Directory, error) {
db, err := uio.NewDirectoryFromNode(dserv, node)
if err != nil {
return nil, err
}
return &Directory{
inode: inode{
name: name,
parent: parent,
dagService: dserv,
},
ctx: ctx,
unixfsDir: db,
entriesCache: make(map[string]FSNode),
modTime: time.Now(),
}, nil
}
// GetCidBuilder gets the CID builder of the root node
func (d *Directory) GetCidBuilder() cid.Builder {
return d.unixfsDir.GetCidBuilder()
}
// SetCidBuilder sets the CID builder
func (d *Directory) SetCidBuilder(b cid.Builder) {
d.unixfsDir.SetCidBuilder(b)
}
// This method implements the `parent` interface. It first does the local
// update of the child entry in the underlying UnixFS directory and saves
// the newly created directory node with the updated entry in the DAG
// service. Then it propagates the update upwards (through this same
// interface) repeating the whole process in the parent.
func (d *Directory) updateChildEntry(c child) error {
newDirNode, err := d.localUpdate(c)
if err != nil {
return err
}
// Continue to propagate the update process upwards
// (all the way up to the root).
return d.parent.updateChildEntry(child{d.name, newDirNode})
}
// This method implements the part of `updateChildEntry` that needs
// to be locked around: in charge of updating the UnixFS layer and
// generating the new node reflecting the update. It also stores the
// new node in the DAG layer.
func (d *Directory) localUpdate(c child) (*dag.ProtoNode, error) {
d.lock.Lock()
defer d.lock.Unlock()
err := d.updateChild(c)
if err != nil {
return nil, err
}
// TODO: Clearly define how are we propagating changes to lower layers
// like UnixFS.
nd, err := d.unixfsDir.GetNode()
if err != nil {
return nil, err
}
pbnd, ok := nd.(*dag.ProtoNode)
if !ok {
return nil, dag.ErrNotProtobuf
}
err = d.dagService.Add(d.ctx, nd)
if err != nil {
return nil, err
}
return pbnd.Copy().(*dag.ProtoNode), nil
// TODO: Why do we need a copy?
}
// Update child entry in the underlying UnixFS directory.
func (d *Directory) updateChild(c child) error {
err := d.unixfsDir.AddChild(d.ctx, c.Name, c.Node)
if err != nil {
return err
}
d.modTime = time.Now()
return nil
}
func (d *Directory) Type() NodeType {
return TDir
}
// childNode returns a FSNode under this directory by the given name if it exists.
// it does *not* check the cached dirs and files
func (d *Directory) childNode(name string) (FSNode, error) {
nd, err := d.childFromDag(name)
if err != nil {
return nil, err
}
return d.cacheNode(name, nd)
}
// cacheNode caches a node into d.childDirs or d.files and returns the FSNode.
func (d *Directory) cacheNode(name string, nd ld.Node) (FSNode, error) {
switch nd := nd.(type) {
case *dag.ProtoNode:
fsn, err := ft.FSNodeFromBytes(nd.Data())
if err != nil {
return nil, err
}
switch fsn.Type() {
case ft.TDirectory, ft.THAMTShard:
ndir, err := NewDirectory(d.ctx, name, nd, d, d.dagService)
if err != nil {
return nil, err
}
d.entriesCache[name] = ndir
return ndir, nil
case ft.TFile, ft.TRaw, ft.TSymlink:
nfi, err := NewFile(name, nd, d, d.dagService)
if err != nil {
return nil, err
}
d.entriesCache[name] = nfi
return nfi, nil
case ft.TMetadata:
return nil, ErrNotYetImplemented
default:
return nil, ErrInvalidChild
}
case *dag.RawNode:
nfi, err := NewFile(name, nd, d, d.dagService)
if err != nil {
return nil, err
}
d.entriesCache[name] = nfi
return nfi, nil
default:
return nil, fmt.Errorf("unrecognized node type in cache node")
}
}
// Child returns the child of this directory by the given name
func (d *Directory) Child(name string) (FSNode, error) {
d.lock.Lock()
defer d.lock.Unlock()
return d.childUnsync(name)
}
func (d *Directory) Uncache(name string) {
d.lock.Lock()
defer d.lock.Unlock()
delete(d.entriesCache, name)
}
// childFromDag searches through this directories dag node for a child link
// with the given name
func (d *Directory) childFromDag(name string) (ld.Node, error) {
return d.unixfsDir.Find(d.ctx, name)
}
// childUnsync returns the child under this directory by the given name
// without locking, useful for operations which already hold a lock
func (d *Directory) childUnsync(name string) (FSNode, error) {
entry, ok := d.entriesCache[name]
if ok {
return entry, nil
}
return d.childNode(name)
}
type NodeListing struct {
Name string
Type int
Size int64
Hash string
}
func (d *Directory) ListNames(ctx context.Context) ([]string, error) {
d.lock.Lock()
defer d.lock.Unlock()
var out []string
err := d.unixfsDir.ForEachLink(ctx, func(l *ld.Link) error {
out = append(out, l.Name)
return nil
})
if err != nil {
return nil, err
}
return out, nil
}
func (d *Directory) List(ctx context.Context) ([]NodeListing, error) {
var out []NodeListing
err := d.ForEachEntry(ctx, func(nl NodeListing) error {
out = append(out, nl)
return nil
})
return out, err
}
func (d *Directory) ForEachEntry(ctx context.Context, f func(NodeListing) error) error {
d.lock.Lock()
defer d.lock.Unlock()
return d.unixfsDir.ForEachLink(ctx, func(l *ld.Link) error {
c, err := d.childUnsync(l.Name)
if err != nil {
return err
}
nd, err := c.GetNode()
if err != nil {
return err
}
child := NodeListing{
Name: l.Name,
Type: int(c.Type()),
Hash: nd.Cid().String(),
}
if c, ok := c.(*File); ok {
size, err := c.Size()
if err != nil {
return err
}
child.Size = size
}
return f(child)
})
}
func (d *Directory) Mkdir(name string) (*Directory, error) {
d.lock.Lock()
defer d.lock.Unlock()
fsn, err := d.childUnsync(name)
if err == nil {
switch fsn := fsn.(type) {
case *Directory:
return fsn, os.ErrExist
case *File:
return nil, os.ErrExist
default:
return nil, fmt.Errorf("unrecognized type: %#v", fsn)
}
}
ndir := ft.EmptyDirNode()
ndir.SetCidBuilder(d.GetCidBuilder())
err = d.dagService.Add(d.ctx, ndir)
if err != nil {
return nil, err
}
err = d.unixfsDir.AddChild(d.ctx, name, ndir)
if err != nil {
return nil, err
}
dirobj, err := NewDirectory(d.ctx, name, ndir, d, d.dagService)
if err != nil {
return nil, err
}
d.entriesCache[name] = dirobj
return dirobj, nil
}
func (d *Directory) Unlink(name string) error {
d.lock.Lock()
defer d.lock.Unlock()
delete(d.entriesCache, name)
return d.unixfsDir.RemoveChild(d.ctx, name)
}
func (d *Directory) Flush() error {
nd, err := d.GetNode()
if err != nil {
return err
}
return d.parent.updateChildEntry(child{d.name, nd})
}
// AddChild adds the node 'nd' under this directory giving it the name 'name'
func (d *Directory) AddChild(name string, nd ld.Node) error {
d.lock.Lock()
defer d.lock.Unlock()
_, err := d.childUnsync(name)
if err == nil {
return ErrDirExists
}
err = d.dagService.Add(d.ctx, nd)
if err != nil {
return err
}
err = d.unixfsDir.AddChild(d.ctx, name, nd)
if err != nil {
return err
}
d.modTime = time.Now()
return nil
}
func (d *Directory) sync() error {
for name, entry := range d.entriesCache {
nd, err := entry.GetNode()
if err != nil {
return err
}
err = d.updateChild(child{name, nd})
if err != nil {
return err
}
}
// TODO: Should we clean the cache here?
return nil
}
func (d *Directory) Path() string {
cur := d
var out string
for cur != nil {
switch parent := cur.parent.(type) {
case *Directory:
out = path.Join(cur.name, out)
cur = parent
case *Root:
return "/" + out
default:
panic("directory parent neither a directory nor a root")
}
}
return out
}
func (d *Directory) GetNode() (ld.Node, error) {
d.lock.Lock()
defer d.lock.Unlock()
err := d.sync()
if err != nil {
return nil, err
}
nd, err := d.unixfsDir.GetNode()
if err != nil {
return nil, err
}
err = d.dagService.Add(d.ctx, nd)
if err != nil {
return nil, err
}
return nd.Copy(), err
}
package mfs
import (
"fmt"
"io"
mod "gitlab.dms3.io/dms3/go-unixfs/mod"
context "context"
ld "gitlab.dms3.io/dms3/go-ld-format"
)
type state uint8
const (
stateCreated state = iota
stateFlushed
stateDirty
stateClosed
)
// One `File` can have many `FileDescriptor`s associated to it
// (only one if it's RW, many if they are RO, see `File.desclock`).
// A `FileDescriptor` contains the "view" of the file (through an
// instance of a `DagModifier`), that's why it (and not the `File`)
// has the responsibility to `Flush` (which crystallizes that view
// in the `File`'s `Node`).
type FileDescriptor interface {
io.Reader
CtxReadFull(context.Context, []byte) (int, error)
io.Writer
io.WriterAt
io.Closer
io.Seeker
Truncate(int64) error
Size() (int64, error)
Flush() error
}
type fileDescriptor struct {
inode *File
mod *mod.DagModifier
flags Flags
state state
}
func (fi *fileDescriptor) checkWrite() error {
if fi.state == stateClosed {
return ErrClosed
}
if !fi.flags.Write {
return fmt.Errorf("file is read-only")
}
return nil
}
func (fi *fileDescriptor) checkRead() error {
if fi.state == stateClosed {
return ErrClosed
}
if !fi.flags.Read {
return fmt.Errorf("file is write-only")
}
return nil
}
// Size returns the size of the file referred to by this descriptor
func (fi *fileDescriptor) Size() (int64, error) {
return fi.mod.Size()
}
// Truncate truncates the file to size
func (fi *fileDescriptor) Truncate(size int64) error {
if err := fi.checkWrite(); err != nil {
return fmt.Errorf("truncate failed: %s", err)
}
fi.state = stateDirty
return fi.mod.Truncate(size)
}
// Write writes the given data to the file at its current offset
func (fi *fileDescriptor) Write(b []byte) (int, error) {
if err := fi.checkWrite(); err != nil {
return 0, fmt.Errorf("write failed: %s", err)
}
fi.state = stateDirty
return fi.mod.Write(b)
}
// Read reads into the given buffer from the current offset
func (fi *fileDescriptor) Read(b []byte) (int, error) {
if err := fi.checkRead(); err != nil {
return 0, fmt.Errorf("read failed: %s", err)
}
return fi.mod.Read(b)
}
// Read reads into the given buffer from the current offset
func (fi *fileDescriptor) CtxReadFull(ctx context.Context, b []byte) (int, error) {
if err := fi.checkRead(); err != nil {
return 0, fmt.Errorf("read failed: %s", err)
}
return fi.mod.CtxReadFull(ctx, b)
}
// Close flushes, then propogates the modified dag node up the directory structure
// and signals a republish to occur
func (fi *fileDescriptor) Close() error {
if fi.state == stateClosed {
return ErrClosed
}
if fi.flags.Write {
defer fi.inode.desclock.Unlock()
} else if fi.flags.Read {
defer fi.inode.desclock.RUnlock()
}
err := fi.flushUp(fi.flags.Sync)
fi.state = stateClosed
return err
}
// Flush generates a new version of the node of the underlying
// UnixFS directory (adding it to the DAG service) and updates
// the entry in the parent directory (setting `fullSync` to
// propagate the update all the way to the root).
func (fi *fileDescriptor) Flush() error {
return fi.flushUp(true)
}
// flushUp syncs the file and adds it to the dagservice
// it *must* be called with the File's lock taken
// If `fullSync` is set the changes are propagated upwards
// (the `Up` part of `flushUp`).
func (fi *fileDescriptor) flushUp(fullSync bool) error {
var nd ld.Node
switch fi.state {
case stateCreated, stateDirty:
var err error
nd, err = fi.mod.GetNode()
if err != nil {
return err
}
err = fi.inode.dagService.Add(context.TODO(), nd)
if err != nil {
return err
}
// TODO: Very similar logic to the update process in
// `Directory`, the logic should be unified, both structures
// (`File` and `Directory`) are backed by a LD node with
// a UnixFS format that is the actual target of the update
// (regenerating it and adding it to the DAG service).
fi.inode.nodeLock.Lock()
// Always update the file descriptor's inode with the created/modified node.
fi.inode.node = nd
// Save the members to be used for subsequent calls
parent := fi.inode.parent
name := fi.inode.name
fi.inode.nodeLock.Unlock()
// Bubble up the update's to the parent, only if fullSync is set to true.
if fullSync {
if err := parent.updateChildEntry(child{name, nd}); err != nil {
return err
}
}
fi.state = stateFlushed
return nil
case stateFlushed:
return nil
default:
panic("invalid state")
}
}
// Seek implements io.Seeker
func (fi *fileDescriptor) Seek(offset int64, whence int) (int64, error) {
if fi.state == stateClosed {
return 0, fmt.Errorf("seek failed: %s", ErrClosed)
}
return fi.mod.Seek(offset, whence)
}
// Write At writes the given bytes at the offset 'at'
func (fi *fileDescriptor) WriteAt(b []byte, at int64) (int, error) {
if err := fi.checkWrite(); err != nil {
return 0, fmt.Errorf("write-at failed: %s", err)
}
fi.state = stateDirty
return fi.mod.WriteAt(b, at)
}
package mfs
import (
"context"
"fmt"
"sync"
dag "gitlab.dms3.io/dms3/go-merkledag"
ft "gitlab.dms3.io/dms3/go-unixfs"
mod "gitlab.dms3.io/dms3/go-unixfs/mod"
chunker "gitlab.dms3.io/dms3/go-dms3-chunker"
ld "gitlab.dms3.io/dms3/go-ld-format"
)
// File represents a file in the MFS, its logic its mainly targeted
// to coordinating (potentially many) `FileDescriptor`s pointing to
// it.
type File struct {
inode
// Lock to coordinate the `FileDescriptor`s associated to this file.
desclock sync.RWMutex
// This isn't any node, it's the root node that represents the
// entire DAG of nodes that comprise the file.
// TODO: Rename, there should be an explicit term for these root nodes
// of a particular sub-DAG that abstract an upper layer's entity.
node ld.Node
// Lock around the `node` that represents this file, necessary because
// there may be many `FileDescriptor`s operating on this `File`.
nodeLock sync.RWMutex
RawLeaves bool
}
// NewFile returns a NewFile object with the given parameters. If the
// Cid version is non-zero RawLeaves will be enabled.
func NewFile(name string, node ld.Node, parent parent, dserv ld.DAGService) (*File, error) {
fi := &File{
inode: inode{
name: name,
parent: parent,
dagService: dserv,
},
node: node,
}
if node.Cid().Prefix().Version > 0 {
fi.RawLeaves = true
}
return fi, nil
}
func (fi *File) Open(flags Flags) (_ FileDescriptor, _retErr error) {
if flags.Write {
fi.desclock.Lock()
defer func() {
if _retErr != nil {
fi.desclock.Unlock()
}
}()
} else if flags.Read {
fi.desclock.RLock()
defer func() {
if _retErr != nil {
fi.desclock.RUnlock()
}
}()
} else {
return nil, fmt.Errorf("file opened for neither reading nor writing")
}
fi.nodeLock.RLock()
node := fi.node
fi.nodeLock.RUnlock()
// TODO: Move this `switch` logic outside (maybe even
// to another package, this seems like a job of UnixFS),
// `NewDagModifier` uses the LD node, we're not
// extracting anything just doing a safety check.
switch node := node.(type) {
case *dag.ProtoNode:
fsn, err := ft.FSNodeFromBytes(node.Data())
if err != nil {
return nil, err
}
switch fsn.Type() {
default:
return nil, fmt.Errorf("unsupported fsnode type for 'file'")
case ft.TSymlink:
return nil, fmt.Errorf("symlinks not yet supported")
case ft.TFile, ft.TRaw:
// OK case
}
case *dag.RawNode:
// Ok as well.
}
dmod, err := mod.NewDagModifier(context.TODO(), node, fi.dagService, chunker.DefaultSplitter)
// TODO: Remove the use of the `chunker` package here, add a new `NewDagModifier` in
// `go-unixfs` with the `DefaultSplitter` already included.
if err != nil {
return nil, err
}
dmod.RawLeaves = fi.RawLeaves
return &fileDescriptor{
inode: fi,
flags: flags,
mod: dmod,
state: stateCreated,
}, nil
}
// Size returns the size of this file
// TODO: Should we be providing this API?
// TODO: There's already a `FileDescriptor.Size()` that
// through the `DagModifier`'s `fileSize` function is doing
// pretty much the same thing as here, we should at least call
// that function and wrap the `ErrNotUnixfs` with an MFS text.
func (fi *File) Size() (int64, error) {
fi.nodeLock.RLock()
defer fi.nodeLock.RUnlock()
switch nd := fi.node.(type) {
case *dag.ProtoNode:
fsn, err := ft.FSNodeFromBytes(nd.Data())
if err != nil {
return 0, err
}
return int64(fsn.FileSize()), nil
case *dag.RawNode:
return int64(len(nd.RawData())), nil
default:
return 0, fmt.Errorf("unrecognized node type in mfs/file.Size()")
}
}
// GetNode returns the dag node associated with this file
// TODO: Use this method and do not access the `nodeLock` directly anywhere else.
func (fi *File) GetNode() (ld.Node, error) {
fi.nodeLock.RLock()
defer fi.nodeLock.RUnlock()
return fi.node, nil
}
// TODO: Tight coupling with the `FileDescriptor`, at the
// very least this should be an independent function that
// takes a `File` argument and automates the open/flush/close
// operations.
// TODO: Why do we need to flush a file that isn't opened?
// (the `OpenWriteOnly` seems to implicitly be targeting a
// closed file, a file we forgot to flush? can we close
// a file without flushing?)
func (fi *File) Flush() error {
// open the file in fullsync mode
fd, err := fi.Open(Flags{Write: true, Sync: true})
if err != nil {
return err
}
defer fd.Close()
return fd.Flush()
}
func (fi *File) Sync() error {
// just being able to take the writelock means the descriptor is synced
// TODO: Why?
fi.desclock.Lock()
fi.desclock.Unlock()
return nil
}
// Type returns the type FSNode this is
func (fi *File) Type() NodeType {
return TFile
}
This diff is collapsed.
package mfs
import (
ld "gitlab.dms3.io/dms3/go-ld-format"
)
// inode abstracts the common characteristics of the MFS `File`
// and `Directory`. All of its attributes are initialized at
// creation.
type inode struct {
// name of this `inode` in the MFS path (the same value
// is also stored as the name of the DAG link).
name string
// parent directory of this `inode` (which may be the `Root`).
parent parent
// dagService used to store modifications made to the contents
// of the file or directory the `inode` belongs to.
dagService ld.DAGService
}
This diff is collapsed.
package mfs
import (
"context"
"fmt"
"os"
gopath "path"
"strings"
path "gitlab.dms3.io/dms3/go-path"
cid "gitlab.dms3.io/dms3/go-cid"
ld "gitlab.dms3.io/dms3/go-ld-format"
)
// TODO: Evaluate moving all this operations to as `Root`
// methods, since all of them use it as its first argument
// and there is no clear documentation that explains this
// separation.
// Mv moves the file or directory at 'src' to 'dst'
// TODO: Document what the strings 'src' and 'dst' represent.
func Mv(r *Root, src, dst string) error {
srcDirName, srcFname := gopath.Split(src)
var dstDirName string
var dstFname string
if dst[len(dst)-1] == '/' {
dstDirName = dst
dstFname = srcFname
} else {
dstDirName, dstFname = gopath.Split(dst)
}
// get parent directories of both src and dest first
dstDir, err := lookupDir(r, dstDirName)
if err != nil {
return err
}
srcDir, err := lookupDir(r, srcDirName)
if err != nil {
return err
}
srcObj, err := srcDir.Child(srcFname)
if err != nil {
return err
}
nd, err := srcObj.GetNode()
if err != nil {
return err
}
fsn, err := dstDir.Child(dstFname)
if err == nil {
switch n := fsn.(type) {
case *File:
_ = dstDir.Unlink(dstFname)
case *Directory:
dstDir = n
dstFname = srcFname
default:
return fmt.Errorf("unexpected type at path: %s", dst)
}
} else if err != os.ErrNotExist {
return err
}
err = dstDir.AddChild(dstFname, nd)
if err != nil {
return err
}
if srcDir.name == dstDir.name && srcFname == dstFname {
return nil
}
return srcDir.Unlink(srcFname)
}
func lookupDir(r *Root, path string) (*Directory, error) {
di, err := Lookup(r, path)
if err != nil {
return nil, err
}
d, ok := di.(*Directory)
if !ok {
return nil, fmt.Errorf("%s is not a directory", path)
}
return d, nil
}
// PutNode inserts 'nd' at 'path' in the given mfs
// TODO: Rename or clearly document that this is not about nodes but actually
// MFS files/directories (that in the underlying representation can be
// considered as just nodes).
// TODO: Document why are we handling LD nodes in the first place when we
// are actually referring to files/directories (that is, it can't be any
// node, it has to have a specific format).
// TODO: Can this function add directories or just files? What would be the
// difference between adding a directory with this method and creating it
// with `Mkdir`.
func PutNode(r *Root, path string, nd ld.Node) error {
dirp, filename := gopath.Split(path)
if filename == "" {
return fmt.Errorf("cannot create file with empty name")
}
pdir, err := lookupDir(r, dirp)
if err != nil {
return err
}
return pdir.AddChild(filename, nd)
}
// MkdirOpts is used by Mkdir
type MkdirOpts struct {
Mkparents bool
Flush bool
CidBuilder cid.Builder
}
// Mkdir creates a directory at 'path' under the directory 'd', creating
// intermediary directories as needed if 'mkparents' is set to true
func Mkdir(r *Root, pth string, opts MkdirOpts) error {
if pth == "" {
return fmt.Errorf("no path given to Mkdir")
}
parts := path.SplitList(pth)
if parts[0] == "" {
parts = parts[1:]
}
// allow 'mkdir /a/b/c/' to create c
if parts[len(parts)-1] == "" {
parts = parts[:len(parts)-1]
}
if len(parts) == 0 {
// this will only happen on 'mkdir /'
if opts.Mkparents {
return nil
}
return fmt.Errorf("cannot create directory '/': Already exists")
}
cur := r.GetDirectory()
for i, d := range parts[:len(parts)-1] {
fsn, err := cur.Child(d)
if err == os.ErrNotExist && opts.Mkparents {
mkd, err := cur.Mkdir(d)
if err != nil {
return err
}
if opts.CidBuilder != nil {
mkd.SetCidBuilder(opts.CidBuilder)
}
fsn = mkd
} else if err != nil {
return err
}
next, ok := fsn.(*Directory)
if !ok {
return fmt.Errorf("%s was not a directory", path.Join(parts[:i]))
}
cur = next
}
final, err := cur.Mkdir(parts[len(parts)-1])
if err != nil {
if !opts.Mkparents || err != os.ErrExist || final == nil {
return err
}
}
if opts.CidBuilder != nil {
final.SetCidBuilder(opts.CidBuilder)
}
if opts.Flush {
err := final.Flush()
if err != nil {
return err
}
}
return nil
}
// Lookup extracts the root directory and performs a lookup under it.
// TODO: Now that the root is always a directory, can this function
// be collapsed with `DirLookup`? Or at least be made a method of `Root`?
func Lookup(r *Root, path string) (FSNode, error) {
dir := r.GetDirectory()
return DirLookup(dir, path)
}
// DirLookup will look up a file or directory at the given path
// under the directory 'd'
func DirLookup(d *Directory, pth string) (FSNode, error) {
pth = strings.Trim(pth, "/")
parts := path.SplitList(pth)
if len(parts) == 1 && parts[0] == "" {
return d, nil
}
var cur FSNode
cur = d
for i, p := range parts {
chdir, ok := cur.(*Directory)
if !ok {
return nil, fmt.Errorf("cannot access %s: Not a directory", path.Join(parts[:i+1]))
}
child, err := chdir.Child(p)
if err != nil {
return nil, err
}
cur = child
}
return cur, nil
}
// TODO: Document this function and link its functionality
// with the republisher.
func FlushPath(ctx context.Context, rt *Root, pth string) (ld.Node, error) {
nd, err := Lookup(rt, pth)
if err != nil {
return nil, err
}
err = nd.Flush()
if err != nil {
return nil, err
}
rt.repub.WaitPub(ctx)
return nd.GetNode()
}
package mfs
type Flags struct {
Read bool
Write bool
Sync bool
}
{
"author": "hsanjuan",
"bugs": {
"url": "https://github.com/ipfs/go-mfs"
},
"gx": {
"dvcsimport": "github.com/ipfs/go-mfs"
},
"gxDependencies": [
{
"author": "why",
"hash": "QmY6UwsN3D6uoxrRkYpJ8Wos8R66gwLmdn3wy7jM7CCRQ1",
"name": "go-merkledag",
"version": "1.1.40"
},
{
"author": "why",
"hash": "QmVmueix5wxmr8UWpfpcKw6F1xT7T8AS7CXQRM37BE29eX",
"name": "go-unixfs",
"version": "1.3.15"
},
{
"author": "whyrusleeping",
"hash": "QmTbxNB1NwDesLmKTscr4udL2tVP7MaxvXnD1D9yX7g3PN",
"name": "go-cid",
"version": "0.9.3"
},
{
"author": "whyrusleeping",
"hash": "QmZ6nzCLwGLVfRzYLpD7pW6UNuBDKEcA2imJtVpbEx2rxy",
"name": "go-ipld-format",
"version": "0.8.1"
},
{
"author": "why",
"hash": "QmUquHAkyh9phs4ojSuHcHcZnUMGDwDNJZWnRXkNks1qLB",
"name": "go-path",
"version": "1.1.41"
}
],
"gxVersion": "0.12.1",
"language": "go",
"license": "MIT",
"name": "go-mfs",
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "0.1.54"
}
package mfs
import (
"context"
"time"
cid "gitlab.dms3.io/dms3/go-cid"
)
// PubFunc is the user-defined function that determines exactly what
// logic entails "publishing" a `Cid` value.
type PubFunc func(context.Context, cid.Cid) error
// Republisher manages when to publish a given entry.
type Republisher struct {
TimeoutLong time.Duration
TimeoutShort time.Duration
RetryTimeout time.Duration
pubfunc PubFunc
update chan cid.Cid
immediatePublish chan chan struct{}
ctx context.Context
cancel func()
}
// NewRepublisher creates a new Republisher object to republish the given root
// using the given short and long time intervals.
func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration) *Republisher {
ctx, cancel := context.WithCancel(ctx)
return &Republisher{
TimeoutShort: tshort,
TimeoutLong: tlong,
RetryTimeout: tlong,
update: make(chan cid.Cid, 1),
pubfunc: pf,
immediatePublish: make(chan chan struct{}),
ctx: ctx,
cancel: cancel,
}
}
// WaitPub waits for the current value to be published (or returns early
// if it already has).
func (rp *Republisher) WaitPub(ctx context.Context) error {
wait := make(chan struct{})
select {
case rp.immediatePublish <- wait:
case <-ctx.Done():
return ctx.Err()
}
select {
case <-wait:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (rp *Republisher) Close() error {
// TODO(steb): Wait for `Run` to stop
err := rp.WaitPub(rp.ctx)
rp.cancel()
return err
}
// Update the current value. The value will be published after a delay but each
// consecutive call to Update may extend this delay up to TimeoutLong.
func (rp *Republisher) Update(c cid.Cid) {
select {
case <-rp.update:
select {
case rp.update <- c:
default:
// Don't try again. If we hit this case, there's a
// concurrent publish and we can safely let that
// concurrent publish win.
}
case rp.update <- c:
}
}
// Run contains the core logic of the `Republisher`. It calls the user-defined
// `pubfunc` function whenever the `Cid` value is updated to a *new* value. The
// complexity comes from the fact that `pubfunc` may be slow so we need to batch
// updates.
//
// Algorithm:
// 1. When we receive the first update after publishing, we set a `longer` timer.
// 2. When we receive any update, we reset the `quick` timer.
// 3. If either the `quick` timeout or the `longer` timeout elapses,
// we call `publish` with the latest updated value.
//
// The `longer` timer ensures that we delay publishing by at most
// `TimeoutLong`. The `quick` timer allows us to publish sooner if
// it looks like there are no more updates coming down the pipe.
//
// Note: If a publish fails, we retry repeatedly every TimeoutRetry.
func (rp *Republisher) Run(lastPublished cid.Cid) {
quick := time.NewTimer(0)
if !quick.Stop() {
<-quick.C
}
longer := time.NewTimer(0)
if !longer.Stop() {
<-longer.C
}
var toPublish cid.Cid
for rp.ctx.Err() == nil {
var waiter chan struct{}
select {
case <-rp.ctx.Done():
return
case newValue := <-rp.update:
// Skip already published values.
if lastPublished.Equals(newValue) {
// Break to the end of the switch to cleanup any
// timers.
toPublish = cid.Undef
break
}
// If we aren't already waiting to publish something,
// reset the long timeout.
if !toPublish.Defined() {
longer.Reset(rp.TimeoutLong)
}
// Always reset the short timeout.
quick.Reset(rp.TimeoutShort)
// Finally, set the new value to publish.
toPublish = newValue
continue
case waiter = <-rp.immediatePublish:
// Make sure to grab the *latest* value to publish.
select {
case toPublish = <-rp.update:
default:
}
// Avoid publishing duplicate values
if lastPublished.Equals(toPublish) {
toPublish = cid.Undef
}
case <-quick.C:
case <-longer.C:
}
// Cleanup, publish, and close waiters.
// 1. Stop any timers. Don't use the `if !t.Stop() { ... }`
// idiom as these timers may not be running.
quick.Stop()
select {
case <-quick.C:
default:
}
longer.Stop()
select {
case <-longer.C:
default:
}
// 2. If we have a value to publish, publish it now.
if toPublish.Defined() {
for {
err := rp.pubfunc(rp.ctx, toPublish)
if err == nil {
break
}
// Keep retrying until we succeed or we abort.
// TODO(steb): We could try pulling new values
// off `update` but that's not critical (and
// complicates this code a bit). We'll pull off
// a new value on the next loop through.
select {
case <-time.After(rp.RetryTimeout):
case <-rp.ctx.Done():
return
}
}
lastPublished = toPublish
toPublish = cid.Undef
}
// 3. Trigger anything waiting in `WaitPub`.
if waiter != nil {
close(waiter)
}
}
}
package mfs
import (
"context"
"testing"
"time"
cid "gitlab.dms3.io/dms3/go-cid"
ci "gitlab.dms3.io/p2p/go-p2p-testing/ci"
)
func TestRepublisher(t *testing.T) {
if ci.IsRunning() {
t.Skip("dont run timing tests in CI")
}
ctx := context.TODO()
pub := make(chan struct{})
pf := func(ctx context.Context, c cid.Cid) error {
pub <- struct{}{}
return nil
}
testCid1, _ := cid.Parse("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH")
testCid2, _ := cid.Parse("QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVX")
tshort := time.Millisecond * 50
tlong := time.Second / 2
rp := NewRepublisher(ctx, pf, tshort, tlong)
go rp.Run(cid.Undef)
rp.Update(testCid1)
// should hit short timeout
select {
case <-time.After(tshort * 2):
t.Fatal("publish didnt happen in time")
case <-pub:
}
cctx, cancel := context.WithCancel(context.Background())
go func() {
for {
rp.Update(testCid2)
time.Sleep(time.Millisecond * 10)
select {
case <-cctx.Done():
return
default:
}
}
}()
select {
case <-pub:
t.Fatal("shouldnt have received publish yet!")
case <-time.After((tlong * 9) / 10):
}
select {
case <-pub:
case <-time.After(tlong / 2):
t.Fatal("waited too long for pub!")
}
cancel()
err := rp.Close()
if err != nil {
t.Fatal(err)
}
}
// package mfs implements an in memory model of a mutable DMS3 filesystem.
// TODO: Develop on this line (and move it to `doc.go`).
package mfs
import (
"context"
"errors"
"fmt"
"time"
dag "gitlab.dms3.io/dms3/go-merkledag"
ft "gitlab.dms3.io/dms3/go-unixfs"
ld "gitlab.dms3.io/dms3/go-ld-format"
logging "gitlab.dms3.io/dms3/go-log"
)
// TODO: Remove if not used.
var ErrNotExist = errors.New("no such rootfs")
var ErrClosed = errors.New("file closed")
var log = logging.Logger("mfs")
// TODO: Remove if not used.
var ErrIsDirectory = errors.New("error: is a directory")
// The information that an MFS `Directory` has about its children
// when updating one of its entries: when a child mutates it signals
// its parent directory to update its entry (under `Name`) with the
// new content (in `Node`).
type child struct {
Name string
Node ld.Node
}
// This interface represents the basic property of MFS directories of updating
// children entries with modified content. Implemented by both the MFS
// `Directory` and `Root` (which is basically a `Directory` with republishing
// support).
//
// TODO: What is `fullsync`? (unnamed `bool` argument)
// TODO: There are two types of persistence/flush that need to be
// distinguished here, one at the DAG level (when I store the modified
// nodes in the DAG service) and one in the UnixFS/MFS level (when I modify
// the entry/link of the directory that pointed to the modified node).
type parent interface {
// Method called by a child to its parent to signal to update the content
// pointed to in the entry by that child's name. The child sends its own
// information in the `child` structure. As modifying a directory entry
// entails modifying its contents the parent will also call *its* parent's
// `updateChildEntry` to update the entry pointing to the new directory,
// this mechanism is in turn repeated until reaching the `Root`.
updateChildEntry(c child) error
}
type NodeType int
const (
TFile NodeType = iota
TDir
)
// FSNode abstracts the `Directory` and `File` structures, it represents
// any child node in the MFS (i.e., all the nodes besides the `Root`). It
// is the counterpart of the `parent` interface which represents any
// parent node in the MFS (`Root` and `Directory`).
// (Not to be confused with the `unixfs.FSNode`.)
type FSNode interface {
GetNode() (ld.Node, error)
Flush() error
Type() NodeType
}
// IsDir checks whether the FSNode is dir type
func IsDir(fsn FSNode) bool {
return fsn.Type() == TDir
}
// IsFile checks whether the FSNode is file type
func IsFile(fsn FSNode) bool {
return fsn.Type() == TFile
}
// Root represents the root of a filesystem tree.
type Root struct {
// Root directory of the MFS layout.
dir *Directory
repub *Republisher
}
// NewRoot creates a new Root and starts up a republisher routine for it.
func NewRoot(parent context.Context, ds ld.DAGService, node *dag.ProtoNode, pf PubFunc) (*Root, error) {
var repub *Republisher
if pf != nil {
repub = NewRepublisher(parent, pf, time.Millisecond*300, time.Second*3)
// No need to take the lock here since we just created
// the `Republisher` and no one has access to it yet.
go repub.Run(node.Cid())
}
root := &Root{
repub: repub,
}
fsn, err := ft.FSNodeFromBytes(node.Data())
if err != nil {
log.Error("DMS3NS pointer was not unixfs node")
// TODO: DMS3NS pointer?
return nil, err
}
switch fsn.Type() {
case ft.TDirectory, ft.THAMTShard:
newDir, err := NewDirectory(parent, node.String(), node, root, ds)
if err != nil {
return nil, err
}
root.dir = newDir
case ft.TFile, ft.TMetadata, ft.TRaw:
return nil, fmt.Errorf("root can't be a file (unixfs type: %s)", fsn.Type())
// TODO: This special error reporting case doesn't seem worth it, we either
// have a UnixFS directory or we don't.
default:
return nil, fmt.Errorf("unrecognized unixfs type: %s", fsn.Type())
}
return root, nil
}
// GetDirectory returns the root directory.
func (kr *Root) GetDirectory() *Directory {
return kr.dir
}
// Flush signals that an update has occurred since the last publish,
// and updates the Root republisher.
// TODO: We are definitely abusing the "flush" terminology here.
func (kr *Root) Flush() error {
nd, err := kr.GetDirectory().GetNode()
if err != nil {
return err
}
if kr.repub != nil {
kr.repub.Update(nd.Cid())
}
return nil
}
// FlushMemFree flushes the root directory and then uncaches all of its links.
// This has the effect of clearing out potentially stale references and allows
// them to be garbage collected.
// CAUTION: Take care not to ever call this while holding a reference to any
// child directories. Those directories will be bad references and using them
// may have unintended racy side effects.
// A better implemented mfs system (one that does smarter internal caching and
// refcounting) shouldnt need this method.
// TODO: Review the motivation behind this method once the cache system is
// refactored.
func (kr *Root) FlushMemFree(ctx context.Context) error {
dir := kr.GetDirectory()
if err := dir.Flush(); err != nil {
return err
}
dir.lock.Lock()
defer dir.lock.Unlock()
for name := range dir.entriesCache {
delete(dir.entriesCache, name)
}
// TODO: Can't we just create new maps?
return nil
}
// updateChildEntry implements the `parent` interface, and signals
// to the publisher that there are changes ready to be published.
// This is the only thing that separates a `Root` from a `Directory`.
// TODO: Evaluate merging both.
// TODO: The `sync` argument isn't used here (we've already reached
// the top), document it and maybe make it an anonymous variable (if
// that's possible).
func (kr *Root) updateChildEntry(c child) error {
err := kr.GetDirectory().dagService.Add(context.TODO(), c.Node)
if err != nil {
return err
}
// TODO: Why are we not using the inner directory lock nor
// applying the same procedure as `Directory.updateChildEntry`?
if kr.repub != nil {
kr.repub.Update(c.Node.Cid())
}
return nil
}
func (kr *Root) Close() error {
nd, err := kr.GetDirectory().GetNode()
if err != nil {
return err
}
if kr.repub != nil {
kr.repub.Update(nd.Cid())
return kr.repub.Close()
}
return nil
}