add.go 12.4 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1
package coreunix
2 3

import (
4
	"context"
5
	"fmt"
6
	"io"
7
	"io/ioutil"
8
	"os"
9
	gopath "path"
10
	"strconv"
Jeromy's avatar
Jeromy committed
11

Jeromy's avatar
Jeromy committed
12
	core "github.com/ipfs/go-ipfs/core"
13 14 15
	balanced "github.com/ipfs/go-ipfs/importer/balanced"
	ihelper "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
Jeromy's avatar
Jeromy committed
16
	dag "github.com/ipfs/go-ipfs/merkledag"
Jeromy's avatar
Jeromy committed
17
	mfs "github.com/ipfs/go-ipfs/mfs"
18
	"github.com/ipfs/go-ipfs/pin"
Jeromy's avatar
Jeromy committed
19 20
	unixfs "github.com/ipfs/go-ipfs/unixfs"

21 22
	posinfo "gx/ipfs/QmUWsXLvYYDAaoAt9TPZpFX4ffHHMg46AHrz1ZLTN5ABbe/go-ipfs-posinfo"
	ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format"
Steven Allen's avatar
Steven Allen committed
23
	chunker "gx/ipfs/QmXnzH7wowyLZy8XJxxaQCVTgLMcDXdMBznmsrmQWCyiQV/go-ipfs-chunker"
24
	cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
Steven Allen's avatar
Steven Allen committed
25
	logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log"
26
	files "gx/ipfs/QmdE4gMduCKCGAcczM2F5ioYDfdeKuPix138wrES1YSr7f/go-ipfs-cmdkit/files"
Steven Allen's avatar
Steven Allen committed
27
	bstore "gx/ipfs/QmdpuJBPBZ6sLPj9BQpn3Rpi38BT2cF1QMiUfyzNWeySW4/go-ipfs-blockstore"
28 29
)

Jeromy's avatar
Jeromy committed
30
var log = logging.Logger("coreunix")
31

32 33 34
// how many bytes of progress to wait before sending a progress update message
const progressReaderIncrement = 1024 * 256

35 36
var liveCacheSize = uint64(256 << 10)

37 38 39 40 41 42 43 44
type Link struct {
	Name, Hash string
	Size       uint64
}

type Object struct {
	Hash  string
	Links []Link
45
	Size  string
46 47 48 49 50 51
}

type AddedObject struct {
	Name  string
	Hash  string `json:",omitempty"`
	Bytes int64  `json:",omitempty"`
52
	Size  string `json:",omitempty"`
53 54
}

55
// NewAdder Returns a new Adder used for a file add operation.
56
func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCBlockstore, ds ipld.DAGService) (*Adder, error) {
57
	return &Adder{
58 59 60 61 62 63 64 65 66 67
		ctx:        ctx,
		pinning:    p,
		blockstore: bs,
		dagService: ds,
		Progress:   false,
		Hidden:     true,
		Pin:        true,
		Trickle:    false,
		Wrap:       false,
		Chunker:    "",
Jeromy's avatar
Jeromy committed
68
	}, nil
69 70
}

71
// Adder holds the switches passed to the `add` command.
72
type Adder struct {
73 74 75
	ctx        context.Context
	pinning    pin.Pinner
	blockstore bstore.GCBlockstore
76
	dagService ipld.DAGService
77 78 79 80 81
	Out        chan interface{}
	Progress   bool
	Hidden     bool
	Pin        bool
	Trickle    bool
82
	RawLeaves  bool
83 84
	Silent     bool
	Wrap       bool
85
	NoCopy     bool
86
	Chunker    string
87
	root       ipld.Node
88
	mroot      *mfs.Root
89
	unlocker   bstore.Unlocker
Jeromy's avatar
Jeromy committed
90
	tempRoot   *cid.Cid
91
	Prefix     *cid.Prefix
92
	liveNodes  uint64
93 94 95 96 97 98 99 100 101 102 103 104 105 106
}

func (adder *Adder) mfsRoot() (*mfs.Root, error) {
	if adder.mroot != nil {
		return adder.mroot, nil
	}
	rnode := unixfs.EmptyDirNode()
	rnode.SetPrefix(adder.Prefix)
	mr, err := mfs.NewRoot(adder.ctx, adder.dagService, rnode, nil)
	if err != nil {
		return nil, err
	}
	adder.mroot = mr
	return adder.mroot, nil
107 108
}

109
// SetMfsRoot sets `r` as the root for Adder.
Jeromy's avatar
Jeromy committed
110
func (adder *Adder) SetMfsRoot(r *mfs.Root) {
111
	adder.mroot = r
Jeromy's avatar
Jeromy committed
112 113
}

114
// Constructs a node from reader's data, and adds it. Doesn't pin.
115
func (adder *Adder) add(reader io.Reader) (ipld.Node, error) {
116
	chnk, err := chunker.FromString(reader, adder.Chunker)
117 118 119
	if err != nil {
		return nil, err
	}
120

121 122 123 124
	params := ihelper.DagBuilderParams{
		Dagserv:   adder.dagService,
		RawLeaves: adder.RawLeaves,
		Maxlinks:  ihelper.DefaultLinksPerBlock,
125
		NoCopy:    adder.NoCopy,
126
		Prefix:    adder.Prefix,
127
	}
128

Jeromy's avatar
Jeromy committed
129
	if adder.Trickle {
130
		return trickle.Layout(params.New(chnk))
131 132
	}

133
	return balanced.Layout(params.New(chnk))
134 135
}

136
// RootNode returns the root node of the Added.
137
func (adder *Adder) RootNode() (ipld.Node, error) {
Jeromy's avatar
Jeromy committed
138
	// for memoizing
Jeromy's avatar
Jeromy committed
139 140
	if adder.root != nil {
		return adder.root, nil
Jeromy's avatar
Jeromy committed
141
	}
142

143 144 145 146 147
	mr, err := adder.mfsRoot()
	if err != nil {
		return nil, err
	}
	root, err := mr.GetValue().GetNode()
Jeromy's avatar
Jeromy committed
148 149 150
	if err != nil {
		return nil, err
	}
151

Jeromy's avatar
Jeromy committed
152
	// if not wrapping, AND one root file, use that hash as root.
153 154
	if !adder.Wrap && len(root.Links()) == 1 {
		nd, err := root.Links()[0].GetNode(adder.ctx, adder.dagService)
Jeromy's avatar
Jeromy committed
155 156
		if err != nil {
			return nil, err
Jeromy's avatar
Jeromy committed
157
		}
158

Jeromy's avatar
Jeromy committed
159
		root = nd
Jeromy's avatar
Jeromy committed
160
	}
Jeromy's avatar
Jeromy committed
161

Jeromy's avatar
Jeromy committed
162
	adder.root = root
Jeromy's avatar
Jeromy committed
163
	return root, err
164 165
}

166 167
// Recursively pins the root node of Adder and
// writes the pin state to the backing datastore.
Jeromy's avatar
Jeromy committed
168 169
func (adder *Adder) PinRoot() error {
	root, err := adder.RootNode()
170 171 172
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
173
	if !adder.Pin {
174 175
		return nil
	}
176

177 178 179
	rnk := root.Cid()

	err = adder.dagService.Add(adder.ctx, root)
180 181 182 183
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
184
	if adder.tempRoot != nil {
185
		err := adder.pinning.Unpin(adder.ctx, adder.tempRoot, true)
Jeromy's avatar
Jeromy committed
186 187 188
		if err != nil {
			return err
		}
Jeromy's avatar
Jeromy committed
189
		adder.tempRoot = rnk
Jeromy's avatar
Jeromy committed
190 191
	}

192 193
	adder.pinning.PinWithMode(rnk, pin.Recursive)
	return adder.pinning.Flush()
194 195
}

196
// Finalize flushes the mfs root directory and returns the mfs root node.
197
func (adder *Adder) Finalize() (ipld.Node, error) {
198 199 200 201 202
	mr, err := adder.mfsRoot()
	if err != nil {
		return nil, err
	}
	root := mr.GetValue()
Stephen Whitmore's avatar
Stephen Whitmore committed
203

204
	err = root.Flush()
Jeromy's avatar
Jeromy committed
205 206 207 208 209
	if err != nil {
		return nil, err
	}

	var name string
Jeromy's avatar
Jeromy committed
210
	if !adder.Wrap {
Jeromy's avatar
Jeromy committed
211
		children, err := root.(*mfs.Directory).ListNames(adder.ctx)
Jeromy's avatar
Jeromy committed
212 213 214
		if err != nil {
			return nil, err
		}
Quantomic's avatar
Quantomic committed
215 216 217 218 219

		if len(children) == 0 {
			return nil, fmt.Errorf("expected at least one child dir, got none")
		}

Jeromy's avatar
Jeromy committed
220
		name = children[0]
Stephen Whitmore's avatar
Stephen Whitmore committed
221

222 223 224 225 226 227
		mr, err := adder.mfsRoot()
		if err != nil {
			return nil, err
		}

		dir, ok := mr.GetValue().(*mfs.Directory)
228 229 230 231 232
		if !ok {
			return nil, fmt.Errorf("root is not a directory")
		}

		root, err = dir.Child(name)
Jeromy's avatar
Jeromy committed
233 234 235 236 237
		if err != nil {
			return nil, err
		}
	}

Jeromy's avatar
Jeromy committed
238
	err = adder.outputDirs(name, root)
Jeromy's avatar
Jeromy committed
239 240 241 242
	if err != nil {
		return nil, err
	}

243
	err = mr.Close()
244 245
	if err != nil {
		return nil, err
Stephen Whitmore's avatar
Stephen Whitmore committed
246 247
	}

Stephen Whitmore's avatar
Stephen Whitmore committed
248
	return root.GetNode()
Jeromy's avatar
Jeromy committed
249 250
}

Jeromy's avatar
Jeromy committed
251 252 253
func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {
	switch fsn := fsn.(type) {
	case *mfs.File:
Jeromy's avatar
Jeromy committed
254
		return nil
Jeromy's avatar
Jeromy committed
255
	case *mfs.Directory:
Jeromy's avatar
Jeromy committed
256
		names, err := fsn.ListNames(adder.ctx)
257 258 259 260 261
		if err != nil {
			return err
		}

		for _, name := range names {
Jeromy's avatar
Jeromy committed
262 263 264 265 266 267 268 269 270 271
			child, err := fsn.Child(name)
			if err != nil {
				return err
			}

			childpath := gopath.Join(path, name)
			err = adder.outputDirs(childpath, child)
			if err != nil {
				return err
			}
272 273

			fsn.Uncache(name)
Jeromy's avatar
Jeromy committed
274
		}
Jeromy's avatar
Jeromy committed
275
		nd, err := fsn.GetNode()
Jeromy's avatar
Jeromy committed
276 277
		if err != nil {
			return err
Jeromy's avatar
Jeromy committed
278
		}
Jeromy's avatar
Jeromy committed
279

Jeromy's avatar
Jeromy committed
280 281 282 283
		return outputDagnode(adder.Out, path, nd)
	default:
		return fmt.Errorf("unrecognized fsn type: %#v", fsn)
	}
284 285
}

286 287 288
// Add builds a merkledag node from a reader, adds it to the blockstore,
// and returns the key representing that node.
// If you want to pin it, use NewAdder() and Adder.PinRoot().
289
func Add(n *core.IpfsNode, r io.Reader) (string, error) {
Lars Gierth's avatar
Lars Gierth committed
290 291 292
	return AddWithContext(n.Context(), n, r)
}

293
// AddWithContext does the same as Add, but with a custom context.
Lars Gierth's avatar
Lars Gierth committed
294
func AddWithContext(ctx context.Context, n *core.IpfsNode, r io.Reader) (string, error) {
295
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
296

297
	fileAdder, err := NewAdder(ctx, n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
298 299 300
	if err != nil {
		return "", err
	}
301

302
	node, err := fileAdder.add(r)
303 304 305
	if err != nil {
		return "", err
	}
306

Jeromy's avatar
Jeromy committed
307
	return node.Cid().String(), nil
308
}
309 310 311

// AddR recursively adds files in |path|.
func AddR(n *core.IpfsNode, root string) (key string, err error) {
312
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
313

314
	stat, err := os.Lstat(root)
315 316 317
	if err != nil {
		return "", err
	}
318

Jeromy's avatar
Jeromy committed
319
	f, err := files.NewSerialFile(root, root, false, stat)
320 321 322
	if err != nil {
		return "", err
	}
323
	defer f.Close()
324

325
	fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
326 327 328
	if err != nil {
		return "", err
	}
329

Jeromy's avatar
Jeromy committed
330
	err = fileAdder.addFile(f)
331 332 333
	if err != nil {
		return "", err
	}
334

335
	nd, err := fileAdder.Finalize()
Jeromy's avatar
Jeromy committed
336 337 338 339
	if err != nil {
		return "", err
	}

Jeromy's avatar
Jeromy committed
340
	return nd.String(), nil
341 342
}

343 344
// AddWrapped adds data from a reader, and wraps it with a directory object
// to preserve the filename.
345 346
// Returns the path of the added file ("<dir hash>/filename"), the DAG node of
// the directory, and and error if any.
347
func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, ipld.Node, error) {
348
	file := files.NewReaderFile(filename, filename, ioutil.NopCloser(r), nil)
349
	fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
350 351 352
	if err != nil {
		return "", nil, err
	}
Jeromy's avatar
Jeromy committed
353
	fileAdder.Wrap = true
Jeromy's avatar
Jeromy committed
354

355
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
356

Jeromy's avatar
Jeromy committed
357
	err = fileAdder.addFile(file)
Jeromy's avatar
Jeromy committed
358 359 360 361
	if err != nil {
		return "", nil, err
	}

362
	dagnode, err := fileAdder.Finalize()
363 364 365
	if err != nil {
		return "", nil, err
	}
Jeromy's avatar
Jeromy committed
366

Jeromy's avatar
Jeromy committed
367 368
	c := dagnode.Cid()
	return gopath.Join(c.String(), filename), dagnode, nil
369 370
}

371
func (adder *Adder) addNode(node ipld.Node, path string) error {
372 373
	// patch it into the root
	if path == "" {
Jeromy's avatar
Jeromy committed
374
		path = node.Cid().String()
375
	}
376

377 378 379 380
	if pi, ok := node.(*posinfo.FilestoreNode); ok {
		node = pi.Node
	}

381 382 383 384
	mr, err := adder.mfsRoot()
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
385 386
	dir := gopath.Dir(path)
	if dir != "." {
387 388 389 390 391 392
		opts := mfs.MkdirOpts{
			Mkparents: true,
			Flush:     false,
			Prefix:    adder.Prefix,
		}
		if err := mfs.Mkdir(mr, dir, opts); err != nil {
Jeromy's avatar
Jeromy committed
393 394 395 396
			return err
		}
	}

397
	if err := mfs.PutNode(mr, path, node); err != nil {
398 399
		return err
	}
400

Jeromy's avatar
Jeromy committed
401
	if !adder.Silent {
402
		return outputDagnode(adder.Out, path, node)
Jeromy's avatar
Jeromy committed
403 404
	}
	return nil
405 406
}

407
// AddFile adds the given file while respecting the adder.
Jeromy's avatar
Jeromy committed
408
func (adder *Adder) AddFile(file files.File) error {
409 410 411
	if adder.Pin {
		adder.unlocker = adder.blockstore.PinLock()
	}
412
	defer func() {
413 414 415
		if adder.unlocker != nil {
			adder.unlocker.Unlock()
		}
416
	}()
Jeromy's avatar
Jeromy committed
417

Jeromy's avatar
Jeromy committed
418
	return adder.addFile(file)
Jeromy's avatar
Jeromy committed
419 420 421 422 423 424 425 426
}

func (adder *Adder) addFile(file files.File) error {
	err := adder.maybePauseForGC()
	if err != nil {
		return err
	}

427 428 429 430 431 432
	if adder.liveNodes >= liveCacheSize {
		// TODO: A smarter cache that uses some sort of lru cache with an eviction handler
		mr, err := adder.mfsRoot()
		if err != nil {
			return err
		}
Jeromy's avatar
Jeromy committed
433
		if err := mr.FlushMemFree(adder.ctx); err != nil {
434 435
			return err
		}
Jeromy's avatar
Jeromy committed
436

437 438 439 440
		adder.liveNodes = 0
	}
	adder.liveNodes++

441
	if file.IsDirectory() {
Jeromy's avatar
Jeromy committed
442
		return adder.addDir(file)
443 444
	}

445 446 447 448
	// case for symlink
	if s, ok := file.(*files.Symlink); ok {
		sdata, err := unixfs.SymlinkData(s.Target)
		if err != nil {
Jeromy's avatar
Jeromy committed
449
			return err
450
		}
451

452
		dagnode := dag.NodeWithData(sdata)
453
		dagnode.SetPrefix(adder.Prefix)
454
		err = adder.dagService.Add(adder.ctx, dagnode)
455
		if err != nil {
Jeromy's avatar
Jeromy committed
456
			return err
457 458
		}

Jeromy's avatar
Jeromy committed
459
		return adder.addNode(dagnode, s.FileName())
460 461 462 463 464 465
	}

	// case for regular file
	// if the progress flag was specified, wrap the file so that we can send
	// progress updates to the client (over the output channel)
	var reader io.Reader = file
Jeromy's avatar
Jeromy committed
466
	if adder.Progress {
467 468 469 470 471 472
		rdr := &progressReader{file: file, out: adder.Out}
		if fi, ok := file.(files.FileInfo); ok {
			reader = &progressReader2{rdr, fi}
		} else {
			reader = rdr
		}
473 474
	}

Jeromy's avatar
Jeromy committed
475
	dagnode, err := adder.add(reader)
476
	if err != nil {
Jeromy's avatar
Jeromy committed
477
		return err
478 479 480
	}

	// patch it into the root
Jeromy's avatar
Jeromy committed
481
	return adder.addNode(dagnode, file.FileName())
482 483
}

Jeromy's avatar
Jeromy committed
484
func (adder *Adder) addDir(dir files.File) error {
485
	log.Infof("adding directory: %s", dir.FileName())
486

487 488 489 490
	mr, err := adder.mfsRoot()
	if err != nil {
		return err
	}
491 492 493 494 495
	err = mfs.Mkdir(mr, dir.FileName(), mfs.MkdirOpts{
		Mkparents: true,
		Flush:     false,
		Prefix:    adder.Prefix,
	})
Jeromy's avatar
Jeromy committed
496 497 498 499
	if err != nil {
		return err
	}

500 501
	for {
		file, err := dir.NextFile()
502
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
503
			return err
504 505 506
		}
		if file == nil {
			break
507 508
		}

509 510 511
		// Skip hidden files when adding recursively, unless Hidden is enabled.
		if files.IsHidden(file) && !adder.Hidden {
			log.Infof("%s is hidden, skipping", file.FileName())
512
			continue
513 514 515
		}
		err = adder.addFile(file)
		if err != nil {
Jeromy's avatar
Jeromy committed
516
			return err
517 518 519
		}
	}

Jeromy's avatar
Jeromy committed
520
	return nil
521
}
522

Jeromy's avatar
Jeromy committed
523
func (adder *Adder) maybePauseForGC() error {
524
	if adder.unlocker != nil && adder.blockstore.GCRequested() {
Jeromy's avatar
Jeromy committed
525 526 527 528 529
		err := adder.PinRoot()
		if err != nil {
			return err
		}

530
		adder.unlocker.Unlock()
531
		adder.unlocker = adder.blockstore.PinLock()
Jeromy's avatar
Jeromy committed
532 533 534 535
	}
	return nil
}

536
// outputDagnode sends dagnode info over the output channel
537
func outputDagnode(out chan interface{}, name string, dn ipld.Node) error {
538 539 540 541 542 543 544 545 546 547 548 549
	if out == nil {
		return nil
	}

	o, err := getOutput(dn)
	if err != nil {
		return err
	}

	out <- &AddedObject{
		Hash: o.Hash,
		Name: name,
550
		Size: o.Size,
551 552 553 554 555 556
	}

	return nil
}

// from core/commands/object.go
557
func getOutput(dagnode ipld.Node) (*Object, error) {
Jeromy's avatar
Jeromy committed
558
	c := dagnode.Cid()
559 560 561 562
	s, err := dagnode.Size()
	if err != nil {
		return nil, err
	}
563 564

	output := &Object{
Jeromy's avatar
Jeromy committed
565
		Hash:  c.String(),
566
		Size:  strconv.FormatUint(s, 10),
567
		Links: make([]Link, len(dagnode.Links())),
568 569
	}

570
	for i, link := range dagnode.Links() {
571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600
		output.Links[i] = Link{
			Name: link.Name,
			Size: link.Size,
		}
	}

	return output, nil
}

type progressReader struct {
	file         files.File
	out          chan interface{}
	bytes        int64
	lastProgress int64
}

func (i *progressReader) Read(p []byte) (int, error) {
	n, err := i.file.Read(p)

	i.bytes += int64(n)
	if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
		i.lastProgress = i.bytes
		i.out <- &AddedObject{
			Name:  i.file.FileName(),
			Bytes: i.bytes,
		}
	}

	return n, err
}
601 602 603 604 605

type progressReader2 struct {
	*progressReader
	files.FileInfo
}