Commit 50433b18 authored by Jeromy's avatar Jeromy

implement seeking in the dagreader

parent 559d20fd
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
package merkledag package merkledag
import ( import (
"bytes"
"fmt" "fmt"
"sync" "sync"
"time" "time"
...@@ -27,6 +26,7 @@ type DAGService interface { ...@@ -27,6 +26,7 @@ type DAGService interface {
// GetDAG returns, in order, all the single leve child // GetDAG returns, in order, all the single leve child
// nodes of the passed in node. // nodes of the passed in node.
GetDAG(context.Context, *Node) <-chan *Node GetDAG(context.Context, *Node) <-chan *Node
GetNodes(context.Context, []u.Key) <-chan *Node
} }
func NewDAGService(bs *bserv.BlockService) DAGService { func NewDAGService(bs *bserv.BlockService) DAGService {
...@@ -155,11 +155,10 @@ func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} ...@@ -155,11 +155,10 @@ func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{}
// FindLinks searches this nodes links for the given key, // FindLinks searches this nodes links for the given key,
// returns the indexes of any links pointing to it // returns the indexes of any links pointing to it
func FindLinks(n *Node, k u.Key, start int) []int { func FindLinks(links []u.Key, k u.Key, start int) []int {
var out []int var out []int
keybytes := []byte(k) for i, lnk_k := range links[start:] {
for i, lnk := range n.Links[start:] { if k == lnk_k {
if bytes.Equal([]byte(lnk.Hash), keybytes) {
out = append(out, i+start) out = append(out, i+start)
} }
} }
...@@ -170,40 +169,54 @@ func FindLinks(n *Node, k u.Key, start int) []int { ...@@ -170,40 +169,54 @@ func FindLinks(n *Node, k u.Key, start int) []int {
// It returns a channel of nodes, which the caller can receive // It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order. // all the child nodes of 'root' on, in proper order.
func (ds *dagService) GetDAG(ctx context.Context, root *Node) <-chan *Node { func (ds *dagService) GetDAG(ctx context.Context, root *Node) <-chan *Node {
var keys []u.Key
for _, lnk := range root.Links {
keys = append(keys, u.Key(lnk.Hash))
}
return ds.GetNodes(ctx, keys)
}
func (ds *dagService) GetNodes(ctx context.Context, keys []u.Key) <-chan *Node {
sig := make(chan *Node) sig := make(chan *Node)
go func() { go func() {
defer close(sig) defer close(sig)
var keys []u.Key
for _, lnk := range root.Links {
keys = append(keys, u.Key(lnk.Hash))
}
blkchan := ds.Blocks.GetBlocks(ctx, keys) blkchan := ds.Blocks.GetBlocks(ctx, keys)
nodes := make([]*Node, len(root.Links)) nodes := make([]*Node, len(keys))
next := 0 next := 0
for blk := range blkchan { for {
nd, err := Decoded(blk.Data) select {
if err != nil { case blk, ok := <-blkchan:
// NB: can occur in normal situations, with improperly formatted if !ok {
// input data if next < len(nodes) {
log.Error("Got back bad block!") log.Errorf("Did not receive correct number of nodes!")
break }
} return
is := FindLinks(root, blk.Key(), next) }
for _, i := range is { nd, err := Decoded(blk.Data)
nodes[i] = nd if err != nil {
} // NB: can occur in normal situations, with improperly formatted
// input data
for ; next < len(nodes) && nodes[next] != nil; next++ { log.Error("Got back bad block!")
sig <- nodes[next] break
}
is := FindLinks(keys, blk.Key(), next)
for _, i := range is {
nodes[i] = nd
}
for ; next < len(nodes) && nodes[next] != nil; next++ {
select {
case sig <- nodes[next]:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
} }
} }
if next < len(nodes) {
// TODO: bubble errors back up.
log.Errorf("Did not receive correct number of nodes!")
}
}() }()
return sig return sig
} }
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"sync" "sync"
"testing" "testing"
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
bstore "github.com/jbenet/go-ipfs/blocks/blockstore" bstore "github.com/jbenet/go-ipfs/blocks/blockstore"
...@@ -162,7 +163,7 @@ func runBatchFetchTest(t *testing.T, read io.Reader) { ...@@ -162,7 +163,7 @@ func runBatchFetchTest(t *testing.T, read io.Reader) {
t.Log("finished setup.") t.Log("finished setup.")
dagr, err := uio.NewDagReader(root, dagservs[0]) dagr, err := uio.NewDagReader(context.TODO(), root, dagservs[0])
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -195,7 +196,7 @@ func runBatchFetchTest(t *testing.T, read io.Reader) { ...@@ -195,7 +196,7 @@ func runBatchFetchTest(t *testing.T, read io.Reader) {
} }
fmt.Println("Got first node back.") fmt.Println("Got first node back.")
read, err := uio.NewDagReader(first, dagservs[i]) read, err := uio.NewDagReader(context.TODO(), first, dagservs[i])
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment