add.go 7.44 KB
Newer Older
1 2 3
package commands

import (
4
	"errors"
5 6
	"fmt"
	"io"
7
	"os"
8
	"path"
9
	"strings"
10 11

	cmds "github.com/jbenet/go-ipfs/commands"
12
	files "github.com/jbenet/go-ipfs/commands/files"
13 14
	core "github.com/jbenet/go-ipfs/core"
	importer "github.com/jbenet/go-ipfs/importer"
15
	"github.com/jbenet/go-ipfs/importer/chunk"
16
	dag "github.com/jbenet/go-ipfs/merkledag"
17
	pinning "github.com/jbenet/go-ipfs/pin"
18
	ft "github.com/jbenet/go-ipfs/unixfs"
19
	u "github.com/jbenet/go-ipfs/util"
20 21

	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb"
22 23 24 25 26
)

// Error indicating the max depth has been exceded.
var ErrDepthLimitExceeded = fmt.Errorf("depth limit exceeded")

27 28 29 30 31
// how many bytes of progress to wait before sending a progress update message
const progressReaderIncrement = 1024 * 256

const progressOptionName = "progress"

32
type AddedObject struct {
33 34 35
	Name  string
	Hash  string `json:",omitempty"`
	Bytes int64  `json:",omitempty"`
36 37
}

38
var AddCmd = &cmds.Command{
39 40 41 42 43 44 45 46 47 48
	Helptext: cmds.HelpText{
		Tagline: "Add an object to ipfs.",
		ShortDescription: `
Adds contents of <path> to ipfs. Use -r to add directories.
Note that directories are added recursively, to form the ipfs
MerkleDAG. A smarter partial add with a staging area (like git)
remains to be implemented.
`,
	},

49
	Arguments: []cmds.Argument{
50
		cmds.FileArg("path", true, true, "The path to a file to be added to IPFS").EnableRecursive().EnableStdin(),
51
	},
52 53
	Options: []cmds.Option{
		cmds.OptionRecursivePath, // a builtin option that allows recursive paths (-r, --recursive)
54
		cmds.BoolOption("quiet", "q", "Write minimal output"),
55 56 57
		cmds.BoolOption(progressOptionName, "p", "Stream progress data"),
	},
	PreRun: func(req cmds.Request) error {
58 59 60 61
		if quiet, _, _ := req.Option("quiet").Bool(); quiet {
			return nil
		}

62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
		req.SetOption(progressOptionName, true)

		sizeFile, ok := req.Files().(files.SizeFile)
		if !ok {
			// we don't need to error, the progress bar just won't know how big the files are
			return nil
		}

		size, err := sizeFile.Size()
		if err != nil {
			// see comment above
			return nil
		}
		log.Debugf("Total size of file being added: %v\n", size)
		req.Values()["size"] = size

		return nil
79
	},
80
	Run: func(req cmds.Request, res cmds.Response) {
81 82
		n, err := req.Context().GetNode()
		if err != nil {
83 84
			res.SetError(err, cmds.ErrNormal)
			return
85
		}
86

87 88
		progress, _, _ := req.Option(progressOptionName).Bool()

89
		outChan := make(chan interface{})
90
		res.SetOutput((<-chan interface{})(outChan))
91

92 93
		go func() {
			defer close(outChan)
94

95 96 97 98 99
			for {
				file, err := req.Files().NextFile()
				if (err != nil && err != io.EOF) || file == nil {
					return
				}
100

101
				_, err = addFile(n, file, outChan, progress)
102 103 104 105 106
				if err != nil {
					return
				}
			}
		}()
107
	},
108 109 110 111 112 113
	PostRun: func(res cmds.Response) {
		outChan, ok := res.Output().(<-chan interface{})
		if !ok {
			res.SetError(u.ErrCast(), cmds.ErrNormal)
			return
		}
114
		res.SetOutput(nil)
115

116 117 118 119 120
		quiet, _, err := res.Request().Option("quiet").Bool()
		if err != nil {
			res.SetError(u.ErrCast(), cmds.ErrNormal)
			return
		}
121 122 123 124 125 126

		size := int64(0)
		s, found := res.Request().Values()["size"]
		if found {
			size = s.(int64)
		}
127
		showProgressBar := !quiet && size >= progressBarMinSize
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147

		var bar *pb.ProgressBar
		var terminalWidth int
		if showProgressBar {
			bar = pb.New64(size).SetUnits(pb.U_BYTES)
			bar.ManualUpdate = true
			bar.Start()

			// the progress bar lib doesn't give us a way to get the width of the output,
			// so as a hack we just use a callback to measure the output, then git rid of it
			terminalWidth = 0
			bar.Callback = func(line string) {
				terminalWidth = len(line)
				bar.Callback = nil
				bar.Output = os.Stderr
				log.Infof("terminal width: %v\n", terminalWidth)
			}
			bar.Update()
		}

148 149
		lastFile := ""
		var totalProgress, prevFiles, lastBytes int64
150

151 152 153
		for out := range outChan {
			output := out.(*AddedObject)
			if len(output.Hash) > 0 {
154
				if showProgressBar {
155 156 157 158 159 160 161
					// clear progress bar line before we print "added x" output
					fmt.Fprintf(os.Stderr, "\r%s\r", strings.Repeat(" ", terminalWidth))
				}
				if quiet {
					fmt.Printf("%s\n", output.Hash)
				} else {
					fmt.Printf("added %s %s\n", output.Hash, output.Name)
162
				}
163

164 165
			} else {
				log.Debugf("add progress: %v %v\n", output.Name, output.Bytes)
166

167 168
				if !showProgressBar {
					continue
169 170
				}

171 172 173 174 175 176
				if len(lastFile) == 0 {
					lastFile = output.Name
				}
				if output.Name != lastFile || output.Bytes < lastBytes {
					prevFiles += lastBytes
					lastFile = output.Name
177
				}
178 179 180
				lastBytes = output.Bytes
				delta := prevFiles + lastBytes - totalProgress
				totalProgress = bar.Add64(delta)
181
			}
182

183 184 185 186
			if showProgressBar {
				bar.Update()
			}
		}
187
	},
188
	Type: AddedObject{},
189 190
}

191
func add(n *core.IpfsNode, readers []io.Reader) ([]*dag.Node, error) {
192 193 194 195 196
	mp, ok := n.Pinning.(pinning.ManualPinner)
	if !ok {
		return nil, errors.New("invalid pinner type! expected manual pinner")
	}

197
	dagnodes := make([]*dag.Node, 0)
198

199
	for _, reader := range readers {
200
		node, err := importer.BuildDagFromReader(reader, n.DAG, mp, chunk.DefaultSplitter)
201 202 203 204 205
		if err != nil {
			return nil, err
		}
		dagnodes = append(dagnodes, node)
	}
206

207 208 209 210 211
	err := n.Pinning.Flush()
	if err != nil {
		return nil, err
	}

212
	return dagnodes, nil
213
}
214 215 216 217 218 219 220 221 222 223 224 225 226 227

func addNode(n *core.IpfsNode, node *dag.Node) error {
	err := n.DAG.AddRecursive(node) // add the file to the graph + local storage
	if err != nil {
		return err
	}

	err = n.Pinning.Pin(node, true) // ensure we keep it
	if err != nil {
		return err
	}

	return nil
}
228

229
func addFile(n *core.IpfsNode, file files.File, out chan interface{}, progress bool) (*dag.Node, error) {
230
	if file.IsDirectory() {
231 232 233 234 235 236 237 238
		return addDir(n, file, out, progress)
	}

	// if the progress flag was specified, wrap the file so that we can send
	// progress updates to the client (over the output channel)
	var reader io.Reader = file
	if progress {
		reader = &progressReader{file: file, out: out}
239 240
	}

241
	dns, err := add(n, []io.Reader{reader})
242 243 244 245 246
	if err != nil {
		return nil, err
	}

	log.Infof("adding file: %s", file.FileName())
247
	if err := outputDagnode(out, file.FileName(), dns[len(dns)-1]); err != nil {
248 249 250 251 252
		return nil, err
	}
	return dns[len(dns)-1], nil // last dag node is the file.
}

253
func addDir(n *core.IpfsNode, dir files.File, out chan interface{}, progress bool) (*dag.Node, error) {
254 255 256 257 258 259 260 261 262 263 264 265 266
	log.Infof("adding directory: %s", dir.FileName())

	tree := &dag.Node{Data: ft.FolderPBData()}

	for {
		file, err := dir.NextFile()
		if err != nil && err != io.EOF {
			return nil, err
		}
		if file == nil {
			break
		}

267
		node, err := addFile(n, file, out, progress)
268 269 270 271
		if err != nil {
			return nil, err
		}

272
		_, name := path.Split(file.FileName())
273 274 275 276 277 278 279

		err = tree.AddNodeLink(name, node)
		if err != nil {
			return nil, err
		}
	}

280
	err := outputDagnode(out, dir.FileName(), tree)
281 282 283 284 285 286 287 288
	if err != nil {
		return nil, err
	}

	err = addNode(n, tree)
	if err != nil {
		return nil, err
	}
289

290
	return tree, nil
291 292
}

293 294
// outputDagnode sends dagnode info over the output channel
func outputDagnode(out chan interface{}, name string, dn *dag.Node) error {
295 296 297 298 299
	o, err := getOutput(dn)
	if err != nil {
		return err
	}

300 301 302 303
	out <- &AddedObject{
		Hash: o.Hash,
		Name: name,
	}
304

305
	return nil
306
}
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328

type progressReader struct {
	file         files.File
	out          chan interface{}
	bytes        int64
	lastProgress int64
}

func (i *progressReader) Read(p []byte) (int, error) {
	n, err := i.file.Read(p)

	i.bytes += int64(n)
	if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
		i.lastProgress = i.bytes
		i.out <- &AddedObject{
			Name:  i.file.FileName(),
			Bytes: i.bytes,
		}
	}

	return n, err
}