Commit e290b54d authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

Merge pull request #351 from jbenet/mount-fixes

Mount, Daemon, and Signal Handler Fixes
parents 61c1e39a 4ac6db9a
......@@ -4,12 +4,16 @@ package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/gonuts/flag"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/commander"
core "github.com/jbenet/go-ipfs/core"
ipns "github.com/jbenet/go-ipfs/fuse/ipns"
mount "github.com/jbenet/go-ipfs/fuse/mount"
rofs "github.com/jbenet/go-ipfs/fuse/readonly"
)
......@@ -48,52 +52,65 @@ func mountCmd(c *commander.Command, inp []string) error {
if val, ok := c.Flag.Lookup("f").Value.Get().(string); ok && val != "" {
fsdir = val
}
fsdone := mountIpfs(cc.node, fsdir)
// get default mount points
nsdir := cc.node.Config.Mounts.IPNS
if val, ok := c.Flag.Lookup("n").Value.Get().(string); ok && val != "" {
nsdir = val
}
nsdone := mountIpns(cc.node, nsdir, fsdir)
// wait till mounts are done.
err1 := <-fsdone
err2 := <-nsdone
if err1 != nil {
return err1
}
return err2
return doMount(cc.node, fsdir, nsdir)
}
func mountIpfs(node *core.IpfsNode, fsdir string) <-chan error {
done := make(chan error)
fmt.Printf("mounting ipfs at %s\n", fsdir)
func doMount(node *core.IpfsNode, fsdir, nsdir string) error {
// this sync stuff is so that both can be mounted simultaneously.
var fsmount mount.Mount
var nsmount mount.Mount
var err1 error
var err2 error
done := make(chan struct{})
go func() {
err := rofs.Mount(node, fsdir)
done <- err
close(done)
fsmount, err1 = rofs.Mount(node, fsdir)
done <- struct{}{}
}()
return done
}
go func() {
nsmount, err2 = ipns.Mount(node, nsdir, fsdir)
done <- struct{}{}
}()
func mountIpns(node *core.IpfsNode, nsdir, fsdir string) <-chan error {
if nsdir == "" {
return nil
<-done
<-done
if err1 != nil || err2 != nil {
fsmount.Close()
nsmount.Close()
if err1 != nil {
return err1
} else {
return err2
}
}
done := make(chan error)
fmt.Printf("mounting ipns at %s\n", nsdir)
go func() {
err := ipns.Mount(node, nsdir, fsdir)
done <- err
close(done)
}()
fmt.Printf("mounted ipfs at %s\n", fsdir)
fmt.Printf("mounted ipns at %s\n", nsdir)
return done
// setup node state, so that it can be cancelled
node.Mounts.Ipfs = fsmount
node.Mounts.Ipns = nsmount
// wait until we kill
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT,
syscall.SIGTERM, syscall.SIGQUIT)
<-sigc
fmt.Println("unmounting...")
node.Close()
return nil
}
var platformFuseChecks = func() error {
......
......@@ -3,9 +3,6 @@ package main
import (
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net"
......@@ -74,40 +71,35 @@ func daemonFunc(req cmds.Request) (interface{}, error) {
ifpsHandler := &ipfsHandler{node}
mux.Handle("/ipfs/", ifpsHandler)
err = listenAndServe(mux, host)
err = listenAndServe(node, mux, host)
return nil, err
}
func listenAndServe(mux *http.ServeMux, host string) error {
func listenAndServe(node *core.IpfsNode, mux *http.ServeMux, host string) error {
fmt.Printf("API server listening on '%s'\n", host)
s := manners.NewServer()
done := make(chan struct{}, 1)
defer func() {
done <- struct{}{}
}()
// go wait until we kill it.
// go wait until the node dies
go func() {
sig := sigTerm()
select {
case <-node.Closed():
case <-done:
log.Info("daemon terminated at %s.", host)
case <-sig:
s.Shutdown <- true
log.Info("terminating daemon at %s...", host)
return
}
log.Info("terminating daemon at %s...", host)
s.Shutdown <- true
}()
if err := s.ListenAndServe(host, mux); err != nil {
return err
}
return nil
}
func sigTerm() chan os.Signal {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT,
syscall.SIGTERM, syscall.SIGQUIT)
return sigc
return nil
}
......@@ -7,6 +7,7 @@ import (
"os"
"os/signal"
"runtime/pprof"
"syscall"
logging "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-logging"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
......@@ -124,7 +125,8 @@ func main() {
}
func (i *cmdInvocation) Run() (output io.Reader, err error) {
handleInterrupt()
// setup our global interrupt handler.
i.setupInterruptHandler()
// check if user wants to debug. option OR env var.
debug, _, err := i.req.Option("debug").Bool()
......@@ -281,6 +283,7 @@ func callCommand(req cmds.Request, root *cmds.Command) (cmds.Response, error) {
// this sets up the function that will initialize the node
// this is so that we can construct the node lazily.
ctx := req.Context()
ctx.ConstructNode = func() (*core.IpfsNode, error) {
cfg, err := ctx.GetConfig()
if err != nil {
......@@ -297,6 +300,7 @@ func callCommand(req cmds.Request, root *cmds.Command) (cmds.Response, error) {
// this is gross, and should be changed when we extract out the exec Context.
node := req.Context().NodeWithoutConstructing()
if node != nil {
log.Info("Shutting down node...")
node.Close()
}
}
......@@ -441,14 +445,37 @@ func writeHeapProfileToFile() error {
}
// listen for and handle SIGTERM
func handleInterrupt() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
func (i *cmdInvocation) setupInterruptHandler() {
ctx := i.req.Context()
sig := allInterruptSignals()
go func() {
for _ = range c {
log.Info("Received interrupt signal, terminating...")
for {
// first time, try to shut down.
<-sig
log.Critical("Received interrupt signal, shutting down...")
n, err := ctx.GetNode()
if err == nil {
go n.Close()
select {
case <-n.Closed():
case <-sig:
log.Critical("Received another interrupt signal, terminating...")
}
}
os.Exit(0)
}
}()
}
func allInterruptSignals() chan os.Signal {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT,
syscall.SIGTERM, syscall.SIGQUIT)
return sigc
}
......@@ -11,6 +11,7 @@ import (
config "github.com/jbenet/go-ipfs/config"
core "github.com/jbenet/go-ipfs/core"
ipns "github.com/jbenet/go-ipfs/fuse/ipns"
mount "github.com/jbenet/go-ipfs/fuse/mount"
rofs "github.com/jbenet/go-ipfs/fuse/readonly"
)
......@@ -97,6 +98,16 @@ baz
return nil, err
}
// check if we already have live mounts.
// if the user said "Mount", then there must be something wrong.
// so, close them and try again.
if node.Mounts.Ipfs != nil {
node.Mounts.Ipfs.Unmount()
}
if node.Mounts.Ipns != nil {
node.Mounts.Ipns.Unmount()
}
// error if we aren't running node in online mode
if node.Network == nil {
return nil, errNotOnline
......@@ -113,7 +124,6 @@ baz
if !found {
fsdir = cfg.Mounts.IPFS // use default value
}
fsdone := mountIpfs(node, fsdir)
// get default mount points
nsdir, found, err := req.Option("n").String()
......@@ -124,30 +134,14 @@ baz
nsdir = cfg.Mounts.IPNS // NB: be sure to not redeclare!
}
nsdone := mountIpns(node, nsdir, fsdir)
fmtFuseErr := func(err error) error {
s := err.Error()
if strings.Contains(s, fuseNoDirectory) {
s = strings.Replace(s, `fusermount: "fusermount:`, "", -1)
s = strings.Replace(s, `\n", exit status 1`, "", -1)
return cmds.ClientError(s)
}
return err
if err := doMount(node, fsdir, nsdir); err != nil {
return nil, err
}
// wait until mounts return an error (or timeout if successful)
select {
case err := <-fsdone:
return nil, fmtFuseErr(err)
case err := <-nsdone:
return nil, fmtFuseErr(err)
// mounted successfully, we timed out with no errors
case <-time.After(mountTimeout):
output := cfg.Mounts
return &output, nil
}
var output config.Mounts
output.IPFS = fsdir
output.IPNS = nsdir
return &output, nil
},
Type: &config.Mounts{},
Marshalers: cmds.MarshalerMap{
......@@ -160,33 +154,52 @@ baz
},
}
func mountIpfs(node *core.IpfsNode, fsdir string) <-chan error {
done := make(chan error)
log.Info("Mounting IPFS at ", fsdir)
func doMount(node *core.IpfsNode, fsdir, nsdir string) error {
fmtFuseErr := func(err error) error {
s := err.Error()
if strings.Contains(s, fuseNoDirectory) {
s = strings.Replace(s, `fusermount: "fusermount:`, "", -1)
s = strings.Replace(s, `\n", exit status 1`, "", -1)
return cmds.ClientError(s)
}
return err
}
go func() {
err := rofs.Mount(node, fsdir)
done <- err
close(done)
}()
// this sync stuff is so that both can be mounted simultaneously.
var fsmount mount.Mount
var nsmount mount.Mount
var err1 error
var err2 error
return done
}
done := make(chan struct{})
func mountIpns(node *core.IpfsNode, nsdir, fsdir string) <-chan error {
if nsdir == "" {
return nil
}
done := make(chan error)
log.Info("Mounting IPNS at ", nsdir)
go func() {
fsmount, err1 = rofs.Mount(node, fsdir)
done <- struct{}{}
}()
go func() {
err := ipns.Mount(node, nsdir, fsdir)
done <- err
close(done)
nsmount, err2 = ipns.Mount(node, nsdir, fsdir)
done <- struct{}{}
}()
return done
<-done
<-done
if err1 != nil || err2 != nil {
fsmount.Close()
nsmount.Close()
if err1 != nil {
return fmtFuseErr(err1)
} else {
return fmtFuseErr(err2)
}
}
// setup node state, so that it can be cancelled
node.Mounts.Ipfs = fsmount
node.Mounts.Ipns = nsmount
return nil
}
var platformFuseChecks = func() error {
......
......@@ -27,6 +27,7 @@ import (
routing "github.com/jbenet/go-ipfs/routing"
dht "github.com/jbenet/go-ipfs/routing/dht"
u "github.com/jbenet/go-ipfs/util"
mount "github.com/jbenet/go-ipfs/fuse/mount"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
)
......@@ -74,11 +75,22 @@ type IpfsNode struct {
// the pinning manager
Pinning pin.Pinner
// current mount state, if any.
Mounts Mounts
ctxc.ContextCloser
onlineMode bool // alternatively, offline
}
// Mounts defines what the node's mount state is. This should
// perhaps be moved to the daemon or mount. It's here because
// it needs to be accessible across daemon requests.
type Mounts struct {
Ipfs mount.Mount
Ipns mount.Mount
}
// NewIpfsNode constructs a new IpfsNode based on the given config.
func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) {
success := false // flip to true after all sub-system inits succeed
......
......@@ -2,37 +2,45 @@ package ipns
import (
"fmt"
"os"
"os/exec"
"os/signal"
"runtime"
"syscall"
"time"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs"
"github.com/jbenet/go-ipfs/core"
fuse "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
fs "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs"
core "github.com/jbenet/go-ipfs/core"
mount "github.com/jbenet/go-ipfs/fuse/mount"
)
// Mount mounts an IpfsNode instance at a particular path. It
// serves until the process receives exit signals (to Unmount).
func Mount(ipfs *core.IpfsNode, fpath string, ipfspath string) error {
func Mount(ipfs *core.IpfsNode, fpath string, ipfspath string) (mount.Mount, error) {
log.Infof("Mounting ipns at %s...", fpath)
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT,
syscall.SIGTERM, syscall.SIGQUIT)
// setup the Mount abstraction.
m := mount.New(ipfs.Context(), fpath)
go func() {
defer ipfs.Network.Close()
<-sigc
for {
err := Unmount(fpath)
if err == nil {
return
}
time.Sleep(time.Millisecond * 100)
}
}()
// go serve the mount
m.Mount(func(m mount.Mount) error {
return internalMount(ipfs, fpath, ipfspath)
}, internalUnmount)
select {
case <-m.Closed():
return nil, fmt.Errorf("failed to mount")
case <-time.After(time.Second):
// assume it worked...
}
// bind the mount (ContextCloser) to the node, so that when the node exits
// the fsclosers are automatically closed.
ipfs.AddCloserChild(m)
return m, nil
}
// mount attempts to mount at the provided FUSE mount point
func internalMount(ipfs *core.IpfsNode, fpath string, ipfspath string) error {
c, err := fuse.Mount(fpath)
if err != nil {
......@@ -45,8 +53,8 @@ func Mount(ipfs *core.IpfsNode, fpath string, ipfspath string) error {
return err
}
err = fs.Serve(c, fsys)
if err != nil {
log.Infof("Mounted ipns at %s.", fpath)
if err := fs.Serve(c, fsys); err != nil {
return err
}
......@@ -58,10 +66,11 @@ func Mount(ipfs *core.IpfsNode, fpath string, ipfspath string) error {
return nil
}
// Unmount attempts to unmount the provided FUSE mount point, forcibly
// unmount attempts to unmount the provided FUSE mount point, forcibly
// if necessary.
func Unmount(point string) error {
fmt.Printf("Unmounting %s...\n", point)
func internalUnmount(m mount.Mount) error {
point := m.MountPoint()
log.Infof("Unmounting ipns at %s...", point)
var cmd *exec.Cmd
switch runtime.GOOS {
......
// package mount provides a simple abstraction around a mount point
package mount
import (
"fmt"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
u "github.com/jbenet/go-ipfs/util"
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
)
var log = u.Logger("mount")
// Mount represents a filesystem mount
type Mount interface {
// MountPoint is the path at which this mount is mounted
MountPoint() string
// Mount function sets up a mount + registers the unmount func
Mount(mount MountFunc, unmount UnmountFunc)
// Unmount calls Close.
Unmount() error
ctxc.ContextCloser
}
// UnmountFunc is a function used to Unmount a mount
type UnmountFunc func(Mount) error
// MountFunc is a function used to Mount a mount
type MountFunc func(Mount) error
// New constructs a new Mount instance. ctx is a context to wait upon,
// the mountpoint is the directory that the mount was mounted at, and unmount
// in an UnmountFunc to perform the unmounting logic.
func New(ctx context.Context, mountpoint string) Mount {
m := &mount{mpoint: mountpoint}
m.ContextCloser = ctxc.NewContextCloser(ctx, m.persistentUnmount)
return m
}
type mount struct {
ctxc.ContextCloser
unmount UnmountFunc
mpoint string
}
// umount is called after the mount is closed.
// TODO this is hacky, make it better.
func (m *mount) persistentUnmount() error {
// no unmount func.
if m.unmount == nil {
return nil
}
// ok try to unmount a whole bunch of times...
for i := 0; i < 34; i++ {
err := m.unmount(m)
if err == nil {
return nil
}
time.Sleep(time.Millisecond * 300)
}
// didnt work.
return fmt.Errorf("Unmount %s failed after 10 seconds of trying.")
}
func (m *mount) MountPoint() string {
return m.mpoint
}
func (m *mount) Unmount() error {
return m.Close()
}
func (m *mount) Mount(mount MountFunc, unmount UnmountFunc) {
m.Children().Add(1)
m.unmount = unmount
// go serve the mount
go func() {
if err := mount(m); err != nil {
log.Error("%s mount: %s", m.MountPoint(), err)
}
m.Children().Done()
m.Unmount()
}()
}
......@@ -9,9 +9,7 @@ import (
"io/ioutil"
"os"
"os/exec"
"os/signal"
"runtime"
"syscall"
"time"
fuse "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
......@@ -19,6 +17,7 @@ import (
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
core "github.com/jbenet/go-ipfs/core"
mount "github.com/jbenet/go-ipfs/fuse/mount"
mdag "github.com/jbenet/go-ipfs/merkledag"
uio "github.com/jbenet/go-ipfs/unixfs/io"
ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
......@@ -162,47 +161,59 @@ func (s *Node) ReadAll(intr fs.Intr) ([]byte, fuse.Error) {
// Mount mounts an IpfsNode instance at a particular path. It
// serves until the process receives exit signals (to Unmount).
func Mount(ipfs *core.IpfsNode, fpath string) error {
func Mount(ipfs *core.IpfsNode, fpath string) (mount.Mount, error) {
log.Infof("Mounting ipfs at %s...", fpath)
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT,
syscall.SIGTERM, syscall.SIGQUIT)
// setup the Mount abstraction.
m := mount.New(ipfs.Context(), fpath)
go func() {
defer ipfs.Network.Close()
<-sigc
for {
err := Unmount(fpath)
if err == nil {
return
}
time.Sleep(time.Millisecond * 10)
}
}()
// go serve the mount
m.Mount(func(m mount.Mount) error {
return internalMount(ipfs, m)
}, internalUnmount)
select {
case <-m.Closed():
return nil, fmt.Errorf("failed to mount")
case <-time.After(time.Second):
// assume it worked...
}
// bind the mount (ContextCloser) to the node, so that when the node exits
// the fsclosers are automatically closed.
ipfs.AddCloserChild(m)
return m, nil
}
c, err := fuse.Mount(fpath)
// mount attempts to mount the provided FUSE mount point
func internalMount(ipfs *core.IpfsNode, m mount.Mount) error {
c, err := fuse.Mount(m.MountPoint())
if err != nil {
return err
}
defer c.Close()
err = fs.Serve(c, FileSystem{Ipfs: ipfs})
if err != nil {
fsys := FileSystem{Ipfs: ipfs}
log.Infof("Mounted ipfs at %s.", m.MountPoint())
if err := fs.Serve(c, fsys); err != nil {
return err
}
// check if the mount process has an error to report
<-c.Ready
if err := c.MountError; err != nil {
m.Unmount()
return err
}
return nil
}
// Unmount attempts to unmount the provided FUSE mount point, forcibly
// unmount attempts to unmount the provided FUSE mount point, forcibly
// if necessary.
func Unmount(point string) error {
log.Info("Unmounting %s...", point)
func internalUnmount(m mount.Mount) error {
point := m.MountPoint()
log.Infof("Unmounting ipfs at %s...", point)
var cmd *exec.Cmd
switch runtime.GOOS {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment