add.go 12.8 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1
package coreunix
2 3

import (
4
	"context"
5
	"fmt"
6
	"io"
7
	"io/ioutil"
8
	"os"
9
	gopath "path"
camelmasa's avatar
camelmasa committed
10
	"path/filepath"
11
	"strconv"
Jeromy's avatar
Jeromy committed
12

Jeromy's avatar
Jeromy committed
13
	core "github.com/ipfs/go-ipfs/core"
14
	"github.com/ipfs/go-ipfs/pin"
15 16 17 18 19 20
	unixfs "gx/ipfs/QmQjEpRiwVvtowhq69dAtB4jhioPVFXiCcWZm9Sfgn7eqc/go-unixfs"
	balanced "gx/ipfs/QmQjEpRiwVvtowhq69dAtB4jhioPVFXiCcWZm9Sfgn7eqc/go-unixfs/importer/balanced"
	ihelper "gx/ipfs/QmQjEpRiwVvtowhq69dAtB4jhioPVFXiCcWZm9Sfgn7eqc/go-unixfs/importer/helpers"
	trickle "gx/ipfs/QmQjEpRiwVvtowhq69dAtB4jhioPVFXiCcWZm9Sfgn7eqc/go-unixfs/importer/trickle"
	dag "gx/ipfs/QmRiQCJZ91B7VNmLvA6sxzDuBJGSojS3uXHHVuNr3iueNZ/go-merkledag"

Steven Allen's avatar
Steven Allen committed
21
	logging "gx/ipfs/QmRREK2CAZ5Re2Bd9zZFG6FeYDppUWt5cMgsoUEp3ktgSr/go-log"
22 23 24 25 26 27 28
	files "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit/files"
	ipld "gx/ipfs/QmX5CsuHyVZeTLxgRSYkgLSDQKb9UjE8xnhQzCEJWWWFsC/go-ipld-format"
	posinfo "gx/ipfs/QmXD4grfThQ4LwVoEEfe4dgR7ukmbV9TppM5Q4SPowp7hU/go-ipfs-posinfo"
	chunker "gx/ipfs/QmXzBbJo2sLf3uwjNTeoWYiJV7CjAhkiA4twtLvwJSSNdK/go-ipfs-chunker"
	cid "gx/ipfs/QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb/go-cid"
	bstore "gx/ipfs/QmcmpX42gtDv1fz24kau4wjS9hfwWj5VexWBKgGnWzsyag/go-ipfs-blockstore"
	mfs "gx/ipfs/QmdghKsSDa2AD1kC4qYRnVYWqZecdSBRZjeXRdhMYYhafj/go-mfs"
29 30
)

Jeromy's avatar
Jeromy committed
31
var log = logging.Logger("coreunix")
32

33 34 35
// how many bytes of progress to wait before sending a progress update message
const progressReaderIncrement = 1024 * 256

36 37
var liveCacheSize = uint64(256 << 10)

38 39 40 41 42 43 44 45
type Link struct {
	Name, Hash string
	Size       uint64
}

type Object struct {
	Hash  string
	Links []Link
46
	Size  string
47 48 49 50 51 52
}

type AddedObject struct {
	Name  string
	Hash  string `json:",omitempty"`
	Bytes int64  `json:",omitempty"`
53
	Size  string `json:",omitempty"`
54 55
}

56
// NewAdder Returns a new Adder used for a file add operation.
57
func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCBlockstore, ds ipld.DAGService) (*Adder, error) {
58
	return &Adder{
59 60 61 62 63 64 65 66 67 68
		ctx:        ctx,
		pinning:    p,
		blockstore: bs,
		dagService: ds,
		Progress:   false,
		Hidden:     true,
		Pin:        true,
		Trickle:    false,
		Wrap:       false,
		Chunker:    "",
Jeromy's avatar
Jeromy committed
69
	}, nil
70 71
}

72
// Adder holds the switches passed to the `add` command.
73
type Adder struct {
74 75 76
	ctx        context.Context
	pinning    pin.Pinner
	blockstore bstore.GCBlockstore
77
	dagService ipld.DAGService
78 79 80 81 82
	Out        chan interface{}
	Progress   bool
	Hidden     bool
	Pin        bool
	Trickle    bool
83
	RawLeaves  bool
84 85
	Silent     bool
	Wrap       bool
86
	Name       string
87
	NoCopy     bool
88
	Chunker    string
89
	root       ipld.Node
90
	mroot      *mfs.Root
91
	unlocker   bstore.Unlocker
Jeromy's avatar
Jeromy committed
92
	tempRoot   *cid.Cid
93
	CidBuilder cid.Builder
94
	liveNodes  uint64
95 96 97 98 99 100 101
}

func (adder *Adder) mfsRoot() (*mfs.Root, error) {
	if adder.mroot != nil {
		return adder.mroot, nil
	}
	rnode := unixfs.EmptyDirNode()
102
	rnode.SetCidBuilder(adder.CidBuilder)
103 104 105 106 107 108
	mr, err := mfs.NewRoot(adder.ctx, adder.dagService, rnode, nil)
	if err != nil {
		return nil, err
	}
	adder.mroot = mr
	return adder.mroot, nil
109 110
}

111
// SetMfsRoot sets `r` as the root for Adder.
Jeromy's avatar
Jeromy committed
112
func (adder *Adder) SetMfsRoot(r *mfs.Root) {
113
	adder.mroot = r
Jeromy's avatar
Jeromy committed
114 115
}

116
// Constructs a node from reader's data, and adds it. Doesn't pin.
117
func (adder *Adder) add(reader io.Reader) (ipld.Node, error) {
118
	chnk, err := chunker.FromString(reader, adder.Chunker)
119 120 121
	if err != nil {
		return nil, err
	}
122

123
	params := ihelper.DagBuilderParams{
124 125 126 127
		Dagserv:    adder.dagService,
		RawLeaves:  adder.RawLeaves,
		Maxlinks:   ihelper.DefaultLinksPerBlock,
		NoCopy:     adder.NoCopy,
128
		CidBuilder: adder.CidBuilder,
129
	}
130

Jeromy's avatar
Jeromy committed
131
	if adder.Trickle {
132
		return trickle.Layout(params.New(chnk))
133 134
	}

135
	return balanced.Layout(params.New(chnk))
136 137
}

138
// RootNode returns the root node of the Added.
139
func (adder *Adder) RootNode() (ipld.Node, error) {
Jeromy's avatar
Jeromy committed
140
	// for memoizing
Jeromy's avatar
Jeromy committed
141 142
	if adder.root != nil {
		return adder.root, nil
Jeromy's avatar
Jeromy committed
143
	}
144

145 146 147 148
	mr, err := adder.mfsRoot()
	if err != nil {
		return nil, err
	}
149
	root, err := mr.GetDirectory().GetNode()
Jeromy's avatar
Jeromy committed
150 151 152
	if err != nil {
		return nil, err
	}
153

Jeromy's avatar
Jeromy committed
154
	// if not wrapping, AND one root file, use that hash as root.
155 156
	if !adder.Wrap && len(root.Links()) == 1 {
		nd, err := root.Links()[0].GetNode(adder.ctx, adder.dagService)
Jeromy's avatar
Jeromy committed
157 158
		if err != nil {
			return nil, err
Jeromy's avatar
Jeromy committed
159
		}
160

Jeromy's avatar
Jeromy committed
161
		root = nd
Jeromy's avatar
Jeromy committed
162
	}
Jeromy's avatar
Jeromy committed
163

Jeromy's avatar
Jeromy committed
164
	adder.root = root
Jeromy's avatar
Jeromy committed
165
	return root, err
166 167
}

168 169
// Recursively pins the root node of Adder and
// writes the pin state to the backing datastore.
Jeromy's avatar
Jeromy committed
170 171
func (adder *Adder) PinRoot() error {
	root, err := adder.RootNode()
172 173 174
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
175
	if !adder.Pin {
176 177
		return nil
	}
178

179 180 181
	rnk := root.Cid()

	err = adder.dagService.Add(adder.ctx, root)
182 183 184 185
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
186
	if adder.tempRoot != nil {
187
		err := adder.pinning.Unpin(adder.ctx, adder.tempRoot, true)
Jeromy's avatar
Jeromy committed
188 189 190
		if err != nil {
			return err
		}
Jeromy's avatar
Jeromy committed
191
		adder.tempRoot = rnk
Jeromy's avatar
Jeromy committed
192 193
	}

194 195
	adder.pinning.PinWithMode(rnk, pin.Recursive)
	return adder.pinning.Flush()
196 197
}

198
// Finalize flushes the mfs root directory and returns the mfs root node.
199
func (adder *Adder) Finalize() (ipld.Node, error) {
200 201 202 203
	mr, err := adder.mfsRoot()
	if err != nil {
		return nil, err
	}
204
	var root mfs.FSNode
205 206
	rootdir := mr.GetDirectory()
	root = rootdir
Stephen Whitmore's avatar
Stephen Whitmore committed
207

208
	err = root.Flush()
Jeromy's avatar
Jeromy committed
209 210 211 212 213
	if err != nil {
		return nil, err
	}

	var name string
Jeromy's avatar
Jeromy committed
214
	if !adder.Wrap {
215
		children, err := rootdir.ListNames(adder.ctx)
Jeromy's avatar
Jeromy committed
216 217 218
		if err != nil {
			return nil, err
		}
Quantomic's avatar
Quantomic committed
219 220 221 222 223

		if len(children) == 0 {
			return nil, fmt.Errorf("expected at least one child dir, got none")
		}

224
		// Replace root with the first child
Jeromy's avatar
Jeromy committed
225
		name = children[0]
226
		root, err = rootdir.Child(name)
Jeromy's avatar
Jeromy committed
227 228 229 230 231
		if err != nil {
			return nil, err
		}
	}

Jeromy's avatar
Jeromy committed
232
	err = adder.outputDirs(name, root)
Jeromy's avatar
Jeromy committed
233 234 235 236
	if err != nil {
		return nil, err
	}

237
	err = mr.Close()
238 239
	if err != nil {
		return nil, err
Stephen Whitmore's avatar
Stephen Whitmore committed
240 241
	}

Stephen Whitmore's avatar
Stephen Whitmore committed
242
	return root.GetNode()
Jeromy's avatar
Jeromy committed
243 244
}

Jeromy's avatar
Jeromy committed
245 246 247
func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {
	switch fsn := fsn.(type) {
	case *mfs.File:
Jeromy's avatar
Jeromy committed
248
		return nil
Jeromy's avatar
Jeromy committed
249
	case *mfs.Directory:
Jeromy's avatar
Jeromy committed
250
		names, err := fsn.ListNames(adder.ctx)
251 252 253 254 255
		if err != nil {
			return err
		}

		for _, name := range names {
Jeromy's avatar
Jeromy committed
256 257 258 259 260 261 262 263 264 265
			child, err := fsn.Child(name)
			if err != nil {
				return err
			}

			childpath := gopath.Join(path, name)
			err = adder.outputDirs(childpath, child)
			if err != nil {
				return err
			}
266 267

			fsn.Uncache(name)
Jeromy's avatar
Jeromy committed
268
		}
Jeromy's avatar
Jeromy committed
269
		nd, err := fsn.GetNode()
Jeromy's avatar
Jeromy committed
270 271
		if err != nil {
			return err
Jeromy's avatar
Jeromy committed
272
		}
Jeromy's avatar
Jeromy committed
273

Jeromy's avatar
Jeromy committed
274 275 276 277
		return outputDagnode(adder.Out, path, nd)
	default:
		return fmt.Errorf("unrecognized fsn type: %#v", fsn)
	}
278 279
}

280 281 282
// Add builds a merkledag node from a reader, adds it to the blockstore,
// and returns the key representing that node.
// If you want to pin it, use NewAdder() and Adder.PinRoot().
283
func Add(n *core.IpfsNode, r io.Reader) (string, error) {
Lars Gierth's avatar
Lars Gierth committed
284 285 286
	return AddWithContext(n.Context(), n, r)
}

287
// AddWithContext does the same as Add, but with a custom context.
Lars Gierth's avatar
Lars Gierth committed
288
func AddWithContext(ctx context.Context, n *core.IpfsNode, r io.Reader) (string, error) {
289
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
290

291
	fileAdder, err := NewAdder(ctx, n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
292 293 294
	if err != nil {
		return "", err
	}
295

296
	node, err := fileAdder.add(r)
297 298 299
	if err != nil {
		return "", err
	}
300

Jeromy's avatar
Jeromy committed
301
	return node.Cid().String(), nil
302
}
303 304 305

// AddR recursively adds files in |path|.
func AddR(n *core.IpfsNode, root string) (key string, err error) {
306
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
307

308
	stat, err := os.Lstat(root)
309 310 311
	if err != nil {
		return "", err
	}
312

camelmasa's avatar
camelmasa committed
313
	f, err := files.NewSerialFile(filepath.Base(root), root, false, stat)
314 315 316
	if err != nil {
		return "", err
	}
317
	defer f.Close()
318

319
	fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
320 321 322
	if err != nil {
		return "", err
	}
323

Jeromy's avatar
Jeromy committed
324
	err = fileAdder.addFile(f)
325 326 327
	if err != nil {
		return "", err
	}
328

329
	nd, err := fileAdder.Finalize()
Jeromy's avatar
Jeromy committed
330 331 332 333
	if err != nil {
		return "", err
	}

Jeromy's avatar
Jeromy committed
334
	return nd.String(), nil
335 336
}

337 338
// AddWrapped adds data from a reader, and wraps it with a directory object
// to preserve the filename.
339 340
// Returns the path of the added file ("<dir hash>/filename"), the DAG node of
// the directory, and and error if any.
341
func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, ipld.Node, error) {
342
	file := files.NewReaderFile(filename, filename, ioutil.NopCloser(r), nil)
343
	fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
344 345 346
	if err != nil {
		return "", nil, err
	}
Jeromy's avatar
Jeromy committed
347
	fileAdder.Wrap = true
Jeromy's avatar
Jeromy committed
348

349
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
350

Jeromy's avatar
Jeromy committed
351
	err = fileAdder.addFile(file)
Jeromy's avatar
Jeromy committed
352 353 354 355
	if err != nil {
		return "", nil, err
	}

356
	dagnode, err := fileAdder.Finalize()
357 358 359
	if err != nil {
		return "", nil, err
	}
Jeromy's avatar
Jeromy committed
360

Jeromy's avatar
Jeromy committed
361 362
	c := dagnode.Cid()
	return gopath.Join(c.String(), filename), dagnode, nil
363 364
}

365
func (adder *Adder) addNode(node ipld.Node, path string) error {
366 367
	// patch it into the root
	if path == "" {
Jeromy's avatar
Jeromy committed
368
		path = node.Cid().String()
369
	}
370

371 372 373 374
	if pi, ok := node.(*posinfo.FilestoreNode); ok {
		node = pi.Node
	}

375 376 377 378
	mr, err := adder.mfsRoot()
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
379 380
	dir := gopath.Dir(path)
	if dir != "." {
381
		opts := mfs.MkdirOpts{
382 383 384
			Mkparents:  true,
			Flush:      false,
			CidBuilder: adder.CidBuilder,
385 386
		}
		if err := mfs.Mkdir(mr, dir, opts); err != nil {
Jeromy's avatar
Jeromy committed
387 388 389 390
			return err
		}
	}

391
	if err := mfs.PutNode(mr, path, node); err != nil {
392 393
		return err
	}
394

Jeromy's avatar
Jeromy committed
395
	if !adder.Silent {
396
		return outputDagnode(adder.Out, path, node)
Jeromy's avatar
Jeromy committed
397 398
	}
	return nil
399 400
}

401
// AddFile adds the given file while respecting the adder.
Jeromy's avatar
Jeromy committed
402
func (adder *Adder) AddFile(file files.File) error {
403 404 405
	if adder.Pin {
		adder.unlocker = adder.blockstore.PinLock()
	}
406
	defer func() {
407 408 409
		if adder.unlocker != nil {
			adder.unlocker.Unlock()
		}
410
	}()
Jeromy's avatar
Jeromy committed
411

Jeromy's avatar
Jeromy committed
412
	return adder.addFile(file)
Jeromy's avatar
Jeromy committed
413 414 415 416 417 418 419 420
}

func (adder *Adder) addFile(file files.File) error {
	err := adder.maybePauseForGC()
	if err != nil {
		return err
	}

421 422 423 424 425 426
	if adder.liveNodes >= liveCacheSize {
		// TODO: A smarter cache that uses some sort of lru cache with an eviction handler
		mr, err := adder.mfsRoot()
		if err != nil {
			return err
		}
Jeromy's avatar
Jeromy committed
427
		if err := mr.FlushMemFree(adder.ctx); err != nil {
428 429
			return err
		}
Jeromy's avatar
Jeromy committed
430

431 432 433 434
		adder.liveNodes = 0
	}
	adder.liveNodes++

435
	if file.IsDirectory() {
Jeromy's avatar
Jeromy committed
436
		return adder.addDir(file)
437 438
	}

439 440 441 442
	// case for symlink
	if s, ok := file.(*files.Symlink); ok {
		sdata, err := unixfs.SymlinkData(s.Target)
		if err != nil {
Jeromy's avatar
Jeromy committed
443
			return err
444
		}
445

446
		dagnode := dag.NodeWithData(sdata)
447
		dagnode.SetCidBuilder(adder.CidBuilder)
448
		err = adder.dagService.Add(adder.ctx, dagnode)
449
		if err != nil {
Jeromy's avatar
Jeromy committed
450
			return err
451 452
		}

Jeromy's avatar
Jeromy committed
453
		return adder.addNode(dagnode, s.FileName())
454 455 456 457 458 459
	}

	// case for regular file
	// if the progress flag was specified, wrap the file so that we can send
	// progress updates to the client (over the output channel)
	var reader io.Reader = file
Jeromy's avatar
Jeromy committed
460
	if adder.Progress {
461 462 463 464 465 466
		rdr := &progressReader{file: file, out: adder.Out}
		if fi, ok := file.(files.FileInfo); ok {
			reader = &progressReader2{rdr, fi}
		} else {
			reader = rdr
		}
467 468
	}

Jeromy's avatar
Jeromy committed
469
	dagnode, err := adder.add(reader)
470
	if err != nil {
Jeromy's avatar
Jeromy committed
471
		return err
472 473
	}

474
	addFileName := file.FileName()
475
	if (file.FullPath() == "/dev/stdin" || file.FullPath() == "") && adder.Name != "" {
476 477
		addFileName = adder.Name
		adder.Name = ""
478
	}
479
	// patch it into the root
480
	return adder.addNode(dagnode, addFileName)
481 482
}

Jeromy's avatar
Jeromy committed
483
func (adder *Adder) addDir(dir files.File) error {
484
	log.Infof("adding directory: %s", dir.FileName())
485

486 487 488 489
	mr, err := adder.mfsRoot()
	if err != nil {
		return err
	}
490
	err = mfs.Mkdir(mr, dir.FileName(), mfs.MkdirOpts{
491 492 493
		Mkparents:  true,
		Flush:      false,
		CidBuilder: adder.CidBuilder,
494
	})
Jeromy's avatar
Jeromy committed
495 496 497 498
	if err != nil {
		return err
	}

499 500
	for {
		file, err := dir.NextFile()
501
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
502
			return err
503 504 505
		}
		if file == nil {
			break
506 507
		}

508 509 510
		// Skip hidden files when adding recursively, unless Hidden is enabled.
		if files.IsHidden(file) && !adder.Hidden {
			log.Infof("%s is hidden, skipping", file.FileName())
511
			continue
512 513 514
		}
		err = adder.addFile(file)
		if err != nil {
Jeromy's avatar
Jeromy committed
515
			return err
516 517 518
		}
	}

Jeromy's avatar
Jeromy committed
519
	return nil
520
}
521

Jeromy's avatar
Jeromy committed
522
func (adder *Adder) maybePauseForGC() error {
523
	if adder.unlocker != nil && adder.blockstore.GCRequested() {
Jeromy's avatar
Jeromy committed
524 525 526 527 528
		err := adder.PinRoot()
		if err != nil {
			return err
		}

529
		adder.unlocker.Unlock()
530
		adder.unlocker = adder.blockstore.PinLock()
Jeromy's avatar
Jeromy committed
531 532 533 534
	}
	return nil
}

535
// outputDagnode sends dagnode info over the output channel
536
func outputDagnode(out chan interface{}, name string, dn ipld.Node) error {
537 538 539 540 541 542 543 544 545 546 547 548
	if out == nil {
		return nil
	}

	o, err := getOutput(dn)
	if err != nil {
		return err
	}

	out <- &AddedObject{
		Hash: o.Hash,
		Name: name,
549
		Size: o.Size,
550 551 552 553 554 555
	}

	return nil
}

// from core/commands/object.go
556
func getOutput(dagnode ipld.Node) (*Object, error) {
Jeromy's avatar
Jeromy committed
557
	c := dagnode.Cid()
558 559 560 561
	s, err := dagnode.Size()
	if err != nil {
		return nil, err
	}
562 563

	output := &Object{
Jeromy's avatar
Jeromy committed
564
		Hash:  c.String(),
565
		Size:  strconv.FormatUint(s, 10),
566
		Links: make([]Link, len(dagnode.Links())),
567 568
	}

569
	for i, link := range dagnode.Links() {
570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599
		output.Links[i] = Link{
			Name: link.Name,
			Size: link.Size,
		}
	}

	return output, nil
}

type progressReader struct {
	file         files.File
	out          chan interface{}
	bytes        int64
	lastProgress int64
}

func (i *progressReader) Read(p []byte) (int, error) {
	n, err := i.file.Read(p)

	i.bytes += int64(n)
	if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
		i.lastProgress = i.bytes
		i.out <- &AddedObject{
			Name:  i.file.FileName(),
			Bytes: i.bytes,
		}
	}

	return n, err
}
600 601 602 603 604

type progressReader2 struct {
	*progressReader
	files.FileInfo
}