add.go 13.2 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1
package coreunix
2 3

import (
4
	"context"
5
	"fmt"
6
	"io"
7
	"io/ioutil"
8
	"os"
9
	gopath "path"
10
	"strconv"
Jeromy's avatar
Jeromy committed
11

Jeromy's avatar
Jeromy committed
12
	bs "github.com/ipfs/go-ipfs/blocks/blockstore"
13 14
	bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
	bserv "github.com/ipfs/go-ipfs/blockservice"
Jeromy's avatar
Jeromy committed
15
	core "github.com/ipfs/go-ipfs/core"
16
	"github.com/ipfs/go-ipfs/exchange/offline"
17
	balanced "github.com/ipfs/go-ipfs/importer/balanced"
18
	"github.com/ipfs/go-ipfs/importer/chunk"
19 20
	ihelper "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
Jeromy's avatar
Jeromy committed
21
	dag "github.com/ipfs/go-ipfs/merkledag"
Jeromy's avatar
Jeromy committed
22
	mfs "github.com/ipfs/go-ipfs/mfs"
23
	"github.com/ipfs/go-ipfs/pin"
24
	posinfo "github.com/ipfs/go-ipfs/thirdparty/posinfo"
Jeromy's avatar
Jeromy committed
25 26
	unixfs "github.com/ipfs/go-ipfs/unixfs"

Steven Allen's avatar
Steven Allen committed
27 28
	ds "gx/ipfs/QmPpegoMqhAEqjncrzArm7KVWAkCm78rqL2DPuNjhPrshg/go-datastore"
	syncds "gx/ipfs/QmPpegoMqhAEqjncrzArm7KVWAkCm78rqL2DPuNjhPrshg/go-datastore/sync"
Jeromy's avatar
Jeromy committed
29
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
Steven Allen's avatar
Steven Allen committed
30
	cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
31
	files "gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit/files"
Steven Allen's avatar
Steven Allen committed
32
	node "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
33 34
)

Jeromy's avatar
Jeromy committed
35
var log = logging.Logger("coreunix")
36

37 38 39
// how many bytes of progress to wait before sending a progress update message
const progressReaderIncrement = 1024 * 256

40 41
var liveCacheSize = uint64(256 << 10)

42 43 44 45 46 47 48 49
type Link struct {
	Name, Hash string
	Size       uint64
}

type Object struct {
	Hash  string
	Links []Link
50
	Size  string
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
}

type hiddenFileError struct {
	fileName string
}

func (e *hiddenFileError) Error() string {
	return fmt.Sprintf("%s is a hidden file", e.fileName)
}

type ignoreFileError struct {
	fileName string
}

func (e *ignoreFileError) Error() string {
	return fmt.Sprintf("%s is an ignored file", e.fileName)
}

type AddedObject struct {
	Name  string
	Hash  string `json:",omitempty"`
	Bytes int64  `json:",omitempty"`
73
	Size  string `json:",omitempty"`
74 75
}

76
// NewAdder Returns a new Adder used for a file add operation.
77
func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCBlockstore, ds node.DAGService) (*Adder, error) {
78
	return &Adder{
79 80 81 82 83 84 85 86 87 88
		ctx:        ctx,
		pinning:    p,
		blockstore: bs,
		dagService: ds,
		Progress:   false,
		Hidden:     true,
		Pin:        true,
		Trickle:    false,
		Wrap:       false,
		Chunker:    "",
Jeromy's avatar
Jeromy committed
89
	}, nil
90 91
}

92
// Adder holds the switches passed to the `add` command.
93
type Adder struct {
94 95 96
	ctx        context.Context
	pinning    pin.Pinner
	blockstore bstore.GCBlockstore
97
	dagService node.DAGService
98 99 100 101 102
	Out        chan interface{}
	Progress   bool
	Hidden     bool
	Pin        bool
	Trickle    bool
103
	RawLeaves  bool
104 105
	Silent     bool
	Wrap       bool
106
	NoCopy     bool
107
	Chunker    string
Jeromy's avatar
Jeromy committed
108
	root       node.Node
109
	mroot      *mfs.Root
110
	unlocker   bs.Unlocker
Jeromy's avatar
Jeromy committed
111
	tempRoot   *cid.Cid
112
	Prefix     *cid.Prefix
113
	liveNodes  uint64
114 115 116 117 118 119 120 121 122 123 124 125 126 127
}

func (adder *Adder) mfsRoot() (*mfs.Root, error) {
	if adder.mroot != nil {
		return adder.mroot, nil
	}
	rnode := unixfs.EmptyDirNode()
	rnode.SetPrefix(adder.Prefix)
	mr, err := mfs.NewRoot(adder.ctx, adder.dagService, rnode, nil)
	if err != nil {
		return nil, err
	}
	adder.mroot = mr
	return adder.mroot, nil
128 129
}

130
// SetMfsRoot sets `r` as the root for Adder.
Jeromy's avatar
Jeromy committed
131
func (adder *Adder) SetMfsRoot(r *mfs.Root) {
132
	adder.mroot = r
Jeromy's avatar
Jeromy committed
133 134
}

135
// Constructs a node from reader's data, and adds it. Doesn't pin.
136
func (adder *Adder) add(reader io.Reader) (node.Node, error) {
Jeromy's avatar
Jeromy committed
137
	chnk, err := chunk.FromString(reader, adder.Chunker)
138 139 140
	if err != nil {
		return nil, err
	}
141

142 143 144 145
	params := ihelper.DagBuilderParams{
		Dagserv:   adder.dagService,
		RawLeaves: adder.RawLeaves,
		Maxlinks:  ihelper.DefaultLinksPerBlock,
146
		NoCopy:    adder.NoCopy,
147
		Prefix:    adder.Prefix,
148
	}
149

Jeromy's avatar
Jeromy committed
150
	if adder.Trickle {
151 152 153 154
		return trickle.TrickleLayout(params.New(chnk))
	}

	return balanced.BalancedLayout(params.New(chnk))
155 156
}

157
// RootNode returns the root node of the Added.
Jeromy's avatar
Jeromy committed
158
func (adder *Adder) RootNode() (node.Node, error) {
Jeromy's avatar
Jeromy committed
159
	// for memoizing
Jeromy's avatar
Jeromy committed
160 161
	if adder.root != nil {
		return adder.root, nil
Jeromy's avatar
Jeromy committed
162
	}
163

164 165 166 167 168
	mr, err := adder.mfsRoot()
	if err != nil {
		return nil, err
	}
	root, err := mr.GetValue().GetNode()
Jeromy's avatar
Jeromy committed
169 170 171
	if err != nil {
		return nil, err
	}
172

Jeromy's avatar
Jeromy committed
173
	// if not wrapping, AND one root file, use that hash as root.
174 175
	if !adder.Wrap && len(root.Links()) == 1 {
		nd, err := root.Links()[0].GetNode(adder.ctx, adder.dagService)
Jeromy's avatar
Jeromy committed
176 177
		if err != nil {
			return nil, err
Jeromy's avatar
Jeromy committed
178
		}
179

Jeromy's avatar
Jeromy committed
180
		root = nd
Jeromy's avatar
Jeromy committed
181
	}
Jeromy's avatar
Jeromy committed
182

Jeromy's avatar
Jeromy committed
183
	adder.root = root
Jeromy's avatar
Jeromy committed
184
	return root, err
185 186
}

187 188
// Recursively pins the root node of Adder and
// writes the pin state to the backing datastore.
Jeromy's avatar
Jeromy committed
189 190
func (adder *Adder) PinRoot() error {
	root, err := adder.RootNode()
191 192 193
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
194
	if !adder.Pin {
195 196
		return nil
	}
197

198 199 200
	rnk := root.Cid()

	err = adder.dagService.Add(adder.ctx, root)
201 202 203 204
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
205
	if adder.tempRoot != nil {
206
		err := adder.pinning.Unpin(adder.ctx, adder.tempRoot, true)
Jeromy's avatar
Jeromy committed
207 208 209
		if err != nil {
			return err
		}
Jeromy's avatar
Jeromy committed
210
		adder.tempRoot = rnk
Jeromy's avatar
Jeromy committed
211 212
	}

213 214
	adder.pinning.PinWithMode(rnk, pin.Recursive)
	return adder.pinning.Flush()
215 216
}

217
// Finalize flushes the mfs root directory and returns the mfs root node.
Jeromy's avatar
Jeromy committed
218
func (adder *Adder) Finalize() (node.Node, error) {
219 220 221 222 223
	mr, err := adder.mfsRoot()
	if err != nil {
		return nil, err
	}
	root := mr.GetValue()
Stephen Whitmore's avatar
Stephen Whitmore committed
224

225
	err = root.Flush()
Jeromy's avatar
Jeromy committed
226 227 228 229 230
	if err != nil {
		return nil, err
	}

	var name string
Jeromy's avatar
Jeromy committed
231
	if !adder.Wrap {
Jeromy's avatar
Jeromy committed
232
		children, err := root.(*mfs.Directory).ListNames(adder.ctx)
Jeromy's avatar
Jeromy committed
233 234 235
		if err != nil {
			return nil, err
		}
Quantomic's avatar
Quantomic committed
236 237 238 239 240

		if len(children) == 0 {
			return nil, fmt.Errorf("expected at least one child dir, got none")
		}

Jeromy's avatar
Jeromy committed
241
		name = children[0]
Stephen Whitmore's avatar
Stephen Whitmore committed
242

243 244 245 246 247 248
		mr, err := adder.mfsRoot()
		if err != nil {
			return nil, err
		}

		dir, ok := mr.GetValue().(*mfs.Directory)
249 250 251 252 253
		if !ok {
			return nil, fmt.Errorf("root is not a directory")
		}

		root, err = dir.Child(name)
Jeromy's avatar
Jeromy committed
254 255 256 257 258
		if err != nil {
			return nil, err
		}
	}

Jeromy's avatar
Jeromy committed
259
	err = adder.outputDirs(name, root)
Jeromy's avatar
Jeromy committed
260 261 262 263
	if err != nil {
		return nil, err
	}

264
	err = mr.Close()
265 266
	if err != nil {
		return nil, err
Stephen Whitmore's avatar
Stephen Whitmore committed
267 268
	}

Stephen Whitmore's avatar
Stephen Whitmore committed
269
	return root.GetNode()
Jeromy's avatar
Jeromy committed
270 271
}

Jeromy's avatar
Jeromy committed
272 273 274
func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {
	switch fsn := fsn.(type) {
	case *mfs.File:
Jeromy's avatar
Jeromy committed
275
		return nil
Jeromy's avatar
Jeromy committed
276
	case *mfs.Directory:
Jeromy's avatar
Jeromy committed
277
		names, err := fsn.ListNames(adder.ctx)
278 279 280 281 282
		if err != nil {
			return err
		}

		for _, name := range names {
Jeromy's avatar
Jeromy committed
283 284 285 286 287 288 289 290 291 292
			child, err := fsn.Child(name)
			if err != nil {
				return err
			}

			childpath := gopath.Join(path, name)
			err = adder.outputDirs(childpath, child)
			if err != nil {
				return err
			}
293 294

			fsn.Uncache(name)
Jeromy's avatar
Jeromy committed
295
		}
Jeromy's avatar
Jeromy committed
296
		nd, err := fsn.GetNode()
Jeromy's avatar
Jeromy committed
297 298
		if err != nil {
			return err
Jeromy's avatar
Jeromy committed
299
		}
Jeromy's avatar
Jeromy committed
300

Jeromy's avatar
Jeromy committed
301 302 303 304
		return outputDagnode(adder.Out, path, nd)
	default:
		return fmt.Errorf("unrecognized fsn type: %#v", fsn)
	}
305 306
}

307 308 309
// Add builds a merkledag node from a reader, adds it to the blockstore,
// and returns the key representing that node.
// If you want to pin it, use NewAdder() and Adder.PinRoot().
310
func Add(n *core.IpfsNode, r io.Reader) (string, error) {
Lars Gierth's avatar
Lars Gierth committed
311 312 313
	return AddWithContext(n.Context(), n, r)
}

314
// AddWithContext does the same as Add, but with a custom context.
Lars Gierth's avatar
Lars Gierth committed
315
func AddWithContext(ctx context.Context, n *core.IpfsNode, r io.Reader) (string, error) {
316
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
317

318
	fileAdder, err := NewAdder(ctx, n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
319 320 321
	if err != nil {
		return "", err
	}
322

323
	node, err := fileAdder.add(r)
324 325 326
	if err != nil {
		return "", err
	}
327

Jeromy's avatar
Jeromy committed
328
	return node.Cid().String(), nil
329
}
330 331 332

// AddR recursively adds files in |path|.
func AddR(n *core.IpfsNode, root string) (key string, err error) {
333
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
334

335
	stat, err := os.Lstat(root)
336 337 338
	if err != nil {
		return "", err
	}
339

Jeromy's avatar
Jeromy committed
340
	f, err := files.NewSerialFile(root, root, false, stat)
341 342 343
	if err != nil {
		return "", err
	}
344
	defer f.Close()
345

346
	fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
347 348 349
	if err != nil {
		return "", err
	}
350

Jeromy's avatar
Jeromy committed
351
	err = fileAdder.addFile(f)
352 353 354
	if err != nil {
		return "", err
	}
355

356
	nd, err := fileAdder.Finalize()
Jeromy's avatar
Jeromy committed
357 358 359 360
	if err != nil {
		return "", err
	}

Jeromy's avatar
Jeromy committed
361
	return nd.String(), nil
362 363
}

364 365
// AddWrapped adds data from a reader, and wraps it with a directory object
// to preserve the filename.
366 367
// Returns the path of the added file ("<dir hash>/filename"), the DAG node of
// the directory, and and error if any.
Jeromy's avatar
Jeromy committed
368
func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, node.Node, error) {
369
	file := files.NewReaderFile(filename, filename, ioutil.NopCloser(r), nil)
370
	fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
371 372 373
	if err != nil {
		return "", nil, err
	}
Jeromy's avatar
Jeromy committed
374
	fileAdder.Wrap = true
Jeromy's avatar
Jeromy committed
375

376
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
377

Jeromy's avatar
Jeromy committed
378
	err = fileAdder.addFile(file)
Jeromy's avatar
Jeromy committed
379 380 381 382
	if err != nil {
		return "", nil, err
	}

383
	dagnode, err := fileAdder.Finalize()
384 385 386
	if err != nil {
		return "", nil, err
	}
Jeromy's avatar
Jeromy committed
387

Jeromy's avatar
Jeromy committed
388 389
	c := dagnode.Cid()
	return gopath.Join(c.String(), filename), dagnode, nil
390 391
}

392
func (adder *Adder) addNode(node node.Node, path string) error {
393 394
	// patch it into the root
	if path == "" {
Jeromy's avatar
Jeromy committed
395
		path = node.Cid().String()
396
	}
397

398 399 400 401
	if pi, ok := node.(*posinfo.FilestoreNode); ok {
		node = pi.Node
	}

402 403 404 405
	mr, err := adder.mfsRoot()
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
406 407
	dir := gopath.Dir(path)
	if dir != "." {
408 409 410 411 412 413
		opts := mfs.MkdirOpts{
			Mkparents: true,
			Flush:     false,
			Prefix:    adder.Prefix,
		}
		if err := mfs.Mkdir(mr, dir, opts); err != nil {
Jeromy's avatar
Jeromy committed
414 415 416 417
			return err
		}
	}

418
	if err := mfs.PutNode(mr, path, node); err != nil {
419 420
		return err
	}
421

Jeromy's avatar
Jeromy committed
422
	if !adder.Silent {
423
		return outputDagnode(adder.Out, path, node)
Jeromy's avatar
Jeromy committed
424 425
	}
	return nil
426 427
}

428
// AddFile adds the given file while respecting the adder.
Jeromy's avatar
Jeromy committed
429
func (adder *Adder) AddFile(file files.File) error {
430 431 432
	if adder.Pin {
		adder.unlocker = adder.blockstore.PinLock()
	}
433
	defer func() {
434 435 436
		if adder.unlocker != nil {
			adder.unlocker.Unlock()
		}
437
	}()
Jeromy's avatar
Jeromy committed
438

Jeromy's avatar
Jeromy committed
439
	return adder.addFile(file)
Jeromy's avatar
Jeromy committed
440 441 442 443 444 445 446 447
}

func (adder *Adder) addFile(file files.File) error {
	err := adder.maybePauseForGC()
	if err != nil {
		return err
	}

448 449 450 451 452 453
	if adder.liveNodes >= liveCacheSize {
		// TODO: A smarter cache that uses some sort of lru cache with an eviction handler
		mr, err := adder.mfsRoot()
		if err != nil {
			return err
		}
Jeromy's avatar
Jeromy committed
454
		if err := mr.FlushMemFree(adder.ctx); err != nil {
455 456
			return err
		}
Jeromy's avatar
Jeromy committed
457

458 459 460 461
		adder.liveNodes = 0
	}
	adder.liveNodes++

462
	if file.IsDirectory() {
Jeromy's avatar
Jeromy committed
463
		return adder.addDir(file)
464 465
	}

466 467 468 469
	// case for symlink
	if s, ok := file.(*files.Symlink); ok {
		sdata, err := unixfs.SymlinkData(s.Target)
		if err != nil {
Jeromy's avatar
Jeromy committed
470
			return err
471
		}
472

473
		dagnode := dag.NodeWithData(sdata)
474
		dagnode.SetPrefix(adder.Prefix)
475
		err = adder.dagService.Add(adder.ctx, dagnode)
476
		if err != nil {
Jeromy's avatar
Jeromy committed
477
			return err
478 479
		}

Jeromy's avatar
Jeromy committed
480
		return adder.addNode(dagnode, s.FileName())
481 482 483 484 485 486
	}

	// case for regular file
	// if the progress flag was specified, wrap the file so that we can send
	// progress updates to the client (over the output channel)
	var reader io.Reader = file
Jeromy's avatar
Jeromy committed
487
	if adder.Progress {
488 489 490 491 492 493
		rdr := &progressReader{file: file, out: adder.Out}
		if fi, ok := file.(files.FileInfo); ok {
			reader = &progressReader2{rdr, fi}
		} else {
			reader = rdr
		}
494 495
	}

Jeromy's avatar
Jeromy committed
496
	dagnode, err := adder.add(reader)
497
	if err != nil {
Jeromy's avatar
Jeromy committed
498
		return err
499 500 501
	}

	// patch it into the root
Jeromy's avatar
Jeromy committed
502
	return adder.addNode(dagnode, file.FileName())
503 504
}

Jeromy's avatar
Jeromy committed
505
func (adder *Adder) addDir(dir files.File) error {
506
	log.Infof("adding directory: %s", dir.FileName())
507

508 509 510 511
	mr, err := adder.mfsRoot()
	if err != nil {
		return err
	}
512 513 514 515 516
	err = mfs.Mkdir(mr, dir.FileName(), mfs.MkdirOpts{
		Mkparents: true,
		Flush:     false,
		Prefix:    adder.Prefix,
	})
Jeromy's avatar
Jeromy committed
517 518 519 520
	if err != nil {
		return err
	}

521 522
	for {
		file, err := dir.NextFile()
523
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
524
			return err
525 526 527
		}
		if file == nil {
			break
528 529
		}

530 531 532
		// Skip hidden files when adding recursively, unless Hidden is enabled.
		if files.IsHidden(file) && !adder.Hidden {
			log.Infof("%s is hidden, skipping", file.FileName())
533
			continue
534 535 536
		}
		err = adder.addFile(file)
		if err != nil {
Jeromy's avatar
Jeromy committed
537
			return err
538 539 540
		}
	}

Jeromy's avatar
Jeromy committed
541
	return nil
542
}
543

Jeromy's avatar
Jeromy committed
544
func (adder *Adder) maybePauseForGC() error {
545
	if adder.unlocker != nil && adder.blockstore.GCRequested() {
Jeromy's avatar
Jeromy committed
546 547 548 549 550
		err := adder.PinRoot()
		if err != nil {
			return err
		}

551
		adder.unlocker.Unlock()
552
		adder.unlocker = adder.blockstore.PinLock()
Jeromy's avatar
Jeromy committed
553 554 555 556
	}
	return nil
}

557
// outputDagnode sends dagnode info over the output channel
558
func outputDagnode(out chan interface{}, name string, dn node.Node) error {
559 560 561 562 563 564 565 566 567 568 569 570
	if out == nil {
		return nil
	}

	o, err := getOutput(dn)
	if err != nil {
		return err
	}

	out <- &AddedObject{
		Hash: o.Hash,
		Name: name,
571
		Size: o.Size,
572 573 574 575 576
	}

	return nil
}

577
// NewMemoryDagService builds and returns a new mem-datastore.
578
func NewMemoryDagService() node.DAGService {
579 580 581
	// build mem-datastore for editor's intermediary nodes
	bs := bstore.NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
	bsrv := bserv.New(bs, offline.Exchange(bs))
582
	return dag.NewDAGService(bsrv)
583 584 585
}

// from core/commands/object.go
586
func getOutput(dagnode node.Node) (*Object, error) {
Jeromy's avatar
Jeromy committed
587
	c := dagnode.Cid()
588 589 590 591
	s, err := dagnode.Size()
	if err != nil {
		return nil, err
	}
592 593

	output := &Object{
Jeromy's avatar
Jeromy committed
594
		Hash:  c.String(),
595
		Size:  strconv.FormatUint(s, 10),
596
		Links: make([]Link, len(dagnode.Links())),
597 598
	}

599
	for i, link := range dagnode.Links() {
600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629
		output.Links[i] = Link{
			Name: link.Name,
			Size: link.Size,
		}
	}

	return output, nil
}

type progressReader struct {
	file         files.File
	out          chan interface{}
	bytes        int64
	lastProgress int64
}

func (i *progressReader) Read(p []byte) (int, error) {
	n, err := i.file.Read(p)

	i.bytes += int64(n)
	if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
		i.lastProgress = i.bytes
		i.out <- &AddedObject{
			Name:  i.file.FileName(),
			Bytes: i.bytes,
		}
	}

	return n, err
}
630 631 632 633 634

type progressReader2 struct {
	*progressReader
	files.FileInfo
}