add.go 12.7 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1
package coreunix
2 3

import (
4
	"context"
5
	"fmt"
6
	"io"
7
	"io/ioutil"
8
	"os"
9
	gopath "path"
camelmasa's avatar
camelmasa committed
10
	"path/filepath"
11
	"strconv"
Jeromy's avatar
Jeromy committed
12

Jeromy's avatar
Jeromy committed
13
	core "github.com/ipfs/go-ipfs/core"
14
	"github.com/ipfs/go-ipfs/pin"
15 16 17 18 19 20
	unixfs "gx/ipfs/QmQjEpRiwVvtowhq69dAtB4jhioPVFXiCcWZm9Sfgn7eqc/go-unixfs"
	balanced "gx/ipfs/QmQjEpRiwVvtowhq69dAtB4jhioPVFXiCcWZm9Sfgn7eqc/go-unixfs/importer/balanced"
	ihelper "gx/ipfs/QmQjEpRiwVvtowhq69dAtB4jhioPVFXiCcWZm9Sfgn7eqc/go-unixfs/importer/helpers"
	trickle "gx/ipfs/QmQjEpRiwVvtowhq69dAtB4jhioPVFXiCcWZm9Sfgn7eqc/go-unixfs/importer/trickle"
	dag "gx/ipfs/QmRiQCJZ91B7VNmLvA6sxzDuBJGSojS3uXHHVuNr3iueNZ/go-merkledag"

Steven Allen's avatar
Steven Allen committed
21
	logging "gx/ipfs/QmRREK2CAZ5Re2Bd9zZFG6FeYDppUWt5cMgsoUEp3ktgSr/go-log"
22 23 24 25 26 27 28
	files "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit/files"
	ipld "gx/ipfs/QmX5CsuHyVZeTLxgRSYkgLSDQKb9UjE8xnhQzCEJWWWFsC/go-ipld-format"
	posinfo "gx/ipfs/QmXD4grfThQ4LwVoEEfe4dgR7ukmbV9TppM5Q4SPowp7hU/go-ipfs-posinfo"
	chunker "gx/ipfs/QmXzBbJo2sLf3uwjNTeoWYiJV7CjAhkiA4twtLvwJSSNdK/go-ipfs-chunker"
	cid "gx/ipfs/QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb/go-cid"
	bstore "gx/ipfs/QmcmpX42gtDv1fz24kau4wjS9hfwWj5VexWBKgGnWzsyag/go-ipfs-blockstore"
	mfs "gx/ipfs/QmdghKsSDa2AD1kC4qYRnVYWqZecdSBRZjeXRdhMYYhafj/go-mfs"
29
	"strings"
30 31
)

Jeromy's avatar
Jeromy committed
32
var log = logging.Logger("coreunix")
33

34 35 36
// how many bytes of progress to wait before sending a progress update message
const progressReaderIncrement = 1024 * 256

37 38
var liveCacheSize = uint64(256 << 10)

39 40 41 42 43 44 45 46
type Link struct {
	Name, Hash string
	Size       uint64
}

type Object struct {
	Hash  string
	Links []Link
47
	Size  string
48 49 50 51 52 53
}

type AddedObject struct {
	Name  string
	Hash  string `json:",omitempty"`
	Bytes int64  `json:",omitempty"`
54
	Size  string `json:",omitempty"`
55 56
}

57
// NewAdder Returns a new Adder used for a file add operation.
58
func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCBlockstore, ds ipld.DAGService) (*Adder, error) {
59
	return &Adder{
60 61 62 63 64 65 66 67 68 69
		ctx:        ctx,
		pinning:    p,
		blockstore: bs,
		dagService: ds,
		Progress:   false,
		Hidden:     true,
		Pin:        true,
		Trickle:    false,
		Wrap:       false,
		Chunker:    "",
Jeromy's avatar
Jeromy committed
70
	}, nil
71 72
}

73
// Adder holds the switches passed to the `add` command.
74
type Adder struct {
75 76 77
	ctx        context.Context
	pinning    pin.Pinner
	blockstore bstore.GCBlockstore
78
	dagService ipld.DAGService
79 80 81 82 83
	Out        chan interface{}
	Progress   bool
	Hidden     bool
	Pin        bool
	Trickle    bool
84
	RawLeaves  bool
85 86
	Silent     bool
	Wrap       bool
87
	WpName     string
88
	NoCopy     bool
89
	Chunker    string
90
	root       ipld.Node
91
	mroot      *mfs.Root
92
	unlocker   bstore.Unlocker
Jeromy's avatar
Jeromy committed
93
	tempRoot   *cid.Cid
94
	CidBuilder cid.Builder
95
	liveNodes  uint64
96 97 98 99 100 101 102
}

func (adder *Adder) mfsRoot() (*mfs.Root, error) {
	if adder.mroot != nil {
		return adder.mroot, nil
	}
	rnode := unixfs.EmptyDirNode()
103
	rnode.SetCidBuilder(adder.CidBuilder)
104 105 106 107 108 109
	mr, err := mfs.NewRoot(adder.ctx, adder.dagService, rnode, nil)
	if err != nil {
		return nil, err
	}
	adder.mroot = mr
	return adder.mroot, nil
110 111
}

112
// SetMfsRoot sets `r` as the root for Adder.
Jeromy's avatar
Jeromy committed
113
func (adder *Adder) SetMfsRoot(r *mfs.Root) {
114
	adder.mroot = r
Jeromy's avatar
Jeromy committed
115 116
}

117
// Constructs a node from reader's data, and adds it. Doesn't pin.
118
func (adder *Adder) add(reader io.Reader) (ipld.Node, error) {
119
	chnk, err := chunker.FromString(reader, adder.Chunker)
120 121 122
	if err != nil {
		return nil, err
	}
123

124
	params := ihelper.DagBuilderParams{
125 126 127 128
		Dagserv:    adder.dagService,
		RawLeaves:  adder.RawLeaves,
		Maxlinks:   ihelper.DefaultLinksPerBlock,
		NoCopy:     adder.NoCopy,
129
		CidBuilder: adder.CidBuilder,
130
	}
131

Jeromy's avatar
Jeromy committed
132
	if adder.Trickle {
133
		return trickle.Layout(params.New(chnk))
134 135
	}

136
	return balanced.Layout(params.New(chnk))
137 138
}

139
// RootNode returns the root node of the Added.
140
func (adder *Adder) RootNode() (ipld.Node, error) {
Jeromy's avatar
Jeromy committed
141
	// for memoizing
Jeromy's avatar
Jeromy committed
142 143
	if adder.root != nil {
		return adder.root, nil
Jeromy's avatar
Jeromy committed
144
	}
145

146 147 148 149
	mr, err := adder.mfsRoot()
	if err != nil {
		return nil, err
	}
150
	root, err := mr.GetDirectory().GetNode()
Jeromy's avatar
Jeromy committed
151 152 153
	if err != nil {
		return nil, err
	}
154

Jeromy's avatar
Jeromy committed
155
	// if not wrapping, AND one root file, use that hash as root.
156 157
	if !adder.Wrap && len(root.Links()) == 1 {
		nd, err := root.Links()[0].GetNode(adder.ctx, adder.dagService)
Jeromy's avatar
Jeromy committed
158 159
		if err != nil {
			return nil, err
Jeromy's avatar
Jeromy committed
160
		}
161

Jeromy's avatar
Jeromy committed
162
		root = nd
Jeromy's avatar
Jeromy committed
163
	}
Jeromy's avatar
Jeromy committed
164

Jeromy's avatar
Jeromy committed
165
	adder.root = root
Jeromy's avatar
Jeromy committed
166
	return root, err
167 168
}

169 170
// Recursively pins the root node of Adder and
// writes the pin state to the backing datastore.
Jeromy's avatar
Jeromy committed
171 172
func (adder *Adder) PinRoot() error {
	root, err := adder.RootNode()
173 174 175
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
176
	if !adder.Pin {
177 178
		return nil
	}
179

180 181 182
	rnk := root.Cid()

	err = adder.dagService.Add(adder.ctx, root)
183 184 185 186
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
187
	if adder.tempRoot != nil {
188
		err := adder.pinning.Unpin(adder.ctx, adder.tempRoot, true)
Jeromy's avatar
Jeromy committed
189 190 191
		if err != nil {
			return err
		}
Jeromy's avatar
Jeromy committed
192
		adder.tempRoot = rnk
Jeromy's avatar
Jeromy committed
193 194
	}

195 196
	adder.pinning.PinWithMode(rnk, pin.Recursive)
	return adder.pinning.Flush()
197 198
}

199
// Finalize flushes the mfs root directory and returns the mfs root node.
200
func (adder *Adder) Finalize() (ipld.Node, error) {
201 202 203 204
	mr, err := adder.mfsRoot()
	if err != nil {
		return nil, err
	}
205
	var root mfs.FSNode
206 207
	rootdir := mr.GetDirectory()
	root = rootdir
Stephen Whitmore's avatar
Stephen Whitmore committed
208

209
	err = root.Flush()
Jeromy's avatar
Jeromy committed
210 211 212 213 214
	if err != nil {
		return nil, err
	}

	var name string
Jeromy's avatar
Jeromy committed
215
	if !adder.Wrap {
216
		children, err := rootdir.ListNames(adder.ctx)
Jeromy's avatar
Jeromy committed
217 218 219
		if err != nil {
			return nil, err
		}
Quantomic's avatar
Quantomic committed
220 221 222 223 224

		if len(children) == 0 {
			return nil, fmt.Errorf("expected at least one child dir, got none")
		}

225
		// Replace root with the first child
Jeromy's avatar
Jeromy committed
226
		name = children[0]
227
		root, err = rootdir.Child(name)
Jeromy's avatar
Jeromy committed
228 229 230 231 232
		if err != nil {
			return nil, err
		}
	}

Jeromy's avatar
Jeromy committed
233
	err = adder.outputDirs(name, root)
Jeromy's avatar
Jeromy committed
234 235 236 237
	if err != nil {
		return nil, err
	}

238
	err = mr.Close()
239 240
	if err != nil {
		return nil, err
Stephen Whitmore's avatar
Stephen Whitmore committed
241 242
	}

Stephen Whitmore's avatar
Stephen Whitmore committed
243
	return root.GetNode()
Jeromy's avatar
Jeromy committed
244 245
}

Jeromy's avatar
Jeromy committed
246 247 248
func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {
	switch fsn := fsn.(type) {
	case *mfs.File:
Jeromy's avatar
Jeromy committed
249
		return nil
Jeromy's avatar
Jeromy committed
250
	case *mfs.Directory:
Jeromy's avatar
Jeromy committed
251
		names, err := fsn.ListNames(adder.ctx)
252 253 254 255 256
		if err != nil {
			return err
		}

		for _, name := range names {
Jeromy's avatar
Jeromy committed
257 258 259 260 261 262 263 264 265 266
			child, err := fsn.Child(name)
			if err != nil {
				return err
			}

			childpath := gopath.Join(path, name)
			err = adder.outputDirs(childpath, child)
			if err != nil {
				return err
			}
267 268

			fsn.Uncache(name)
Jeromy's avatar
Jeromy committed
269
		}
Jeromy's avatar
Jeromy committed
270
		nd, err := fsn.GetNode()
Jeromy's avatar
Jeromy committed
271 272
		if err != nil {
			return err
Jeromy's avatar
Jeromy committed
273
		}
Jeromy's avatar
Jeromy committed
274

Jeromy's avatar
Jeromy committed
275 276 277 278
		return outputDagnode(adder.Out, path, nd)
	default:
		return fmt.Errorf("unrecognized fsn type: %#v", fsn)
	}
279 280
}

281 282 283
// Add builds a merkledag node from a reader, adds it to the blockstore,
// and returns the key representing that node.
// If you want to pin it, use NewAdder() and Adder.PinRoot().
284
func Add(n *core.IpfsNode, r io.Reader) (string, error) {
Lars Gierth's avatar
Lars Gierth committed
285 286 287
	return AddWithContext(n.Context(), n, r)
}

288
// AddWithContext does the same as Add, but with a custom context.
Lars Gierth's avatar
Lars Gierth committed
289
func AddWithContext(ctx context.Context, n *core.IpfsNode, r io.Reader) (string, error) {
290
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
291

292
	fileAdder, err := NewAdder(ctx, n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
293 294 295
	if err != nil {
		return "", err
	}
296

297
	node, err := fileAdder.add(r)
298 299 300
	if err != nil {
		return "", err
	}
301

Jeromy's avatar
Jeromy committed
302
	return node.Cid().String(), nil
303
}
304 305 306

// AddR recursively adds files in |path|.
func AddR(n *core.IpfsNode, root string) (key string, err error) {
307
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
308

309
	stat, err := os.Lstat(root)
310 311 312
	if err != nil {
		return "", err
	}
313

camelmasa's avatar
camelmasa committed
314
	f, err := files.NewSerialFile(filepath.Base(root), root, false, stat)
315 316 317
	if err != nil {
		return "", err
	}
318
	defer f.Close()
319

320
	fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
321 322 323
	if err != nil {
		return "", err
	}
324

Jeromy's avatar
Jeromy committed
325
	err = fileAdder.addFile(f)
326 327 328
	if err != nil {
		return "", err
	}
329

330
	nd, err := fileAdder.Finalize()
Jeromy's avatar
Jeromy committed
331 332 333 334
	if err != nil {
		return "", err
	}

Jeromy's avatar
Jeromy committed
335
	return nd.String(), nil
336 337
}

338 339
// AddWrapped adds data from a reader, and wraps it with a directory object
// to preserve the filename.
340 341
// Returns the path of the added file ("<dir hash>/filename"), the DAG node of
// the directory, and and error if any.
342
func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, ipld.Node, error) {
343
	file := files.NewReaderFile(filename, filename, ioutil.NopCloser(r), nil)
344
	fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
345 346 347
	if err != nil {
		return "", nil, err
	}
Jeromy's avatar
Jeromy committed
348
	fileAdder.Wrap = true
Jeromy's avatar
Jeromy committed
349

350
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
351

Jeromy's avatar
Jeromy committed
352
	err = fileAdder.addFile(file)
Jeromy's avatar
Jeromy committed
353 354 355 356
	if err != nil {
		return "", nil, err
	}

357
	dagnode, err := fileAdder.Finalize()
358 359 360
	if err != nil {
		return "", nil, err
	}
Jeromy's avatar
Jeromy committed
361

Jeromy's avatar
Jeromy committed
362 363
	c := dagnode.Cid()
	return gopath.Join(c.String(), filename), dagnode, nil
364 365
}

366
func (adder *Adder) addNode(node ipld.Node, path string) error {
367 368
	// patch it into the root
	if path == "" {
Jeromy's avatar
Jeromy committed
369
		path = node.Cid().String()
370
	}
371

372 373 374 375
	if pi, ok := node.(*posinfo.FilestoreNode); ok {
		node = pi.Node
	}

376 377 378 379
	mr, err := adder.mfsRoot()
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
380 381
	dir := gopath.Dir(path)
	if dir != "." {
382
		opts := mfs.MkdirOpts{
383 384 385
			Mkparents:  true,
			Flush:      false,
			CidBuilder: adder.CidBuilder,
386 387
		}
		if err := mfs.Mkdir(mr, dir, opts); err != nil {
Jeromy's avatar
Jeromy committed
388 389 390 391
			return err
		}
	}

392
	if err := mfs.PutNode(mr, path, node); err != nil {
393 394
		return err
	}
395

Jeromy's avatar
Jeromy committed
396
	if !adder.Silent {
397
		return outputDagnode(adder.Out, path, node)
Jeromy's avatar
Jeromy committed
398 399
	}
	return nil
400 401
}

402
// AddFile adds the given file while respecting the adder.
Jeromy's avatar
Jeromy committed
403
func (adder *Adder) AddFile(file files.File) error {
404 405 406
	if adder.Pin {
		adder.unlocker = adder.blockstore.PinLock()
	}
407
	defer func() {
408 409 410
		if adder.unlocker != nil {
			adder.unlocker.Unlock()
		}
411
	}()
Jeromy's avatar
Jeromy committed
412

Jeromy's avatar
Jeromy committed
413
	return adder.addFile(file)
Jeromy's avatar
Jeromy committed
414 415 416 417 418 419 420 421
}

func (adder *Adder) addFile(file files.File) error {
	err := adder.maybePauseForGC()
	if err != nil {
		return err
	}

422 423 424 425 426 427
	if adder.liveNodes >= liveCacheSize {
		// TODO: A smarter cache that uses some sort of lru cache with an eviction handler
		mr, err := adder.mfsRoot()
		if err != nil {
			return err
		}
Jeromy's avatar
Jeromy committed
428
		if err := mr.FlushMemFree(adder.ctx); err != nil {
429 430
			return err
		}
Jeromy's avatar
Jeromy committed
431

432 433 434 435
		adder.liveNodes = 0
	}
	adder.liveNodes++

436
	if file.IsDirectory() {
Jeromy's avatar
Jeromy committed
437
		return adder.addDir(file)
438 439
	}

440 441 442 443
	// case for symlink
	if s, ok := file.(*files.Symlink); ok {
		sdata, err := unixfs.SymlinkData(s.Target)
		if err != nil {
Jeromy's avatar
Jeromy committed
444
			return err
445
		}
446

447
		dagnode := dag.NodeWithData(sdata)
448
		dagnode.SetCidBuilder(adder.CidBuilder)
449
		err = adder.dagService.Add(adder.ctx, dagnode)
450
		if err != nil {
Jeromy's avatar
Jeromy committed
451
			return err
452 453
		}

Jeromy's avatar
Jeromy committed
454
		return adder.addNode(dagnode, s.FileName())
455 456 457 458 459 460
	}

	// case for regular file
	// if the progress flag was specified, wrap the file so that we can send
	// progress updates to the client (over the output channel)
	var reader io.Reader = file
Jeromy's avatar
Jeromy committed
461
	if adder.Progress {
462 463 464 465 466 467
		rdr := &progressReader{file: file, out: adder.Out}
		if fi, ok := file.(files.FileInfo); ok {
			reader = &progressReader2{rdr, fi}
		} else {
			reader = rdr
		}
468 469
	}

Jeromy's avatar
Jeromy committed
470
	dagnode, err := adder.add(reader)
471
	if err != nil {
Jeromy's avatar
Jeromy committed
472
		return err
473 474
	}

475 476 477
	if !strings.EqualFold(adder.WpName, "") && adder.Wrap {
		return adder.addNode(dagnode, adder.WpName)
	}
478
	// patch it into the root
Jeromy's avatar
Jeromy committed
479
	return adder.addNode(dagnode, file.FileName())
480 481
}

Jeromy's avatar
Jeromy committed
482
func (adder *Adder) addDir(dir files.File) error {
483
	log.Infof("adding directory: %s", dir.FileName())
484

485 486 487 488
	mr, err := adder.mfsRoot()
	if err != nil {
		return err
	}
489
	err = mfs.Mkdir(mr, dir.FileName(), mfs.MkdirOpts{
490 491 492
		Mkparents:  true,
		Flush:      false,
		CidBuilder: adder.CidBuilder,
493
	})
Jeromy's avatar
Jeromy committed
494 495 496 497
	if err != nil {
		return err
	}

498 499
	for {
		file, err := dir.NextFile()
500
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
501
			return err
502 503 504
		}
		if file == nil {
			break
505 506
		}

507 508 509
		// Skip hidden files when adding recursively, unless Hidden is enabled.
		if files.IsHidden(file) && !adder.Hidden {
			log.Infof("%s is hidden, skipping", file.FileName())
510
			continue
511 512 513
		}
		err = adder.addFile(file)
		if err != nil {
Jeromy's avatar
Jeromy committed
514
			return err
515 516 517
		}
	}

Jeromy's avatar
Jeromy committed
518
	return nil
519
}
520

Jeromy's avatar
Jeromy committed
521
func (adder *Adder) maybePauseForGC() error {
522
	if adder.unlocker != nil && adder.blockstore.GCRequested() {
Jeromy's avatar
Jeromy committed
523 524 525 526 527
		err := adder.PinRoot()
		if err != nil {
			return err
		}

528
		adder.unlocker.Unlock()
529
		adder.unlocker = adder.blockstore.PinLock()
Jeromy's avatar
Jeromy committed
530 531 532 533
	}
	return nil
}

534
// outputDagnode sends dagnode info over the output channel
535
func outputDagnode(out chan interface{}, name string, dn ipld.Node) error {
536 537 538 539 540 541 542 543 544 545 546 547
	if out == nil {
		return nil
	}

	o, err := getOutput(dn)
	if err != nil {
		return err
	}

	out <- &AddedObject{
		Hash: o.Hash,
		Name: name,
548
		Size: o.Size,
549 550 551 552 553 554
	}

	return nil
}

// from core/commands/object.go
555
func getOutput(dagnode ipld.Node) (*Object, error) {
Jeromy's avatar
Jeromy committed
556
	c := dagnode.Cid()
557 558 559 560
	s, err := dagnode.Size()
	if err != nil {
		return nil, err
	}
561 562

	output := &Object{
Jeromy's avatar
Jeromy committed
563
		Hash:  c.String(),
564
		Size:  strconv.FormatUint(s, 10),
565
		Links: make([]Link, len(dagnode.Links())),
566 567
	}

568
	for i, link := range dagnode.Links() {
569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598
		output.Links[i] = Link{
			Name: link.Name,
			Size: link.Size,
		}
	}

	return output, nil
}

type progressReader struct {
	file         files.File
	out          chan interface{}
	bytes        int64
	lastProgress int64
}

func (i *progressReader) Read(p []byte) (int, error) {
	n, err := i.file.Read(p)

	i.bytes += int64(n)
	if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
		i.lastProgress = i.bytes
		i.out <- &AddedObject{
			Name:  i.file.FileName(),
			Bytes: i.bytes,
		}
	}

	return n, err
}
599 600 601 602 603

type progressReader2 struct {
	*progressReader
	files.FileInfo
}