Commit c3ea164f authored by Matt Bell's avatar Matt Bell

core/commands: Added progress bars for 'add'

squash! core/commands: Added progress bars for 'add'

Use vendored progress bar lib in 'add'
parent c73c4ae5
......@@ -5,7 +5,9 @@ import (
"errors"
"fmt"
"io"
"os"
"path"
"strings"
cmds "github.com/jbenet/go-ipfs/commands"
files "github.com/jbenet/go-ipfs/commands/files"
......@@ -16,14 +18,22 @@ import (
pinning "github.com/jbenet/go-ipfs/pin"
ft "github.com/jbenet/go-ipfs/unixfs"
u "github.com/jbenet/go-ipfs/util"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb"
)
// Error indicating the max depth has been exceded.
var ErrDepthLimitExceeded = fmt.Errorf("depth limit exceeded")
// how many bytes of progress to wait before sending a progress update message
const progressReaderIncrement = 1024 * 256
const progressOptionName = "progress"
type AddedObject struct {
Name string
Hash string
Name string
Hash string `json:",omitempty"`
Bytes int64 `json:",omitempty"`
}
var AddCmd = &cmds.Command{
......@@ -43,6 +53,26 @@ remains to be implemented.
Options: []cmds.Option{
cmds.OptionRecursivePath, // a builtin option that allows recursive paths (-r, --recursive)
cmds.BoolOption("quiet", "q", "Write minimal output"),
cmds.BoolOption(progressOptionName, "p", "Stream progress data"),
},
PreRun: func(req cmds.Request) error {
req.SetOption(progressOptionName, true)
sizeFile, ok := req.Files().(files.SizeFile)
if !ok {
// we don't need to error, the progress bar just won't know how big the files are
return nil
}
size, err := sizeFile.Size()
if err != nil {
// see comment above
return nil
}
log.Debugf("Total size of file being added: %v\n", size)
req.Values()["size"] = size
return nil
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.Context().GetNode()
......@@ -51,6 +81,8 @@ remains to be implemented.
return
}
progress, _, _ := req.Option(progressOptionName).Bool()
outChan := make(chan interface{})
res.SetOutput((<-chan interface{})(outChan))
......@@ -63,13 +95,87 @@ remains to be implemented.
return
}
_, err = addFile(n, file, outChan)
_, err = addFile(n, file, outChan, progress)
if err != nil {
return
}
}
}()
},
PostRun: func(res cmds.Response) {
outChan, ok := res.Output().(<-chan interface{})
if !ok {
res.SetError(u.ErrCast(), cmds.ErrNormal)
return
}
wrapperChan := make(chan interface{})
res.SetOutput((<-chan interface{})(wrapperChan))
size := int64(0)
s, found := res.Request().Values()["size"]
if found {
size = s.(int64)
}
showProgressBar := size >= progressBarMinSize
var bar *pb.ProgressBar
var terminalWidth int
if showProgressBar {
bar = pb.New64(size).SetUnits(pb.U_BYTES)
bar.ManualUpdate = true
bar.Start()
// the progress bar lib doesn't give us a way to get the width of the output,
// so as a hack we just use a callback to measure the output, then git rid of it
terminalWidth = 0
bar.Callback = func(line string) {
terminalWidth = len(line)
bar.Callback = nil
bar.Output = os.Stderr
log.Infof("terminal width: %v\n", terminalWidth)
}
bar.Update()
}
go func() {
lastFile := ""
var totalProgress, prevFiles, lastBytes int64
for out := range outChan {
output := out.(*AddedObject)
if len(output.Hash) > 0 {
if showProgressBar {
// clear progress bar line before we print "added x" output
fmt.Fprintf(os.Stderr, "\r%s\r", strings.Repeat(" ", terminalWidth))
}
wrapperChan <- output
} else {
log.Debugf("add progress: %v %v\n", output.Name, output.Bytes)
if !showProgressBar {
continue
}
if len(lastFile) == 0 {
lastFile = output.Name
}
if output.Name != lastFile || output.Bytes < lastBytes {
prevFiles += lastBytes
lastFile = output.Name
}
lastBytes = output.Bytes
delta := prevFiles + lastBytes - totalProgress
totalProgress = bar.Add64(delta)
bar.Update()
}
}
close(wrapperChan)
}()
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
outChan, ok := res.Output().(<-chan interface{})
......@@ -144,12 +250,19 @@ func addNode(n *core.IpfsNode, node *dag.Node) error {
return nil
}
func addFile(n *core.IpfsNode, file files.File, out chan interface{}) (*dag.Node, error) {
func addFile(n *core.IpfsNode, file files.File, out chan interface{}, progress bool) (*dag.Node, error) {
if file.IsDirectory() {
return addDir(n, file, out)
return addDir(n, file, out, progress)
}
// if the progress flag was specified, wrap the file so that we can send
// progress updates to the client (over the output channel)
var reader io.Reader = file
if progress {
reader = &progressReader{file: file, out: out}
}
dns, err := add(n, []io.Reader{file})
dns, err := add(n, []io.Reader{reader})
if err != nil {
return nil, err
}
......@@ -161,7 +274,7 @@ func addFile(n *core.IpfsNode, file files.File, out chan interface{}) (*dag.Node
return dns[len(dns)-1], nil // last dag node is the file.
}
func addDir(n *core.IpfsNode, dir files.File, out chan interface{}) (*dag.Node, error) {
func addDir(n *core.IpfsNode, dir files.File, out chan interface{}, progress bool) (*dag.Node, error) {
log.Infof("adding directory: %s", dir.FileName())
tree := &dag.Node{Data: ft.FolderPBData()}
......@@ -175,7 +288,7 @@ func addDir(n *core.IpfsNode, dir files.File, out chan interface{}) (*dag.Node,
break
}
node, err := addFile(n, file, out)
node, err := addFile(n, file, out, progress)
if err != nil {
return nil, err
}
......@@ -215,3 +328,25 @@ func outputDagnode(out chan interface{}, name string, dn *dag.Node) error {
return nil
}
type progressReader struct {
file files.File
out chan interface{}
bytes int64
lastProgress int64
}
func (i *progressReader) Read(p []byte) (int, error) {
n, err := i.file.Read(p)
i.bytes += int64(n)
if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
i.lastProgress = i.bytes
i.out <- &AddedObject{
Name: i.file.FileName(),
Bytes: i.bytes,
}
}
return n, err
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment