Commit 0023cb30 authored by Jeromy's avatar Jeromy

refactor and clean up dagreader

parent 26826bd5
......@@ -263,6 +263,50 @@ func TestSeekToBegin(t *testing.T) {
}
}
func TestSeekToAlmostBegin(t *testing.T) {
nbytes := int64(10 * 1024)
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)
read := bytes.NewReader(should)
dnp := getDagservAndPinner(t)
nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
rs, err := uio.NewDagReader(context.TODO(), nd, dnp.ds)
if err != nil {
t.Fatal(err)
}
n, err := io.CopyN(ioutil.Discard, rs, 1024*4)
if err != nil {
t.Fatal(err)
}
if n != 4096 {
t.Fatal("Copy didnt copy enough bytes")
}
seeked, err := rs.Seek(1, os.SEEK_SET)
if err != nil {
t.Fatal(err)
}
if seeked != 1 {
t.Fatal("Failed to seek to almost beginning")
}
out, err := ioutil.ReadAll(rs)
if err != nil {
t.Fatal(err)
}
err = arrComp(out, should[1:])
if err != nil {
t.Fatal(err)
}
}
func TestSeekingConsistency(t *testing.T) {
nbytes := int64(128 * 1024)
should := make([]byte, nbytes)
......
......@@ -25,8 +25,8 @@ type DAGService interface {
// GetDAG returns, in order, all the single leve child
// nodes of the passed in node.
GetDAG(context.Context, *Node) <-chan *Node
GetNodes(context.Context, []u.Key) <-chan *Node
GetDAG(context.Context, *Node) []NodeGetter
GetNodes(context.Context, []u.Key) []NodeGetter
}
func NewDAGService(bs *bserv.BlockService) DAGService {
......@@ -168,7 +168,7 @@ func FindLinks(links []u.Key, k u.Key, start int) []int {
// GetDAG will fill out all of the links of the given Node.
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
func (ds *dagService) GetDAG(ctx context.Context, root *Node) <-chan *Node {
func (ds *dagService) GetDAG(ctx context.Context, root *Node) []NodeGetter {
var keys []u.Key
for _, lnk := range root.Links {
keys = append(keys, u.Key(lnk.Hash))
......@@ -177,46 +177,69 @@ func (ds *dagService) GetDAG(ctx context.Context, root *Node) <-chan *Node {
return ds.GetNodes(ctx, keys)
}
func (ds *dagService) GetNodes(ctx context.Context, keys []u.Key) <-chan *Node {
sig := make(chan *Node)
func (ds *dagService) GetNodes(ctx context.Context, keys []u.Key) []NodeGetter {
promises := make([]NodeGetter, len(keys))
sendChans := make([]chan<- *Node, len(keys))
for i, _ := range keys {
promises[i], sendChans[i] = newNodePromise(ctx)
}
go func() {
defer close(sig)
blkchan := ds.Blocks.GetBlocks(ctx, keys)
nodes := make([]*Node, len(keys))
next := 0
for {
select {
case blk, ok := <-blkchan:
if !ok {
if next < len(nodes) {
log.Errorf("Did not receive correct number of nodes!")
}
return
}
nd, err := Decoded(blk.Data)
if err != nil {
// NB: can occur in normal situations, with improperly formatted
// input data
// NB: can happen with improperly formatted input data
log.Error("Got back bad block!")
break
return
}
is := FindLinks(keys, blk.Key(), next)
is := FindLinks(keys, blk.Key(), 0)
for _, i := range is {
nodes[i] = nd
}
for ; next < len(nodes) && nodes[next] != nil; next++ {
select {
case sig <- nodes[next]:
case <-ctx.Done():
return
}
sendChans[i] <- nd
}
case <-ctx.Done():
return
}
}
}()
return sig
return promises
}
func newNodePromise(ctx context.Context) (NodeGetter, chan<- *Node) {
ch := make(chan *Node, 1)
return &nodePromise{
recv: ch,
ctx: ctx,
}, ch
}
type nodePromise struct {
cache *Node
recv <-chan *Node
ctx context.Context
}
type NodeGetter interface {
Get() (*Node, error)
}
func (np *nodePromise) Get() (*Node, error) {
if np.cache != nil {
return np.cache, nil
}
select {
case blk := <-np.recv:
np.cache = blk
case <-np.ctx.Done():
return nil, np.ctx.Err()
}
return np.cache, nil
}
......@@ -3,9 +3,7 @@ package io
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
......@@ -14,7 +12,6 @@ import (
mdag "github.com/jbenet/go-ipfs/merkledag"
ft "github.com/jbenet/go-ipfs/unixfs"
ftpb "github.com/jbenet/go-ipfs/unixfs/pb"
u "github.com/jbenet/go-ipfs/util"
)
var ErrIsDir = errors.New("this dag node is a directory")
......@@ -23,8 +20,9 @@ var ErrIsDir = errors.New("this dag node is a directory")
type DagReader struct {
serv mdag.DAGService
node *mdag.Node
pbdata *ftpb.Data
buf ReadSeekCloser
fetchChan <-chan *mdag.Node
promises []mdag.NodeGetter
linkPosition int
offset int64
......@@ -57,15 +55,15 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (Read
return nil, ErrIsDir
case ftpb.Data_File:
fctx, cancel := context.WithCancel(ctx)
fetchChan := serv.GetDAG(fctx, n)
promises := serv.GetDAG(fctx, n)
return &DagReader{
node: n,
serv: serv,
buf: NewRSNCFromBytes(pb.GetData()),
fetchChan: fetchChan,
ctx: ctx,
fctx: fctx,
cancel: cancel,
node: n,
serv: serv,
buf: NewRSNCFromBytes(pb.GetData()),
promises: promises,
ctx: fctx,
cancel: cancel,
pbdata: pb,
}, nil
case ftpb.Data_Raw:
// Raw block will just be a single level, return a byte buffer
......@@ -78,26 +76,18 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (Read
// precalcNextBuf follows the next link in line and loads it from the DAGService,
// setting the next buffer to read from
func (dr *DagReader) precalcNextBuf() error {
var nxt *mdag.Node
var ok bool
if dr.fetchChan == nil {
// This panic is appropriate because the select statement
// will not panic if you try and read from a nil channel
// it will simply hang.
panic("fetchChan should NOT be nil")
dr.buf.Close() // Just to make sure
if dr.linkPosition >= len(dr.promises) {
return io.EOF
}
select {
case nxt, ok = <-dr.fetchChan:
if !ok {
return io.EOF
}
case <-dr.ctx.Done():
return dr.ctx.Err()
nxt, err := dr.promises[dr.linkPosition].Get()
if err != nil {
return err
}
dr.linkPosition++
pb := new(ftpb.Data)
err := proto.Unmarshal(nxt.Data, pb)
err = proto.Unmarshal(nxt.Data, pb)
if err != nil {
return err
}
......@@ -107,9 +97,7 @@ func (dr *DagReader) precalcNextBuf() error {
// A directory should not exist within a file
return ft.ErrInvalidDirLocation
case ftpb.Data_File:
//TODO: this *should* work, needs testing first
log.Warning("Running untested code for multilayered indirect FS reads.")
subr, err := NewDagReader(dr.fctx, nxt, dr.serv)
subr, err := NewDagReader(dr.ctx, nxt, dr.serv)
if err != nil {
return err
}
......@@ -123,32 +111,9 @@ func (dr *DagReader) precalcNextBuf() error {
}
}
func (dr *DagReader) resetBlockFetch(nlinkpos int) {
dr.cancel()
dr.fetchChan = nil
dr.linkPosition = nlinkpos
var keys []u.Key
for _, lnk := range dr.node.Links[dr.linkPosition:] {
keys = append(keys, u.Key(lnk.Hash))
}
fctx, cancel := context.WithCancel(dr.ctx)
dr.cancel = cancel
dr.fctx = fctx
fch := dr.serv.GetNodes(fctx, keys)
dr.fetchChan = fch
}
// Read reads data from the DAG structured file
func (dr *DagReader) Read(b []byte) (int, error) {
// If no cached buffer, load one
if dr.buf == nil {
err := dr.precalcNextBuf()
if err != nil {
return 0, err
}
}
total := 0
for {
// Attempt to fill bytes from cached buffer
......@@ -176,9 +141,7 @@ func (dr *DagReader) Read(b []byte) (int, error) {
}
func (dr *DagReader) Close() error {
if dr.fctx != nil {
dr.cancel()
}
dr.cancel()
return nil
}
......@@ -188,21 +151,11 @@ func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
if offset < 0 {
return -1, errors.New("Invalid offset")
}
//TODO: this pb should be cached
pb := new(ftpb.Data)
err := proto.Unmarshal(dr.node.Data, pb)
if err != nil {
return -1, err
}
if offset == 0 {
dr.resetBlockFetch(0)
dr.buf = NewRSNCFromBytes(pb.GetData())
return 0, nil
}
pb := dr.pbdata
left := offset
if int64(len(pb.Data)) > offset {
dr.buf.Close()
dr.buf = NewRSNCFromBytes(pb.GetData()[offset:])
dr.linkPosition = 0
dr.offset = offset
......@@ -211,23 +164,22 @@ func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
left -= int64(len(pb.Data))
}
i := 0
for ; i < len(pb.Blocksizes); i++ {
for i := 0; i < len(pb.Blocksizes); i++ {
if pb.Blocksizes[i] > uint64(left) {
dr.linkPosition = i
break
} else {
left -= int64(pb.Blocksizes[i])
}
}
dr.resetBlockFetch(i)
err = dr.precalcNextBuf()
err := dr.precalcNextBuf()
if err != nil {
return 0, err
}
n, err := io.CopyN(ioutil.Discard, dr.buf, left)
n, err := dr.buf.Seek(left, os.SEEK_SET)
if err != nil {
fmt.Printf("the copy failed: %s - [%d]\n", err, n)
return -1, err
}
left -= n
......@@ -237,15 +189,11 @@ func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
dr.offset = offset
return offset, nil
case os.SEEK_CUR:
// TODO: be smarter here
noffset := dr.offset + offset
return dr.Seek(noffset, os.SEEK_SET)
case os.SEEK_END:
pb := new(ftpb.Data)
err := proto.Unmarshal(dr.node.Data, pb)
if err != nil {
return -1, err
}
noffset := int64(pb.GetFilesize()) - offset
noffset := int64(dr.pbdata.GetFilesize()) - offset
return dr.Seek(noffset, os.SEEK_SET)
default:
return 0, errors.New("invalid whence")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment