add.go 11.1 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1
package coreunix
2 3

import (
4
	"context"
5
	"fmt"
6
	"io"
7
	"io/ioutil"
8
	"os"
9
	gopath "path"
Jeromy's avatar
Jeromy committed
10

Jeromy's avatar
Jeromy committed
11
	bs "github.com/ipfs/go-ipfs/blocks/blockstore"
12 13
	bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
	bserv "github.com/ipfs/go-ipfs/blockservice"
Jeromy's avatar
Jeromy committed
14 15
	"github.com/ipfs/go-ipfs/commands/files"
	core "github.com/ipfs/go-ipfs/core"
16
	"github.com/ipfs/go-ipfs/exchange/offline"
17
	balanced "github.com/ipfs/go-ipfs/importer/balanced"
18
	"github.com/ipfs/go-ipfs/importer/chunk"
19 20
	ihelper "github.com/ipfs/go-ipfs/importer/helpers"
	trickle "github.com/ipfs/go-ipfs/importer/trickle"
Jeromy's avatar
Jeromy committed
21
	dag "github.com/ipfs/go-ipfs/merkledag"
Jeromy's avatar
Jeromy committed
22
	mfs "github.com/ipfs/go-ipfs/mfs"
23
	"github.com/ipfs/go-ipfs/pin"
Jeromy's avatar
Jeromy committed
24 25
	unixfs "github.com/ipfs/go-ipfs/unixfs"

Jeromy's avatar
Jeromy committed
26
	logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
27 28
	node "gx/ipfs/QmU7bFWQ793qmvNy7outdCaMfSDNk8uqhx4VNrxYj5fj5g/go-ipld-node"
	cid "gx/ipfs/QmXfiyr2RWEXpVDdaYnD2HNiBk6UBddsvEP4RPfXb6nGqY/go-cid"
George Antoniadis's avatar
George Antoniadis committed
29 30
	ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
	syncds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
31 32
)

Jeromy's avatar
Jeromy committed
33
var log = logging.Logger("coreunix")
34

35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
// how many bytes of progress to wait before sending a progress update message
const progressReaderIncrement = 1024 * 256

type Link struct {
	Name, Hash string
	Size       uint64
}

type Object struct {
	Hash  string
	Links []Link
}

type hiddenFileError struct {
	fileName string
}

func (e *hiddenFileError) Error() string {
	return fmt.Sprintf("%s is a hidden file", e.fileName)
}

type ignoreFileError struct {
	fileName string
}

func (e *ignoreFileError) Error() string {
	return fmt.Sprintf("%s is an ignored file", e.fileName)
}

type AddedObject struct {
	Name  string
	Hash  string `json:",omitempty"`
	Bytes int64  `json:",omitempty"`
}

70
func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCBlockstore, ds dag.DAGService) (*Adder, error) {
71
	mr, err := mfs.NewRoot(ctx, ds, unixfs.EmptyDirNode(), nil)
Jeromy's avatar
Jeromy committed
72 73 74 75
	if err != nil {
		return nil, err
	}

76
	return &Adder{
77 78 79 80 81 82 83 84 85 86 87
		mr:         mr,
		ctx:        ctx,
		pinning:    p,
		blockstore: bs,
		dagService: ds,
		Progress:   false,
		Hidden:     true,
		Pin:        true,
		Trickle:    false,
		Wrap:       false,
		Chunker:    "",
Jeromy's avatar
Jeromy committed
88
	}, nil
89

90 91 92 93
}

// Internal structure for holding the switches passed to the `add` call
type Adder struct {
94 95 96 97 98 99 100 101 102
	ctx        context.Context
	pinning    pin.Pinner
	blockstore bstore.GCBlockstore
	dagService dag.DAGService
	Out        chan interface{}
	Progress   bool
	Hidden     bool
	Pin        bool
	Trickle    bool
103
	RawLeaves  bool
104 105 106
	Silent     bool
	Wrap       bool
	Chunker    string
Jeromy's avatar
Jeromy committed
107
	root       node.Node
108 109
	mr         *mfs.Root
	unlocker   bs.Unlocker
Jeromy's avatar
Jeromy committed
110
	tempRoot   *cid.Cid
111 112
}

Jeromy's avatar
Jeromy committed
113 114 115 116
func (adder *Adder) SetMfsRoot(r *mfs.Root) {
	adder.mr = r
}

117
// Perform the actual add & pin locally, outputting results to reader
118
func (adder Adder) add(reader io.Reader) (node.Node, error) {
Jeromy's avatar
Jeromy committed
119
	chnk, err := chunk.FromString(reader, adder.Chunker)
120 121 122
	if err != nil {
		return nil, err
	}
123 124 125 126 127
	params := ihelper.DagBuilderParams{
		Dagserv:   adder.dagService,
		RawLeaves: adder.RawLeaves,
		Maxlinks:  ihelper.DefaultLinksPerBlock,
	}
128

Jeromy's avatar
Jeromy committed
129
	if adder.Trickle {
130 131 132 133
		return trickle.TrickleLayout(params.New(chnk))
	}

	return balanced.BalancedLayout(params.New(chnk))
134 135
}

Jeromy's avatar
Jeromy committed
136
func (adder *Adder) RootNode() (node.Node, error) {
Jeromy's avatar
Jeromy committed
137
	// for memoizing
Jeromy's avatar
Jeromy committed
138 139
	if adder.root != nil {
		return adder.root, nil
Jeromy's avatar
Jeromy committed
140
	}
141

Jeromy's avatar
Jeromy committed
142
	root, err := adder.mr.GetValue().GetNode()
Jeromy's avatar
Jeromy committed
143 144 145
	if err != nil {
		return nil, err
	}
146

Jeromy's avatar
Jeromy committed
147
	// if not wrapping, AND one root file, use that hash as root.
148 149
	if !adder.Wrap && len(root.Links()) == 1 {
		nd, err := root.Links()[0].GetNode(adder.ctx, adder.dagService)
Jeromy's avatar
Jeromy committed
150 151
		if err != nil {
			return nil, err
Jeromy's avatar
Jeromy committed
152
		}
153

Jeromy's avatar
Jeromy committed
154
		root = nd
Jeromy's avatar
Jeromy committed
155
	}
Jeromy's avatar
Jeromy committed
156

Jeromy's avatar
Jeromy committed
157
	adder.root = root
Jeromy's avatar
Jeromy committed
158
	return root, err
159 160
}

Jeromy's avatar
Jeromy committed
161 162
func (adder *Adder) PinRoot() error {
	root, err := adder.RootNode()
163 164 165
	if err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
166
	if !adder.Pin {
167 168
		return nil
	}
169

170
	rnk, err := adder.dagService.Add(root)
171 172 173 174
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
175
	if adder.tempRoot != nil {
176
		err := adder.pinning.Unpin(adder.ctx, adder.tempRoot, true)
Jeromy's avatar
Jeromy committed
177 178 179
		if err != nil {
			return err
		}
Jeromy's avatar
Jeromy committed
180
		adder.tempRoot = rnk
Jeromy's avatar
Jeromy committed
181 182
	}

183 184
	adder.pinning.PinWithMode(rnk, pin.Recursive)
	return adder.pinning.Flush()
185 186
}

Jeromy's avatar
Jeromy committed
187
func (adder *Adder) Finalize() (node.Node, error) {
Stephen Whitmore's avatar
Stephen Whitmore committed
188 189
	root := adder.mr.GetValue()

Jeromy's avatar
Jeromy committed
190
	// cant just call adder.RootNode() here as we need the name for printing
Stephen Whitmore's avatar
Stephen Whitmore committed
191
	rootNode, err := root.GetNode()
Jeromy's avatar
Jeromy committed
192 193 194 195 196
	if err != nil {
		return nil, err
	}

	var name string
Jeromy's avatar
Jeromy committed
197
	if !adder.Wrap {
198
		name = rootNode.Links()[0].Name
Stephen Whitmore's avatar
Stephen Whitmore committed
199

200 201 202 203 204 205
		dir, ok := adder.mr.GetValue().(*mfs.Directory)
		if !ok {
			return nil, fmt.Errorf("root is not a directory")
		}

		root, err = dir.Child(name)
Jeromy's avatar
Jeromy committed
206 207 208 209 210
		if err != nil {
			return nil, err
		}
	}

Jeromy's avatar
Jeromy committed
211
	err = adder.outputDirs(name, root)
Jeromy's avatar
Jeromy committed
212 213 214 215
	if err != nil {
		return nil, err
	}

216 217 218
	err = adder.mr.Close()
	if err != nil {
		return nil, err
Stephen Whitmore's avatar
Stephen Whitmore committed
219 220
	}

Stephen Whitmore's avatar
Stephen Whitmore committed
221
	return root.GetNode()
Jeromy's avatar
Jeromy committed
222 223
}

Jeromy's avatar
Jeromy committed
224 225 226
func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error {
	switch fsn := fsn.(type) {
	case *mfs.File:
Jeromy's avatar
Jeromy committed
227
		return nil
Jeromy's avatar
Jeromy committed
228 229 230 231 232 233 234 235 236 237 238 239
	case *mfs.Directory:
		for _, name := range fsn.ListNames() {
			child, err := fsn.Child(name)
			if err != nil {
				return err
			}

			childpath := gopath.Join(path, name)
			err = adder.outputDirs(childpath, child)
			if err != nil {
				return err
			}
240 241

			fsn.Uncache(name)
Jeromy's avatar
Jeromy committed
242
		}
Jeromy's avatar
Jeromy committed
243
		nd, err := fsn.GetNode()
Jeromy's avatar
Jeromy committed
244 245
		if err != nil {
			return err
Jeromy's avatar
Jeromy committed
246
		}
Jeromy's avatar
Jeromy committed
247

Jeromy's avatar
Jeromy committed
248 249 250 251
		return outputDagnode(adder.Out, path, nd)
	default:
		return fmt.Errorf("unrecognized fsn type: %#v", fsn)
	}
252 253
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
254 255
// Add builds a merkledag from the a reader, pinning all objects to the local
// datastore. Returns a key representing the root node.
256
func Add(n *core.IpfsNode, r io.Reader) (string, error) {
Lars Gierth's avatar
Lars Gierth committed
257 258 259 260
	return AddWithContext(n.Context(), n, r)
}

func AddWithContext(ctx context.Context, n *core.IpfsNode, r io.Reader) (string, error) {
261
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
262

263
	fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
264 265 266
	if err != nil {
		return "", err
	}
267

268
	node, err := fileAdder.add(r)
269 270 271
	if err != nil {
		return "", err
	}
272

Jeromy's avatar
Jeromy committed
273
	return node.Cid().String(), nil
274
}
275 276 277

// AddR recursively adds files in |path|.
func AddR(n *core.IpfsNode, root string) (key string, err error) {
278
	n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
279

280
	stat, err := os.Lstat(root)
281 282 283
	if err != nil {
		return "", err
	}
284

Jeromy's avatar
Jeromy committed
285
	f, err := files.NewSerialFile(root, root, false, stat)
286 287 288
	if err != nil {
		return "", err
	}
289
	defer f.Close()
290

291
	fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
292 293 294
	if err != nil {
		return "", err
	}
295

Jeromy's avatar
Jeromy committed
296
	err = fileAdder.addFile(f)
297 298 299
	if err != nil {
		return "", err
	}
300

301
	nd, err := fileAdder.Finalize()
Jeromy's avatar
Jeromy committed
302 303 304 305
	if err != nil {
		return "", err
	}

Jeromy's avatar
Jeromy committed
306
	return nd.String(), nil
307 308
}

309 310
// AddWrapped adds data from a reader, and wraps it with a directory object
// to preserve the filename.
311 312
// Returns the path of the added file ("<dir hash>/filename"), the DAG node of
// the directory, and and error if any.
Jeromy's avatar
Jeromy committed
313
func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, node.Node, error) {
314
	file := files.NewReaderFile(filename, filename, ioutil.NopCloser(r), nil)
315
	fileAdder, err := NewAdder(n.Context(), n.Pinning, n.Blockstore, n.DAG)
Jeromy's avatar
Jeromy committed
316 317 318
	if err != nil {
		return "", nil, err
	}
Jeromy's avatar
Jeromy committed
319
	fileAdder.Wrap = true
Jeromy's avatar
Jeromy committed
320

321
	defer n.Blockstore.PinLock().Unlock()
Jeromy's avatar
Jeromy committed
322

Jeromy's avatar
Jeromy committed
323
	err = fileAdder.addFile(file)
Jeromy's avatar
Jeromy committed
324 325 326 327
	if err != nil {
		return "", nil, err
	}

328
	dagnode, err := fileAdder.Finalize()
329 330 331
	if err != nil {
		return "", nil, err
	}
Jeromy's avatar
Jeromy committed
332

Jeromy's avatar
Jeromy committed
333 334
	c := dagnode.Cid()
	return gopath.Join(c.String(), filename), dagnode, nil
335 336
}

337
func (adder *Adder) addNode(node node.Node, path string) error {
338 339
	// patch it into the root
	if path == "" {
Jeromy's avatar
Jeromy committed
340
		path = node.Cid().String()
341
	}
342

Jeromy's avatar
Jeromy committed
343 344
	dir := gopath.Dir(path)
	if dir != "." {
345
		if err := mfs.Mkdir(adder.mr, dir, true, false); err != nil {
Jeromy's avatar
Jeromy committed
346 347 348 349
			return err
		}
	}

Jeromy's avatar
Jeromy committed
350
	if err := mfs.PutNode(adder.mr, path, node); err != nil {
351 352
		return err
	}
353

Jeromy's avatar
Jeromy committed
354
	if !adder.Silent {
355
		return outputDagnode(adder.Out, path, node)
Jeromy's avatar
Jeromy committed
356 357
	}
	return nil
358 359
}

Jeromy's avatar
Jeromy committed
360 361
// Add the given file while respecting the adder.
func (adder *Adder) AddFile(file files.File) error {
362 363 364
	if adder.Pin {
		adder.unlocker = adder.blockstore.PinLock()
	}
365
	defer func() {
366 367 368
		if adder.unlocker != nil {
			adder.unlocker.Unlock()
		}
369
	}()
Jeromy's avatar
Jeromy committed
370

Jeromy's avatar
Jeromy committed
371
	return adder.addFile(file)
Jeromy's avatar
Jeromy committed
372 373 374 375 376 377 378 379
}

func (adder *Adder) addFile(file files.File) error {
	err := adder.maybePauseForGC()
	if err != nil {
		return err
	}

380
	if file.IsDirectory() {
Jeromy's avatar
Jeromy committed
381
		return adder.addDir(file)
382 383
	}

384 385 386 387
	// case for symlink
	if s, ok := file.(*files.Symlink); ok {
		sdata, err := unixfs.SymlinkData(s.Target)
		if err != nil {
Jeromy's avatar
Jeromy committed
388
			return err
389
		}
390

391
		dagnode := dag.NodeWithData(sdata)
392
		_, err = adder.dagService.Add(dagnode)
393
		if err != nil {
Jeromy's avatar
Jeromy committed
394
			return err
395 396
		}

Jeromy's avatar
Jeromy committed
397
		return adder.addNode(dagnode, s.FileName())
398 399 400 401 402 403
	}

	// case for regular file
	// if the progress flag was specified, wrap the file so that we can send
	// progress updates to the client (over the output channel)
	var reader io.Reader = file
Jeromy's avatar
Jeromy committed
404
	if adder.Progress {
405 406 407 408 409 410
		rdr := &progressReader{file: file, out: adder.Out}
		if fi, ok := file.(files.FileInfo); ok {
			reader = &progressReader2{rdr, fi}
		} else {
			reader = rdr
		}
411 412
	}

Jeromy's avatar
Jeromy committed
413
	dagnode, err := adder.add(reader)
414
	if err != nil {
Jeromy's avatar
Jeromy committed
415
		return err
416 417 418
	}

	// patch it into the root
Jeromy's avatar
Jeromy committed
419
	return adder.addNode(dagnode, file.FileName())
420 421
}

Jeromy's avatar
Jeromy committed
422
func (adder *Adder) addDir(dir files.File) error {
423
	log.Infof("adding directory: %s", dir.FileName())
424

425
	err := mfs.Mkdir(adder.mr, dir.FileName(), true, false)
Jeromy's avatar
Jeromy committed
426 427 428 429
	if err != nil {
		return err
	}

430 431
	for {
		file, err := dir.NextFile()
432
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
433
			return err
434 435 436
		}
		if file == nil {
			break
437 438
		}

439 440 441
		// Skip hidden files when adding recursively, unless Hidden is enabled.
		if files.IsHidden(file) && !adder.Hidden {
			log.Infof("%s is hidden, skipping", file.FileName())
442
			continue
443 444 445
		}
		err = adder.addFile(file)
		if err != nil {
Jeromy's avatar
Jeromy committed
446
			return err
447 448 449
		}
	}

Jeromy's avatar
Jeromy committed
450
	return nil
451
}
452

Jeromy's avatar
Jeromy committed
453
func (adder *Adder) maybePauseForGC() error {
454
	if adder.unlocker != nil && adder.blockstore.GCRequested() {
Jeromy's avatar
Jeromy committed
455 456 457 458 459
		err := adder.PinRoot()
		if err != nil {
			return err
		}

460
		adder.unlocker.Unlock()
461
		adder.unlocker = adder.blockstore.PinLock()
Jeromy's avatar
Jeromy committed
462 463 464 465
	}
	return nil
}

466
// outputDagnode sends dagnode info over the output channel
467
func outputDagnode(out chan interface{}, name string, dn node.Node) error {
468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484
	if out == nil {
		return nil
	}

	o, err := getOutput(dn)
	if err != nil {
		return err
	}

	out <- &AddedObject{
		Hash: o.Hash,
		Name: name,
	}

	return nil
}

485
func NewMemoryDagService() dag.DAGService {
486 487 488
	// build mem-datastore for editor's intermediary nodes
	bs := bstore.NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
	bsrv := bserv.New(bs, offline.Exchange(bs))
489
	return dag.NewDAGService(bsrv)
490 491 492
}

// from core/commands/object.go
493
func getOutput(dagnode node.Node) (*Object, error) {
Jeromy's avatar
Jeromy committed
494
	c := dagnode.Cid()
495 496

	output := &Object{
Jeromy's avatar
Jeromy committed
497
		Hash:  c.String(),
498
		Links: make([]Link, len(dagnode.Links())),
499 500
	}

501
	for i, link := range dagnode.Links() {
502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531
		output.Links[i] = Link{
			Name: link.Name,
			Size: link.Size,
		}
	}

	return output, nil
}

type progressReader struct {
	file         files.File
	out          chan interface{}
	bytes        int64
	lastProgress int64
}

func (i *progressReader) Read(p []byte) (int, error) {
	n, err := i.file.Read(p)

	i.bytes += int64(n)
	if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
		i.lastProgress = i.bytes
		i.out <- &AddedObject{
			Name:  i.file.FileName(),
			Bytes: i.bytes,
		}
	}

	return n, err
}
532 533 534 535 536

type progressReader2 struct {
	*progressReader
	files.FileInfo
}