Commit 0ce6071f authored by Jeromy's avatar Jeromy

revamp BatchFetch a bit

parent c7c08597
......@@ -22,6 +22,19 @@ var ErrNotFound = fmt.Errorf("merkledag: not found")
// so have to convert Multihash bytes to string (u.Key)
type NodeMap map[u.Key]*Node
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Add(*Node) (u.Key, error)
AddRecursive(*Node) error
Get(u.Key) (*Node, error)
Remove(*Node) error
BatchFetch(context.Context, *Node) <-chan *Node
}
func NewDAGService(bs *bserv.BlockService) DAGService {
return &dagService{bs}
}
// Node represents a node in the IPFS Merkle DAG.
// nodes have opaque data and a set of navigable links.
type Node struct {
......@@ -156,18 +169,6 @@ func (n *Node) Key() (u.Key, error) {
return u.Key(h), err
}
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Add(*Node) (u.Key, error)
AddRecursive(*Node) error
Get(u.Key) (*Node, error)
Remove(*Node) error
}
func NewDAGService(bs *bserv.BlockService) DAGService {
return &dagService{bs}
}
// dagService is an IPFS Merkle DAG service.
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
......@@ -286,59 +287,55 @@ func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{}
return done
}
// Take advantage of blockservice/bitswap batched requests to fetch all
// child nodes of a given node
// TODO: finish this
func (ds *dagService) BatchFetch(ctx context.Context, root *Node) <-chan int {
sig := make(chan int)
// BatchFetch will fill out all of the links of the given Node.
// It returns a channel of indicies, which will be returned in order
// from 0 to len(root.Links) - 1, signalling that the link specified by
// the index has been filled out.
func (ds *dagService) BatchFetch(ctx context.Context, root *Node) <-chan *Node {
sig := make(chan *Node)
go func() {
var keys []u.Key
for _, lnk := range root.Links {
keys = append(keys, u.Key(lnk.Hash))
}
blkchan := ds.Blocks.GetBlocks(ctx, keys)
nodes := make([]*Node, len(root.Links))
//
next := 0
seen := make(map[int]struct{})
//
for _, lnk := range root.Links {
keys = append(keys, u.Key(lnk.Hash))
}
blkchan := ds.Blocks.GetBlocks(ctx, keys)
for blk := range blkchan {
for i, lnk := range root.Links {
if u.Key(lnk.Hash) != blk.Key() {
continue
}
//
seen[i] = struct{}{}
//
if u.Key(lnk.Hash) != blk.Key() {
continue
}
nd, err := Decoded(blk.Data)
if err != nil {
log.Error("Got back bad block!")
break
}
lnk.Node = nd
nodes[i] = nd
//
if next == i {
sig <- next
sig <- nd
next++
for {
if _, ok := seen[next]; ok {
sig <- next
next++
} else {
break
}
for ; nodes[next] != nil; next++ {
sig <- nodes[next]
}
}
//
}
}
close(sig)
}()
// TODO: return a channel, and signal when the 'Next' readable block is available
return sig
}
......@@ -5,6 +5,8 @@ import (
"errors"
"io"
"code.google.com/p/go.net/context"
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
mdag "github.com/jbenet/go-ipfs/merkledag"
ft "github.com/jbenet/go-ipfs/unixfs"
......@@ -15,10 +17,10 @@ var ErrIsDir = errors.New("this dag node is a directory")
// DagReader provides a way to easily read the data contained in a dag.
type DagReader struct {
serv mdag.DAGService
node *mdag.Node
position int
buf io.Reader
serv mdag.DAGService
node *mdag.Node
buf io.Reader
fetchChan <-chan *mdag.Node
}
// NewDagReader creates a new reader object that reads the data represented by the given
......@@ -36,9 +38,10 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
return nil, ErrIsDir
case ftpb.Data_File:
return &DagReader{
node: n,
serv: serv,
buf: bytes.NewBuffer(pb.GetData()),
node: n,
serv: serv,
buf: bytes.NewBuffer(pb.GetData()),
fetchChan: serv.BatchFetch(context.TODO(), n),
}, nil
case ftpb.Data_Raw:
// Raw block will just be a single level, return a byte buffer
......@@ -51,19 +54,20 @@ func NewDagReader(n *mdag.Node, serv mdag.DAGService) (io.Reader, error) {
// precalcNextBuf follows the next link in line and loads it from the DAGService,
// setting the next buffer to read from
func (dr *DagReader) precalcNextBuf() error {
if dr.position >= len(dr.node.Links) {
return io.EOF
}
nxt, err := dr.node.Links[dr.position].GetNode(dr.serv)
if err != nil {
return err
var nxt *mdag.Node
var ok bool
select {
case nxt, ok = <-dr.fetchChan:
if !ok {
return io.EOF
}
}
pb := new(ftpb.Data)
err = proto.Unmarshal(nxt.Data, pb)
err := proto.Unmarshal(nxt.Data, pb)
if err != nil {
return err
}
dr.position++
switch pb.GetType() {
case ftpb.Data_Directory:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment