dagwriter.go 2.74 KB
Newer Older
1
package io
2 3

import (
4
	"github.com/jbenet/go-ipfs/importer/chunk"
5
	dag "github.com/jbenet/go-ipfs/merkledag"
6
	"github.com/jbenet/go-ipfs/pin"
7
	ft "github.com/jbenet/go-ipfs/unixfs"
8 9 10 11 12 13
	"github.com/jbenet/go-ipfs/util"
)

var log = util.Logger("dagwriter")

type DagWriter struct {
14
	dagserv   dag.DAGService
15 16 17 18
	node      *dag.Node
	totalSize int64
	splChan   chan []byte
	done      chan struct{}
19
	splitter  chunk.BlockSplitter
20
	seterr    error
21
	Pinner    pin.ManualPinner
22 23
}

24
func NewDagWriter(ds dag.DAGService, splitter chunk.BlockSplitter) *DagWriter {
25 26 27 28 29 30 31 32 33
	dw := new(DagWriter)
	dw.dagserv = ds
	dw.splChan = make(chan []byte, 8)
	dw.splitter = splitter
	dw.done = make(chan struct{})
	go dw.startSplitter()
	return dw
}

Jeromy's avatar
Jeromy committed
34 35 36
// startSplitter manages splitting incoming bytes and
// creating dag nodes from them. Created nodes are stored
// in the DAGService and then released to the GC.
37
func (dw *DagWriter) startSplitter() {
Jeromy's avatar
Jeromy committed
38 39 40

	// Since the splitter functions take a reader (and should!)
	// we wrap our byte chan input in a reader
41 42
	r := util.NewByteChanReader(dw.splChan)
	blkchan := dw.splitter.Split(r)
Jeromy's avatar
Jeromy committed
43 44

	// First data block is reserved for storage in the root node
45
	first := <-blkchan
Jeromy's avatar
Jeromy committed
46
	mbf := new(ft.MultiBlock)
47
	root := new(dag.Node)
Jeromy's avatar
Jeromy committed
48

49
	for blkData := range blkchan {
Jeromy's avatar
Jeromy committed
50
		// Store the block size in the root node
Jeromy's avatar
Jeromy committed
51
		mbf.AddBlockSize(uint64(len(blkData)))
52
		node := &dag.Node{Data: ft.WrapData(blkData)}
53 54 55 56
		nk, err := dw.dagserv.Add(node)
		if dw.Pinner != nil {
			dw.Pinner.PinWithMode(nk, pin.Indirect)
		}
57 58 59 60 61
		if err != nil {
			dw.seterr = err
			log.Critical("Got error adding created node to dagservice: %s", err)
			return
		}
Jeromy's avatar
Jeromy committed
62 63

		// Add a link to this node without storing a reference to the memory
64 65 66 67 68 69 70
		err = root.AddNodeLinkClean("", node)
		if err != nil {
			dw.seterr = err
			log.Critical("Got error adding created node to root node: %s", err)
			return
		}
	}
Jeromy's avatar
Jeromy committed
71 72

	// Generate the root node data
Jeromy's avatar
Jeromy committed
73 74 75 76 77 78 79 80 81
	mbf.Data = first
	data, err := mbf.GetBytes()
	if err != nil {
		dw.seterr = err
		log.Critical("Failed generating bytes for multiblock file: %s", err)
		return
	}
	root.Data = data

Jeromy's avatar
Jeromy committed
82
	// Add root node to the dagservice
83
	rootk, err := dw.dagserv.Add(root)
84 85 86 87 88
	if err != nil {
		dw.seterr = err
		log.Critical("Got error adding created node to dagservice: %s", err)
		return
	}
89 90 91
	if dw.Pinner != nil {
		dw.Pinner.PinWithMode(rootk, pin.Recursive)
	}
92 93 94 95 96 97 98 99 100 101 102 103
	dw.node = root
	dw.done <- struct{}{}
}

func (dw *DagWriter) Write(b []byte) (int, error) {
	if dw.seterr != nil {
		return 0, dw.seterr
	}
	dw.splChan <- b
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
104 105 106
// Close the splitters input channel and wait for it to finish
// Must be called to finish up splitting, otherwise split method
// will never halt
107 108 109 110 111 112 113 114 115
func (dw *DagWriter) Close() error {
	close(dw.splChan)
	<-dw.done
	return nil
}

func (dw *DagWriter) GetNode() *dag.Node {
	return dw.node
}