Commit d67e1e50 authored by Matt Bell's avatar Matt Bell Committed by Juan Batiz-Benet

daemon: Replaced daemon package with lock functions

parent 7efa174f
package daemon
import (
"encoding/json"
"fmt"
"io"
"os"
"path"
"sync"
core "github.com/jbenet/go-ipfs/core"
"github.com/jbenet/go-ipfs/core/commands"
u "github.com/jbenet/go-ipfs/util"
lock "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/camlistore/lock"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net"
)
var log = u.Logger("daemon")
// LockFile is the filename of the daemon lock, relative to config dir
const LockFile = "daemon.lock"
// DaemonListener listens to an initialized IPFS node and can send it commands instead of
// starting up a new set of connections
type DaemonListener struct {
node *core.IpfsNode
list manet.Listener
closed bool
wg sync.WaitGroup
lk io.Closer
}
//Command accepts user input and can be sent to the running IPFS node
type Command struct {
Command string
Args []string
Opts map[string]interface{}
}
func NewDaemonListener(ipfsnode *core.IpfsNode, addr ma.Multiaddr, confdir string) (*DaemonListener, error) {
var err error
confdir, err = u.TildeExpansion(confdir)
if err != nil {
return nil, err
}
lk, err := daemonLock(confdir)
if err != nil {
return nil, err
}
ofi, err := os.Create(confdir + "/rpcaddress")
if err != nil {
log.Warningf("Could not create rpcaddress file: %s", err)
return nil, err
}
_, err = ofi.Write([]byte(addr.String()))
if err != nil {
log.Warningf("Could not write to rpcaddress file: %s", err)
return nil, err
}
ofi.Close()
list, err := manet.Listen(addr)
if err != nil {
return nil, err
}
log.Info("New daemon listener initialized.")
return &DaemonListener{
node: ipfsnode,
list: list,
lk: lk,
}, nil
}
func NewCommand() *Command {
return &Command{
Opts: make(map[string]interface{}),
}
}
func (dl *DaemonListener) Listen() {
if dl.closed {
panic("attempting to listen on a closed daemon Listener")
}
// add ourselves to workgroup. and remove ourselves when done.
dl.wg.Add(1)
defer dl.wg.Done()
log.Info("daemon listening")
for {
conn, err := dl.list.Accept()
if err != nil {
if !dl.closed {
log.Warning("DaemonListener Accept: %v", err)
}
return
}
go dl.handleConnection(conn)
}
func Lock(confdir string) (io.Closer, error) {
return lock.Lock(path.Join(confdir, LockFile))
}
func (dl *DaemonListener) handleConnection(conn manet.Conn) {
defer conn.Close()
dec := json.NewDecoder(conn)
func Locked(confdir string) bool {
if lk, err := Lock(confdir); err != nil {
return true
var command Command
err := dec.Decode(&command)
if err != nil {
fmt.Fprintln(conn, err)
return
} else {
lk.Close()
return false
}
log.Debug("Got command: %v", command)
switch command.Command {
case "add":
err = commands.Add(dl.node, command.Args, command.Opts, conn)
case "cat":
err = commands.Cat(dl.node, command.Args, command.Opts, conn)
case "ls":
err = commands.Ls(dl.node, command.Args, command.Opts, conn)
case "pin":
err = commands.Pin(dl.node, command.Args, command.Opts, conn)
case "publish":
err = commands.Publish(dl.node, command.Args, command.Opts, conn)
case "resolve":
err = commands.Resolve(dl.node, command.Args, command.Opts, conn)
case "diag":
err = commands.Diag(dl.node, command.Args, command.Opts, conn)
case "blockGet":
err = commands.BlockGet(dl.node, command.Args, command.Opts, conn)
case "blockPut":
err = commands.BlockPut(dl.node, command.Args, command.Opts, conn)
case "log":
err = commands.Log(dl.node, command.Args, command.Opts, conn)
case "unpin":
err = commands.Unpin(dl.node, command.Args, command.Opts, conn)
case "updateApply":
command.Opts["onDaemon"] = true
err = commands.UpdateApply(dl.node, command.Args, command.Opts, conn)
default:
err = fmt.Errorf("Invalid Command: '%s'", command.Command)
}
if err != nil {
log.Errorf("%s: %s", command.Command, err)
fmt.Fprintln(conn, err)
}
}
func (dl *DaemonListener) Close() error {
dl.closed = true
err := dl.list.Close()
dl.wg.Wait() // wait till done before releasing lock.
dl.lk.Close()
return err
}
func daemonLock(confdir string) (io.Closer, error) {
return lock.Lock(path.Join(confdir, LockFile))
}
package daemon
import (
"bufio"
"encoding/json"
"errors"
"io"
"os"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr/net"
u "github.com/jbenet/go-ipfs/util"
)
// ErrDaemonNotRunning is returned when attempting to retrieve the daemon's
// address and the daemon is not actually running.
var ErrDaemonNotRunning = errors.New("daemon not running")
func getDaemonAddr(confdir string) (string, error) {
var err error
confdir, err = u.TildeExpansion(confdir)
if err != nil {
return "", err
}
fi, err := os.Open(confdir + "/rpcaddress")
if err != nil {
log.Debug("getDaemonAddr failed: %s", err)
if err == os.ErrNotExist {
return "", ErrDaemonNotRunning
}
return "", err
}
read := bufio.NewReader(fi)
// TODO: operating system agostic line delim
line, err := read.ReadBytes('\n')
if err != nil && err != io.EOF {
return "", err
}
return string(line), nil
}
// SendCommand attempts to run the command over a currently-running daemon.
// If there is no running daemon, returns ErrDaemonNotRunning. This is done
// over network RPC API. The address of the daemon is retrieved from the config
// directory, where live daemons write their addresses to special files.
func SendCommand(command *Command, confdir string) error {
server := os.Getenv("IPFS_ADDRESS_RPC")
if server == "" {
//check if daemon is running
log.Info("Checking if daemon is running...")
if !serverIsRunning(confdir) {
return ErrDaemonNotRunning
}
log.Info("Daemon is running!")
var err error
server, err = getDaemonAddr(confdir)
if err != nil {
return err
}
}
return serverComm(server, command)
}
func serverIsRunning(confdir string) bool {
var err error
confdir, err = u.TildeExpansion(confdir)
if err != nil {
log.Errorf("Tilde Expansion Failed: %s", err)
return false
}
lk, err := daemonLock(confdir)
if err == nil {
lk.Close()
return false
}
return true
}
func serverComm(server string, command *Command) error {
log.Info("Daemon address: %s", server)
maddr, err := ma.NewMultiaddr(server)
if err != nil {
return err
}
conn, err := manet.Dial(maddr)
if err != nil {
return err
}
enc := json.NewEncoder(conn)
err = enc.Encode(command)
if err != nil {
return err
}
io.Copy(os.Stdout, conn)
return nil
}
package daemon
import (
"encoding/base64"
"os"
"testing"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
config "github.com/jbenet/go-ipfs/config"
core "github.com/jbenet/go-ipfs/core"
ci "github.com/jbenet/go-ipfs/crypto"
peer "github.com/jbenet/go-ipfs/peer"
)
func TestInitializeDaemonListener(t *testing.T) {
priv, pub, err := ci.GenerateKeyPair(ci.RSA, 512)
if err != nil {
t.Fatal(err)
}
prbytes, err := priv.Bytes()
if err != nil {
t.Fatal(err)
}
ident, _ := peer.IDFromPubKey(pub)
privKey := base64.StdEncoding.EncodeToString(prbytes)
pID := ident.Pretty()
id := config.Identity{
PeerID: pID,
PrivKey: privKey,
}
nodeConfigs := []*config.Config{
&config.Config{
Identity: id,
Datastore: config.Datastore{
Type: "memory",
},
Addresses: config.Addresses{
Swarm: "/ip4/0.0.0.0/tcp/4001",
API: "/ip4/127.0.0.1/tcp/8000",
},
},
&config.Config{
Identity: id,
Datastore: config.Datastore{
Type: "leveldb",
Path: ".test/datastore",
},
Addresses: config.Addresses{
Swarm: "/ip4/0.0.0.0/tcp/4001",
API: "/ip4/127.0.0.1/tcp/8000",
},
},
}
var tempConfigDir = ".test"
err = os.MkdirAll(tempConfigDir, os.ModeDir|0777)
if err != nil {
t.Fatalf("error making temp config dir: %v", err)
}
for _, c := range nodeConfigs {
node, _ := core.NewIpfsNode(c, false)
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1327")
if err != nil {
t.Fatal(err)
}
dl, initErr := NewDaemonListener(node, addr, tempConfigDir)
if initErr != nil {
t.Fatal(initErr)
}
closeErr := dl.Close()
if closeErr != nil {
t.Fatal(closeErr)
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment