Commit 24f8c938 authored by Jeromy's avatar Jeromy

revamp BatchFetch a bit

parent 43c07c7e
......@@ -22,6 +22,19 @@ var ErrNotFound = fmt.Errorf("merkledag: not found")
// so have to convert Multihash bytes to string (u.Key)
type NodeMap map[u.Key]*Node
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Add(*Node) (u.Key, error)
AddRecursive(*Node) error
Get(u.Key) (*Node, error)
Remove(*Node) error
BatchFetch(context.Context, *Node) <-chan *Node
}
func NewDAGService(bs *bserv.BlockService) DAGService {
return &dagService{bs}
}
// Node represents a node in the IPFS Merkle DAG.
// nodes have opaque data and a set of navigable links.
type Node struct {
......@@ -156,18 +169,6 @@ func (n *Node) Key() (u.Key, error) {
return u.Key(h), err
}
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
Add(*Node) (u.Key, error)
AddRecursive(*Node) error
Get(u.Key) (*Node, error)
Remove(*Node) error
}
func NewDAGService(bs *bserv.BlockService) DAGService {
return &dagService{bs}
}
// dagService is an IPFS Merkle DAG service.
// - the root is virtual (like a forest)
// - stores nodes' data in a BlockService
......@@ -286,59 +287,55 @@ func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{}
return done
}
// Take advantage of blockservice/bitswap batched requests to fetch all
// child nodes of a given node
// TODO: finish this
func (ds *dagService) BatchFetch(ctx context.Context, root *Node) <-chan int {
sig := make(chan int)
// BatchFetch will fill out all of the links of the given Node.
// It returns a channel of indicies, which will be returned in order
// from 0 to len(root.Links) - 1, signalling that the link specified by
// the index has been filled out.
func (ds *dagService) BatchFetch(ctx context.Context, root *Node) <-chan *Node {
sig := make(chan *Node)
go func() {
var keys []u.Key
for _, lnk := range root.Links {
keys = append(keys, u.Key(lnk.Hash))
}
blkchan := ds.Blocks.GetBlocks(ctx, keys)
nodes := make([]*Node, len(root.Links))
//
next := 0
seen := make(map[int]struct{})
//
for _, lnk := range root.Links {
keys = append(keys, u.Key(lnk.Hash))
}
blkchan := ds.Blocks.GetBlocks(ctx, keys)
for blk := range blkchan {
for i, lnk := range root.Links {
if u.Key(lnk.Hash) != blk.Key() {
continue
}
//
seen[i] = struct{}{}
//
if u.Key(lnk.Hash) != blk.Key() {
continue
}
nd, err := Decoded(blk.Data)
if err != nil {
log.Error("Got back bad block!")
break
}
lnk.Node = nd
nodes[i] = nd
//
if next == i {
sig <- next
sig <- nd
next++
for {
if _, ok := seen[next]; ok {
sig <- next
next++
} else {
break
}
for ; nodes[next] != nil; next++ {
sig <- nodes[next]
}
}
//
}
}
close(sig)
}()
// TODO: return a channel, and signal when the 'Next' readable block is available
return sig
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment