Unverified Commit f5474e03 authored by postables's avatar postables Committed by GitHub

Merge pull request #1 from ipfs/master

Pull In Upstream Changes
parents e0c97528 3ecad706
......@@ -9,7 +9,9 @@ go-merkledag
> go-merkledag implements the 'DAGService' interface and adds two ipld node types, Protobuf and Raw
## Lead Maintainer
[Steven Allen](https://github.com/Stebalien)
## Table of Contents
......
......@@ -162,7 +162,7 @@ func FetchGraph(ctx context.Context, root cid.Cid, serv ipld.DAGService) error {
}
// FetchGraphWithDepthLimit fetches all nodes that are children to the given
// node down to the given depth. maxDetph=0 means "only fetch root",
// node down to the given depth. maxDepth=0 means "only fetch root",
// maxDepth=1 means "fetch root and its direct children" and so on...
// maxDepth=-1 means unlimited.
func FetchGraphWithDepthLimit(ctx context.Context, root cid.Cid, depthLim int, serv ipld.DAGService) error {
......@@ -172,7 +172,7 @@ func FetchGraphWithDepthLimit(ctx context.Context, root cid.Cid, depthLim int, s
ng = &sesGetter{bserv.NewSession(ctx, ds.Blocks)}
}
set := make(map[string]int)
set := make(map[cid.Cid]int)
// Visit function returns true when:
// * The element is not in the set and we're not over depthLim
......@@ -182,23 +182,23 @@ func FetchGraphWithDepthLimit(ctx context.Context, root cid.Cid, depthLim int, s
// depthLim = -1 means we only return true if the element is not in the
// set.
visit := func(c cid.Cid, depth int) bool {
key := string(c.Bytes())
oldDepth, ok := set[key]
oldDepth, ok := set[c]
if (ok && depthLim < 0) || (depthLim >= 0 && depth > depthLim) {
return false
}
if !ok || oldDepth > depth {
set[key] = depth
set[c] = depth
return true
}
return false
}
// If we have a ProgressTracker, we wrap the visit function to handle it
v, _ := ctx.Value(progressContextKey).(*ProgressTracker)
if v == nil {
return EnumerateChildrenAsyncDepth(ctx, GetLinksDirect(ng), root, 0, visit)
return WalkDepth(ctx, GetLinksDirect(ng), root, visit, Concurrent())
}
visitProgress := func(c cid.Cid, depth int) bool {
......@@ -208,7 +208,7 @@ func FetchGraphWithDepthLimit(ctx context.Context, root cid.Cid, depthLim int, s
}
return false
}
return EnumerateChildrenAsyncDepth(ctx, GetLinksDirect(ng), root, 0, visitProgress)
return WalkDepth(ctx, GetLinksDirect(ng), root, visitProgress, Concurrent())
}
// GetMany gets many nodes from the DAG at once.
......@@ -282,33 +282,144 @@ func GetLinksWithDAG(ng ipld.NodeGetter) GetLinks {
}
}
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root cid.Cid, visit func(cid.Cid) bool) error {
// defaultConcurrentFetch is the default maximum number of concurrent fetches
// that 'fetchNodes' will start at a time
const defaultConcurrentFetch = 32
// walkOptions represent the parameters of a graph walking algorithm
type walkOptions struct {
SkipRoot bool
Concurrency int
ErrorHandler func(c cid.Cid, err error) error
}
// WalkOption is a setter for walkOptions
type WalkOption func(*walkOptions)
func (wo *walkOptions) addHandler(handler func(c cid.Cid, err error) error) {
if wo.ErrorHandler != nil {
wo.ErrorHandler = func(c cid.Cid, err error) error {
return handler(c, wo.ErrorHandler(c, err))
}
} else {
wo.ErrorHandler = handler
}
}
// SkipRoot is a WalkOption indicating that the root node should skipped
func SkipRoot() WalkOption {
return func(walkOptions *walkOptions) {
walkOptions.SkipRoot = true
}
}
// Concurrent is a WalkOption indicating that node fetching should be done in
// parallel, with the default concurrency factor.
// NOTE: When using that option, the walk order is *not* guarantee.
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
func Concurrent() WalkOption {
return func(walkOptions *walkOptions) {
walkOptions.Concurrency = defaultConcurrentFetch
}
}
// Concurrency is a WalkOption indicating that node fetching should be done in
// parallel, with a specific concurrency factor.
// NOTE: When using that option, the walk order is *not* guarantee.
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
func Concurrency(worker int) WalkOption {
return func(walkOptions *walkOptions) {
walkOptions.Concurrency = worker
}
}
// IgnoreErrors is a WalkOption indicating that the walk should attempt to
// continue even when an error occur.
func IgnoreErrors() WalkOption {
return func(walkOptions *walkOptions) {
walkOptions.addHandler(func(c cid.Cid, err error) error {
return nil
})
}
}
// IgnoreMissing is a WalkOption indicating that the walk should continue when
// a node is missing.
func IgnoreMissing() WalkOption {
return func(walkOptions *walkOptions) {
walkOptions.addHandler(func(c cid.Cid, err error) error {
if err == ipld.ErrNotFound {
return nil
}
return err
})
}
}
// OnMissing is a WalkOption adding a callback that will be triggered on a missing
// node.
func OnMissing(callback func(c cid.Cid)) WalkOption {
return func(walkOptions *walkOptions) {
walkOptions.addHandler(func(c cid.Cid, err error) error {
if err == ipld.ErrNotFound {
callback(c)
}
return err
})
}
}
// OnError is a WalkOption adding a custom error handler.
// If this handler return a nil error, the walk will continue.
func OnError(handler func(c cid.Cid, err error) error) WalkOption {
return func(walkOptions *walkOptions) {
walkOptions.addHandler(handler)
}
}
// WalkGraph will walk the dag in order (depth first) starting at the given root.
func Walk(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid) bool, options ...WalkOption) error {
visitDepth := func(c cid.Cid, depth int) bool {
return visit(c)
}
return EnumerateChildrenDepth(ctx, getLinks, root, 0, visitDepth)
return WalkDepth(ctx, getLinks, c, visitDepth, options...)
}
// WalkDepth walks the dag starting at the given root and passes the current
// depth to a given visit function. The visit function can be used to limit DAG
// exploration.
func WalkDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid, int) bool, options ...WalkOption) error {
opts := &walkOptions{}
for _, opt := range options {
opt(opts)
}
if opts.Concurrency > 1 {
return parallelWalkDepth(ctx, getLinks, c, visit, opts)
} else {
return sequentialWalkDepth(ctx, getLinks, c, 0, visit, opts)
}
}
// EnumerateChildrenDepth walks the dag below the given root and passes the
// current depth to a given visit function. The visit function can be used to
// limit DAG exploration.
func EnumerateChildrenDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, depth int, visit func(cid.Cid, int) bool) error {
func sequentialWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, depth int, visit func(cid.Cid, int) bool, options *walkOptions) error {
if !(options.SkipRoot && depth == 0) {
if !visit(root, depth) {
return nil
}
}
links, err := getLinks(ctx, root)
if err != nil && options.ErrorHandler != nil {
err = options.ErrorHandler(root, err)
}
if err != nil {
return err
}
for _, lnk := range links {
c := lnk.Cid
if visit(c, depth+1) {
err = EnumerateChildrenDepth(ctx, getLinks, c, depth+1, visit)
if err != nil {
return err
}
if err := sequentialWalkDepth(ctx, getLinks, lnk.Cid, depth+1, visit, options); err != nil {
return err
}
}
return nil
......@@ -340,27 +451,7 @@ func (p *ProgressTracker) Value() int {
return p.Total
}
// FetchGraphConcurrency is total number of concurrent fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 32
// EnumerateChildrenAsync is equivalent to EnumerateChildren *except* that it
// fetches children in parallel.
//
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid) bool) error {
visitDepth := func(c cid.Cid, depth int) bool {
return visit(c)
}
return EnumerateChildrenAsyncDepth(ctx, getLinks, c, 0, visitDepth)
}
// EnumerateChildrenAsyncDepth is equivalent to EnumerateChildrenDepth *except*
// that it fetches children in parallel (down to a maximum depth in the graph).
//
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
func EnumerateChildrenAsyncDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, startDepth int, visit func(cid.Cid, int) bool) error {
func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, visit func(cid.Cid, int) bool, options *walkOptions) error {
type cidDepth struct {
cid cid.Cid
depth int
......@@ -371,18 +462,18 @@ func EnumerateChildrenAsyncDepth(ctx context.Context, getLinks GetLinks, c cid.C
depth int
}
feed := make(chan *cidDepth)
out := make(chan *linksDepth)
feed := make(chan cidDepth)
out := make(chan linksDepth)
done := make(chan struct{})
var setlk sync.Mutex
var visitlk sync.Mutex
var wg sync.WaitGroup
errChan := make(chan error)
fetchersCtx, cancel := context.WithCancel(ctx)
defer wg.Wait()
defer cancel()
for i := 0; i < FetchGraphConcurrency; i++ {
for i := 0; i < options.Concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
......@@ -390,12 +481,22 @@ func EnumerateChildrenAsyncDepth(ctx context.Context, getLinks GetLinks, c cid.C
ci := cdepth.cid
depth := cdepth.depth
setlk.Lock()
shouldVisit := visit(ci, depth)
setlk.Unlock()
var shouldVisit bool
// bypass the root if needed
if !(options.SkipRoot && depth == 0) {
visitlk.Lock()
shouldVisit = visit(ci, depth)
visitlk.Unlock()
} else {
shouldVisit = true
}
if shouldVisit {
links, err := getLinks(ctx, ci)
if err != nil && options.ErrorHandler != nil {
err = options.ErrorHandler(root, err)
}
if err != nil {
select {
case errChan <- err:
......@@ -404,7 +505,7 @@ func EnumerateChildrenAsyncDepth(ctx context.Context, getLinks GetLinks, c cid.C
return
}
outLinks := &linksDepth{
outLinks := linksDepth{
links: links,
depth: depth + 1,
}
......@@ -425,41 +526,42 @@ func EnumerateChildrenAsyncDepth(ctx context.Context, getLinks GetLinks, c cid.C
defer close(feed)
send := feed
var todobuffer []*cidDepth
var todoQueue []cidDepth
var inProgress int
next := &cidDepth{
cid: c,
depth: startDepth,
next := cidDepth{
cid: root,
depth: 0,
}
for {
select {
case send <- next:
inProgress++
if len(todobuffer) > 0 {
next = todobuffer[0]
todobuffer = todobuffer[1:]
if len(todoQueue) > 0 {
next = todoQueue[0]
todoQueue = todoQueue[1:]
} else {
next = nil
next = cidDepth{}
send = nil
}
case <-done:
inProgress--
if inProgress == 0 && next == nil {
if inProgress == 0 && !next.cid.Defined() {
return nil
}
case linksDepth := <-out:
for _, lnk := range linksDepth.links {
cd := &cidDepth{
cd := cidDepth{
cid: lnk.Cid,
depth: linksDepth.depth,
}
if next == nil {
if !next.cid.Defined() {
next = cd
send = feed
} else {
todobuffer = append(todobuffer, cd)
todoQueue = append(todoQueue, cd)
}
}
case err := <-errChan:
......@@ -469,7 +571,6 @@ func EnumerateChildrenAsyncDepth(ctx context.Context, getLinks GetLinks, c cid.C
return ctx.Err()
}
}
}
var _ ipld.LinkGetter = &dagService{}
......
......@@ -29,7 +29,7 @@ import (
// makeDepthTestingGraph makes a small DAG with two levels. The level-two
// nodes are both children of the root and of one of the level 1 nodes.
// This is meant to test the EnumerateChildren*Depth functions.
// This is meant to test the Walk*Depth functions.
func makeDepthTestingGraph(t *testing.T, ds ipld.DAGService) ipld.Node {
root := NodeWithData(nil)
l11 := NodeWithData([]byte("leve1_node1"))
......@@ -203,8 +203,11 @@ func makeTestDAG(t *testing.T, read io.Reader, ds ipld.DAGService) ipld.Node {
// Add a root referencing all created nodes
root := NodeWithData(nil)
for _, n := range nodes {
root.AddNodeLink(n.Cid().String(), n)
err := ds.Add(ctx, n)
err := root.AddNodeLink(n.Cid().String(), n)
if err != nil {
t.Fatal(err)
}
err = ds.Add(ctx, n)
if err != nil {
t.Fatal(err)
}
......@@ -334,7 +337,7 @@ func TestFetchGraph(t *testing.T) {
offlineDS := NewDAGService(bs)
err = EnumerateChildren(context.Background(), offlineDS.GetLinks, root.Cid(), func(_ cid.Cid) bool { return true })
err = Walk(context.Background(), offlineDS.GetLinks, root.Cid(), func(_ cid.Cid) bool { return true })
if err != nil {
t.Fatal(err)
}
......@@ -347,11 +350,11 @@ func TestFetchGraphWithDepthLimit(t *testing.T) {
}
tests := []testcase{
testcase{1, 3},
testcase{0, 0},
testcase{-1, 5},
testcase{2, 5},
testcase{3, 5},
testcase{1, 4},
testcase{0, 1},
testcase{-1, 6},
testcase{2, 6},
testcase{3, 6},
}
testF := func(t *testing.T, tc testcase) {
......@@ -383,7 +386,7 @@ func TestFetchGraphWithDepthLimit(t *testing.T) {
}
err = EnumerateChildrenDepth(context.Background(), offlineDS.GetLinks, root.Cid(), 0, visitF)
err = WalkDepth(context.Background(), offlineDS.GetLinks, root.Cid(), visitF)
if err != nil {
t.Fatal(err)
}
......@@ -400,7 +403,7 @@ func TestFetchGraphWithDepthLimit(t *testing.T) {
}
}
func TestEnumerateChildren(t *testing.T) {
func TestWalk(t *testing.T) {
bsi := bstest.Mocks(1)
ds := NewDAGService(bsi[0])
......@@ -409,7 +412,7 @@ func TestEnumerateChildren(t *testing.T) {
set := cid.NewSet()
err := EnumerateChildren(context.Background(), ds.GetLinks, root.Cid(), set.Visit)
err := Walk(context.Background(), ds.GetLinks, root.Cid(), set.Visit)
if err != nil {
t.Fatal(err)
}
......@@ -736,7 +739,7 @@ func TestEnumerateAsyncFailsNotFound(t *testing.T) {
}
cset := cid.NewSet()
err = EnumerateChildrenAsync(ctx, GetLinksDirect(ds), parent.Cid(), cset.Visit)
err = Walk(ctx, GetLinksDirect(ds), parent.Cid(), cset.Visit)
if err == nil {
t.Fatal("this should have failed")
}
......
......@@ -138,7 +138,7 @@ func (n *ProtoNode) RemoveNodeLink(name string) error {
}
if !found {
return ipld.ErrNotFound
return ErrLinkNotFound
}
n.links = ref
......
......@@ -59,7 +59,7 @@ func TestRemoveLink(t *testing.T) {
// should fail
err = nd.RemoveNodeLink("a")
if err != ipld.ErrNotFound {
if err != ErrLinkNotFound {
t.Fatal("should have failed to remove link")
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment