Commit e61c5975 authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet

implement initial ipns filesystem interface as well as plumbing command for publishing

parent 375a38c5
package main
import (
"os"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/gonuts/flag"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/commander"
"github.com/jbenet/go-ipfs/core/commands"
"github.com/jbenet/go-ipfs/daemon"
u "github.com/jbenet/go-ipfs/util"
)
var cmdIpfsPub = &commander.Command{
UsageLine: "publish",
Short: "Publish an object to ipns under your key.",
Long: `ipfs publish <path> - Publish object to ipns.
`,
Run: pubCmd,
Flag: *flag.NewFlagSet("ipfs-publish", flag.ExitOnError),
}
func init() {
cmdIpfsPub.Flag.String("k", "", "Specify key to use for publishing.")
}
func pubCmd(c *commander.Command, inp []string) error {
u.Debug = true
if len(inp) < 1 {
u.POut(c.Long)
return nil
}
conf, err := getConfigDir(c.Parent)
if err != nil {
return err
}
cmd := daemon.NewCommand()
cmd.Command = "publish"
cmd.Args = inp
cmd.Opts["k"] = c.Flag.Lookup("k").Value.Get()
err = daemon.SendCommand(cmd, conf)
if err != nil {
u.DOut("Executing command locally.\n")
// Do locally
conf, err := getConfigDir(c.Parent)
if err != nil {
return err
}
n, err := localNode(conf, true)
if err != nil {
return err
}
return commands.Publish(n, cmd.Args, cmd.Opts, os.Stdout)
}
return nil
}
package commands
import (
"errors"
"fmt"
"io"
"github.com/jbenet/go-ipfs/core"
u "github.com/jbenet/go-ipfs/util"
"github.com/op/go-logging"
nsys "github.com/jbenet/go-ipfs/namesys"
)
var log = logging.MustGetLogger("commands")
func Publish(n *core.IpfsNode, args []string, opts map[string]interface{}, out io.Writer) error {
log.Debug("Begin Publish")
if n.Identity == nil {
return errors.New("Identity not loaded!")
}
k := n.Identity.PrivKey
val := u.Key(args[0])
pub := nsys.NewPublisher(n.DAG, n.Routing)
err := pub.Publish(k, val)
if err != nil {
return err
}
hash, err := k.GetPublic().Hash()
if err != nil {
return err
}
fmt.Fprintf(out, "Published %s to %s\n", val, u.Key(hash).Pretty())
return nil
}
package ipns
import (
"fmt"
"io/ioutil"
"os"
"os/exec"
"os/signal"
"path/filepath"
"runtime"
"syscall"
"time"
"bazil.org/fuse"
"bazil.org/fuse/fs"
"code.google.com/p/goprotobuf/proto"
"github.com/jbenet/go-ipfs/core"
ci "github.com/jbenet/go-ipfs/crypto"
mdag "github.com/jbenet/go-ipfs/merkledag"
u "github.com/jbenet/go-ipfs/util"
"github.com/op/go-logging"
)
var log = logging.MustGetLogger("ipns")
// FileSystem is the readonly Ipfs Fuse Filesystem.
type FileSystem struct {
Ipfs *core.IpfsNode
RootNode *Root
}
// NewFileSystem constructs new fs using given core.IpfsNode instance.
func NewIpns(ipfs *core.IpfsNode, ipfspath string) (*FileSystem, error) {
root, err := CreateRoot(ipfs, []ci.PrivKey{ipfs.Identity.PrivKey}, ipfspath)
if err != nil {
return nil, err
}
return &FileSystem{Ipfs: ipfs, RootNode: root}, nil
}
func CreateRoot(n *core.IpfsNode, keys []ci.PrivKey, ipfsroot string) (*Root, error) {
root := new(Root)
root.LocalDirs = make(map[string]*Node)
root.Ipfs = n
abspath, err := filepath.Abs(ipfsroot)
if err != nil {
return nil, err
}
root.IpfsRoot = abspath
root.Keys = keys
if len(keys) == 0 {
log.Warning("No keys given for ipns root creation")
} else {
k := keys[0]
pub := k.GetPublic()
hash, err := pub.Hash()
if err != nil {
log.Error("Read Root Error: %s", err)
return nil, err
}
root.LocalLink = &Link{u.Key(hash).Pretty()}
}
return root, nil
}
// Root constructs the Root of the filesystem, a Root object.
func (f FileSystem) Root() (fs.Node, fuse.Error) {
return f.RootNode, nil
}
// Root is the root object of the filesystem tree.
type Root struct {
Ipfs *core.IpfsNode
Keys []ci.PrivKey
// Used for symlinking into ipfs
IpfsRoot string
LocalDirs map[string]*Node
LocalLink *Link
}
// Attr returns file attributes.
func (*Root) Attr() fuse.Attr {
return fuse.Attr{Mode: os.ModeDir | 0111} // -rw+x
}
// Lookup performs a lookup under this node.
func (s *Root) Lookup(name string, intr fs.Intr) (fs.Node, fuse.Error) {
log.Debug("ipns: Root Lookup: '%s'", name)
switch name {
case "mach_kernel", ".hidden", "._.":
// Just quiet some log noise on OS X.
return nil, fuse.ENOENT
}
if name == "local" {
if s.LocalLink == nil {
return nil, fuse.ENOENT
}
return s.LocalLink, nil
}
nd, ok := s.LocalDirs[name]
if ok {
return nd, nil
}
log.Debug("ipns: Falling back to resolution.")
resolved, err := s.Ipfs.Namesys.Resolve(name)
if err != nil {
log.Error("ipns: namesys resolve error: %s", err)
return nil, fuse.ENOENT
}
return &Link{s.IpfsRoot + "/" + resolved}, nil
}
// ReadDir reads a particular directory. Disallowed for root.
func (r *Root) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) {
u.DOut("Read Root.\n")
listing := []fuse.Dirent{
fuse.Dirent{
Name: "local",
Type: fuse.DT_Link,
},
}
for _, k := range r.Keys {
pub := k.GetPublic()
hash, err := pub.Hash()
if err != nil {
log.Error("Read Root Error: %s", err)
continue
}
ent := fuse.Dirent{
Name: u.Key(hash).Pretty(),
Type: fuse.DT_Dir,
}
listing = append(listing, ent)
}
return listing, nil
}
// Node is the core object representing a filesystem tree node.
type Node struct {
nsRoot *Node
Ipfs *core.IpfsNode
Nd *mdag.Node
fd *mdag.DagReader
cached *mdag.PBData
}
func (s *Node) loadData() error {
s.cached = new(mdag.PBData)
return proto.Unmarshal(s.Nd.Data, s.cached)
}
// Attr returns the attributes of a given node.
func (s *Node) Attr() fuse.Attr {
u.DOut("Node attr.\n")
if s.cached == nil {
s.loadData()
}
switch s.cached.GetType() {
case mdag.PBData_Directory:
u.DOut("this is a directory.\n")
return fuse.Attr{Mode: os.ModeDir | 0555}
case mdag.PBData_File, mdag.PBData_Raw:
u.DOut("this is a file.\n")
size, _ := s.Nd.Size()
return fuse.Attr{
Mode: 0444,
Size: uint64(size),
Blocks: uint64(len(s.Nd.Links)),
}
default:
u.PErr("Invalid data type.")
return fuse.Attr{}
}
}
// Lookup performs a lookup under this node.
func (s *Node) Lookup(name string, intr fs.Intr) (fs.Node, fuse.Error) {
u.DOut("Lookup '%s'\n", name)
nd, err := s.Ipfs.Resolver.ResolveLinks(s.Nd, []string{name})
if err != nil {
// todo: make this error more versatile.
return nil, fuse.ENOENT
}
return &Node{Ipfs: s.Ipfs, Nd: nd}, nil
}
// ReadDir reads the link structure as directory entries
func (s *Node) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) {
u.DOut("Node ReadDir\n")
entries := make([]fuse.Dirent, len(s.Nd.Links))
for i, link := range s.Nd.Links {
n := link.Name
if len(n) == 0 {
n = link.Hash.B58String()
}
entries[i] = fuse.Dirent{Name: n, Type: fuse.DT_File}
}
if len(entries) > 0 {
return entries, nil
}
return nil, fuse.ENOENT
}
// ReadAll reads the object data as file data
func (s *Node) ReadAll(intr fs.Intr) ([]byte, fuse.Error) {
log.Debug("ipns: ReadAll Node")
r, err := mdag.NewDagReader(s.Nd, s.Ipfs.DAG)
if err != nil {
return nil, err
}
// this is a terrible function... 'ReadAll'?
// what if i have a 6TB file? GG RAM.
return ioutil.ReadAll(r)
}
// Mount mounts an IpfsNode instance at a particular path. It
// serves until the process receives exit signals (to Unmount).
func Mount(ipfs *core.IpfsNode, fpath string, ipfspath string) error {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT,
syscall.SIGTERM, syscall.SIGQUIT)
go func() {
<-sigc
for {
err := Unmount(fpath)
if err == nil {
return
}
time.Sleep(time.Millisecond * 10)
}
ipfs.Network.Close()
}()
c, err := fuse.Mount(fpath)
if err != nil {
return err
}
defer c.Close()
fsys, err := NewIpns(ipfs, ipfspath)
if err != nil {
return err
}
err = fs.Serve(c, fsys)
if err != nil {
return err
}
// check if the mount process has an error to report
<-c.Ready
if err := c.MountError; err != nil {
return err
}
return nil
}
// Unmount attempts to unmount the provided FUSE mount point, forcibly
// if necessary.
func Unmount(point string) error {
fmt.Printf("Unmounting %s...\n", point)
var cmd *exec.Cmd
switch runtime.GOOS {
case "darwin":
cmd = exec.Command("diskutil", "umount", "force", point)
case "linux":
cmd = exec.Command("fusermount", "-u", point)
default:
return fmt.Errorf("unmount: unimplemented")
}
errc := make(chan error, 1)
go func() {
if err := exec.Command("umount", point).Run(); err == nil {
errc <- err
}
// retry to unmount with the fallback cmd
errc <- cmd.Run()
}()
select {
case <-time.After(1 * time.Second):
return fmt.Errorf("umount timeout")
case err := <-errc:
return err
}
}
type Link struct {
Target string
}
func (l *Link) Attr() fuse.Attr {
log.Debug("Link attr.")
return fuse.Attr{
Mode: os.ModeSymlink | 0555,
}
}
func (l *Link) Readlink(req *fuse.ReadlinkRequest, intr fs.Intr) (string, fuse.Error) {
return l.Target, nil
}
......@@ -106,6 +106,7 @@ func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) {
log.Warning("Running query with no peers!")
return nil, nil
}
// setup concurrency rate limiting
for i := 0; i < r.query.concurrency; i++ {
r.rateLimit <- struct{}{}
......
......@@ -47,14 +47,13 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
ll := startNewRPC("GET")
defer ll.EndAndPrint()
log.Debug("Get Value [%s]", key.Pretty())
// If we have it local, dont bother doing an RPC!
// NOTE: this might not be what we want to do...
val, err := dht.getLocal(key)
if err == nil {
ll.Success = true
log.Debug("Got value locally!")
return val, nil
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment