add.go 7.36 KB
Newer Older
1 2 3
package commands

import (
4
	"errors"
5 6
	"fmt"
	"io"
7
	"os"
8
	"path"
9
	"strings"
10 11

	cmds "github.com/jbenet/go-ipfs/commands"
12
	files "github.com/jbenet/go-ipfs/commands/files"
13 14
	core "github.com/jbenet/go-ipfs/core"
	importer "github.com/jbenet/go-ipfs/importer"
15
	"github.com/jbenet/go-ipfs/importer/chunk"
16
	dag "github.com/jbenet/go-ipfs/merkledag"
17
	pinning "github.com/jbenet/go-ipfs/pin"
18
	ft "github.com/jbenet/go-ipfs/unixfs"
19
	u "github.com/jbenet/go-ipfs/util"
20 21

	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb"
22 23 24 25 26
)

// Error indicating the max depth has been exceded.
var ErrDepthLimitExceeded = fmt.Errorf("depth limit exceeded")

27 28 29 30 31
// how many bytes of progress to wait before sending a progress update message
const progressReaderIncrement = 1024 * 256

const progressOptionName = "progress"

32
type AddedObject struct {
33 34 35
	Name  string
	Hash  string `json:",omitempty"`
	Bytes int64  `json:",omitempty"`
36 37
}

38
var AddCmd = &cmds.Command{
39 40 41 42 43 44 45 46 47 48
	Helptext: cmds.HelpText{
		Tagline: "Add an object to ipfs.",
		ShortDescription: `
Adds contents of <path> to ipfs. Use -r to add directories.
Note that directories are added recursively, to form the ipfs
MerkleDAG. A smarter partial add with a staging area (like git)
remains to be implemented.
`,
	},

49
	Arguments: []cmds.Argument{
50
		cmds.FileArg("path", true, true, "The path to a file to be added to IPFS").EnableRecursive().EnableStdin(),
51
	},
52 53
	Options: []cmds.Option{
		cmds.OptionRecursivePath, // a builtin option that allows recursive paths (-r, --recursive)
54
		cmds.BoolOption("quiet", "q", "Write minimal output"),
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
		cmds.BoolOption(progressOptionName, "p", "Stream progress data"),
	},
	PreRun: func(req cmds.Request) error {
		req.SetOption(progressOptionName, true)

		sizeFile, ok := req.Files().(files.SizeFile)
		if !ok {
			// we don't need to error, the progress bar just won't know how big the files are
			return nil
		}

		size, err := sizeFile.Size()
		if err != nil {
			// see comment above
			return nil
		}
		log.Debugf("Total size of file being added: %v\n", size)
		req.Values()["size"] = size

		return nil
75
	},
76
	Run: func(req cmds.Request, res cmds.Response) {
77 78
		n, err := req.Context().GetNode()
		if err != nil {
79 80
			res.SetError(err, cmds.ErrNormal)
			return
81
		}
82

83 84
		progress, _, _ := req.Option(progressOptionName).Bool()

85
		outChan := make(chan interface{})
86
		res.SetOutput((<-chan interface{})(outChan))
87

88 89
		go func() {
			defer close(outChan)
90

91 92 93 94 95
			for {
				file, err := req.Files().NextFile()
				if (err != nil && err != io.EOF) || file == nil {
					return
				}
96

97
				_, err = addFile(n, file, outChan, progress)
98 99 100 101 102
				if err != nil {
					return
				}
			}
		}()
103
	},
104 105 106 107 108 109
	PostRun: func(res cmds.Response) {
		outChan, ok := res.Output().(<-chan interface{})
		if !ok {
			res.SetError(u.ErrCast(), cmds.ErrNormal)
			return
		}
110
		res.SetOutput(nil)
111

112 113 114 115 116
		quiet, _, err := res.Request().Option("quiet").Bool()
		if err != nil {
			res.SetError(u.ErrCast(), cmds.ErrNormal)
			return
		}
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143

		size := int64(0)
		s, found := res.Request().Values()["size"]
		if found {
			size = s.(int64)
		}
		showProgressBar := size >= progressBarMinSize

		var bar *pb.ProgressBar
		var terminalWidth int
		if showProgressBar {
			bar = pb.New64(size).SetUnits(pb.U_BYTES)
			bar.ManualUpdate = true
			bar.Start()

			// the progress bar lib doesn't give us a way to get the width of the output,
			// so as a hack we just use a callback to measure the output, then git rid of it
			terminalWidth = 0
			bar.Callback = func(line string) {
				terminalWidth = len(line)
				bar.Callback = nil
				bar.Output = os.Stderr
				log.Infof("terminal width: %v\n", terminalWidth)
			}
			bar.Update()
		}

144 145
		lastFile := ""
		var totalProgress, prevFiles, lastBytes int64
146

147 148 149
		for out := range outChan {
			output := out.(*AddedObject)
			if len(output.Hash) > 0 {
150
				if showProgressBar {
151 152 153 154 155 156 157
					// clear progress bar line before we print "added x" output
					fmt.Fprintf(os.Stderr, "\r%s\r", strings.Repeat(" ", terminalWidth))
				}
				if quiet {
					fmt.Printf("%s\n", output.Hash)
				} else {
					fmt.Printf("added %s %s\n", output.Hash, output.Name)
158
				}
159

160 161
			} else {
				log.Debugf("add progress: %v %v\n", output.Name, output.Bytes)
162

163 164
				if !showProgressBar {
					continue
165 166
				}

167 168 169 170 171 172
				if len(lastFile) == 0 {
					lastFile = output.Name
				}
				if output.Name != lastFile || output.Bytes < lastBytes {
					prevFiles += lastBytes
					lastFile = output.Name
173
				}
174 175 176
				lastBytes = output.Bytes
				delta := prevFiles + lastBytes - totalProgress
				totalProgress = bar.Add64(delta)
177
			}
178

179 180 181 182
			if showProgressBar {
				bar.Update()
			}
		}
183
	},
184
	Type: AddedObject{},
185 186
}

187
func add(n *core.IpfsNode, readers []io.Reader) ([]*dag.Node, error) {
188 189 190 191 192
	mp, ok := n.Pinning.(pinning.ManualPinner)
	if !ok {
		return nil, errors.New("invalid pinner type! expected manual pinner")
	}

193
	dagnodes := make([]*dag.Node, 0)
194

195
	for _, reader := range readers {
196
		node, err := importer.BuildDagFromReader(reader, n.DAG, mp, chunk.DefaultSplitter)
197 198 199 200 201
		if err != nil {
			return nil, err
		}
		dagnodes = append(dagnodes, node)
	}
202

203 204 205 206 207
	err := n.Pinning.Flush()
	if err != nil {
		return nil, err
	}

208
	return dagnodes, nil
209
}
210 211 212 213 214 215 216 217 218 219 220 221 222 223

func addNode(n *core.IpfsNode, node *dag.Node) error {
	err := n.DAG.AddRecursive(node) // add the file to the graph + local storage
	if err != nil {
		return err
	}

	err = n.Pinning.Pin(node, true) // ensure we keep it
	if err != nil {
		return err
	}

	return nil
}
224

225
func addFile(n *core.IpfsNode, file files.File, out chan interface{}, progress bool) (*dag.Node, error) {
226
	if file.IsDirectory() {
227 228 229 230 231 232 233 234
		return addDir(n, file, out, progress)
	}

	// if the progress flag was specified, wrap the file so that we can send
	// progress updates to the client (over the output channel)
	var reader io.Reader = file
	if progress {
		reader = &progressReader{file: file, out: out}
235 236
	}

237
	dns, err := add(n, []io.Reader{reader})
238 239 240 241 242
	if err != nil {
		return nil, err
	}

	log.Infof("adding file: %s", file.FileName())
243
	if err := outputDagnode(out, file.FileName(), dns[len(dns)-1]); err != nil {
244 245 246 247 248
		return nil, err
	}
	return dns[len(dns)-1], nil // last dag node is the file.
}

249
func addDir(n *core.IpfsNode, dir files.File, out chan interface{}, progress bool) (*dag.Node, error) {
250 251 252 253 254 255 256 257 258 259 260 261 262
	log.Infof("adding directory: %s", dir.FileName())

	tree := &dag.Node{Data: ft.FolderPBData()}

	for {
		file, err := dir.NextFile()
		if err != nil && err != io.EOF {
			return nil, err
		}
		if file == nil {
			break
		}

263
		node, err := addFile(n, file, out, progress)
264 265 266 267
		if err != nil {
			return nil, err
		}

268
		_, name := path.Split(file.FileName())
269 270 271 272 273 274 275

		err = tree.AddNodeLink(name, node)
		if err != nil {
			return nil, err
		}
	}

276
	err := outputDagnode(out, dir.FileName(), tree)
277 278 279 280 281 282 283 284
	if err != nil {
		return nil, err
	}

	err = addNode(n, tree)
	if err != nil {
		return nil, err
	}
285

286
	return tree, nil
287 288
}

289 290
// outputDagnode sends dagnode info over the output channel
func outputDagnode(out chan interface{}, name string, dn *dag.Node) error {
291 292 293 294 295
	o, err := getOutput(dn)
	if err != nil {
		return err
	}

296 297 298 299
	out <- &AddedObject{
		Hash: o.Hash,
		Name: name,
	}
300

301
	return nil
302
}
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324

type progressReader struct {
	file         files.File
	out          chan interface{}
	bytes        int64
	lastProgress int64
}

func (i *progressReader) Read(p []byte) (int, error) {
	n, err := i.file.Read(p)

	i.bytes += int64(n)
	if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
		i.lastProgress = i.bytes
		i.out <- &AddedObject{
			Name:  i.file.FileName(),
			Bytes: i.bytes,
		}
	}

	return n, err
}