add.go 7.47 KB
Newer Older
1 2 3
package commands

import (
4
	"errors"
5 6
	"fmt"
	"io"
7
	"path"
8
	"strings"
9 10

	cmds "github.com/jbenet/go-ipfs/commands"
11
	files "github.com/jbenet/go-ipfs/commands/files"
12 13
	core "github.com/jbenet/go-ipfs/core"
	importer "github.com/jbenet/go-ipfs/importer"
14
	"github.com/jbenet/go-ipfs/importer/chunk"
15
	dag "github.com/jbenet/go-ipfs/merkledag"
16
	pinning "github.com/jbenet/go-ipfs/pin"
17
	ft "github.com/jbenet/go-ipfs/unixfs"
18
	u "github.com/jbenet/go-ipfs/util"
19 20

	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb"
21 22 23 24 25
)

// Error indicating the max depth has been exceded.
var ErrDepthLimitExceeded = fmt.Errorf("depth limit exceeded")

26 27 28 29 30
// how many bytes of progress to wait before sending a progress update message
const progressReaderIncrement = 1024 * 256

const progressOptionName = "progress"

31
type AddedObject struct {
32 33 34
	Name  string
	Hash  string `json:",omitempty"`
	Bytes int64  `json:",omitempty"`
35 36
}

37
var AddCmd = &cmds.Command{
38 39 40 41 42 43 44 45 46 47
	Helptext: cmds.HelpText{
		Tagline: "Add an object to ipfs.",
		ShortDescription: `
Adds contents of <path> to ipfs. Use -r to add directories.
Note that directories are added recursively, to form the ipfs
MerkleDAG. A smarter partial add with a staging area (like git)
remains to be implemented.
`,
	},

48
	Arguments: []cmds.Argument{
49
		cmds.FileArg("path", true, true, "The path to a file to be added to IPFS").EnableRecursive().EnableStdin(),
50
	},
51 52
	Options: []cmds.Option{
		cmds.OptionRecursivePath, // a builtin option that allows recursive paths (-r, --recursive)
53
		cmds.BoolOption("quiet", "q", "Write minimal output"),
54 55 56
		cmds.BoolOption(progressOptionName, "p", "Stream progress data"),
	},
	PreRun: func(req cmds.Request) error {
57 58 59 60
		if quiet, _, _ := req.Option("quiet").Bool(); quiet {
			return nil
		}

61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
		req.SetOption(progressOptionName, true)

		sizeFile, ok := req.Files().(files.SizeFile)
		if !ok {
			// we don't need to error, the progress bar just won't know how big the files are
			return nil
		}

		size, err := sizeFile.Size()
		if err != nil {
			// see comment above
			return nil
		}
		log.Debugf("Total size of file being added: %v\n", size)
		req.Values()["size"] = size

		return nil
78
	},
79
	Run: func(req cmds.Request, res cmds.Response) {
80 81
		n, err := req.Context().GetNode()
		if err != nil {
82 83
			res.SetError(err, cmds.ErrNormal)
			return
84
		}
85

86 87
		progress, _, _ := req.Option(progressOptionName).Bool()

88
		outChan := make(chan interface{})
89
		res.SetOutput((<-chan interface{})(outChan))
90

91 92
		go func() {
			defer close(outChan)
93

94 95 96 97 98
			for {
				file, err := req.Files().NextFile()
				if (err != nil && err != io.EOF) || file == nil {
					return
				}
99

100
				_, err = addFile(n, file, outChan, progress)
101 102 103 104 105
				if err != nil {
					return
				}
			}
		}()
106
	},
107
	PostRun: func(req cmds.Request, res cmds.Response) {
108 109 110 111 112
		outChan, ok := res.Output().(<-chan interface{})
		if !ok {
			res.SetError(u.ErrCast(), cmds.ErrNormal)
			return
		}
113
		res.SetOutput(nil)
114

115
		quiet, _, err := req.Option("quiet").Bool()
116 117 118 119
		if err != nil {
			res.SetError(u.ErrCast(), cmds.ErrNormal)
			return
		}
120 121

		size := int64(0)
122
		s, found := req.Values()["size"]
123 124 125
		if found {
			size = s.(int64)
		}
126
		showProgressBar := !quiet && size >= progressBarMinSize
127 128 129 130 131 132 133 134 135 136 137 138 139 140

		var bar *pb.ProgressBar
		var terminalWidth int
		if showProgressBar {
			bar = pb.New64(size).SetUnits(pb.U_BYTES)
			bar.ManualUpdate = true
			bar.Start()

			// the progress bar lib doesn't give us a way to get the width of the output,
			// so as a hack we just use a callback to measure the output, then git rid of it
			terminalWidth = 0
			bar.Callback = func(line string) {
				terminalWidth = len(line)
				bar.Callback = nil
141
				bar.Output = res.Stderr()
142 143 144 145 146
				log.Infof("terminal width: %v\n", terminalWidth)
			}
			bar.Update()
		}

147 148
		lastFile := ""
		var totalProgress, prevFiles, lastBytes int64
149

150 151 152
		for out := range outChan {
			output := out.(*AddedObject)
			if len(output.Hash) > 0 {
153
				if showProgressBar {
154
					// clear progress bar line before we print "added x" output
155
					fmt.Fprintf(res.Stderr(), "\r%s\r", strings.Repeat(" ", terminalWidth))
156 157
				}
				if quiet {
158
					fmt.Fprintf(res.Stdout(), "%s\n", output.Hash)
159
				} else {
160
					fmt.Fprintf(res.Stdout(), "added %s %s\n", output.Hash, output.Name)
161
				}
162

163 164
			} else {
				log.Debugf("add progress: %v %v\n", output.Name, output.Bytes)
165

166 167
				if !showProgressBar {
					continue
168 169
				}

170 171 172 173 174 175
				if len(lastFile) == 0 {
					lastFile = output.Name
				}
				if output.Name != lastFile || output.Bytes < lastBytes {
					prevFiles += lastBytes
					lastFile = output.Name
176
				}
177 178 179
				lastBytes = output.Bytes
				delta := prevFiles + lastBytes - totalProgress
				totalProgress = bar.Add64(delta)
180
			}
181

182 183 184 185
			if showProgressBar {
				bar.Update()
			}
		}
186
	},
187
	Type: AddedObject{},
188 189
}

190
func add(n *core.IpfsNode, readers []io.Reader) ([]*dag.Node, error) {
191 192 193 194 195
	mp, ok := n.Pinning.(pinning.ManualPinner)
	if !ok {
		return nil, errors.New("invalid pinner type! expected manual pinner")
	}

196
	dagnodes := make([]*dag.Node, 0)
197

198
	for _, reader := range readers {
199
		node, err := importer.BuildDagFromReader(reader, n.DAG, mp, chunk.DefaultSplitter)
200 201 202 203 204
		if err != nil {
			return nil, err
		}
		dagnodes = append(dagnodes, node)
	}
205

206 207 208 209 210
	err := n.Pinning.Flush()
	if err != nil {
		return nil, err
	}

211
	return dagnodes, nil
212
}
213 214 215 216 217 218 219 220 221 222 223 224 225 226

func addNode(n *core.IpfsNode, node *dag.Node) error {
	err := n.DAG.AddRecursive(node) // add the file to the graph + local storage
	if err != nil {
		return err
	}

	err = n.Pinning.Pin(node, true) // ensure we keep it
	if err != nil {
		return err
	}

	return nil
}
227

228
func addFile(n *core.IpfsNode, file files.File, out chan interface{}, progress bool) (*dag.Node, error) {
229
	if file.IsDirectory() {
230 231 232 233 234 235 236 237
		return addDir(n, file, out, progress)
	}

	// if the progress flag was specified, wrap the file so that we can send
	// progress updates to the client (over the output channel)
	var reader io.Reader = file
	if progress {
		reader = &progressReader{file: file, out: out}
238 239
	}

240
	dns, err := add(n, []io.Reader{reader})
241 242 243 244 245
	if err != nil {
		return nil, err
	}

	log.Infof("adding file: %s", file.FileName())
246
	if err := outputDagnode(out, file.FileName(), dns[len(dns)-1]); err != nil {
247 248 249 250 251
		return nil, err
	}
	return dns[len(dns)-1], nil // last dag node is the file.
}

252
func addDir(n *core.IpfsNode, dir files.File, out chan interface{}, progress bool) (*dag.Node, error) {
253 254 255 256 257 258 259 260 261 262 263 264 265
	log.Infof("adding directory: %s", dir.FileName())

	tree := &dag.Node{Data: ft.FolderPBData()}

	for {
		file, err := dir.NextFile()
		if err != nil && err != io.EOF {
			return nil, err
		}
		if file == nil {
			break
		}

266
		node, err := addFile(n, file, out, progress)
267 268 269 270
		if err != nil {
			return nil, err
		}

271
		_, name := path.Split(file.FileName())
272 273 274 275 276 277 278

		err = tree.AddNodeLink(name, node)
		if err != nil {
			return nil, err
		}
	}

279
	err := outputDagnode(out, dir.FileName(), tree)
280 281 282 283 284 285 286 287
	if err != nil {
		return nil, err
	}

	err = addNode(n, tree)
	if err != nil {
		return nil, err
	}
288

289
	return tree, nil
290 291
}

292 293
// outputDagnode sends dagnode info over the output channel
func outputDagnode(out chan interface{}, name string, dn *dag.Node) error {
294 295 296 297 298
	o, err := getOutput(dn)
	if err != nil {
		return err
	}

299 300 301 302
	out <- &AddedObject{
		Hash: o.Hash,
		Name: name,
	}
303

304
	return nil
305
}
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327

type progressReader struct {
	file         files.File
	out          chan interface{}
	bytes        int64
	lastProgress int64
}

func (i *progressReader) Read(p []byte) (int, error) {
	n, err := i.file.Read(p)

	i.bytes += int64(n)
	if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
		i.lastProgress = i.bytes
		i.out <- &AddedObject{
			Name:  i.file.FileName(),
			Bytes: i.bytes,
		}
	}

	return n, err
}