add.go 10.2 KB
Newer Older
Brian Tiger Chow's avatar
Brian Tiger Chow committed
1
package coreunix
2 3

import (
Jeromy's avatar
Jeromy committed
4
	"bytes"
5
	"fmt"
6
	"io"
7
	"io/ioutil"
8
	"os"
9
	gopath "path"
Jeromy's avatar
Jeromy committed
10

11 12
	ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
	syncds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
Jeromy's avatar
Jeromy committed
13
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
14
	bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
Jeromy's avatar
Jeromy committed
15
	key "github.com/ipfs/go-ipfs/blocks/key"
16 17 18 19
	bserv "github.com/ipfs/go-ipfs/blockservice"
	"github.com/ipfs/go-ipfs/exchange/offline"
	importer "github.com/ipfs/go-ipfs/importer"
	"github.com/ipfs/go-ipfs/importer/chunk"
Jeromy's avatar
Jeromy committed
20
	mfs "github.com/ipfs/go-ipfs/mfs"
21
	"github.com/ipfs/go-ipfs/pin"
22

23 24
	"github.com/ipfs/go-ipfs/commands/files"
	core "github.com/ipfs/go-ipfs/core"
25
	dag "github.com/ipfs/go-ipfs/merkledag"
26
	unixfs "github.com/ipfs/go-ipfs/unixfs"
Jeromy's avatar
Jeromy committed
27
	logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
28 29
)

Jeromy's avatar
Jeromy committed
30
var log = logging.Logger("coreunix")
31

Jeromy's avatar
Jeromy committed
32 33
var folderData = unixfs.FolderPBData()

34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
// how many bytes of progress to wait before sending a progress update message
const progressReaderIncrement = 1024 * 256

type Link struct {
	Name, Hash string
	Size       uint64
}

type Object struct {
	Hash  string
	Links []Link
}

type hiddenFileError struct {
	fileName string
}

func (e *hiddenFileError) Error() string {
	return fmt.Sprintf("%s is a hidden file", e.fileName)
}

type ignoreFileError struct {
	fileName string
}

func (e *ignoreFileError) Error() string {
	return fmt.Sprintf("%s is an ignored file", e.fileName)
}

type AddedObject struct {
	Name  string
	Hash  string `json:",omitempty"`
	Bytes int64  `json:",omitempty"`
}

Jeromy's avatar
Jeromy committed
69 70 71 72 73 74
func NewAdder(ctx context.Context, n *core.IpfsNode, out chan interface{}) (*Adder, error) {
	mr, err := mfs.NewRoot(ctx, n.DAG, newDirNode(), nil)
	if err != nil {
		return nil, err
	}

75
	return &Adder{
Jeromy's avatar
Jeromy committed
76
		mr:       mr,
77 78 79 80 81 82 83 84 85
		ctx:      ctx,
		node:     n,
		out:      out,
		Progress: false,
		Hidden:   true,
		Pin:      true,
		Trickle:  false,
		Wrap:     false,
		Chunker:  "",
Jeromy's avatar
Jeromy committed
86
	}, nil
87 88 89 90 91 92 93 94 95 96 97
}

// Internal structure for holding the switches passed to the `add` call
type Adder struct {
	ctx      context.Context
	node     *core.IpfsNode
	out      chan interface{}
	Progress bool
	Hidden   bool
	Pin      bool
	Trickle  bool
Jeromy's avatar
Jeromy committed
98
	Silent   bool
99 100
	Wrap     bool
	Chunker  string
101
	root     *dag.Node
Jeromy's avatar
Jeromy committed
102
	mr       *mfs.Root
Jeromy's avatar
Jeromy committed
103 104
	unlock   func()
	tempRoot key.Key
105 106 107
}

// Perform the actual add & pin locally, outputting results to reader
108
func (params Adder) add(reader io.Reader) (*dag.Node, error) {
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
	chnk, err := chunk.FromString(reader, params.Chunker)
	if err != nil {
		return nil, err
	}

	if params.Trickle {
		return importer.BuildTrickleDagFromReader(
			params.node.DAG,
			chnk,
		)
	}
	return importer.BuildDagFromReader(
		params.node.DAG,
		chnk,
	)
}

126
func (params *Adder) RootNode() (*dag.Node, error) {
Jeromy's avatar
Jeromy committed
127 128 129 130
	// for memoizing
	if params.root != nil {
		return params.root, nil
	}
131

Jeromy's avatar
Jeromy committed
132 133 134 135
	root, err := params.mr.GetValue().GetNode()
	if err != nil {
		return nil, err
	}
136

Jeromy's avatar
Jeromy committed
137 138 139 140 141
	// if not wrapping, AND one root file, use that hash as root.
	if !params.Wrap && len(root.Links) == 1 {
		root, err = root.Links[0].GetNode(params.ctx, params.node.DAG)
		if err != nil {
			return nil, err
Jeromy's avatar
Jeromy committed
142
		}
Jeromy's avatar
Jeromy committed
143
	}
Jeromy's avatar
Jeromy committed
144

Jeromy's avatar
Jeromy committed
145 146
	params.root = root
	return root, err
147 148 149 150 151 152 153
}

func (params *Adder) PinRoot() error {
	root, err := params.RootNode()
	if err != nil {
		return err
	}
154 155 156
	if !params.Pin {
		return nil
	}
157

Jeromy's avatar
Jeromy committed
158
	rnk, err := params.node.DAG.Add(root)
159 160 161 162
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
163 164 165 166 167 168 169 170
	if params.tempRoot != "" {
		err := params.node.Pinning.Unpin(params.ctx, params.tempRoot, true)
		if err != nil {
			return err
		}
		params.tempRoot = rnk
	}

171 172 173 174
	params.node.Pinning.PinWithMode(rnk, pin.Recursive)
	return params.node.Pinning.Flush()
}

Jeromy's avatar
Jeromy committed
175
func (params *Adder) Finalize() (*dag.Node, error) {
Jeromy's avatar
Jeromy committed
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
	root, err := params.mr.GetValue().GetNode()
	if err != nil {
		return nil, err
	}

	params.RootNode()

	var name string
	if !params.Wrap {
		name = root.Links[0].Name
		child, err := root.Links[0].GetNode(params.ctx, params.node.DAG)
		if err != nil {
			return nil, err
		}
		root = child
	}

	err = params.outputDirs(name, root)
	if err != nil {
		return nil, err
	}

	err = params.mr.Close()
	if err != nil {
		return nil, err
	}

	return root, nil
}

func (params *Adder) outputDirs(path string, nd *dag.Node) error {
Jeromy's avatar
Jeromy committed
207 208 209 210
	if !bytes.Equal(nd.Data, folderData) {
		return nil
	}

Jeromy's avatar
Jeromy committed
211 212 213 214 215 216
	for _, l := range nd.Links {
		child, err := l.GetNode(params.ctx, params.node.DAG)
		if err != nil {
			return err
		}

Jeromy's avatar
Jeromy committed
217 218 219
		err = params.outputDirs(gopath.Join(path, l.Name), child)
		if err != nil {
			return err
Jeromy's avatar
Jeromy committed
220 221
		}
	}
Jeromy's avatar
Jeromy committed
222

Jeromy's avatar
Jeromy committed
223
	return outputDagnode(params.out, path, nd)
224 225
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
226 227
// Add builds a merkledag from the a reader, pinning all objects to the local
// datastore. Returns a key representing the root node.
228
func Add(n *core.IpfsNode, r io.Reader) (string, error) {
229
	unlock := n.Blockstore.PinLock()
Jeromy's avatar
Jeromy committed
230 231
	defer unlock()

Jeromy's avatar
Jeromy committed
232 233 234 235
	fileAdder, err := NewAdder(n.Context(), n, nil)
	if err != nil {
		return "", err
	}
236

237
	node, err := fileAdder.add(r)
238 239 240
	if err != nil {
		return "", err
	}
241
	k, err := node.Key()
242 243 244 245 246
	if err != nil {
		return "", err
	}

	return k.String(), nil
247
}
248 249 250

// AddR recursively adds files in |path|.
func AddR(n *core.IpfsNode, root string) (key string, err error) {
251
	unlock := n.Blockstore.PinLock()
Jeromy's avatar
Jeromy committed
252 253
	defer unlock()

254
	stat, err := os.Lstat(root)
255 256 257
	if err != nil {
		return "", err
	}
258

259
	f, err := files.NewSerialFile(root, root, stat)
260 261 262
	if err != nil {
		return "", err
	}
263
	defer f.Close()
264

Jeromy's avatar
Jeromy committed
265 266 267 268
	fileAdder, err := NewAdder(n.Context(), n, nil)
	if err != nil {
		return "", err
	}
269

Jeromy's avatar
Jeromy committed
270
	err = fileAdder.addFile(f)
271 272 273
	if err != nil {
		return "", err
	}
274

Jeromy's avatar
Jeromy committed
275 276 277 278 279 280
	nd, err := fileAdder.Finalize()
	if err != nil {
		return "", err
	}

	k, err := nd.Key()
281 282 283
	if err != nil {
		return "", err
	}
284

285 286 287
	return k.String(), nil
}

288 289
// AddWrapped adds data from a reader, and wraps it with a directory object
// to preserve the filename.
290 291
// Returns the path of the added file ("<dir hash>/filename"), the DAG node of
// the directory, and and error if any.
292
func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *dag.Node, error) {
293
	file := files.NewReaderFile(filename, filename, ioutil.NopCloser(r), nil)
Jeromy's avatar
Jeromy committed
294 295 296 297
	fileAdder, err := NewAdder(n.Context(), n, nil)
	if err != nil {
		return "", nil, err
	}
Jeromy's avatar
Jeromy committed
298
	fileAdder.Wrap = true
Jeromy's avatar
Jeromy committed
299

300
	unlock := n.Blockstore.PinLock()
Jeromy's avatar
Jeromy committed
301
	defer unlock()
Jeromy's avatar
Jeromy committed
302

Jeromy's avatar
Jeromy committed
303
	err = fileAdder.addFile(file)
Jeromy's avatar
Jeromy committed
304 305 306 307 308
	if err != nil {
		return "", nil, err
	}

	dagnode, err := fileAdder.Finalize()
309 310 311
	if err != nil {
		return "", nil, err
	}
Jeromy's avatar
Jeromy committed
312

313 314 315 316
	k, err := dagnode.Key()
	if err != nil {
		return "", nil, err
	}
Jeromy's avatar
Jeromy committed
317

318 319 320
	return gopath.Join(k.String(), filename), dagnode, nil
}

321
func (params *Adder) addNode(node *dag.Node, path string) error {
322 323 324 325 326 327 328 329 330
	// patch it into the root
	if path == "" {
		key, err := node.Key()
		if err != nil {
			return err
		}

		path = key.Pretty()
	}
331

Jeromy's avatar
Jeromy committed
332
	if err := mfs.PutNode(params.mr, path, node); err != nil {
333 334
		return err
	}
335

Jeromy's avatar
Jeromy committed
336 337 338 339
	if !params.Silent {
		return outputDagnode(params.out, path, node)
	}
	return nil
340 341
}

342
// Add the given file while respecting the params.
Jeromy's avatar
Jeromy committed
343
func (params *Adder) AddFile(file files.File) error {
Jeromy's avatar
Jeromy committed
344 345 346 347 348 349 350 351 352 353 354 355
	params.unlock = params.node.Blockstore.PinLock()
	defer params.unlock()

	return params.addFile(file)
}

func (adder *Adder) addFile(file files.File) error {
	err := adder.maybePauseForGC()
	if err != nil {
		return err
	}

356
	switch {
Jeromy's avatar
Jeromy committed
357
	case files.IsHidden(file) && !adder.Hidden:
358
		log.Debugf("%s is hidden, skipping", file.FileName())
Jeromy's avatar
Jeromy committed
359
		return &hiddenFileError{file.FileName()}
360
	case file.IsDirectory():
Jeromy's avatar
Jeromy committed
361
		return adder.addDir(file)
362 363
	}

364 365 366 367
	// case for symlink
	if s, ok := file.(*files.Symlink); ok {
		sdata, err := unixfs.SymlinkData(s.Target)
		if err != nil {
Jeromy's avatar
Jeromy committed
368
			return err
369
		}
370

371
		dagnode := &dag.Node{Data: sdata}
Jeromy's avatar
Jeromy committed
372
		_, err = adder.node.DAG.Add(dagnode)
373
		if err != nil {
Jeromy's avatar
Jeromy committed
374
			return err
375 376
		}

Jeromy's avatar
Jeromy committed
377
		return adder.addNode(dagnode, s.FileName())
378 379 380 381 382 383
	}

	// case for regular file
	// if the progress flag was specified, wrap the file so that we can send
	// progress updates to the client (over the output channel)
	var reader io.Reader = file
Jeromy's avatar
Jeromy committed
384 385
	if adder.Progress {
		reader = &progressReader{file: file, out: adder.out}
386 387
	}

Jeromy's avatar
Jeromy committed
388
	dagnode, err := adder.add(reader)
389
	if err != nil {
Jeromy's avatar
Jeromy committed
390
		return err
391 392 393
	}

	// patch it into the root
Jeromy's avatar
Jeromy committed
394
	return adder.addNode(dagnode, file.FileName())
395 396
}

Jeromy's avatar
Jeromy committed
397
func (params *Adder) addDir(dir files.File) error {
398
	log.Infof("adding directory: %s", dir.FileName())
399

Jeromy's avatar
Jeromy committed
400 401 402 403 404
	err := mfs.Mkdir(params.mr, dir.FileName(), true)
	if err != nil {
		return err
	}

405 406
	for {
		file, err := dir.NextFile()
407
		if err != nil && err != io.EOF {
Jeromy's avatar
Jeromy committed
408
			return err
409 410 411
		}
		if file == nil {
			break
412 413
		}

Jeromy's avatar
Jeromy committed
414
		err = params.addFile(file)
415 416 417 418
		if _, ok := err.(*hiddenFileError); ok {
			// hidden file error, skip file
			continue
		} else if err != nil {
Jeromy's avatar
Jeromy committed
419
			return err
420 421 422
		}
	}

Jeromy's avatar
Jeromy committed
423
	return nil
424
}
425

Jeromy's avatar
Jeromy committed
426 427 428 429 430 431 432 433 434 435 436 437 438
func (adder *Adder) maybePauseForGC() error {
	if adder.node.Blockstore.GCRequested() {
		err := adder.PinRoot()
		if err != nil {
			return err
		}

		adder.unlock()
		adder.unlock = adder.node.Blockstore.PinLock()
	}
	return nil
}

439
// outputDagnode sends dagnode info over the output channel
440
func outputDagnode(out chan interface{}, name string, dn *dag.Node) error {
441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457
	if out == nil {
		return nil
	}

	o, err := getOutput(dn)
	if err != nil {
		return err
	}

	out <- &AddedObject{
		Hash: o.Hash,
		Name: name,
	}

	return nil
}

458
func NewMemoryDagService() dag.DAGService {
459 460 461
	// build mem-datastore for editor's intermediary nodes
	bs := bstore.NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
	bsrv := bserv.New(bs, offline.Exchange(bs))
462
	return dag.NewDAGService(bsrv)
463 464 465
}

// TODO: generalize this to more than unix-fs nodes.
466 467
func newDirNode() *dag.Node {
	return &dag.Node{Data: unixfs.FolderPBData()}
468 469 470
}

// from core/commands/object.go
471
func getOutput(dagnode *dag.Node) (*Object, error) {
472 473 474 475 476 477 478 479 480 481 482 483 484
	key, err := dagnode.Key()
	if err != nil {
		return nil, err
	}

	output := &Object{
		Hash:  key.Pretty(),
		Links: make([]Link, len(dagnode.Links)),
	}

	for i, link := range dagnode.Links {
		output.Links[i] = Link{
			Name: link.Name,
Jeromy's avatar
Jeromy committed
485
			//Hash: link.Hash.B58String(),
486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513
			Size: link.Size,
		}
	}

	return output, nil
}

type progressReader struct {
	file         files.File
	out          chan interface{}
	bytes        int64
	lastProgress int64
}

func (i *progressReader) Read(p []byte) (int, error) {
	n, err := i.file.Read(p)

	i.bytes += int64(n)
	if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
		i.lastProgress = i.bytes
		i.out <- &AddedObject{
			Name:  i.file.FileName(),
			Bytes: i.bytes,
		}
	}

	return n, err
}