Commit 5b4f82f2 authored by Jeromy's avatar Jeromy

implement seeking in the dagreader

parent 4cfcdc64
......@@ -6,6 +6,7 @@ import (
"io/ioutil"
"testing"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
"github.com/jbenet/go-ipfs/blocks/blockstore"
bs "github.com/jbenet/go-ipfs/blockservice"
......@@ -38,7 +39,7 @@ func getNode(t *testing.T, dserv mdag.DAGService, size int64) ([]byte, *mdag.Nod
t.Fatal(err)
}
dr, err := NewDagReader(node, dserv)
dr, err := NewDagReader(context.TODO(), node, dserv)
if err != nil {
t.Fatal(err)
}
......@@ -75,7 +76,7 @@ func testModWrite(t *testing.T, beg, size uint64, orig []byte, dm *DagModifier)
t.Fatal(err)
}
rd, err := NewDagReader(nd, dm.dagserv)
rd, err := NewDagReader(context.TODO(), nd, dm.dagserv)
if err != nil {
t.Fatal(err)
}
......@@ -173,7 +174,7 @@ func TestMultiWrite(t *testing.T) {
t.Fatal(err)
}
read, err := NewDagReader(nd, dserv)
read, err := NewDagReader(context.TODO(), nd, dserv)
if err != nil {
t.Fatal(err)
}
......@@ -215,7 +216,7 @@ func TestMultiWriteCoal(t *testing.T) {
t.Fatal(err)
}
read, err := NewDagReader(nd, dserv)
read, err := NewDagReader(context.TODO(), nd, dserv)
if err != nil {
t.Fatal(err)
}
......
......@@ -3,7 +3,10 @@ package io
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
......@@ -11,6 +14,7 @@ import (
mdag "github.com/jbenet/go-ipfs/merkledag"
ft "github.com/jbenet/go-ipfs/unixfs"
ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
u "github.com/jbenet/go-ipfs/util"
)
var ErrIsDir = errors.New("this dag node is a directory")
......@@ -19,14 +23,28 @@ var ErrIsDir = errors.New("this dag node is a directory")
type DagReader struct {
serv mdag.DAGService
node *mdag.Node
buf io.Reader
buf ReadSeekCloser
fetchChan <-chan *mdag.Node
linkPosition int
offset int64
// Our context
ctx context.Context
// Context for children
fctx context.Context
cancel func()
}
type ReadSeekCloser interface {
io.Reader
io.Seeker
io.Closer
}
// NewDagReader creates a new reader object that reads the data represented by the given
// node, using the passed in DAGService for data retreival
func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (ReadSeekCloser, error) {
pb := new(ftpb.Data)
err := proto.Unmarshal(n.Data, pb)
if err != nil {
......@@ -38,16 +56,20 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
// Dont allow reading directories
return nil, ErrIsDir
case ftpb.Data_File:
fetchChan := serv.GetDAG(context.TODO(), n)
fctx, cancel := context.WithCancel(ctx)
fetchChan := serv.GetDAG(fctx, n)
return &DagReader{
node: n,
serv: serv,
buf: bytes.NewBuffer(pb.GetData()),
buf: NewRSNCFromBytes(pb.GetData()),
fetchChan: fetchChan,
ctx: ctx,
fctx: fctx,
cancel: cancel,
}, nil
case ftpb.Data_Raw:
// Raw block will just be a single level, return a byte buffer
return bytes.NewBuffer(pb.GetData()), nil
return NewRSNCFromBytes(pb.GetData()), nil
default:
return nil, ft.ErrUnrecognizedType
}
......@@ -70,6 +92,8 @@ func (dr *DagReader) precalcNextBuf() error {
if !ok {
return io.EOF
}
case <-dr.ctx.Done():
return dr.ctx.Err()
}
pb := new(ftpb.Data)
......@@ -85,20 +109,37 @@ func (dr *DagReader) precalcNextBuf() error {
case ftpb.Data_File:
//TODO: this *should* work, needs testing first
log.Warning("Running untested code for multilayered indirect FS reads.")
subr, err := NewDagReader(nxt, dr.serv)
subr, err := NewDagReader(dr.fctx, nxt, dr.serv)
if err != nil {
return err
}
dr.buf = subr
return nil
case ftpb.Data_Raw:
dr.buf = bytes.NewBuffer(pb.GetData())
dr.buf = NewRSNCFromBytes(pb.GetData())
return nil
default:
return ft.ErrUnrecognizedType
}
}
func (dr *DagReader) resetBlockFetch(nlinkpos int) {
dr.cancel()
dr.fetchChan = nil
dr.linkPosition = nlinkpos
var keys []u.Key
for _, lnk := range dr.node.Links[dr.linkPosition:] {
keys = append(keys, u.Key(lnk.Hash))
}
fctx, cancel := context.WithCancel(dr.ctx)
dr.cancel = cancel
dr.fctx = fctx
fch := dr.serv.GetNodes(fctx, keys)
dr.fetchChan = fch
}
// Read reads data from the DAG structured file
func (dr *DagReader) Read(b []byte) (int, error) {
// If no cached buffer, load one
......@@ -113,6 +154,7 @@ func (dr *DagReader) Read(b []byte) (int, error) {
// Attempt to fill bytes from cached buffer
n, err := dr.buf.Read(b[total:])
total += n
dr.offset += int64(n)
if err != nil {
// EOF is expected
if err != io.EOF {
......@@ -133,28 +175,90 @@ func (dr *DagReader) Read(b []byte) (int, error) {
}
}
/*
func (dr *DagReader) Close() error {
if dr.fctx != nil {
dr.cancel()
}
return nil
}
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case os.SEEK_SET:
for i := 0; i < len(dr.node.Links); i++ {
nsize := dr.node.Links[i].Size - 8
if offset > nsize {
offset -= nsize
} else {
if offset < 0 {
return -1, errors.New("Invalid offset")
}
//TODO: this pb should be cached
pb := new(ftpb.Data)
err := proto.Unmarshal(dr.node.Data, pb)
if err != nil {
return -1, err
}
if offset == 0 {
dr.resetBlockFetch(0)
dr.buf = NewRSNCFromBytes(pb.GetData())
return 0, nil
}
left := offset
if int64(len(pb.Data)) > offset {
dr.buf = NewRSNCFromBytes(pb.GetData()[offset:])
dr.linkPosition = 0
dr.offset = offset
return offset, nil
} else {
left -= int64(len(pb.Data))
}
i := 0
for ; i < len(pb.Blocksizes); i++ {
if pb.Blocksizes[i] > uint64(left) {
break
} else {
left -= int64(pb.Blocksizes[i])
}
}
dr.position = i
err := dr.precalcNextBuf()
dr.resetBlockFetch(i)
err = dr.precalcNextBuf()
if err != nil {
return 0, err
}
n, err := io.CopyN(ioutil.Discard, dr.buf, left)
if err != nil {
fmt.Printf("the copy failed: %s - [%d]\n", err, n)
return -1, err
}
left -= n
if left != 0 {
return -1, errors.New("failed to seek properly")
}
dr.offset = offset
return offset, nil
case os.SEEK_CUR:
noffset := dr.offset + offset
return dr.Seek(noffset, os.SEEK_SET)
case os.SEEK_END:
pb := new(ftpb.Data)
err := proto.Unmarshal(dr.node.Data, pb)
if err != nil {
return -1, err
}
noffset := int64(pb.GetFilesize()) - offset
return dr.Seek(noffset, os.SEEK_SET)
default:
return 0, errors.New("invalid whence")
}
return 0, nil
}
*/
type readSeekNopCloser struct {
*bytes.Reader
}
func NewRSNCFromBytes(b []byte) ReadSeekCloser {
return &readSeekNopCloser{bytes.NewReader(b)}
}
func (r *readSeekNopCloser) Close() error { return nil }
......@@ -4,6 +4,7 @@ import (
"archive/tar"
"bytes"
"compress/gzip"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"io"
gopath "path"
"strings"
......@@ -114,7 +115,7 @@ func (i *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
}
i.flush()
reader, err := uio.NewDagReader(dagnode, i.dag)
reader, err := uio.NewDagReader(context.TODO(), dagnode, i.dag)
if err != nil {
i.emitError(err)
return
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment