Commit b6915124 authored by Jeromy's avatar Jeromy

merkledag FetchGraph and EnumerateChildren

This commit improves (fixes) the FetchGraph call for recursively
fetching every descendant node of a given merkledag node. This operation
should be the simplest way of ensuring that you have replicated a dag
locally.

This commit also implements a method in the merkledag package called
EnumerateChildren, this method is used to get a set of the keys of every
descendant node of the given node. All keys found are noted in the
passed in KeySet, which may in the future be implemented on disk to
avoid excessive memory consumption.

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent 27f6f389
...@@ -3,7 +3,6 @@ package merkledag ...@@ -3,7 +3,6 @@ package merkledag
import ( import (
"fmt" "fmt"
"sync"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
blocks "github.com/ipfs/go-ipfs/blocks" blocks "github.com/ipfs/go-ipfs/blocks"
...@@ -121,41 +120,86 @@ func (n *dagService) Remove(nd *Node) error { ...@@ -121,41 +120,86 @@ func (n *dagService) Remove(nd *Node) error {
return n.Blocks.DeleteBlock(k) return n.Blocks.DeleteBlock(k)
} }
// FetchGraph asynchronously fetches all nodes that are children of the given // FetchGraph fetches all nodes that are children of the given node
// node, and returns a channel that may be waited upon for the fetch to complete func FetchGraph(ctx context.Context, root *Node, serv DAGService) error {
func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} { toprocess := make(chan []key.Key, 8)
log.Warning("Untested.") nodes := make(chan *Node, 8)
var wg sync.WaitGroup errs := make(chan error, 1)
done := make(chan struct{})
for _, l := range root.Links { ctx, cancel := context.WithCancel(ctx)
wg.Add(1) defer cancel()
go func(lnk *Link) { defer close(toprocess)
// Signal child is done on way out go fetchNodes(ctx, serv, toprocess, nodes, errs)
defer wg.Done()
select { nodes <- root
case <-ctx.Done(): live := 1
return
for {
select {
case nd, ok := <-nodes:
if !ok {
return nil
} }
nd, err := lnk.GetNode(ctx, serv) var keys []key.Key
if err != nil { for _, lnk := range nd.Links {
log.Debug(err) keys = append(keys, key.Key(lnk.Hash))
return
} }
keys = dedupeKeys(keys)
// Wait for children to finish // keep track of open request, when zero, we're done
<-FetchGraph(ctx, nd, serv) live += len(keys) - 1
}(l)
if live == 0 {
return nil
}
if len(keys) > 0 {
select {
case toprocess <- keys:
case <-ctx.Done():
return ctx.Err()
}
}
case err := <-errs:
return err
case <-ctx.Done():
return ctx.Err()
}
} }
}
go func() { func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) {
wg.Wait() defer close(out)
done <- struct{}{} for {
}() select {
case ks, ok := <-in:
if !ok {
return
}
return done ng := ds.GetNodes(ctx, ks)
for _, g := range ng {
go func(g NodeGetter) {
nd, err := g.Get(ctx)
if err != nil {
select {
case errs <- err:
case <-ctx.Done():
}
return
}
select {
case out <- nd:
case <-ctx.Done():
return
}
}(g)
}
}
}
} }
// FindLinks searches this nodes links for the given key, // FindLinks searches this nodes links for the given key,
...@@ -318,3 +362,24 @@ func (t *Batch) Commit() error { ...@@ -318,3 +362,24 @@ func (t *Batch) Commit() error {
t.size = 0 t.size = 0
return err return err
} }
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
for _, lnk := range root.Links {
k := key.Key(lnk.Hash)
if !set.Has(k) {
set.Add(k)
child, err := ds.Get(ctx, k)
if err != nil {
return err
}
err = EnumerateChildren(ctx, ds, child, set)
if err != nil {
return err
}
}
}
return nil
}
...@@ -130,7 +130,7 @@ func SubtestNodeStat(t *testing.T, n *Node) { ...@@ -130,7 +130,7 @@ func SubtestNodeStat(t *testing.T, n *Node) {
} }
if expected != *actual { if expected != *actual {
t.Errorf("n.Stat incorrect.\nexpect: %s\nactual: %s", expected, actual) t.Error("n.Stat incorrect.\nexpect: %s\nactual: %s", expected, actual)
} else { } else {
fmt.Printf("n.Stat correct: %s\n", actual) fmt.Printf("n.Stat correct: %s\n", actual)
} }
...@@ -232,7 +232,6 @@ func runBatchFetchTest(t *testing.T, read io.Reader) { ...@@ -232,7 +232,6 @@ func runBatchFetchTest(t *testing.T, read io.Reader) {
} }
} }
} }
func TestRecursiveAdd(t *testing.T) { func TestRecursiveAdd(t *testing.T) {
a := &Node{Data: []byte("A")} a := &Node{Data: []byte("A")}
b := &Node{Data: []byte("B")} b := &Node{Data: []byte("B")}
...@@ -298,3 +297,79 @@ func TestCantGet(t *testing.T) { ...@@ -298,3 +297,79 @@ func TestCantGet(t *testing.T) {
t.Fatal("expected err not found, got: ", err) t.Fatal("expected err not found, got: ", err)
} }
} }
func TestFetchGraph(t *testing.T) {
bsi := bstest.Mocks(t, 1)[0]
ds := NewDAGService(bsi)
read := io.LimitReader(u.NewTimeSeededRand(), 1024*32)
spl := &chunk.SizeSplitter{512}
root, err := imp.BuildDagFromReader(read, ds, spl, nil)
if err != nil {
t.Fatal(err)
}
err = FetchGraph(context.TODO(), root, ds)
if err != nil {
t.Fatal(err)
}
}
func TestFetchGraphOther(t *testing.T) {
var dservs []DAGService
for _, bsi := range bstest.Mocks(t, 2) {
dservs = append(dservs, NewDAGService(bsi))
}
read := io.LimitReader(u.NewTimeSeededRand(), 1024*32)
spl := &chunk.SizeSplitter{512}
root, err := imp.BuildDagFromReader(read, dservs[0], spl, nil)
if err != nil {
t.Fatal(err)
}
err = FetchGraph(context.TODO(), root, dservs[1])
if err != nil {
t.Fatal(err)
}
}
func TestEnumerateChildren(t *testing.T) {
bsi := bstest.Mocks(t, 1)
ds := NewDAGService(bsi[0])
spl := &chunk.SizeSplitter{512}
read := io.LimitReader(u.NewTimeSeededRand(), 1024*1024)
root, err := imp.BuildDagFromReader(read, ds, spl, nil)
if err != nil {
t.Fatal(err)
}
ks := key.NewKeySet()
err = EnumerateChildren(context.Background(), ds, root, ks)
if err != nil {
t.Fatal(err)
}
var traverse func(n *Node)
traverse = func(n *Node) {
// traverse dag and check
for _, lnk := range n.Links {
k := key.Key(lnk.Hash)
if !ks.Has(k) {
t.Fatal("missing key in set!")
}
child, err := ds.Get(context.Background(), k)
if err != nil {
t.Fatal(err)
}
traverse(child)
}
}
traverse(root)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment