Commit cdd29c24 authored by Kevin Atkinson's avatar Kevin Atkinson

Report progress during 'pin add'.

License: MIT
Signed-off-by: default avatarKevin Atkinson <k@kevina.org>
parent 584c0956
......@@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"io"
"time"
cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
......@@ -33,6 +34,11 @@ type PinOutput struct {
Pins []*cid.Cid
}
type AddPinOutput struct {
Pins []*cid.Cid
Progress int `json:",omitempty"`
}
var addPinCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Pin objects to local storage.",
......@@ -44,8 +50,9 @@ var addPinCmd = &cmds.Command{
},
Options: []cmds.Option{
cmds.BoolOption("recursive", "r", "Recursively pin the object linked to by the specified object(s).").Default(true),
cmds.BoolOption("progress", "Show progress"),
},
Type: PinOutput{},
Type: AddPinOutput{},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
......@@ -61,22 +68,88 @@ var addPinCmd = &cmds.Command{
res.SetError(err, cmds.ErrNormal)
return
}
showProgress, _, _ := req.Option("progress").Bool()
added, err := corerepo.Pin(n, req.Context(), req.Arguments(), recursive)
if err != nil {
res.SetError(err, cmds.ErrNormal)
if !showProgress {
added, err := corerepo.Pin(n, req.Context(), req.Arguments(), recursive)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
res.SetOutput(&AddPinOutput{Pins: added})
return
}
res.SetOutput(&PinOutput{added})
v := new(dag.ProgressTracker)
ctx := v.DeriveContext(req.Context())
ch := make(chan []*cid.Cid)
go func() {
defer close(ch)
added, err := corerepo.Pin(n, ctx, req.Arguments(), recursive)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
ch <- added
}()
out := make(chan interface{})
res.SetOutput((<-chan interface{})(out))
go func() {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
defer close(out)
for {
select {
case val, ok := <-ch:
if !ok {
// error already set just return
return
}
if pv := v.Value(); pv != 0 {
out <- &AddPinOutput{Progress: v.Value()}
}
out <- &AddPinOutput{Pins: val}
return
case <-ticker.C:
out <- &AddPinOutput{Progress: v.Value()}
case <-ctx.Done():
res.SetError(ctx.Err(), cmds.ErrNormal)
return
}
}
}()
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
added, ok := res.Output().(*PinOutput)
if !ok {
var added []*cid.Cid
switch out := res.Output().(type) {
case *AddPinOutput:
added = out.Pins
case <-chan interface{}:
progressLine := false
for r0 := range out {
r := r0.(*AddPinOutput)
if r.Pins != nil {
added = r.Pins
} else {
if progressLine {
fmt.Fprintf(res.Stderr(), "\r")
}
fmt.Fprintf(res.Stderr(), "Fetched/Processed %d nodes", r.Progress)
progressLine = true
}
}
if progressLine {
fmt.Fprintf(res.Stderr(), "\n")
}
if res.Error() != nil {
return nil, res.Error()
}
default:
return nil, u.ErrCast()
}
var pintype string
rec, found, _ := res.Request().Option("recursive").Bool()
if rec || !found {
......@@ -86,7 +159,7 @@ var addPinCmd = &cmds.Command{
}
buf := new(bytes.Buffer)
for _, k := range added.Pins {
for _, k := range added {
fmt.Fprintf(buf, "pinned %s %s\n", k, pintype)
}
return buf, nil
......
......@@ -139,8 +139,21 @@ func (n *dagService) Remove(nd node.Node) error {
}
// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, c *cid.Cid, serv DAGService) error {
return EnumerateChildrenAsync(ctx, serv, c, cid.NewSet().Visit)
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
v, _ := ctx.Value("progress").(*ProgressTracker)
if v == nil {
return EnumerateChildrenAsync(ctx, serv, root, cid.NewSet().Visit)
}
set := cid.NewSet()
visit := func(c *cid.Cid) bool {
if set.Visit(c) {
v.Increment()
return true
} else {
return false
}
}
return EnumerateChildrenAsync(ctx, serv, root, visit)
}
// FindLinks searches this nodes links for the given key,
......@@ -389,6 +402,27 @@ func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit
return nil
}
type ProgressTracker struct {
Total int
lk sync.Mutex
}
func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
return context.WithValue(ctx, "progress", p)
}
func (p *ProgressTracker) Increment() {
p.lk.Lock()
defer p.lk.Unlock()
p.Total++
}
func (p *ProgressTracker) Value() int {
p.lk.Lock()
defer p.lk.Unlock()
return p.Total
}
// FetchGraphConcurrency is total number of concurrent fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8
......
......@@ -7,6 +7,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"strings"
"sync"
"testing"
......@@ -547,3 +548,80 @@ func TestEnumerateAsyncFailsNotFound(t *testing.T) {
t.Fatal("this should have failed")
}
}
func TestProgressIndicator(t *testing.T) {
testProgressIndicator(t, 5)
}
func TestProgressIndicatorNoChildren(t *testing.T) {
testProgressIndicator(t, 0)
}
func testProgressIndicator(t *testing.T, depth int) {
ds := dstest.Mock()
top, numChildren := mkDag(ds, depth)
v := new(ProgressTracker)
ctx := v.DeriveContext(context.Background())
err := FetchGraph(ctx, top, ds)
if err != nil {
t.Fatal(err)
}
if v.Value() != numChildren+1 {
t.Errorf("wrong number of children reported in progress indicator, expected %d, got %d",
numChildren+1, v.Value())
}
}
func mkDag(ds DAGService, depth int) (*cid.Cid, int) {
totalChildren := 0
f := func() *ProtoNode {
p := new(ProtoNode)
buf := make([]byte, 16)
rand.Read(buf)
p.SetData(buf)
_, err := ds.Add(p)
if err != nil {
panic(err)
}
return p
}
for i := 0; i < depth; i++ {
thisf := f
f = func() *ProtoNode {
pn := mkNodeWithChildren(thisf, 10)
_, err := ds.Add(pn)
if err != nil {
panic(err)
}
totalChildren += 10
return pn
}
}
nd := f()
c, err := ds.Add(nd)
if err != nil {
panic(err)
}
return c, totalChildren
}
func mkNodeWithChildren(getChild func() *ProtoNode, width int) *ProtoNode {
cur := new(ProtoNode)
for i := 0; i < width; i++ {
c := getChild()
if err := cur.AddNodeLinkClean(fmt.Sprint(i), c); err != nil {
panic(err)
}
}
return cur
}
......@@ -10,6 +10,8 @@ test_description="Test ipfs pinning operations"
test_pins() {
EXTRA_ARGS=$1
test_expect_success "create some hashes" '
HASH_A=$(echo "A" | ipfs add -q --pin=false) &&
HASH_B=$(echo "B" | ipfs add -q --pin=false) &&
......@@ -30,8 +32,8 @@ test_pins() {
echo $HASH_G >> hashes
'
test_expect_success "pin those hashes via stdin" '
cat hashes | ipfs pin add
test_expect_success "'ipfs pin add $EXTRA_ARGS' via stdin" '
cat hashes | ipfs pin add $EXTRA_ARGS
'
test_expect_success "unpin those hashes" '
......@@ -39,15 +41,30 @@ test_pins() {
'
}
test_pin_dag() {
RANDOM_HASH=Qme8uX5n9hn15pw9p6WcVKoziyyC9LXv4LEgvsmKMULjnV
test_pins_error_reporting() {
EXTRA_ARGS=$1
test_expect_success "'ipfs pin add $EXTRA_ARGS' on non-existent hash should fail" '
test_must_fail ipfs pin add $EXTRA_ARGS $RANDOM_HASH 2> err &&
grep -q "not found" err
'
}
test_pin_dag_init() {
EXTRA_ARGS=$1
test_expect_success "'ipfs add $EXTRA_ARGS --pin=false' 1MB file" '
random 1048576 56 > afile &&
HASH=`ipfs add $EXTRA_ARGS --pin=false -q afile`
'
}
test_expect_success "'ipfs pin add' file" '
test_pin_dag() {
test_pin_dag_init $1
test_expect_success "'ipfs pin add --progress' file" '
ipfs pin add --recursive=true $HASH
'
......@@ -67,20 +84,45 @@ test_pin_dag() {
'
}
test_pin_progress() {
test_pin_dag_init
test_expect_success "'ipfs pin add --progress' file" '
ipfs pin add --progress $HASH 2> err
'
test_expect_success "pin progress reported correctly" '
cat err
grep -q " 5 nodes" err
'
}
test_init_ipfs
test_pins
test_pins --progress
test_pins_error_reporting
test_pins_error_reporting --progress
test_pin_dag
test_pin_dag --raw-leaves
test_pin_progress
test_launch_ipfs_daemon --offline
test_pins
test_pins --progress
test_pins_error_reporting
test_pins_error_reporting --progress
test_pin_dag
test_pin_dag --raw-leaves
test_pin_progress
test_kill_ipfs_daemon
test_done
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment