dagwriter.go 2.54 KB
Newer Older
1
package io
2 3

import (
4
	"github.com/jbenet/go-ipfs/importer/chunk"
5
	dag "github.com/jbenet/go-ipfs/merkledag"
6
	ft "github.com/jbenet/go-ipfs/unixfs"
7 8 9 10 11 12
	"github.com/jbenet/go-ipfs/util"
)

var log = util.Logger("dagwriter")

type DagWriter struct {
13
	dagserv   dag.DAGService
14 15 16 17
	node      *dag.Node
	totalSize int64
	splChan   chan []byte
	done      chan struct{}
18
	splitter  chunk.BlockSplitter
19 20 21
	seterr    error
}

22
func NewDagWriter(ds dag.DAGService, splitter chunk.BlockSplitter) *DagWriter {
23 24 25 26 27 28 29 30 31
	dw := new(DagWriter)
	dw.dagserv = ds
	dw.splChan = make(chan []byte, 8)
	dw.splitter = splitter
	dw.done = make(chan struct{})
	go dw.startSplitter()
	return dw
}

Jeromy's avatar
Jeromy committed
32 33 34
// startSplitter manages splitting incoming bytes and
// creating dag nodes from them. Created nodes are stored
// in the DAGService and then released to the GC.
35
func (dw *DagWriter) startSplitter() {
Jeromy's avatar
Jeromy committed
36 37 38

	// Since the splitter functions take a reader (and should!)
	// we wrap our byte chan input in a reader
39 40
	r := util.NewByteChanReader(dw.splChan)
	blkchan := dw.splitter.Split(r)
Jeromy's avatar
Jeromy committed
41 42

	// First data block is reserved for storage in the root node
43
	first := <-blkchan
Jeromy's avatar
Jeromy committed
44
	mbf := new(ft.MultiBlock)
45
	root := new(dag.Node)
Jeromy's avatar
Jeromy committed
46

47
	for blkData := range blkchan {
Jeromy's avatar
Jeromy committed
48
		// Store the block size in the root node
Jeromy's avatar
Jeromy committed
49
		mbf.AddBlockSize(uint64(len(blkData)))
50 51 52 53 54 55 56
		node := &dag.Node{Data: ft.WrapData(blkData)}
		_, err := dw.dagserv.Add(node)
		if err != nil {
			dw.seterr = err
			log.Critical("Got error adding created node to dagservice: %s", err)
			return
		}
Jeromy's avatar
Jeromy committed
57 58

		// Add a link to this node without storing a reference to the memory
59 60 61 62 63 64 65
		err = root.AddNodeLinkClean("", node)
		if err != nil {
			dw.seterr = err
			log.Critical("Got error adding created node to root node: %s", err)
			return
		}
	}
Jeromy's avatar
Jeromy committed
66 67

	// Generate the root node data
Jeromy's avatar
Jeromy committed
68 69 70 71 72 73 74 75 76
	mbf.Data = first
	data, err := mbf.GetBytes()
	if err != nil {
		dw.seterr = err
		log.Critical("Failed generating bytes for multiblock file: %s", err)
		return
	}
	root.Data = data

Jeromy's avatar
Jeromy committed
77
	// Add root node to the dagservice
Jeromy's avatar
Jeromy committed
78
	_, err = dw.dagserv.Add(root)
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
	if err != nil {
		dw.seterr = err
		log.Critical("Got error adding created node to dagservice: %s", err)
		return
	}
	dw.node = root
	dw.done <- struct{}{}
}

func (dw *DagWriter) Write(b []byte) (int, error) {
	if dw.seterr != nil {
		return 0, dw.seterr
	}
	dw.splChan <- b
	return len(b), nil
}

Jeromy's avatar
Jeromy committed
96 97 98
// Close the splitters input channel and wait for it to finish
// Must be called to finish up splitting, otherwise split method
// will never halt
99 100 101 102 103 104 105 106 107
func (dw *DagWriter) Close() error {
	close(dw.splChan)
	<-dw.done
	return nil
}

func (dw *DagWriter) GetNode() *dag.Node {
	return dw.node
}