dagreader.go 4.56 KB
Newer Older
1 2 3 4 5 6
package io

import (
	"bytes"
	"errors"
	"io"
Jeromy's avatar
Jeromy committed
7
	"os"
8

9
	"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
Jeromy's avatar
Jeromy committed
10

11 12 13
	proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
	mdag "github.com/jbenet/go-ipfs/merkledag"
	ft "github.com/jbenet/go-ipfs/unixfs"
14
	ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
15 16 17 18 19 20
)

var ErrIsDir = errors.New("this dag node is a directory")

// DagReader provides a way to easily read the data contained in a dag.
type DagReader struct {
Jeromy's avatar
Jeromy committed
21 22
	serv         mdag.DAGService
	node         *mdag.Node
Jeromy's avatar
Jeromy committed
23
	pbdata       *ftpb.Data
Jeromy's avatar
Jeromy committed
24
	buf          ReadSeekCloser
Jeromy's avatar
Jeromy committed
25
	promises     []mdag.NodeGetter
Jeromy's avatar
Jeromy committed
26
	linkPosition int
Jeromy's avatar
Jeromy committed
27 28 29 30 31 32 33 34 35 36 37 38 39 40
	offset       int64

	// Our context
	ctx context.Context

	// Context for children
	fctx   context.Context
	cancel func()
}

type ReadSeekCloser interface {
	io.Reader
	io.Seeker
	io.Closer
41 42 43 44
}

// NewDagReader creates a new reader object that reads the data represented by the given
// node, using the passed in DAGService for data retreival
Jeromy's avatar
Jeromy committed
45
func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (ReadSeekCloser, error) {
46
	pb := new(ftpb.Data)
47 48 49 50 51 52
	err := proto.Unmarshal(n.Data, pb)
	if err != nil {
		return nil, err
	}

	switch pb.GetType() {
53
	case ftpb.Data_Directory:
54 55
		// Dont allow reading directories
		return nil, ErrIsDir
56
	case ftpb.Data_File:
Jeromy's avatar
Jeromy committed
57
		fctx, cancel := context.WithCancel(ctx)
Jeromy's avatar
Jeromy committed
58
		promises := serv.GetDAG(fctx, n)
Jeromy's avatar
Jeromy committed
59
		return &DagReader{
Jeromy's avatar
Jeromy committed
60 61 62 63 64 65 66
			node:     n,
			serv:     serv,
			buf:      NewRSNCFromBytes(pb.GetData()),
			promises: promises,
			ctx:      fctx,
			cancel:   cancel,
			pbdata:   pb,
Jeromy's avatar
Jeromy committed
67
		}, nil
68
	case ftpb.Data_Raw:
69
		// Raw block will just be a single level, return a byte buffer
Jeromy's avatar
Jeromy committed
70
		return NewRSNCFromBytes(pb.GetData()), nil
71 72 73 74 75
	default:
		return nil, ft.ErrUnrecognizedType
	}
}

Jeromy's avatar
Jeromy committed
76
// precalcNextBuf follows the next link in line and loads it from the DAGService,
77 78
// setting the next buffer to read from
func (dr *DagReader) precalcNextBuf() error {
Jeromy's avatar
Jeromy committed
79 80 81
	dr.buf.Close() // Just to make sure
	if dr.linkPosition >= len(dr.promises) {
		return io.EOF
Jeromy's avatar
Jeromy committed
82
	}
Jeromy's avatar
Jeromy committed
83 84 85
	nxt, err := dr.promises[dr.linkPosition].Get()
	if err != nil {
		return err
86
	}
Jeromy's avatar
Jeromy committed
87
	dr.linkPosition++
Jeromy's avatar
Jeromy committed
88

89
	pb := new(ftpb.Data)
Jeromy's avatar
Jeromy committed
90
	err = proto.Unmarshal(nxt.Data, pb)
91 92 93 94 95
	if err != nil {
		return err
	}

	switch pb.GetType() {
96
	case ftpb.Data_Directory:
Jeromy's avatar
Jeromy committed
97
		// A directory should not exist within a file
98
		return ft.ErrInvalidDirLocation
99
	case ftpb.Data_File:
Jeromy's avatar
Jeromy committed
100
		subr, err := NewDagReader(dr.ctx, nxt, dr.serv)
Jeromy's avatar
Jeromy committed
101 102 103 104 105
		if err != nil {
			return err
		}
		dr.buf = subr
		return nil
106
	case ftpb.Data_Raw:
Jeromy's avatar
Jeromy committed
107
		dr.buf = NewRSNCFromBytes(pb.GetData())
108 109 110 111 112 113
		return nil
	default:
		return ft.ErrUnrecognizedType
	}
}

Jeromy's avatar
Jeromy committed
114
// Read reads data from the DAG structured file
115 116 117 118 119 120 121
func (dr *DagReader) Read(b []byte) (int, error) {
	// If no cached buffer, load one
	total := 0
	for {
		// Attempt to fill bytes from cached buffer
		n, err := dr.buf.Read(b[total:])
		total += n
Jeromy's avatar
Jeromy committed
122
		dr.offset += int64(n)
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
		if err != nil {
			// EOF is expected
			if err != io.EOF {
				return total, err
			}
		}

		// If weve read enough bytes, return
		if total == len(b) {
			return total, nil
		}

		// Otherwise, load up the next block
		err = dr.precalcNextBuf()
		if err != nil {
			return total, err
		}
	}
}

Jeromy's avatar
Jeromy committed
143
func (dr *DagReader) Close() error {
Jeromy's avatar
Jeromy committed
144
	dr.cancel()
Jeromy's avatar
Jeromy committed
145 146 147
	return nil
}

148 149 150
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
	switch whence {
	case os.SEEK_SET:
Jeromy's avatar
Jeromy committed
151 152 153 154
		if offset < 0 {
			return -1, errors.New("Invalid offset")
		}

Jeromy's avatar
Jeromy committed
155
		pb := dr.pbdata
Jeromy's avatar
Jeromy committed
156 157
		left := offset
		if int64(len(pb.Data)) > offset {
Jeromy's avatar
Jeromy committed
158
			dr.buf.Close()
Jeromy's avatar
Jeromy committed
159 160 161 162 163 164 165 166
			dr.buf = NewRSNCFromBytes(pb.GetData()[offset:])
			dr.linkPosition = 0
			dr.offset = offset
			return offset, nil
		} else {
			left -= int64(len(pb.Data))
		}

Jeromy's avatar
Jeromy committed
167
		for i := 0; i < len(pb.Blocksizes); i++ {
Jeromy's avatar
Jeromy committed
168
			if pb.Blocksizes[i] > uint64(left) {
Jeromy's avatar
Jeromy committed
169
				dr.linkPosition = i
170
				break
Jeromy's avatar
Jeromy committed
171 172
			} else {
				left -= int64(pb.Blocksizes[i])
173 174
			}
		}
Jeromy's avatar
Jeromy committed
175 176

		err := dr.precalcNextBuf()
177 178 179
		if err != nil {
			return 0, err
		}
Jeromy's avatar
Jeromy committed
180

Jeromy's avatar
Jeromy committed
181
		n, err := dr.buf.Seek(left, os.SEEK_SET)
Jeromy's avatar
Jeromy committed
182 183 184 185 186 187 188 189 190
		if err != nil {
			return -1, err
		}
		left -= n
		if left != 0 {
			return -1, errors.New("failed to seek properly")
		}
		dr.offset = offset
		return offset, nil
191
	case os.SEEK_CUR:
Jeromy's avatar
Jeromy committed
192
		// TODO: be smarter here
Jeromy's avatar
Jeromy committed
193 194
		noffset := dr.offset + offset
		return dr.Seek(noffset, os.SEEK_SET)
195
	case os.SEEK_END:
Jeromy's avatar
Jeromy committed
196
		noffset := int64(dr.pbdata.GetFilesize()) - offset
Jeromy's avatar
Jeromy committed
197
		return dr.Seek(noffset, os.SEEK_SET)
198 199 200 201 202
	default:
		return 0, errors.New("invalid whence")
	}
	return 0, nil
}
Jeromy's avatar
Jeromy committed
203 204 205 206 207 208 209 210 211 212

type readSeekNopCloser struct {
	*bytes.Reader
}

func NewRSNCFromBytes(b []byte) ReadSeekCloser {
	return &readSeekNopCloser{bytes.NewReader(b)}
}

func (r *readSeekNopCloser) Close() error { return nil }