diff --git a/cmd/ipfs/publish.go b/cmd/ipfs/publish.go new file mode 100644 index 0000000000000000000000000000000000000000..649767614da2db347deeafd14b78d0fa8186fd85 --- /dev/null +++ b/cmd/ipfs/publish.go @@ -0,0 +1,59 @@ +package main + +import ( + "os" + + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/gonuts/flag" + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/commander" + "github.com/jbenet/go-ipfs/core/commands" + "github.com/jbenet/go-ipfs/daemon" + u "github.com/jbenet/go-ipfs/util" +) + +var cmdIpfsPub = &commander.Command{ + UsageLine: "publish", + Short: "Publish an object to ipns under your key.", + Long: `ipfs publish <path> - Publish object to ipns. + +`, + Run: pubCmd, + Flag: *flag.NewFlagSet("ipfs-publish", flag.ExitOnError), +} + +func init() { + cmdIpfsPub.Flag.String("k", "", "Specify key to use for publishing.") +} + +func pubCmd(c *commander.Command, inp []string) error { + u.Debug = true + if len(inp) < 1 { + u.POut(c.Long) + return nil + } + + conf, err := getConfigDir(c.Parent) + if err != nil { + return err + } + + cmd := daemon.NewCommand() + cmd.Command = "publish" + cmd.Args = inp + cmd.Opts["k"] = c.Flag.Lookup("k").Value.Get() + err = daemon.SendCommand(cmd, conf) + if err != nil { + u.DOut("Executing command locally.\n") + // Do locally + conf, err := getConfigDir(c.Parent) + if err != nil { + return err + } + n, err := localNode(conf, true) + if err != nil { + return err + } + + return commands.Publish(n, cmd.Args, cmd.Opts, os.Stdout) + } + return nil +} diff --git a/core/commands/publish.go b/core/commands/publish.go new file mode 100644 index 0000000000000000000000000000000000000000..8fae4235d95e49e1f06ccf031aa12833832f87f7 --- /dev/null +++ b/core/commands/publish.go @@ -0,0 +1,39 @@ +package commands + +import ( + "errors" + "fmt" + "io" + + "github.com/jbenet/go-ipfs/core" + u "github.com/jbenet/go-ipfs/util" + "github.com/op/go-logging" + + nsys "github.com/jbenet/go-ipfs/namesys" +) + +var log = logging.MustGetLogger("commands") + +func Publish(n *core.IpfsNode, args []string, opts map[string]interface{}, out io.Writer) error { + log.Debug("Begin Publish") + if n.Identity == nil { + return errors.New("Identity not loaded!") + } + + k := n.Identity.PrivKey + val := u.Key(args[0]) + + pub := nsys.NewPublisher(n.DAG, n.Routing) + err := pub.Publish(k, val) + if err != nil { + return err + } + + hash, err := k.GetPublic().Hash() + if err != nil { + return err + } + fmt.Fprintf(out, "Published %s to %s\n", val, u.Key(hash).Pretty()) + + return nil +} diff --git a/fuse/ipns/ipns_unix.go b/fuse/ipns/ipns_unix.go new file mode 100644 index 0000000000000000000000000000000000000000..2713538aef91ad843ff50c8ce039d5211c7bd8d3 --- /dev/null +++ b/fuse/ipns/ipns_unix.go @@ -0,0 +1,316 @@ +package ipns + +import ( + "fmt" + "io/ioutil" + "os" + "os/exec" + "os/signal" + "path/filepath" + "runtime" + "syscall" + "time" + + "bazil.org/fuse" + "bazil.org/fuse/fs" + "code.google.com/p/goprotobuf/proto" + "github.com/jbenet/go-ipfs/core" + ci "github.com/jbenet/go-ipfs/crypto" + mdag "github.com/jbenet/go-ipfs/merkledag" + u "github.com/jbenet/go-ipfs/util" + "github.com/op/go-logging" +) + +var log = logging.MustGetLogger("ipns") + +// FileSystem is the readonly Ipfs Fuse Filesystem. +type FileSystem struct { + Ipfs *core.IpfsNode + RootNode *Root +} + +// NewFileSystem constructs new fs using given core.IpfsNode instance. +func NewIpns(ipfs *core.IpfsNode, ipfspath string) (*FileSystem, error) { + root, err := CreateRoot(ipfs, []ci.PrivKey{ipfs.Identity.PrivKey}, ipfspath) + if err != nil { + return nil, err + } + return &FileSystem{Ipfs: ipfs, RootNode: root}, nil +} + +func CreateRoot(n *core.IpfsNode, keys []ci.PrivKey, ipfsroot string) (*Root, error) { + root := new(Root) + root.LocalDirs = make(map[string]*Node) + root.Ipfs = n + abspath, err := filepath.Abs(ipfsroot) + if err != nil { + return nil, err + } + root.IpfsRoot = abspath + + root.Keys = keys + + if len(keys) == 0 { + log.Warning("No keys given for ipns root creation") + } else { + k := keys[0] + pub := k.GetPublic() + hash, err := pub.Hash() + if err != nil { + log.Error("Read Root Error: %s", err) + return nil, err + } + root.LocalLink = &Link{u.Key(hash).Pretty()} + } + + return root, nil +} + +// Root constructs the Root of the filesystem, a Root object. +func (f FileSystem) Root() (fs.Node, fuse.Error) { + return f.RootNode, nil +} + +// Root is the root object of the filesystem tree. +type Root struct { + Ipfs *core.IpfsNode + Keys []ci.PrivKey + + // Used for symlinking into ipfs + IpfsRoot string + LocalDirs map[string]*Node + + LocalLink *Link +} + +// Attr returns file attributes. +func (*Root) Attr() fuse.Attr { + return fuse.Attr{Mode: os.ModeDir | 0111} // -rw+x +} + +// Lookup performs a lookup under this node. +func (s *Root) Lookup(name string, intr fs.Intr) (fs.Node, fuse.Error) { + log.Debug("ipns: Root Lookup: '%s'", name) + switch name { + case "mach_kernel", ".hidden", "._.": + // Just quiet some log noise on OS X. + return nil, fuse.ENOENT + } + + if name == "local" { + if s.LocalLink == nil { + return nil, fuse.ENOENT + } + return s.LocalLink, nil + } + + nd, ok := s.LocalDirs[name] + if ok { + return nd, nil + } + + log.Debug("ipns: Falling back to resolution.") + resolved, err := s.Ipfs.Namesys.Resolve(name) + if err != nil { + log.Error("ipns: namesys resolve error: %s", err) + return nil, fuse.ENOENT + } + + return &Link{s.IpfsRoot + "/" + resolved}, nil +} + +// ReadDir reads a particular directory. Disallowed for root. +func (r *Root) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) { + u.DOut("Read Root.\n") + listing := []fuse.Dirent{ + fuse.Dirent{ + Name: "local", + Type: fuse.DT_Link, + }, + } + for _, k := range r.Keys { + pub := k.GetPublic() + hash, err := pub.Hash() + if err != nil { + log.Error("Read Root Error: %s", err) + continue + } + ent := fuse.Dirent{ + Name: u.Key(hash).Pretty(), + Type: fuse.DT_Dir, + } + listing = append(listing, ent) + } + return listing, nil +} + +// Node is the core object representing a filesystem tree node. +type Node struct { + nsRoot *Node + Ipfs *core.IpfsNode + Nd *mdag.Node + fd *mdag.DagReader + cached *mdag.PBData +} + +func (s *Node) loadData() error { + s.cached = new(mdag.PBData) + return proto.Unmarshal(s.Nd.Data, s.cached) +} + +// Attr returns the attributes of a given node. +func (s *Node) Attr() fuse.Attr { + u.DOut("Node attr.\n") + if s.cached == nil { + s.loadData() + } + switch s.cached.GetType() { + case mdag.PBData_Directory: + u.DOut("this is a directory.\n") + return fuse.Attr{Mode: os.ModeDir | 0555} + case mdag.PBData_File, mdag.PBData_Raw: + u.DOut("this is a file.\n") + size, _ := s.Nd.Size() + return fuse.Attr{ + Mode: 0444, + Size: uint64(size), + Blocks: uint64(len(s.Nd.Links)), + } + default: + u.PErr("Invalid data type.") + return fuse.Attr{} + } +} + +// Lookup performs a lookup under this node. +func (s *Node) Lookup(name string, intr fs.Intr) (fs.Node, fuse.Error) { + u.DOut("Lookup '%s'\n", name) + nd, err := s.Ipfs.Resolver.ResolveLinks(s.Nd, []string{name}) + if err != nil { + // todo: make this error more versatile. + return nil, fuse.ENOENT + } + + return &Node{Ipfs: s.Ipfs, Nd: nd}, nil +} + +// ReadDir reads the link structure as directory entries +func (s *Node) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) { + u.DOut("Node ReadDir\n") + entries := make([]fuse.Dirent, len(s.Nd.Links)) + for i, link := range s.Nd.Links { + n := link.Name + if len(n) == 0 { + n = link.Hash.B58String() + } + entries[i] = fuse.Dirent{Name: n, Type: fuse.DT_File} + } + + if len(entries) > 0 { + return entries, nil + } + return nil, fuse.ENOENT +} + +// ReadAll reads the object data as file data +func (s *Node) ReadAll(intr fs.Intr) ([]byte, fuse.Error) { + log.Debug("ipns: ReadAll Node") + r, err := mdag.NewDagReader(s.Nd, s.Ipfs.DAG) + if err != nil { + return nil, err + } + // this is a terrible function... 'ReadAll'? + // what if i have a 6TB file? GG RAM. + return ioutil.ReadAll(r) +} + +// Mount mounts an IpfsNode instance at a particular path. It +// serves until the process receives exit signals (to Unmount). +func Mount(ipfs *core.IpfsNode, fpath string, ipfspath string) error { + + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, + syscall.SIGTERM, syscall.SIGQUIT) + + go func() { + <-sigc + for { + err := Unmount(fpath) + if err == nil { + return + } + time.Sleep(time.Millisecond * 10) + } + ipfs.Network.Close() + }() + + c, err := fuse.Mount(fpath) + if err != nil { + return err + } + defer c.Close() + + fsys, err := NewIpns(ipfs, ipfspath) + if err != nil { + return err + } + + err = fs.Serve(c, fsys) + if err != nil { + return err + } + + // check if the mount process has an error to report + <-c.Ready + if err := c.MountError; err != nil { + return err + } + return nil +} + +// Unmount attempts to unmount the provided FUSE mount point, forcibly +// if necessary. +func Unmount(point string) error { + fmt.Printf("Unmounting %s...\n", point) + + var cmd *exec.Cmd + switch runtime.GOOS { + case "darwin": + cmd = exec.Command("diskutil", "umount", "force", point) + case "linux": + cmd = exec.Command("fusermount", "-u", point) + default: + return fmt.Errorf("unmount: unimplemented") + } + + errc := make(chan error, 1) + go func() { + if err := exec.Command("umount", point).Run(); err == nil { + errc <- err + } + // retry to unmount with the fallback cmd + errc <- cmd.Run() + }() + + select { + case <-time.After(1 * time.Second): + return fmt.Errorf("umount timeout") + case err := <-errc: + return err + } +} + +type Link struct { + Target string +} + +func (l *Link) Attr() fuse.Attr { + log.Debug("Link attr.") + return fuse.Attr{ + Mode: os.ModeSymlink | 0555, + } +} + +func (l *Link) Readlink(req *fuse.ReadlinkRequest, intr fs.Intr) (string, fuse.Error) { + return l.Target, nil +} diff --git a/routing/dht/query.go b/routing/dht/query.go index cc709d7e9b1880953843de2b0a2a308b8689451e..a62646f05b23f40218c02ee71236678125ac9bcb 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -106,6 +106,7 @@ func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) { log.Warning("Running query with no peers!") return nil, nil } + // setup concurrency rate limiting for i := 0; i < r.query.concurrency; i++ { r.rateLimit <- struct{}{} diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 6a2a0cdcb6bfcb5a264b681c253e09486ae5db05..a62f4c01beb8eab6c66086dfea114333b87ef0e4 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -47,14 +47,13 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error // If the search does not succeed, a multiaddr string of a closer peer is // returned along with util.ErrSearchIncomplete func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { - ll := startNewRPC("GET") - defer ll.EndAndPrint() + log.Debug("Get Value [%s]", key.Pretty()) // If we have it local, dont bother doing an RPC! // NOTE: this might not be what we want to do... val, err := dht.getLocal(key) if err == nil { - ll.Success = true + log.Debug("Got value locally!") return val, nil }