Commit 40a49c83 authored by Stephen Whitmore's avatar Stephen Whitmore

Mounts detect unmounts and track mount state.

This lets FUSE mounts to track whether they are active or not by
tracking when fs.Serve terminates.

License: MIT
Signed-off-by: default avatarStephen Whitmore <noffle@ipfs.io>
parent 5a0b8023
......@@ -21,6 +21,7 @@ import (
corehttp "github.com/ipfs/go-ipfs/core/corehttp"
corerepo "github.com/ipfs/go-ipfs/core/corerepo"
"github.com/ipfs/go-ipfs/core/corerouting"
nodeMount "github.com/ipfs/go-ipfs/fuse/node"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
util "github.com/ipfs/go-ipfs/util"
conn "gx/ipfs/QmUBogf4nUefBjmYjn6jfsfPJRkmDGSeMhNj4usRKq69f4/go-libp2p/p2p/net/conn"
......@@ -494,7 +495,7 @@ func mountFuse(req cmds.Request) error {
return fmt.Errorf("mountFuse: ConstructNode() failed: %s", err)
}
err = commands.Mount(node, fsdir, nsdir)
err = nodeMount.Mount(node, fsdir, nsdir)
if err != nil {
return err
}
......
......@@ -4,10 +4,7 @@
package commands
import (
"errors"
cmds "github.com/ipfs/go-ipfs/commands"
"github.com/ipfs/go-ipfs/core"
)
var MountCmd = &cmds.Command{
......@@ -23,7 +20,3 @@ For the latest instructions, please check the project's repository:
`,
},
}
func Mount(node *core.IpfsNode, fsdir, nsdir string) error {
return errors.New("not compiled in")
}
......@@ -7,32 +7,12 @@ import (
"fmt"
"io"
"strings"
"time"
cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
ipns "github.com/ipfs/go-ipfs/fuse/ipns"
mount "github.com/ipfs/go-ipfs/fuse/mount"
rofs "github.com/ipfs/go-ipfs/fuse/readonly"
nodeMount "github.com/ipfs/go-ipfs/fuse/node"
config "github.com/ipfs/go-ipfs/repo/config"
)
// amount of time to wait for mount errors
// TODO is this non-deterministic?
const mountTimeout = time.Second
// fuseNoDirectory used to check the returning fuse error
const fuseNoDirectory = "fusermount: failed to access mountpoint"
// fuseExitStatus1 used to check the returning fuse error
const fuseExitStatus1 = "fusermount: exit status 1"
// platformFuseChecks can get overridden by arch-specific files
// to run fuse checks (like checking the OSXFUSE version)
var platformFuseChecks = func(*core.IpfsNode) error {
return nil
}
var MountCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Mounts IPFS to the filesystem (read-only).",
......@@ -134,7 +114,7 @@ baz
nsdir = cfg.Mounts.IPNS // NB: be sure to not redeclare!
}
err = Mount(node, fsdir, nsdir)
err = nodeMount.Mount(node, fsdir, nsdir)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
......@@ -155,90 +135,3 @@ baz
},
},
}
func Mount(node *core.IpfsNode, fsdir, nsdir string) error {
// check if we already have live mounts.
// if the user said "Mount", then there must be something wrong.
// so, close them and try again.
if node.Mounts.Ipfs != nil {
node.Mounts.Ipfs.Unmount()
}
if node.Mounts.Ipns != nil {
node.Mounts.Ipns.Unmount()
}
if err := platformFuseChecks(node); err != nil {
return err
}
var err error
if err = doMount(node, fsdir, nsdir); err != nil {
return err
}
return nil
}
func doMount(node *core.IpfsNode, fsdir, nsdir string) error {
fmtFuseErr := func(err error, mountpoint string) error {
s := err.Error()
if strings.Contains(s, fuseNoDirectory) {
s = strings.Replace(s, `fusermount: "fusermount:`, "", -1)
s = strings.Replace(s, `\n", exit status 1`, "", -1)
return cmds.ClientError(s)
}
if s == fuseExitStatus1 {
s = fmt.Sprintf("fuse failed to access mountpoint %s", mountpoint)
return cmds.ClientError(s)
}
return err
}
// this sync stuff is so that both can be mounted simultaneously.
var fsmount mount.Mount
var nsmount mount.Mount
var err1 error
var err2 error
done := make(chan struct{})
go func() {
fsmount, err1 = rofs.Mount(node, fsdir)
done <- struct{}{}
}()
go func() {
nsmount, err2 = ipns.Mount(node, nsdir, fsdir)
done <- struct{}{}
}()
<-done
<-done
if err1 != nil {
log.Errorf("error mounting: %s", err1)
}
if err2 != nil {
log.Errorf("error mounting: %s", err2)
}
if err1 != nil || err2 != nil {
if fsmount != nil {
fsmount.Unmount()
}
if nsmount != nil {
nsmount.Unmount()
}
if err1 != nil {
return fmtFuseErr(err1, fsdir)
}
return fmtFuseErr(err2, nsdir)
}
// setup node state, so that it can be cancelled
node.Mounts.Ipfs = fsmount
node.Mounts.Ipns = nsmount
return nil
}
......@@ -4,7 +4,6 @@ import (
"errors"
cmds "github.com/ipfs/go-ipfs/commands"
"github.com/ipfs/go-ipfs/core"
)
var MountCmd = &cmds.Command{
......@@ -17,9 +16,3 @@ var MountCmd = &cmds.Command{
res.SetError(errors.New("Mount isn't compatible with Windows yet"), cmds.ErrNormal)
},
}
func Mount(node *core.IpfsNode, fsdir, nsdir string) error {
// TODO
// currently a no-op, but we don't want to return an error
return nil
}
......@@ -335,10 +335,10 @@ func (n *IpfsNode) teardown() error {
closers = append(closers, n.Exchange)
}
if n.Mounts.Ipfs != nil {
if n.Mounts.Ipfs != nil && !n.Mounts.Ipfs.IsActive() {
closers = append(closers, mount.Closer(n.Mounts.Ipfs))
}
if n.Mounts.Ipns != nil {
if n.Mounts.Ipns != nil && !n.Mounts.Ipns.IsActive() {
closers = append(closers, mount.Closer(n.Mounts.Ipns))
}
......
......@@ -15,12 +15,11 @@ import (
racedet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
core "github.com/ipfs/go-ipfs/core"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
//mfs "github.com/ipfs/go-ipfs/mfs"
namesys "github.com/ipfs/go-ipfs/namesys"
offroute "github.com/ipfs/go-ipfs/routing/offline"
u "github.com/ipfs/go-ipfs/util"
ci "github.com/ipfs/go-ipfs/util/testutil/ci"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)
func maybeSkipFuseTests(t *testing.T) {
......@@ -437,111 +436,6 @@ func TestFSThrash(t *testing.T) {
}
}
/*
func TestFastRepublish(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
// make timeout noticeable.
osrt := shortRepublishTimeout
shortRepublishTimeout = time.Millisecond * 100
olrt := longRepublishTimeout
longRepublishTimeout = time.Second
node, mnt := setupIpnsTest(t, nil)
h, err := node.PrivateKey.GetPublic().Hash()
if err != nil {
t.Fatal(err)
}
pubkeyPath := "/ipns/" + u.Key(h).String()
// set them back
defer func() {
shortRepublishTimeout = osrt
longRepublishTimeout = olrt
mnt.Close()
}()
closed := make(chan struct{})
dataA := []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
dataB := []byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
fname := mnt.Dir + "/local/file"
// get first resolved hash
log.Debug("publishing first hash")
writeFileData(t, dataA, fname) // random
<-time.After(shortRepublishTimeout * 2)
log.Debug("resolving first hash")
resolvedHash, err := node.Namesys.Resolve(context.Background(), pubkeyPath)
if err != nil {
t.Fatal("resolve err:", pubkeyPath, err)
}
// constantly keep writing to the file
go func(timeout time.Duration) {
for {
select {
case <-closed:
return
case <-time.After(timeout * 8 / 10):
writeFileData(t, dataB, fname)
}
}
}(shortRepublishTimeout)
hasPublished := func() bool {
res, err := node.Namesys.Resolve(context.Background(), pubkeyPath)
if err != nil {
t.Fatalf("resolve err: %v", err)
}
return res != resolvedHash
}
// test things
// at this point, should not have written dataA and not have written dataB
rbuf, err := ioutil.ReadFile(fname)
if err != nil || !bytes.Equal(rbuf, dataA) {
t.Fatalf("Data inconsistent! %v %v", err, string(rbuf))
}
if hasPublished() {
t.Fatal("published (wrote)")
}
<-time.After(shortRepublishTimeout * 11 / 10)
// at this point, should have written written dataB, but not published it
rbuf, err = ioutil.ReadFile(fname)
if err != nil || !bytes.Equal(rbuf, dataB) {
t.Fatalf("Data inconsistent! %v %v", err, string(rbuf))
}
if hasPublished() {
t.Fatal("published (wrote)")
}
<-time.After(longRepublishTimeout * 11 / 10)
// at this point, should have written written dataB, and published it
rbuf, err = ioutil.ReadFile(fname)
if err != nil || !bytes.Equal(rbuf, dataB) {
t.Fatalf("Data inconsistent! %v %v", err, string(rbuf))
}
if !hasPublished() {
t.Fatal("not published")
}
close(closed)
}
*/
// Test writing a medium sized file one byte at a time
func TestMultiWrite(t *testing.T) {
......
......@@ -4,7 +4,9 @@
package mount
import (
"errors"
"fmt"
"sync"
"time"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
......@@ -12,12 +14,16 @@ import (
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
)
var ErrNotMounted = errors.New("not mounted")
// mount implements go-ipfs/fuse/mount
type mount struct {
mpoint string
filesys fs.FS
fuseConn *fuse.Conn
// closeErr error
active bool
activeLock *sync.RWMutex
proc goprocess.Process
}
......@@ -39,10 +45,12 @@ func NewMount(p goprocess.Process, fsys fs.FS, mountpoint string, allow_other bo
}
m := &mount{
mpoint: mountpoint,
fuseConn: conn,
filesys: fsys,
proc: goprocess.WithParent(p), // link it to parent.
mpoint: mountpoint,
fuseConn: conn,
filesys: fsys,
active: false,
activeLock: &sync.RWMutex{},
proc: goprocess.WithParent(p), // link it to parent.
}
m.proc.SetTeardown(m.unmount)
......@@ -60,11 +68,14 @@ func (m *mount) mount() error {
errs := make(chan error, 1)
go func() {
// fs.Serve blocks until the filesystem is unmounted.
err := fs.Serve(m.fuseConn, m.filesys)
log.Debugf("Mounting %s -- fs.Serve returned (%s)", err)
log.Debugf("%s is unmounted", m.MountPoint())
if err != nil {
log.Debugf("fs.Serve returned (%s)", err)
errs <- err
}
m.setActive(false)
}()
// wait for the mount process to be done, or timed out.
......@@ -81,6 +92,8 @@ func (m *mount) mount() error {
return err
}
m.setActive(true)
log.Infof("Mounted %s", m.MountPoint())
return nil
}
......@@ -95,6 +108,7 @@ func (m *mount) unmount() error {
// try unmounting with fuse lib
err := fuse.Unmount(m.MountPoint())
if err == nil {
m.setActive(false)
return nil
}
log.Warningf("fuse unmount err: %s", err)
......@@ -102,11 +116,10 @@ func (m *mount) unmount() error {
// try closing the fuseConn
err = m.fuseConn.Close()
if err == nil {
m.setActive(false)
return nil
}
if err != nil {
log.Warningf("fuse conn error: %s", err)
}
log.Warningf("fuse conn error: %s", err)
// try mount.ForceUnmountManyTimes
if err := ForceUnmountManyTimes(m, 10); err != nil {
......@@ -114,6 +127,7 @@ func (m *mount) unmount() error {
}
log.Infof("Seemingly unmounted %s", m.MountPoint())
m.setActive(false)
return nil
}
......@@ -126,6 +140,23 @@ func (m *mount) MountPoint() string {
}
func (m *mount) Unmount() error {
if !m.IsActive() {
return ErrNotMounted
}
// call Process Close(), which calls unmount() exactly once.
return m.proc.Close()
}
func (m *mount) IsActive() bool {
m.activeLock.RLock()
defer m.activeLock.RUnlock()
return m.active
}
func (m *mount) setActive(a bool) {
m.activeLock.Lock()
m.active = a
m.activeLock.Unlock()
}
......@@ -25,6 +25,9 @@ type Mount interface {
// Unmounts the mount
Unmount() error
// Checks if the mount is still active.
IsActive() bool
// Process returns the mount's Process to be able to link it
// to other processes. Unmount upon closing.
Process() goprocess.Process
......
// +build !nofuse
package commands
package node
import (
"bytes"
......
// +build linux darwin freebsd
// +build nofuse
package node
import (
"errors"
core "github.com/ipfs/go-ipfs/core"
)
func Mount(node *core.IpfsNode, fsdir, nsdir string) error {
return errors.New("not compiled in")
}
// +build !nofuse
package node
import (
"io/ioutil"
"os"
"os/exec"
"testing"
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
core "github.com/ipfs/go-ipfs/core"
ipns "github.com/ipfs/go-ipfs/fuse/ipns"
mount "github.com/ipfs/go-ipfs/fuse/mount"
namesys "github.com/ipfs/go-ipfs/namesys"
offroute "github.com/ipfs/go-ipfs/routing/offline"
ci "github.com/ipfs/go-ipfs/util/testutil/ci"
)
func maybeSkipFuseTests(t *testing.T) {
if ci.NoFuse() {
t.Skip("Skipping FUSE tests")
}
}
func mkdir(t *testing.T, path string) {
err := os.Mkdir(path, os.ModeDir|os.ModePerm)
if err != nil {
t.Fatal(err)
}
}
// Test externally unmounting, then trying to unmount in code
func TestExternalUnmount(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
// TODO: needed?
maybeSkipFuseTests(t)
node, err := core.NewNode(context.Background(), nil)
if err != nil {
t.Fatal(err)
}
err = node.LoadPrivateKey()
if err != nil {
t.Fatal(err)
}
node.Routing = offroute.NewOfflineRouter(node.Repo.Datastore(), node.PrivateKey)
node.Namesys = namesys.NewNameSystem(node.Routing, node.Repo.Datastore(), 0)
err = ipns.InitializeKeyspace(node, node.PrivateKey)
if err != nil {
t.Fatal(err)
}
// get the test dir paths (/tmp/fusetestXXXX)
dir, err := ioutil.TempDir("", "fusetest")
if err != nil {
t.Fatal(err)
}
ipfsDir := dir + "/ipfs"
ipnsDir := dir + "/ipns"
mkdir(t, ipfsDir)
mkdir(t, ipnsDir)
err = Mount(node, ipfsDir, ipnsDir)
if err != nil {
t.Fatal(err)
}
// Run shell command to externally unmount the directory
cmd := "fusermount"
args := []string{"-u", ipnsDir}
if err := exec.Command(cmd, args...).Run(); err != nil {
t.Fatal(err)
}
// TODO(noffle): it takes a moment for the goroutine that's running fs.Serve to be notified and do its cleanup.
time.Sleep(time.Millisecond * 100)
// Attempt to unmount IPNS; check that it was already unmounted.
err = node.Mounts.Ipns.Unmount()
if err != mount.ErrNotMounted {
t.Fatal("Unmount should have failed")
}
// Attempt to unmount IPFS; it should unmount successfully.
err = node.Mounts.Ipfs.Unmount()
if err != nil {
t.Fatal(err)
}
}
// +build linux darwin freebsd
// +build !nofuse
package node
import (
"errors"
"fmt"
"strings"
"time"
core "github.com/ipfs/go-ipfs/core"
ipns "github.com/ipfs/go-ipfs/fuse/ipns"
mount "github.com/ipfs/go-ipfs/fuse/mount"
rofs "github.com/ipfs/go-ipfs/fuse/readonly"
logging "gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log"
)
var log = logging.Logger("node")
// amount of time to wait for mount errors
// TODO is this non-deterministic?
const mountTimeout = time.Second
// fuseNoDirectory used to check the returning fuse error
const fuseNoDirectory = "fusermount: failed to access mountpoint"
// fuseExitStatus1 used to check the returning fuse error
const fuseExitStatus1 = "fusermount: exit status 1"
// platformFuseChecks can get overridden by arch-specific files
// to run fuse checks (like checking the OSXFUSE version)
var platformFuseChecks = func(*core.IpfsNode) error {
return nil
}
func Mount(node *core.IpfsNode, fsdir, nsdir string) error {
// check if we already have live mounts.
// if the user said "Mount", then there must be something wrong.
// so, close them and try again.
if node.Mounts.Ipfs != nil && node.Mounts.Ipfs.IsActive() {
node.Mounts.Ipfs.Unmount()
}
if node.Mounts.Ipns != nil && node.Mounts.Ipns.IsActive() {
node.Mounts.Ipns.Unmount()
}
if err := platformFuseChecks(node); err != nil {
return err
}
var err error
if err = doMount(node, fsdir, nsdir); err != nil {
return err
}
return nil
}
func doMount(node *core.IpfsNode, fsdir, nsdir string) error {
fmtFuseErr := func(err error, mountpoint string) error {
s := err.Error()
if strings.Contains(s, fuseNoDirectory) {
s = strings.Replace(s, `fusermount: "fusermount:`, "", -1)
s = strings.Replace(s, `\n", exit status 1`, "", -1)
return errors.New(s)
}
if s == fuseExitStatus1 {
s = fmt.Sprintf("fuse failed to access mountpoint %s", mountpoint)
return errors.New(s)
}
return err
}
// this sync stuff is so that both can be mounted simultaneously.
var fsmount mount.Mount
var nsmount mount.Mount
var err1 error
var err2 error
done := make(chan struct{})
go func() {
fsmount, err1 = rofs.Mount(node, fsdir)
done <- struct{}{}
}()
go func() {
nsmount, err2 = ipns.Mount(node, nsdir, fsdir)
done <- struct{}{}
}()
<-done
<-done
if err1 != nil {
log.Errorf("error mounting: %s", err1)
}
if err2 != nil {
log.Errorf("error mounting: %s", err2)
}
if err1 != nil || err2 != nil {
if fsmount != nil {
fsmount.Unmount()
}
if nsmount != nil {
nsmount.Unmount()
}
if err1 != nil {
return fmtFuseErr(err1, fsdir)
}
return fmtFuseErr(err2, nsdir)
}
// setup node state, so that it can be cancelled
node.Mounts.Ipfs = fsmount
node.Mounts.Ipns = nsmount
return nil
}
package node
import (
"github.com/ipfs/go-ipfs/core"
)
func Mount(node *core.IpfsNode, fsdir, nsdir string) error {
// TODO
// currently a no-op, but we don't want to return an error
return nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment