more generalized/powerful error handling in the walk functions

parent 86e56524
...@@ -286,19 +286,29 @@ func GetLinksWithDAG(ng ipld.NodeGetter) GetLinks { ...@@ -286,19 +286,29 @@ func GetLinksWithDAG(ng ipld.NodeGetter) GetLinks {
// that 'fetchNodes' will start at a time // that 'fetchNodes' will start at a time
const defaultConcurrentFetch = 32 const defaultConcurrentFetch = 32
// WalkOptions represent the parameters of a graph walking algorithm // walkOptions represent the parameters of a graph walking algorithm
type WalkOptions struct { type walkOptions struct {
WithRoot bool WithRoot bool
IgnoreErrors bool
Concurrency int Concurrency int
ErrorHandler func(c cid.Cid, err error) error
} }
// WalkOption is a setter for WalkOptions // WalkOption is a setter for walkOptions
type WalkOption func(*WalkOptions) type WalkOption func(*walkOptions)
func (wo *walkOptions) addHandler(handler func(c cid.Cid, err error) error) {
if wo.ErrorHandler != nil {
wo.ErrorHandler = func(c cid.Cid, err error) error {
return handler(c, wo.ErrorHandler(c, err))
}
} else {
wo.ErrorHandler = handler
}
}
// WithRoot is a WalkOption indicating that the root node should be visited // WithRoot is a WalkOption indicating that the root node should be visited
func WithRoot() WalkOption { func WithRoot() WalkOption {
return func(walkOptions *WalkOptions) { return func(walkOptions *walkOptions) {
walkOptions.WithRoot = true walkOptions.WithRoot = true
} }
} }
...@@ -308,7 +318,7 @@ func WithRoot() WalkOption { ...@@ -308,7 +318,7 @@ func WithRoot() WalkOption {
// NOTE: When using that option, the walk order is *not* guarantee. // NOTE: When using that option, the walk order is *not* guarantee.
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function. // NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
func Concurrent() WalkOption { func Concurrent() WalkOption {
return func(walkOptions *WalkOptions) { return func(walkOptions *walkOptions) {
walkOptions.Concurrency = defaultConcurrentFetch walkOptions.Concurrency = defaultConcurrentFetch
} }
} }
...@@ -318,7 +328,7 @@ func Concurrent() WalkOption { ...@@ -318,7 +328,7 @@ func Concurrent() WalkOption {
// NOTE: When using that option, the walk order is *not* guarantee. // NOTE: When using that option, the walk order is *not* guarantee.
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function. // NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
func Concurrency(worker int) WalkOption { func Concurrency(worker int) WalkOption {
return func(walkOptions *WalkOptions) { return func(walkOptions *walkOptions) {
walkOptions.Concurrency = worker walkOptions.Concurrency = worker
} }
} }
...@@ -326,8 +336,44 @@ func Concurrency(worker int) WalkOption { ...@@ -326,8 +336,44 @@ func Concurrency(worker int) WalkOption {
// IgnoreErrors is a WalkOption indicating that the walk should attempt to // IgnoreErrors is a WalkOption indicating that the walk should attempt to
// continue even when an error occur. // continue even when an error occur.
func IgnoreErrors() WalkOption { func IgnoreErrors() WalkOption {
return func(walkOptions *WalkOptions) { return func(walkOptions *walkOptions) {
walkOptions.IgnoreErrors = true walkOptions.addHandler(func(c cid.Cid, err error) error {
return nil
})
}
}
// IgnoreMissing is a WalkOption indicating that the walk should continue when
// a node is missing.
func IgnoreMissing() WalkOption {
return func(walkOptions *walkOptions) {
walkOptions.addHandler(func(c cid.Cid, err error) error {
if err == ipld.ErrNotFound {
return nil
}
return err
})
}
}
// OnMissing is a WalkOption adding a callback that will be triggered on a missing
// node.
func OnMissing(callback func(c cid.Cid)) WalkOption {
return func(walkOptions *walkOptions) {
walkOptions.addHandler(func(c cid.Cid, err error) error {
if err == ipld.ErrNotFound {
callback(c)
}
return err
})
}
}
// OnError is a WalkOption adding a custom error handler.
// If this handler return a nil error, the walk will continue.
func OnError(handler func(c cid.Cid, err error) error) WalkOption {
return func(walkOptions *walkOptions) {
walkOptions.addHandler(handler)
} }
} }
...@@ -344,7 +390,7 @@ func Walk(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid) ...@@ -344,7 +390,7 @@ func Walk(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid)
// depth to a given visit function. The visit function can be used to limit DAG // depth to a given visit function. The visit function can be used to limit DAG
// exploration. // exploration.
func WalkDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid, int) bool, options ...WalkOption) error { func WalkDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid.Cid, int) bool, options ...WalkOption) error {
opts := &WalkOptions{} opts := &walkOptions{}
for _, opt := range options { for _, opt := range options {
opt(opts) opt(opts)
} }
...@@ -356,7 +402,7 @@ func WalkDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid ...@@ -356,7 +402,7 @@ func WalkDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, visit func(cid
} }
} }
func sequentialWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, depth int, visit func(cid.Cid, int) bool, options *WalkOptions) error { func sequentialWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, depth int, visit func(cid.Cid, int) bool, options *walkOptions) error {
if depth != 0 || options.WithRoot { if depth != 0 || options.WithRoot {
if !visit(root, depth) { if !visit(root, depth) {
return nil return nil
...@@ -364,7 +410,10 @@ func sequentialWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, d ...@@ -364,7 +410,10 @@ func sequentialWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, d
} }
links, err := getLinks(ctx, root) links, err := getLinks(ctx, root)
if err != nil && !options.IgnoreErrors { if err != nil && options.ErrorHandler != nil {
err = options.ErrorHandler(root, err)
}
if err != nil {
return err return err
} }
...@@ -402,7 +451,7 @@ func (p *ProgressTracker) Value() int { ...@@ -402,7 +451,7 @@ func (p *ProgressTracker) Value() int {
return p.Total return p.Total
} }
func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, visit func(cid.Cid, int) bool, options *WalkOptions) error { func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, visit func(cid.Cid, int) bool, options *walkOptions) error {
type cidDepth struct { type cidDepth struct {
cid cid.Cid cid cid.Cid
depth int depth int
...@@ -445,7 +494,10 @@ func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, vis ...@@ -445,7 +494,10 @@ func parallelWalkDepth(ctx context.Context, getLinks GetLinks, root cid.Cid, vis
if shouldVisit { if shouldVisit {
links, err := getLinks(ctx, ci) links, err := getLinks(ctx, ci)
if err != nil && !options.IgnoreErrors { if err != nil && options.ErrorHandler != nil {
err = options.ErrorHandler(root, err)
}
if err != nil {
select { select {
case errChan <- err: case errChan <- err:
case <-fetchersCtx.Done(): case <-fetchersCtx.Done():
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment