Commit 10935680 authored by Jeromy's avatar Jeromy

address concerns from PR

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent b6915124
...@@ -122,84 +122,7 @@ func (n *dagService) Remove(nd *Node) error { ...@@ -122,84 +122,7 @@ func (n *dagService) Remove(nd *Node) error {
// FetchGraph fetches all nodes that are children of the given node // FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *Node, serv DAGService) error { func FetchGraph(ctx context.Context, root *Node, serv DAGService) error {
toprocess := make(chan []key.Key, 8) return EnumerateChildrenAsync(ctx, serv, root, key.NewKeySet())
nodes := make(chan *Node, 8)
errs := make(chan error, 1)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(toprocess)
go fetchNodes(ctx, serv, toprocess, nodes, errs)
nodes <- root
live := 1
for {
select {
case nd, ok := <-nodes:
if !ok {
return nil
}
var keys []key.Key
for _, lnk := range nd.Links {
keys = append(keys, key.Key(lnk.Hash))
}
keys = dedupeKeys(keys)
// keep track of open request, when zero, we're done
live += len(keys) - 1
if live == 0 {
return nil
}
if len(keys) > 0 {
select {
case toprocess <- keys:
case <-ctx.Done():
return ctx.Err()
}
}
case err := <-errs:
return err
case <-ctx.Done():
return ctx.Err()
}
}
}
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) {
defer close(out)
for {
select {
case ks, ok := <-in:
if !ok {
return
}
ng := ds.GetNodes(ctx, ks)
for _, g := range ng {
go func(g NodeGetter) {
nd, err := g.Get(ctx)
if err != nil {
select {
case errs <- err:
case <-ctx.Done():
}
return
}
select {
case out <- nd:
case <-ctx.Done():
return
}
}(g)
}
}
}
} }
// FindLinks searches this nodes links for the given key, // FindLinks searches this nodes links for the given key,
...@@ -383,3 +306,83 @@ func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.K ...@@ -383,3 +306,83 @@ func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.K
} }
return nil return nil
} }
func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
toprocess := make(chan []key.Key, 8)
nodes := make(chan *Node, 8)
errs := make(chan error, 1)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(toprocess)
go fetchNodes(ctx, ds, toprocess, nodes, errs)
nodes <- root
live := 1
for {
select {
case nd, ok := <-nodes:
if !ok {
return nil
}
// a node has been fetched
live--
var keys []key.Key
for _, lnk := range nd.Links {
k := key.Key(lnk.Hash)
if !set.Has(k) {
set.Add(k)
live++
keys = append(keys, k)
}
}
if live == 0 {
return nil
}
if len(keys) > 0 {
select {
case toprocess <- keys:
case <-ctx.Done():
return ctx.Err()
}
}
case err := <-errs:
return err
case <-ctx.Done():
return ctx.Err()
}
}
}
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) {
defer close(out)
get := func(g NodeGetter) {
nd, err := g.Get(ctx)
if err != nil {
select {
case errs <- err:
case <-ctx.Done():
}
return
}
select {
case out <- nd:
case <-ctx.Done():
return
}
}
for ks := range in {
ng := ds.GetNodes(ctx, ks)
for _, g := range ng {
go get(g)
}
}
}
...@@ -299,38 +299,35 @@ func TestCantGet(t *testing.T) { ...@@ -299,38 +299,35 @@ func TestCantGet(t *testing.T) {
} }
func TestFetchGraph(t *testing.T) { func TestFetchGraph(t *testing.T) {
bsi := bstest.Mocks(t, 1)[0] var dservs []DAGService
ds := NewDAGService(bsi) bsis := bstest.Mocks(t, 2)
for _, bsi := range bsis {
dservs = append(dservs, NewDAGService(bsi))
}
read := io.LimitReader(u.NewTimeSeededRand(), 1024*32) read := io.LimitReader(u.NewTimeSeededRand(), 1024*32)
spl := &chunk.SizeSplitter{512} spl := &chunk.SizeSplitter{512}
root, err := imp.BuildDagFromReader(read, ds, spl, nil) root, err := imp.BuildDagFromReader(read, dservs[0], spl, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = FetchGraph(context.TODO(), root, ds) err = FetchGraph(context.TODO(), root, dservs[1])
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
}
func TestFetchGraphOther(t *testing.T) {
var dservs []DAGService
for _, bsi := range bstest.Mocks(t, 2) {
dservs = append(dservs, NewDAGService(bsi))
}
read := io.LimitReader(u.NewTimeSeededRand(), 1024*32)
spl := &chunk.SizeSplitter{512}
root, err := imp.BuildDagFromReader(read, dservs[0], spl, nil) // create an offline dagstore and ensure all blocks were fetched
bs, err := bserv.New(bsis[1].Blockstore, offline.Exchange(bsis[1].Blockstore))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = FetchGraph(context.TODO(), root, dservs[1]) offline_ds := NewDAGService(bs)
ks := key.NewKeySet()
err = EnumerateChildren(context.Background(), offline_ds, root, ks)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment