add.go 7.93 KB
Newer Older
1 2 3 4 5
package commands

import (
	"fmt"
	"io"
6
	"path"
7
	"strings"
8

Jeromy's avatar
Jeromy committed
9 10
	"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb"

11 12 13 14 15 16 17
	cmds "github.com/ipfs/go-ipfs/commands"
	files "github.com/ipfs/go-ipfs/commands/files"
	core "github.com/ipfs/go-ipfs/core"
	coreunix "github.com/ipfs/go-ipfs/core/coreunix"
	importer "github.com/ipfs/go-ipfs/importer"
	"github.com/ipfs/go-ipfs/importer/chunk"
	dag "github.com/ipfs/go-ipfs/merkledag"
18
	pin "github.com/ipfs/go-ipfs/pin"
19 20
	ft "github.com/ipfs/go-ipfs/unixfs"
	u "github.com/ipfs/go-ipfs/util"
21 22 23 24 25
)

// Error indicating the max depth has been exceded.
var ErrDepthLimitExceeded = fmt.Errorf("depth limit exceeded")

26 27 28
// how many bytes of progress to wait before sending a progress update message
const progressReaderIncrement = 1024 * 256

29 30 31 32
const (
	progressOptionName = "progress"
	wrapOptionName     = "wrap-with-directory"
)
33

34
type AddedObject struct {
35 36 37
	Name  string
	Hash  string `json:",omitempty"`
	Bytes int64  `json:",omitempty"`
38 39
}

40
var AddCmd = &cmds.Command{
41 42 43 44 45 46 47 48 49 50
	Helptext: cmds.HelpText{
		Tagline: "Add an object to ipfs.",
		ShortDescription: `
Adds contents of <path> to ipfs. Use -r to add directories.
Note that directories are added recursively, to form the ipfs
MerkleDAG. A smarter partial add with a staging area (like git)
remains to be implemented.
`,
	},

51
	Arguments: []cmds.Argument{
52
		cmds.FileArg("path", true, true, "The path to a file to be added to IPFS").EnableRecursive().EnableStdin(),
53
	},
54 55
	Options: []cmds.Option{
		cmds.OptionRecursivePath, // a builtin option that allows recursive paths (-r, --recursive)
56
		cmds.BoolOption("quiet", "q", "Write minimal output"),
57
		cmds.BoolOption(progressOptionName, "p", "Stream progress data"),
58
		cmds.BoolOption(wrapOptionName, "w", "Wrap files with a directory object"),
Jeromy's avatar
Jeromy committed
59
		cmds.BoolOption("t", "trickle", "Use trickle-dag format for dag generation"),
60 61
	},
	PreRun: func(req cmds.Request) error {
62 63 64 65
		if quiet, _, _ := req.Option("quiet").Bool(); quiet {
			return nil
		}

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
		req.SetOption(progressOptionName, true)

		sizeFile, ok := req.Files().(files.SizeFile)
		if !ok {
			// we don't need to error, the progress bar just won't know how big the files are
			return nil
		}

		size, err := sizeFile.Size()
		if err != nil {
			// see comment above
			return nil
		}
		log.Debugf("Total size of file being added: %v\n", size)
		req.Values()["size"] = size

		return nil
83
	},
84
	Run: func(req cmds.Request, res cmds.Response) {
85 86
		n, err := req.Context().GetNode()
		if err != nil {
87 88
			res.SetError(err, cmds.ErrNormal)
			return
89
		}
90

91
		progress, _, _ := req.Option(progressOptionName).Bool()
92
		wrap, _, _ := req.Option(wrapOptionName).Bool()
93

94
		outChan := make(chan interface{}, 8)
95
		res.SetOutput((<-chan interface{})(outChan))
96

97 98
		go func() {
			defer close(outChan)
99

100 101
			for {
				file, err := req.Files().NextFile()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
102 103 104 105 106
				if err != nil && err != io.EOF {
					res.SetError(err, cmds.ErrNormal)
					return
				}
				if file == nil { // done
107 108
					return
				}
109

Jeromy's avatar
Jeromy committed
110 111
				rootnd, err := addFile(n, file, outChan, progress, wrap)
				if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112
					res.SetError(err, cmds.ErrNormal)
Jeromy's avatar
Jeromy committed
113 114 115
					return
				}

116
				rnk, err := rootnd.Key()
Jeromy's avatar
Jeromy committed
117
				if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118
					res.SetError(err, cmds.ErrNormal)
Jeromy's avatar
Jeromy committed
119 120 121
					return
				}

122 123 124 125
				mp := n.Pinning.GetManual()
				mp.RemovePinWithMode(rnk, pin.Indirect)
				mp.PinWithMode(rnk, pin.Recursive)

Jeromy's avatar
Jeromy committed
126
				err = n.Pinning.Flush()
127
				if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
128
					res.SetError(err, cmds.ErrNormal)
129 130 131 132
					return
				}
			}
		}()
133
	},
134
	PostRun: func(req cmds.Request, res cmds.Response) {
135 136 137
		if res.Error() != nil {
			return
		}
138 139 140 141 142
		outChan, ok := res.Output().(<-chan interface{})
		if !ok {
			res.SetError(u.ErrCast(), cmds.ErrNormal)
			return
		}
143
		res.SetOutput(nil)
144

145
		quiet, _, err := req.Option("quiet").Bool()
146 147 148 149
		if err != nil {
			res.SetError(u.ErrCast(), cmds.ErrNormal)
			return
		}
150 151

		size := int64(0)
152
		s, found := req.Values()["size"]
153 154 155
		if found {
			size = s.(int64)
		}
156
		showProgressBar := !quiet && size >= progressBarMinSize
157 158 159 160 161 162 163 164 165 166 167 168 169 170

		var bar *pb.ProgressBar
		var terminalWidth int
		if showProgressBar {
			bar = pb.New64(size).SetUnits(pb.U_BYTES)
			bar.ManualUpdate = true
			bar.Start()

			// the progress bar lib doesn't give us a way to get the width of the output,
			// so as a hack we just use a callback to measure the output, then git rid of it
			terminalWidth = 0
			bar.Callback = func(line string) {
				terminalWidth = len(line)
				bar.Callback = nil
171
				bar.Output = res.Stderr()
172 173 174 175 176
				log.Infof("terminal width: %v\n", terminalWidth)
			}
			bar.Update()
		}

177 178
		lastFile := ""
		var totalProgress, prevFiles, lastBytes int64
179

180 181 182
		for out := range outChan {
			output := out.(*AddedObject)
			if len(output.Hash) > 0 {
183
				if showProgressBar {
184
					// clear progress bar line before we print "added x" output
185
					fmt.Fprintf(res.Stderr(), "\r%s\r", strings.Repeat(" ", terminalWidth))
186 187
				}
				if quiet {
188
					fmt.Fprintf(res.Stdout(), "%s\n", output.Hash)
189
				} else {
190
					fmt.Fprintf(res.Stdout(), "added %s %s\n", output.Hash, output.Name)
191
				}
192

193 194
			} else {
				log.Debugf("add progress: %v %v\n", output.Name, output.Bytes)
195

196 197
				if !showProgressBar {
					continue
198 199
				}

200 201 202 203 204 205
				if len(lastFile) == 0 {
					lastFile = output.Name
				}
				if output.Name != lastFile || output.Bytes < lastBytes {
					prevFiles += lastBytes
					lastFile = output.Name
206
				}
207 208 209
				lastBytes = output.Bytes
				delta := prevFiles + lastBytes - totalProgress
				totalProgress = bar.Add64(delta)
210
			}
211

212 213 214 215
			if showProgressBar {
				bar.Update()
			}
		}
216
	},
217
	Type: AddedObject{},
218 219
}

220
func add(n *core.IpfsNode, reader io.Reader) (*dag.Node, error) {
221 222 223 224 225 226
	node, err := importer.BuildDagFromReader(
		reader,
		n.DAG,
		chunk.DefaultSplitter,
		importer.PinIndirectCB(n.Pinning.GetManual()),
	)
227 228
	if err != nil {
		return nil, err
229
	}
230

231
	return node, nil
232
}
233

234
func addFile(n *core.IpfsNode, file files.File, out chan interface{}, progress bool, wrap bool) (*dag.Node, error) {
235
	if file.IsDirectory() {
236 237 238 239 240 241 242 243
		return addDir(n, file, out, progress)
	}

	// if the progress flag was specified, wrap the file so that we can send
	// progress updates to the client (over the output channel)
	var reader io.Reader = file
	if progress {
		reader = &progressReader{file: file, out: out}
244 245
	}

246 247 248 249 250 251 252 253 254 255 256 257
	if wrap {
		p, dagnode, err := coreunix.AddWrapped(n, reader, path.Base(file.FileName()))
		if err != nil {
			return nil, err
		}
		out <- &AddedObject{
			Hash: p,
			Name: file.FileName(),
		}
		return dagnode, nil
	}

258
	dagnode, err := add(n, reader)
259 260 261 262 263
	if err != nil {
		return nil, err
	}

	log.Infof("adding file: %s", file.FileName())
264
	if err := outputDagnode(out, file.FileName(), dagnode); err != nil {
265 266
		return nil, err
	}
267
	return dagnode, nil
268 269
}

270
func addDir(n *core.IpfsNode, dir files.File, out chan interface{}, progress bool) (*dag.Node, error) {
271 272 273 274 275 276 277 278 279 280 281 282 283
	log.Infof("adding directory: %s", dir.FileName())

	tree := &dag.Node{Data: ft.FolderPBData()}

	for {
		file, err := dir.NextFile()
		if err != nil && err != io.EOF {
			return nil, err
		}
		if file == nil {
			break
		}

284
		node, err := addFile(n, file, out, progress, false)
285 286 287 288
		if err != nil {
			return nil, err
		}

289
		_, name := path.Split(file.FileName())
290 291 292 293 294 295 296

		err = tree.AddNodeLink(name, node)
		if err != nil {
			return nil, err
		}
	}

297
	err := outputDagnode(out, dir.FileName(), tree)
298 299 300 301
	if err != nil {
		return nil, err
	}

302
	k, err := n.DAG.Add(tree)
303 304 305
	if err != nil {
		return nil, err
	}
306

307 308
	n.Pinning.GetManual().PinWithMode(k, pin.Indirect)

309
	return tree, nil
310 311
}

312 313
// outputDagnode sends dagnode info over the output channel
func outputDagnode(out chan interface{}, name string, dn *dag.Node) error {
314 315 316 317 318
	o, err := getOutput(dn)
	if err != nil {
		return err
	}

319 320 321 322
	out <- &AddedObject{
		Hash: o.Hash,
		Name: name,
	}
323

324
	return nil
325
}
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347

type progressReader struct {
	file         files.File
	out          chan interface{}
	bytes        int64
	lastProgress int64
}

func (i *progressReader) Read(p []byte) (int, error) {
	n, err := i.file.Read(p)

	i.bytes += int64(n)
	if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
		i.lastProgress = i.bytes
		i.out <- &AddedObject{
			Name:  i.file.FileName(),
			Bytes: i.bytes,
		}
	}

	return n, err
}