dagreader.go 3.76 KB
Newer Older
1 2 3 4 5 6 7
package io

import (
	"bytes"
	"errors"
	"io"

Jeromy's avatar
Jeromy committed
8 9
	"code.google.com/p/go.net/context"

10 11 12
	proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
	mdag "github.com/jbenet/go-ipfs/merkledag"
	ft "github.com/jbenet/go-ipfs/unixfs"
13
	ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
14 15 16 17 18 19
)

var ErrIsDir = errors.New("this dag node is a directory")

// DagReader provides a way to easily read the data contained in a dag.
type DagReader struct {
Jeromy's avatar
Jeromy committed
20 21 22 23 24
	serv         mdag.DAGService
	node         *mdag.Node
	buf          io.Reader
	fetchChan    <-chan *mdag.Node
	linkPosition int
25 26 27 28
}

// NewDagReader creates a new reader object that reads the data represented by the given
// node, using the passed in DAGService for data retreival
29
func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
30
	pb := new(ftpb.Data)
31 32 33 34 35 36
	err := proto.Unmarshal(n.Data, pb)
	if err != nil {
		return nil, err
	}

	switch pb.GetType() {
37
	case ftpb.Data_Directory:
38 39
		// Dont allow reading directories
		return nil, ErrIsDir
40
	case ftpb.Data_File:
Jeromy's avatar
Jeromy committed
41 42 43 44
		var fetchChan <-chan *mdag.Node
		if serv != nil {
			fetchChan = serv.BatchFetch(context.TODO(), n)
		}
Jeromy's avatar
Jeromy committed
45
		return &DagReader{
Jeromy's avatar
Jeromy committed
46 47 48
			node:      n,
			serv:      serv,
			buf:       bytes.NewBuffer(pb.GetData()),
Jeromy's avatar
Jeromy committed
49
			fetchChan: fetchChan,
Jeromy's avatar
Jeromy committed
50
		}, nil
51
	case ftpb.Data_Raw:
52 53 54 55 56 57 58
		// Raw block will just be a single level, return a byte buffer
		return bytes.NewBuffer(pb.GetData()), nil
	default:
		return nil, ft.ErrUnrecognizedType
	}
}

Jeromy's avatar
Jeromy committed
59
// precalcNextBuf follows the next link in line and loads it from the DAGService,
60 61
// setting the next buffer to read from
func (dr *DagReader) precalcNextBuf() error {
Jeromy's avatar
Jeromy committed
62 63 64 65 66 67 68
	var nxt *mdag.Node
	var ok bool
	select {
	case nxt, ok = <-dr.fetchChan:
		if !ok {
			return io.EOF
		}
Jeromy's avatar
Jeromy committed
69 70 71 72 73 74 75 76 77 78 79
	default:
		// Only used when fetchChan is nil,
		// which only happens when passed in a nil dagservice
		// TODO: this logic is hard to follow, do it better.
		// NOTE: the only time this code is used, is during the
		//			importer tests, consider just changing those tests
		if dr.linkPosition >= len(dr.node.Links) {
			return io.EOF
		}
		nxt = dr.node.Links[dr.linkPosition].Node
		dr.linkPosition++
80
	}
Jeromy's avatar
Jeromy committed
81

82
	pb := new(ftpb.Data)
Jeromy's avatar
Jeromy committed
83
	err := proto.Unmarshal(nxt.Data, pb)
84 85 86 87 88
	if err != nil {
		return err
	}

	switch pb.GetType() {
89
	case ftpb.Data_Directory:
Jeromy's avatar
Jeromy committed
90
		// A directory should not exist within a file
91
		return ft.ErrInvalidDirLocation
92
	case ftpb.Data_File:
93
		//TODO: this *should* work, needs testing first
Jeromy's avatar
Jeromy committed
94 95 96 97 98 99 100
		log.Warning("Running untested code for multilayered indirect FS reads.")
		subr, err := NewDagReader(nxt, dr.serv)
		if err != nil {
			return err
		}
		dr.buf = subr
		return nil
101
	case ftpb.Data_Raw:
102 103 104 105 106 107 108
		dr.buf = bytes.NewBuffer(pb.GetData())
		return nil
	default:
		return ft.ErrUnrecognizedType
	}
}

Jeromy's avatar
Jeromy committed
109
// Read reads data from the DAG structured file
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
func (dr *DagReader) Read(b []byte) (int, error) {
	// If no cached buffer, load one
	if dr.buf == nil {
		err := dr.precalcNextBuf()
		if err != nil {
			return 0, err
		}
	}
	total := 0
	for {
		// Attempt to fill bytes from cached buffer
		n, err := dr.buf.Read(b[total:])
		total += n
		if err != nil {
			// EOF is expected
			if err != io.EOF {
				return total, err
			}
		}

		// If weve read enough bytes, return
		if total == len(b) {
			return total, nil
		}

		// Otherwise, load up the next block
		err = dr.precalcNextBuf()
		if err != nil {
			return total, err
		}
	}
}

/*
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
	switch whence {
	case os.SEEK_SET:
		for i := 0; i < len(dr.node.Links); i++ {
			nsize := dr.node.Links[i].Size - 8
			if offset > nsize {
				offset -= nsize
			} else {
				break
			}
		}
		dr.position = i
		err := dr.precalcNextBuf()
		if err != nil {
			return 0, err
		}
	case os.SEEK_CUR:
	case os.SEEK_END:
	default:
		return 0, errors.New("invalid whence")
	}
	return 0, nil
}
*/