Commit a8d88dcf authored by tavit ohanian's avatar tavit ohanian

Merge remote-tracking branch 'upstream/master' into reference

parents 90ef578c d2e09424
blank_issues_enabled: false
contact_links:
- name: Getting Help on IPFS
url: https://ipfs.io/help
about: All information about how and where to get help on IPFS.
- name: IPFS Official Forum
url: https://discuss.ipfs.io
about: Please post general questions, support requests, and discussions here.
---
name: Open an issue
about: Only for actionable issues relevant to this repository.
title: ''
labels: need/triage
assignees: ''
---
<!--
Hello! To ensure this issue is correctly addressed as soon as possible by the IPFS team, please try to make sure:
- This issue is relevant to this repository's topic or codebase.
- A clear description is provided. It should includes as much relevant information as possible and clear scope for the issue to be actionable.
FOR GENERAL DISCUSSION, HELP OR QUESTIONS, please see the options at https://ipfs.io/help or head directly to https://discuss.ipfs.io.
(you can delete this section after reading)
-->
# Configuration for welcome - https://github.com/behaviorbot/welcome
# Configuration for new-issue-welcome - https://github.com/behaviorbot/new-issue-welcome
# Comment to be posted to on first time issues
newIssueWelcomeComment: >
Thank you for submitting your first issue to this repository! A maintainer
will be here shortly to triage and review.
In the meantime, please double-check that you have provided all the
necessary information to make this process easy! Any information that can
help save additional round trips is useful! We currently aim to give
initial feedback within **two business days**. If this does not happen, feel
free to leave a comment.
Please keep an eye on how this issue will be labeled, as labels give an
overview of priorities, assignments and additional actions requested by the
maintainers:
- "Priority" labels will show how urgent this is for the team.
- "Status" labels will show if this is ready to be worked on, blocked, or in progress.
- "Need" labels will indicate if additional input or analysis is required.
Finally, remember to use https://discuss.ipfs.io if you just need general
support.
# Configuration for new-pr-welcome - https://github.com/behaviorbot/new-pr-welcome
# Comment to be posted to on PRs from first time contributors in your repository
newPRWelcomeComment: >
Thank you for submitting this PR!
A maintainer will be here shortly to review it.
We are super grateful, but we are also overloaded! Help us by making sure
that:
* The context for this PR is clear, with relevant discussion, decisions
and stakeholders linked/mentioned.
* Your contribution itself is clear (code comments, self-review for the
rest) and in its best form. Follow the [code contribution
guidelines](https://github.com/ipfs/community/blob/master/CONTRIBUTING.md#code-contribution-guidelines)
if they apply.
Getting other community members to do a review would be great help too on
complex PRs (you can ask in the chats/forums). If you are unsure about
something, just leave us a comment.
Next steps:
* A maintainer will triage and assign priority to this PR, commenting on
any missing things and potentially assigning a reviewer for high
priority items.
* The PR gets reviews, discussed and approvals as needed.
* The PR is merged by maintainers when it has been approved and comments addressed.
We currently aim to provide initial feedback/triaging within **two business
days**. Please keep an eye on any labelling actions, as these will indicate
priorities and status of your contribution.
We are very grateful for your contribution!
# Configuration for first-pr-merge - https://github.com/behaviorbot/first-pr-merge
# Comment to be posted to on pull requests merged by a first time user
# Currently disabled
#firstPRMergeComment: ""
0.8.1: QmZ6nzCLwGLVfRzYLpD7pW6UNuBDKEcA2imJtVpbEx2rxy
os:
- linux
language: go
go:
- 1.14.x
env:
global:
- GOTFLAGS="-race"
matrix:
- BUILD_DEPTYPE=gomod
# disable travis install
install:
- true
script:
- bash <(curl -s https://raw.githubusercontent.com/ipfs/ci-helpers/master/travis-ci/run-standard-tests.sh)
cache:
directories:
- $GOPATH/pkg/mod
- $HOME/.cache/go-build
notifications:
email: false
The MIT License (MIT)
Copyright (c) 2016 Jeromy Johnson
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
gx:
go get github.com/whyrusleeping/gx
go get github.com/whyrusleeping/gx-go
covertools:
go get github.com/mattn/goveralls
go get golang.org/x/tools/cmd/cover
deps: gx covertools
gx --verbose install --global
gx-go rewrite
publish:
gx-go rewrite --undo
# go-dms3ld-format
go-dms3ld-format
==================
DMS3 ld format
\ No newline at end of file
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/)
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
[![Coverage Status](https://codecov.io/gh/ipfs/go-ipld-format/branch/master/graph/badge.svg)](https://codecov.io/gh/ipfs/go-ipld-format/branch/master)
[![Travis CI](https://travis-ci.org/ipfs/go-ipld-format.svg?branch=master)](https://travis-ci.org/ipfs/go-ipld-format)
> go-dms3-format is a set of interfaces that a type needs to implement in order to be a part of the dms3 merkle-forest.
## Lead Maintainer
[Eric Myhre](https://github.com/warpfork)
## Table of Contents
- [Install](#install)
- [Usage](#usage)
- [API](#api)
- [Contribute](#contribute)
- [License](#license)
## Install
```sh
make install
```
## Contribute
PRs are welcome!
Small note: If editing the Readme, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification.
## License
MIT © Jeromy Johnson
package format
import (
"context"
"errors"
"runtime"
cid "github.com/ipfs/go-cid"
)
// parallelBatchCommits is the number of batch commits that can be in-flight before blocking.
// TODO(ipfs/go-ipfs#4299): Experiment with multiple datastores, storage
// devices, and CPUs to find the right value/formula.
var parallelCommits = runtime.NumCPU()
// ErrNotCommited is returned when closing a batch that hasn't been successfully
// committed.
var ErrNotCommited = errors.New("error: batch not commited")
// ErrClosed is returned when operating on a batch that has already been closed.
var ErrClosed = errors.New("error: batch closed")
// NewBatch returns a node buffer (Batch) that buffers nodes internally and
// commits them to the underlying DAGService in batches. Use this if you intend
// to add or remove a lot of nodes all at once.
//
// If the passed context is canceled, any in-progress commits are aborted.
//
func NewBatch(ctx context.Context, na NodeAdder, opts ...BatchOption) *Batch {
ctx, cancel := context.WithCancel(ctx)
bopts := defaultBatchOptions
for _, o := range opts {
o(&bopts)
}
// Commit numCPU batches at once, but split the maximum buffer size over all commits in flight.
bopts.maxSize /= parallelCommits
bopts.maxNodes /= parallelCommits
return &Batch{
na: na,
ctx: ctx,
cancel: cancel,
commitResults: make(chan error, parallelCommits),
opts: bopts,
}
}
// Batch is a buffer for batching adds to a dag.
type Batch struct {
na NodeAdder
ctx context.Context
cancel func()
activeCommits int
err error
commitResults chan error
nodes []Node
size int
opts batchOptions
}
func (t *Batch) processResults() {
for t.activeCommits > 0 {
select {
case err := <-t.commitResults:
t.activeCommits--
if err != nil {
t.setError(err)
return
}
default:
return
}
}
}
func (t *Batch) asyncCommit() {
numBlocks := len(t.nodes)
if numBlocks == 0 {
return
}
if t.activeCommits >= parallelCommits {
select {
case err := <-t.commitResults:
t.activeCommits--
if err != nil {
t.setError(err)
return
}
case <-t.ctx.Done():
t.setError(t.ctx.Err())
return
}
}
go func(ctx context.Context, b []Node, result chan error, na NodeAdder) {
select {
case result <- na.AddMany(ctx, b):
case <-ctx.Done():
}
}(t.ctx, t.nodes, t.commitResults, t.na)
t.activeCommits++
t.nodes = make([]Node, 0, numBlocks)
t.size = 0
return
}
// Add adds a node to the batch and commits the batch if necessary.
func (t *Batch) Add(ctx context.Context, nd Node) error {
return t.AddMany(ctx, []Node{nd})
}
// AddMany many calls Add for every given Node, thus batching and
// commiting them as needed.
func (t *Batch) AddMany(ctx context.Context, nodes []Node) error {
if t.err != nil {
return t.err
}
// Not strictly necessary but allows us to catch errors early.
t.processResults()
if t.err != nil {
return t.err
}
t.nodes = append(t.nodes, nodes...)
for _, nd := range nodes {
t.size += len(nd.RawData())
}
if t.size > t.opts.maxSize || len(t.nodes) > t.opts.maxNodes {
t.asyncCommit()
}
return t.err
}
// Commit commits batched nodes.
func (t *Batch) Commit() error {
if t.err != nil {
return t.err
}
t.asyncCommit()
loop:
for t.activeCommits > 0 {
select {
case err := <-t.commitResults:
t.activeCommits--
if err != nil {
t.setError(err)
break loop
}
case <-t.ctx.Done():
t.setError(t.ctx.Err())
break loop
}
}
return t.err
}
func (t *Batch) setError(err error) {
t.err = err
t.cancel()
// Drain as much as we can without blocking.
loop:
for {
select {
case <-t.commitResults:
default:
break loop
}
}
// Be nice and cleanup. These can take a *lot* of memory.
t.commitResults = nil
t.na = nil
t.ctx = nil
t.nodes = nil
t.size = 0
t.activeCommits = 0
}
// BatchOption provides a way of setting internal options of
// a Batch.
//
// See this post about the "functional options" pattern:
// http://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis
type BatchOption func(o *batchOptions)
type batchOptions struct {
maxSize int
maxNodes int
}
var defaultBatchOptions = batchOptions{
maxSize: 8 << 20,
// By default, only batch up to 128 nodes at a time.
// The current implementation of flatfs opens this many file
// descriptors at the same time for the optimized batch write.
maxNodes: 128,
}
// MaxSizeBatchOption sets the maximum amount of buffered data before writing
// blocks.
func MaxSizeBatchOption(size int) BatchOption {
return func(o *batchOptions) {
o.maxSize = size
}
}
// MaxNodesBatchOption sets the maximum number of buffered nodes before writing
// blocks.
func MaxNodesBatchOption(num int) BatchOption {
return func(o *batchOptions) {
o.maxNodes = num
}
}
// BufferedDAG implements DAGService using a Batch NodeAdder to wrap add
// operations in the given DAGService. It will trigger Commit() before any
// non-Add operations, but otherwise calling Commit() is left to the user.
type BufferedDAG struct {
ds DAGService
b *Batch
}
// NewBufferedDAG creates a BufferedDAG using the given DAGService and the
// given options for the Batch NodeAdder.
func NewBufferedDAG(ctx context.Context, ds DAGService, opts ...BatchOption) *BufferedDAG {
return &BufferedDAG{
ds: ds,
b: NewBatch(ctx, ds, opts...),
}
}
// Commit calls commit on the Batch.
func (bd *BufferedDAG) Commit() error {
return bd.b.Commit()
}
// Add adds a new node using Batch.
func (bd *BufferedDAG) Add(ctx context.Context, n Node) error {
return bd.b.Add(ctx, n)
}
// AddMany adds many nodes using Batch.
func (bd *BufferedDAG) AddMany(ctx context.Context, nds []Node) error {
return bd.b.AddMany(ctx, nds)
}
// Get commits and gets a node from the DAGService.
func (bd *BufferedDAG) Get(ctx context.Context, c cid.Cid) (Node, error) {
err := bd.b.Commit()
if err != nil {
return nil, err
}
return bd.ds.Get(ctx, c)
}
// GetMany commits and gets nodes from the DAGService.
func (bd *BufferedDAG) GetMany(ctx context.Context, cs []cid.Cid) <-chan *NodeOption {
err := bd.b.Commit()
if err != nil {
ch := make(chan *NodeOption, 1)
defer close(ch)
ch <- &NodeOption{
Node: nil,
Err: err,
}
return ch
}
return bd.ds.GetMany(ctx, cs)
}
// Remove commits and removes a node from the DAGService.
func (bd *BufferedDAG) Remove(ctx context.Context, c cid.Cid) error {
err := bd.b.Commit()
if err != nil {
return err
}
return bd.ds.Remove(ctx, c)
}
// RemoveMany commits and removes nodes from the DAGService.
func (bd *BufferedDAG) RemoveMany(ctx context.Context, cs []cid.Cid) error {
err := bd.b.Commit()
if err != nil {
return err
}
return bd.ds.RemoveMany(ctx, cs)
}
package format
import (
"context"
"sync"
"testing"
cid "github.com/ipfs/go-cid"
)
// Test dag
type testDag struct {
mu sync.Mutex
nodes map[string]Node
}
func newTestDag() *testDag {
return &testDag{nodes: make(map[string]Node)}
}
func (d *testDag) Get(ctx context.Context, cid cid.Cid) (Node, error) {
d.mu.Lock()
defer d.mu.Unlock()
if n, ok := d.nodes[cid.KeyString()]; ok {
return n, nil
}
return nil, ErrNotFound
}
func (d *testDag) GetMany(ctx context.Context, cids []cid.Cid) <-chan *NodeOption {
d.mu.Lock()
defer d.mu.Unlock()
out := make(chan *NodeOption, len(cids))
for _, c := range cids {
if n, ok := d.nodes[c.KeyString()]; ok {
out <- &NodeOption{Node: n}
} else {
out <- &NodeOption{Err: ErrNotFound}
}
}
close(out)
return out
}
func (d *testDag) Add(ctx context.Context, node Node) error {
d.mu.Lock()
defer d.mu.Unlock()
d.nodes[node.Cid().KeyString()] = node
return nil
}
func (d *testDag) AddMany(ctx context.Context, nodes []Node) error {
d.mu.Lock()
defer d.mu.Unlock()
for _, n := range nodes {
d.nodes[n.Cid().KeyString()] = n
}
return nil
}
func (d *testDag) Remove(ctx context.Context, c cid.Cid) error {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.nodes, c.KeyString())
return nil
}
func (d *testDag) RemoveMany(ctx context.Context, cids []cid.Cid) error {
d.mu.Lock()
defer d.mu.Unlock()
for _, c := range cids {
delete(d.nodes, c.KeyString())
}
return nil
}
var _ DAGService = new(testDag)
func TestBatch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := newTestDag()
b := NewBatch(ctx, d)
for i := 0; i < 1000; i++ {
// It would be great if we could use *many* different nodes here
// but we can't add any dependencies and I don't feel like adding
// any more testing code.
if err := b.Add(ctx, new(EmptyNode)); err != nil {
t.Fatal(err)
}
}
if err := b.Commit(); err != nil {
t.Fatal(err)
}
n, err := d.Get(ctx, new(EmptyNode).Cid())
if err != nil {
t.Fatal(err)
}
switch n.(type) {
case *EmptyNode:
default:
t.Fatal("expected the node to exist in the dag")
}
if len(d.nodes) != 1 {
t.Fatal("should have one node")
}
}
func TestBufferedDAG(t *testing.T) {
ds := newTestDag()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var bdag DAGService = NewBufferedDAG(ctx, ds)
for i := 0; i < 1000; i++ {
n := new(EmptyNode)
if err := bdag.Add(ctx, n); err != nil {
t.Fatal(err)
}
if _, err := bdag.Get(ctx, n.Cid()); err != nil {
t.Fatal(err)
}
if err := bdag.Remove(ctx, n.Cid()); err != nil {
t.Fatal(err)
}
}
}
func TestBatchOptions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wantMaxSize := 8 << 10
wantMaxNodes := 500
d := newTestDag()
b := NewBatch(ctx, d, MaxSizeBatchOption(wantMaxSize), MaxNodesBatchOption(wantMaxNodes))
if b.opts.maxSize != wantMaxSize/parallelCommits {
t.Fatalf("maxSize incorrect, want: %d, got: %d", wantMaxSize, b.opts.maxSize)
}
if b.opts.maxNodes != wantMaxNodes/parallelCommits {
t.Fatalf("maxNodes incorrect, want: %d, got: %d", wantMaxNodes, b.opts.maxNodes)
}
}
coverage:
range: "50...100"
comment: off
package format
import (
"fmt"
"sync"
blocks "github.com/ipfs/go-block-format"
)
// DecodeBlockFunc functions decode blocks into nodes.
type DecodeBlockFunc func(block blocks.Block) (Node, error)
type BlockDecoder interface {
Register(codec uint64, decoder DecodeBlockFunc)
Decode(blocks.Block) (Node, error)
}
type safeBlockDecoder struct {
// Can be replaced with an RCU if necessary.
lock sync.RWMutex
decoders map[uint64]DecodeBlockFunc
}
// Register registers decoder for all blocks with the passed codec.
//
// This will silently replace any existing registered block decoders.
func (d *safeBlockDecoder) Register(codec uint64, decoder DecodeBlockFunc) {
d.lock.Lock()
defer d.lock.Unlock()
d.decoders[codec] = decoder
}
func (d *safeBlockDecoder) Decode(block blocks.Block) (Node, error) {
// Short-circuit by cast if we already have a Node.
if node, ok := block.(Node); ok {
return node, nil
}
ty := block.Cid().Type()
d.lock.RLock()
decoder, ok := d.decoders[ty]
d.lock.RUnlock()
if ok {
return decoder(block)
} else {
// TODO: get the *long* name for this format
return nil, fmt.Errorf("unrecognized object type: %d", ty)
}
}
var DefaultBlockDecoder BlockDecoder = &safeBlockDecoder{decoders: make(map[uint64]DecodeBlockFunc)}
// Decode decodes the given block using the default BlockDecoder.
func Decode(block blocks.Block) (Node, error) {
return DefaultBlockDecoder.Decode(block)
}
// Register registers block decoders with the default BlockDecoder.
func Register(codec uint64, decoder DecodeBlockFunc) {
DefaultBlockDecoder.Register(codec, decoder)
}
package format
import (
"errors"
"testing"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
)
func init() {
Register(cid.Raw, func(b blocks.Block) (Node, error) {
node := &EmptyNode{}
if b.RawData() != nil || !b.Cid().Equals(node.Cid()) {
return nil, errors.New("can only decode empty blocks")
}
return node, nil
})
}
func TestDecode(t *testing.T) {
id, err := cid.Prefix{
Version: 1,
Codec: cid.Raw,
MhType: mh.ID,
MhLength: 0,
}.Sum(nil)
if err != nil {
t.Fatalf("failed to create cid: %s", err)
}
block, err := blocks.NewBlockWithCid(nil, id)
if err != nil {
t.Fatalf("failed to create empty block: %s", err)
}
node, err := Decode(block)
if err != nil {
t.Fatalf("failed to decode empty node: %s", err)
}
if !node.Cid().Equals(id) {
t.Fatalf("empty node doesn't have the right cid")
}
if _, ok := node.(*EmptyNode); !ok {
t.Fatalf("empty node doesn't have the right type")
}
}
package format
import (
"context"
cid "github.com/ipfs/go-cid"
)
// GetLinks returns the CIDs of the children of the given node. Prefer this
// method over looking up the node itself and calling `Links()` on it as this
// method may be able to use a link cache.
func GetLinks(ctx context.Context, ng NodeGetter, c cid.Cid) ([]*Link, error) {
if c.Type() == cid.Raw {
return nil, nil
}
if gl, ok := ng.(LinkGetter); ok {
return gl.GetLinks(ctx, c)
}
node, err := ng.Get(ctx, c)
if err != nil {
return nil, err
}
return node.Links(), nil
}
// GetDAG will fill out all of the links of the given Node.
// It returns an array of NodePromise with the linked nodes all in the proper
// order.
func GetDAG(ctx context.Context, ds NodeGetter, root Node) []*NodePromise {
var cids []cid.Cid
for _, lnk := range root.Links() {
cids = append(cids, lnk.Cid)
}
return GetNodes(ctx, ds, cids)
}
// GetNodes returns an array of 'FutureNode' promises, with each corresponding
// to the key with the same index as the passed in keys
func GetNodes(ctx context.Context, ds NodeGetter, keys []cid.Cid) []*NodePromise {
// Early out if no work to do
if len(keys) == 0 {
return nil
}
promises := make([]*NodePromise, len(keys))
for i := range keys {
promises[i] = NewNodePromise(ctx)
}
dedupedKeys := dedupeKeys(keys)
go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
nodechan := ds.GetMany(ctx, dedupedKeys)
for count := 0; count < len(keys); {
select {
case opt, ok := <-nodechan:
if !ok {
for _, p := range promises {
p.Fail(ErrNotFound)
}
return
}
if opt.Err != nil {
for _, p := range promises {
p.Fail(opt.Err)
}
return
}
nd := opt.Node
c := nd.Cid()
for i, lnk_c := range keys {
if c.Equals(lnk_c) {
count++
promises[i].Send(nd)
}
}
case <-ctx.Done():
return
}
}
}()
return promises
}
func Copy(ctx context.Context, from, to DAGService, root cid.Cid) error {
node, err := from.Get(ctx, root)
if err != nil {
return err
}
links := node.Links()
for _, link := range links {
err := Copy(ctx, from, to, link.Cid)
if err != nil {
return err
}
}
err = to.Add(ctx, node)
if err != nil {
return err
}
return nil
}
// Remove duplicates from a list of keys
func dedupeKeys(cids []cid.Cid) []cid.Cid {
set := cid.NewSet()
for _, c := range cids {
set.Add(c)
}
return set.Keys()
}
package format
import (
"context"
"github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
"testing"
)
type TestNode struct {
links []*Link
data []byte
builder cid.Builder
}
var v0CidPrefix = cid.Prefix{
Codec: cid.DagProtobuf,
MhLength: -1,
MhType: mh.SHA2_256,
Version: 0,
}
func InitNode(d []byte) *TestNode {
return &TestNode{
data: d,
builder: v0CidPrefix,
}
}
func (n *TestNode) Resolve([]string) (interface{}, []string, error) {
return nil, nil, EmptyNodeError
}
func (n *TestNode) Tree(string, int) []string {
return nil
}
func (n *TestNode) ResolveLink([]string) (*Link, []string, error) {
return nil, nil, EmptyNodeError
}
func (n *TestNode) Copy() Node {
return &EmptyNode{}
}
func (n *TestNode) Cid() cid.Cid {
c, err := n.builder.Sum(n.RawData())
if err != nil {
return cid.Cid{}
}
return c
}
func (n *TestNode) Links() []*Link {
return n.links
}
func (n *TestNode) Loggable() map[string]interface{} {
return nil
}
func (n *TestNode) String() string {
return string(n.data)
}
func (n *TestNode) RawData() []byte {
return n.data
}
func (n *TestNode) Size() (uint64, error) {
return 0, nil
}
func (n *TestNode) Stat() (*NodeStat, error) {
return &NodeStat{}, nil
}
// AddNodeLink adds a link to another node.
func (n *TestNode) AddNodeLink(name string, that Node) error {
lnk, err := MakeLink(that)
if err != nil {
return err
}
lnk.Name = name
n.AddRawLink(name, lnk)
return nil
}
func (n *TestNode) AddRawLink(name string, l *Link) error {
n.links = append(n.links, &Link{
Name: name,
Size: l.Size,
Cid: l.Cid,
})
return nil
}
func TestCopy(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
from := newTestDag()
root := InitNode([]byte("level0"))
l11 := InitNode([]byte("leve1_node1"))
l12 := InitNode([]byte("leve1_node2"))
l21 := InitNode([]byte("leve2_node1"))
l22 := InitNode([]byte("leve2_node2"))
l23 := InitNode([]byte("leve2_node3"))
l11.AddNodeLink(l21.Cid().String(), l21)
l11.AddNodeLink(l22.Cid().String(), l22)
l11.AddNodeLink(l23.Cid().String(), l23)
root.AddNodeLink(l11.Cid().String(), l11)
root.AddNodeLink(l12.Cid().String(), l12)
for _, n := range []Node{l23, l22, l21, l12, l11, root} {
err := from.Add(ctx, n)
if err != nil {
t.Fatal(err)
}
}
to := newTestDag()
err := Copy(ctx, from, to, root.Cid())
if err != nil {
t.Error(err)
}
r, err := to.Get(ctx, root.Cid())
if err != nil || len(r.Links()) != 2 {
t.Error("fail to copy dag")
}
l1, err := to.Get(ctx, l11.Cid())
if err != nil || len(l1.Links()) != 3 {
t.Error("fail to copy dag")
}
}
package format
import (
"context"
"fmt"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
)
type Resolver interface {
// Resolve resolves a path through this node, stopping at any link boundary
// and returning the object found as well as the remaining path to traverse
Resolve(path []string) (interface{}, []string, error)
// Tree lists all paths within the object under 'path', and up to the given depth.
// To list the entire object (similar to `find .`) pass "" and -1
Tree(path string, depth int) []string
}
// Node is the base interface all IPLD nodes must implement.
//
// Nodes are **Immutable** and all methods defined on the interface are
// **Thread Safe**.
type Node interface {
blocks.Block
Resolver
// ResolveLink is a helper function that calls resolve and asserts the
// output is a link
ResolveLink(path []string) (*Link, []string, error)
// Copy returns a deep copy of this node
Copy() Node
// Links is a helper function that returns all links within this object
Links() []*Link
// TODO: not sure if stat deserves to stay
Stat() (*NodeStat, error)
// Size returns the size in bytes of the serialized object
Size() (uint64, error)
}
// Link represents an IPFS Merkle DAG Link between Nodes.
type Link struct {
// utf string name. should be unique per object
Name string // utf8
// cumulative size of target object
Size uint64
// multihash of the target object
Cid cid.Cid
}
// NodeStat is a statistics object for a Node. Mostly sizes.
type NodeStat struct {
Hash string
NumLinks int // number of links in link table
BlockSize int // size of the raw, encoded data
LinksSize int // size of the links segment
DataSize int // size of the data segment
CumulativeSize int // cumulative size of object and its references
}
func (ns NodeStat) String() string {
f := "NodeStat{NumLinks: %d, BlockSize: %d, LinksSize: %d, DataSize: %d, CumulativeSize: %d}"
return fmt.Sprintf(f, ns.NumLinks, ns.BlockSize, ns.LinksSize, ns.DataSize, ns.CumulativeSize)
}
// MakeLink creates a link to the given node
func MakeLink(n Node) (*Link, error) {
s, err := n.Size()
if err != nil {
return nil, err
}
return &Link{
Size: s,
Cid: n.Cid(),
}, nil
}
// GetNode returns the MDAG Node that this link points to
func (l *Link) GetNode(ctx context.Context, serv NodeGetter) (Node, error) {
return serv.Get(ctx, l.Cid)
}
package format
import (
"errors"
"testing"
cid "github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
)
type EmptyNode struct{}
var EmptyNodeError error = errors.New("dummy node")
func (n *EmptyNode) Resolve([]string) (interface{}, []string, error) {
return nil, nil, EmptyNodeError
}
func (n *EmptyNode) Tree(string, int) []string {
return nil
}
func (n *EmptyNode) ResolveLink([]string) (*Link, []string, error) {
return nil, nil, EmptyNodeError
}
func (n *EmptyNode) Copy() Node {
return &EmptyNode{}
}
func (n *EmptyNode) Cid() cid.Cid {
id, err := cid.Prefix{
Version: 1,
Codec: cid.Raw,
MhType: mh.ID,
MhLength: 0,
}.Sum(nil)
if err != nil {
panic("failed to create an empty cid!")
}
return id
}
func (n *EmptyNode) Links() []*Link {
return nil
}
func (n *EmptyNode) Loggable() map[string]interface{} {
return nil
}
func (n *EmptyNode) String() string {
return "[]"
}
func (n *EmptyNode) RawData() []byte {
return nil
}
func (n *EmptyNode) Size() (uint64, error) {
return 0, nil
}
func (n *EmptyNode) Stat() (*NodeStat, error) {
return &NodeStat{}, nil
}
func TestNodeType(t *testing.T) {
// Type assertion.
var _ Node = &EmptyNode{}
}
github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyFSs7UnsU=
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/ipfs/go-block-format v0.0.2 h1:qPDvcP19izTjU8rgo6p7gTXZlkMkF5bz5G3fqIsSCPE=
github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY=
github.com/ipfs/go-cid v0.0.1 h1:GBjWPktLnNyX0JiQCNFpUuUSoMw5KMyqrsejHYlILBE=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.2 h1:tuuKaZPU1M6HcejsO3AcYWW8sZ8MTvyxfc4uqB4eFE8=
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-ipfs-util v0.0.1 h1:Wz9bL2wB2YBJqggkA4dD7oSmqB4cAnpNbGrlHJulv50=
github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc=
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 h1:5W7KhL8HVF3XCFOweFD3BNESdnO8ewyYTFT2R+/b8FQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U=
github.com/mr-tron/base58 v1.1.0 h1:Y51FGVJ91WBqCEabAi5OPUz38eAx8DakuAm5svLcsfQ=
github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/multiformats/go-base32 v0.0.3 h1:tw5+NhuwaOjJCC5Pp82QuXbrmLzWg7uxlMFp8Nq/kkI=
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-multibase v0.0.1 h1:PN9/v21eLywrFWdFNsFKaU04kLJzuYzmrJR+ubhT9qA=
github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs=
github.com/multiformats/go-multihash v0.0.1 h1:HHwN1K12I+XllBCrqKnhX949Orn4oawPkegHMu2vDqQ=
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67 h1:ng3VDlRp5/DHpSWl02R4rM9I+8M2rhmsuLwAMmkLQWE=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/sys v0.0.0-20190219092855-153ac476189d h1:Z0Ahzd7HltpJtjAHHxX8QFP3j1yYgiuvjbjRzDj/KH0=
golang.org/x/sys v0.0.0-20190219092855-153ac476189d/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
package format
import (
"context"
"fmt"
cid "github.com/ipfs/go-cid"
)
var ErrNotFound = fmt.Errorf("merkledag: not found")
// Either a node or an error.
type NodeOption struct {
Node Node
Err error
}
// The basic Node resolution service.
type NodeGetter interface {
// Get retrieves nodes by CID. Depending on the NodeGetter
// implementation, this may involve fetching the Node from a remote
// machine; consider setting a deadline in the context.
Get(context.Context, cid.Cid) (Node, error)
// GetMany returns a channel of NodeOptions given a set of CIDs.
GetMany(context.Context, []cid.Cid) <-chan *NodeOption
}
// NodeAdder adds nodes to a DAG.
type NodeAdder interface {
// Add adds a node to this DAG.
Add(context.Context, Node) error
// AddMany adds many nodes to this DAG.
//
// Consider using the Batch NodeAdder (`NewBatch`) if you make
// extensive use of this function.
AddMany(context.Context, []Node) error
}
// NodeGetters can optionally implement this interface to make finding linked
// objects faster.
type LinkGetter interface {
NodeGetter
// TODO(ipfs/go-ipld-format#9): This should return []cid.Cid
// GetLinks returns the children of the node refered to by the given
// CID.
GetLinks(ctx context.Context, nd cid.Cid) ([]*Link, error)
}
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
NodeGetter
NodeAdder
// Remove removes a node from this DAG.
//
// Remove returns no error if the requested node is not present in this DAG.
Remove(context.Context, cid.Cid) error
// RemoveMany removes many nodes from this DAG.
//
// It returns success even if the nodes were not present in the DAG.
RemoveMany(context.Context, []cid.Cid) error
}
package format
import (
"context"
cid "github.com/ipfs/go-cid"
)
// NavigableIPLDNode implements the `NavigableNode` interface wrapping
// an IPLD `Node` and providing support for node promises.
type NavigableIPLDNode struct {
node Node
// The CID of each child of the node.
childCIDs []cid.Cid
// Node promises for child nodes requested.
childPromises []*NodePromise
// TODO: Consider encapsulating it in a single structure alongside `childCIDs`.
nodeGetter NodeGetter
// TODO: Should this be stored in the `Walker`'s context to avoid passing
// it along to every node? It seems like a structure that doesn't need
// to be replicated (the entire DAG will use the same `NodeGetter`).
}
// NewNavigableIPLDNode returns a `NavigableIPLDNode` wrapping the provided
// `node`.
func NewNavigableIPLDNode(node Node, nodeGetter NodeGetter) *NavigableIPLDNode {
nn := &NavigableIPLDNode{
node: node,
nodeGetter: nodeGetter,
}
nn.childCIDs = getLinkCids(node)
nn.childPromises = make([]*NodePromise, len(nn.childCIDs))
return nn
}
// FetchChild implements the `NavigableNode` interface using node promises
// to preload the following child nodes to `childIndex` leaving them ready
// for subsequent `FetchChild` calls.
func (nn *NavigableIPLDNode) FetchChild(ctx context.Context, childIndex uint) (NavigableNode, error) {
// This function doesn't check that `childIndex` is valid, that's
// the `Walker` responsibility.
// If we drop to <= preloadSize/2 preloading nodes, preload the next 10.
for i := childIndex; i < childIndex+preloadSize/2 && i < uint(len(nn.childPromises)); i++ {
// TODO: Check if canceled.
if nn.childPromises[i] == nil {
nn.preload(ctx, i)
break
}
}
child, err := nn.getPromiseValue(ctx, childIndex)
switch err {
case nil:
case context.DeadlineExceeded, context.Canceled:
if ctx.Err() != nil {
return nil, ctx.Err()
}
// In this case, the context used to *preload* the node (in a previous
// `FetchChild` call) has been canceled. We need to retry the load with
// the current context and we might as well preload some extra nodes
// while we're at it.
nn.preload(ctx, childIndex)
child, err = nn.getPromiseValue(ctx, childIndex)
if err != nil {
return nil, err
}
default:
return nil, err
}
return NewNavigableIPLDNode(child, nn.nodeGetter), nil
}
// Number of nodes to preload every time a child is requested.
// TODO: Give more visibility to this constant, it could be an attribute
// set in the `Walker` context that gets passed in `FetchChild`.
const preloadSize = 10
// Preload at most `preloadSize` child nodes from `beg` through promises
// created using this `ctx`.
func (nn *NavigableIPLDNode) preload(ctx context.Context, beg uint) {
end := beg + preloadSize
if end >= uint(len(nn.childCIDs)) {
end = uint(len(nn.childCIDs))
}
copy(nn.childPromises[beg:], GetNodes(ctx, nn.nodeGetter, nn.childCIDs[beg:end]))
}
// Fetch the actual node (this is the blocking part of the mechanism)
// and invalidate the promise. `preload` should always be called first
// for the `childIndex` being fetch.
//
// TODO: Include `preload` into the beginning of this function?
// (And collapse the two calls in `FetchChild`).
func (nn *NavigableIPLDNode) getPromiseValue(ctx context.Context, childIndex uint) (Node, error) {
value, err := nn.childPromises[childIndex].Get(ctx)
nn.childPromises[childIndex] = nil
return value, err
}
// Get the CID of all the links of this `node`.
func getLinkCids(node Node) []cid.Cid {
links := node.Links()
out := make([]cid.Cid, 0, len(links))
for _, l := range links {
out = append(out, l.Cid)
}
return out
}
// GetIPLDNode returns the IPLD `Node` wrapped into this structure.
func (nn *NavigableIPLDNode) GetIPLDNode() Node {
return nn.node
}
// ChildTotal implements the `NavigableNode` returning the number
// of links (of child nodes) in this node.
func (nn *NavigableIPLDNode) ChildTotal() uint {
return uint(len(nn.GetIPLDNode().Links()))
}
// ExtractIPLDNode is a helper function that takes a `NavigableNode`
// and returns the IPLD `Node` wrapped inside. Used in the `Visitor`
// function.
// TODO: Check for errors to avoid a panic?
func ExtractIPLDNode(node NavigableNode) Node {
return node.GetIPLDNode()
}
// TODO: `Cleanup` is not supported at the moment in the `Walker`.
//
// Called in `Walker.up()` when the node is not part of the path anymore.
//func (nn *NavigableIPLDNode) Cleanup() {
// // TODO: Ideally this would be the place to issue a context `cancel()`
// // but since the DAG reader uses multiple contexts in the same session
// // (through `Read` and `CtxReadFull`) we would need to store an array
// // with the multiple contexts in `NavigableIPLDNode` with its corresponding
// // cancel functions.
//}
package format
import (
"context"
)
// NodePromise provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
//
// Thread Safety: This is multiple-consumer/single-producer safe.
func NewNodePromise(ctx context.Context) *NodePromise {
return &NodePromise{
done: make(chan struct{}),
ctx: ctx,
}
}
type NodePromise struct {
value Node
err error
done chan struct{}
ctx context.Context
}
// Call this function to fail a promise.
//
// Once a promise has been failed or fulfilled, further attempts to fail it will
// be silently dropped.
func (np *NodePromise) Fail(err error) {
if np.err != nil || np.value != nil {
// Already filled.
return
}
np.err = err
close(np.done)
}
// Fulfill this promise.
//
// Once a promise has been fulfilled or failed, calling this function will
// panic.
func (np *NodePromise) Send(nd Node) {
// if promise has a value, don't fail it
if np.err != nil || np.value != nil {
panic("already filled")
}
np.value = nd
close(np.done)
}
// Get the value of this promise.
//
// This function is safe to call concurrently from any number of goroutines.
func (np *NodePromise) Get(ctx context.Context) (Node, error) {
select {
case <-np.done:
return np.value, np.err
case <-np.ctx.Done():
return nil, np.ctx.Err()
case <-ctx.Done():
return nil, ctx.Err()
}
}
package format
import (
"context"
"errors"
)
// Walker provides methods to move through a DAG of nodes that implement
// the `NavigableNode` interface. It uses iterative algorithms (instead
// of recursive ones) that expose the `path` of nodes from the root to
// the `ActiveNode` it currently points to.
//
// It provides multiple ways to walk through the DAG (e.g. `Iterate`
// and `Seek`). When using them, you provide a Visitor function that
// will be called for each node the Walker traverses. The Visitor can
// read data from those nodes and, optionally, direct the movement of
// the Walker by calling `Pause` (to stop traversing and return) or
// `NextChild` (to skip a child and its descendants). See the DAG reader
// in `github.com/ipfs/go-unixfs/io/dagreader.go` for a usage example.
// TODO: This example isn't merged yet.
type Walker struct {
// Sequence of nodes in the DAG from the root to the `ActiveNode`, each
// position in the slice being the parent of the next one. The `ActiveNode`
// resides in the position indexed by `currentDepth` (the slice may contain
// more elements past that point but they should be ignored since the slice
// is not truncated to leverage the already allocated space).
//
// Every time `down` is called the `currentDepth` increases and the child
// of the `ActiveNode` is inserted after it (effectively becoming the new
// `ActiveNode`).
//
// The slice must *always* have a length bigger than zero with the root
// of the DAG at the first position (empty DAGs are not valid).
path []NavigableNode
// Depth of the `ActiveNode`. It grows downwards, root being 0, its child 1,
// and so on. It controls the effective length of `path` and `childIndex`.
//
// A currentDepth of -1 signals the start case of a new `Walker` that hasn't
// moved yet. Although this state is an invalid index to the slices, it
// allows to centralize all the visit calls in the `down` move (starting at
// zero would require a special visit case inside every walk operation like
// `Iterate()` and `Seek`). This value should never be returned to after
// the first `down` movement, moving up from the root should always return
// `errUpOnRoot`.
currentDepth int
// This slice has the index of the child each node in `path` is pointing
// to. The child index in the node can be set past all of its child nodes
// (having a value equal to `ChildTotal`) to signal it has visited (or
// skipped) all of them. A leaf node with no children that has its index
// in zero would also comply with this format.
//
// Complement to `path`, not only do we need to know which nodes have been
// traversed to reach the `ActiveNode` but also which child nodes they are
// to correctly have the active path of the DAG. (Reword this paragraph.)
childIndex []uint
// Flag to signal that a pause in the current walk operation has been
// requested by the user inside `Visitor`.
pauseRequested bool
// Used to pass information from the central `Walker` structure to the
// distributed `NavigableNode`s (to have a centralized configuration
// structure to control the behavior of all of them), e.g., to tell
// the `NavigableIPLDNode` which context should be used to load node
// promises (but this could later be used in more elaborate ways).
ctx context.Context
}
// `Walker` implementation details:
//
// The `Iterate` and `Seek` walk operations are implemented through two
// basic move methods `up` and `down`, that change which node is the
// `ActiveNode` (modifying the `path` that leads to it). The `NextChild`
// method allows to change which child the `ActiveNode` is pointing to
// in order to change the direction of the descent.
//
// The `down` method is the analogous of a recursive call and the one in
// charge of visiting (possible new) nodes (through `Visitor`) and performing
// some user-defined logic. A `Pause` method is available to interrupt the
// current walk operation after visiting a node.
//
// Key terms and concepts:
// * Walk operation (e.g., `Iterate`).
// * Move methods: `up` and `down`.
// * Active node.
// * Path to the active node.
// Function called each time a node is arrived upon in a walk operation
// through the `down` method (not when going back `up`). It is the main
// API to implement DAG functionality (e.g., read and seek a file DAG)
// on top of the `Walker` structure.
//
// Its argument is the current `node` being visited (the `ActiveNode`).
// Any error it returns (apart from the internal `errPauseWalkOperation`)
// will be forwarded to the caller of the walk operation (pausing it).
//
// Any of the exported methods of this API should be allowed to be called
// from within this method, e.g., `NextChild`.
// TODO: Check that. Can `ResetPosition` be called without breaking
// the `Walker` integrity?
type Visitor func(node NavigableNode) error
// NavigableNode is the interface the nodes of a DAG need to implement in
// order to be traversed by the `Walker`.
type NavigableNode interface {
// FetchChild returns the child of this node pointed to by `childIndex`.
// A `Context` stored in the `Walker` is passed (`ctx`) that may contain
// configuration attributes stored by the user before initiating the
// walk operation.
FetchChild(ctx context.Context, childIndex uint) (NavigableNode, error)
// ChildTotal returns the number of children of the `ActiveNode`.
ChildTotal() uint
// GetIPLDNode returns actual IPLD Node
GetIPLDNode() Node
// TODO: Evaluate providing the `Cleanup` and `Reset` methods.
// Cleanup is an optional method that is called by the `Walker` when
// this node leaves the active `path`, i.e., when this node is the
// `ActiveNode` and the `up` movement is called.
//Cleanup()
// Allow this method to return an error? That would imply
// modifying the `Walker` API, `up()` would now return an error
// different than `errUpOnRoot`.
// Reset is an optional function that is called by the `Walker` when
// `ResetPosition` is called, it is only applied to the root node
// of the DAG.
//Reset()
}
// NewWalker creates a new `Walker` structure from a `root`
// NavigableNode.
func NewWalker(ctx context.Context, root NavigableNode) *Walker {
return &Walker{
ctx: ctx,
path: []NavigableNode{root},
childIndex: []uint{0},
currentDepth: -1,
// Starting position, "on top" of the root node, see `currentDepth`.
}
}
// ActiveNode returns the `NavigableNode` that `Walker` is pointing
// to at the moment. It changes when `up` or `down` is called.
func (w *Walker) ActiveNode() NavigableNode {
return w.path[w.currentDepth]
// TODO: Add a check for the initial state of `currentDepth` -1?
}
// ErrDownNoChild signals there is no child at `ActiveChildIndex` in the
// `ActiveNode` to go down to.
var ErrDownNoChild = errors.New("can't go down, the child does not exist")
// errUpOnRoot signals the end of the DAG after returning to the root.
var errUpOnRoot = errors.New("can't go up, already on root")
// EndOfDag wraps the `errUpOnRoot` and signals to the user that the
// entire DAG has been iterated.
var EndOfDag = errors.New("end of DAG")
// ErrNextNoChild signals the end of this parent child nodes.
var ErrNextNoChild = errors.New("can't go to the next child, no more child nodes in this parent")
// errPauseWalkOperation signals the pause of the walk operation.
var errPauseWalkOperation = errors.New("pause in the current walk operation")
// ErrNilVisitor signals the lack of a `Visitor` function.
var ErrNilVisitor = errors.New("no Visitor function specified")
// Iterate the DAG through the DFS pre-order walk algorithm, going down
// as much as possible, then `NextChild` to the other siblings, and then up
// (to go down again). The position is saved throughout iterations (and
// can be previously set in `Seek`) allowing `Iterate` to be called
// repeatedly (after a `Pause`) to continue the iteration.
//
// This function returns the errors received from `down` (generated either
// inside the `Visitor` call or any other errors while fetching the child
// nodes), the rest of the move errors are handled within the function and
// are not returned.
func (w *Walker) Iterate(visitor Visitor) error {
// Iterate until either: the end of the DAG (`errUpOnRoot`), a `Pause`
// is requested (`errPauseWalkOperation`) or an error happens (while
// going down).
for {
// First, go down as much as possible.
for {
err := w.down(visitor)
if err == ErrDownNoChild {
break
// Can't keep going down from this node, try to move Next.
}
if err == errPauseWalkOperation {
return nil
// Pause requested, `errPauseWalkOperation` is just an internal
// error to signal to pause, don't pass it along.
}
if err != nil {
return err
// `down` is the only movement that can return *any* error.
}
}
// Can't move down anymore, turn to the next child in the `ActiveNode`
// to go down a different path. If there are no more child nodes
// available, go back up.
for {
err := w.NextChild()
if err == nil {
break
// No error, it turned to the next child. Try to go down again.
}
// It can't go Next (`ErrNextNoChild`), try to move up.
err = w.up()
if err != nil {
// Can't move up, on the root again (`errUpOnRoot`).
return EndOfDag
}
// Moved up, try `NextChild` again.
}
// Turned to the next child (after potentially many up moves),
// try going down again.
}
}
// Seek a specific node in a downwards manner. The `Visitor` should be
// used to steer the seek selecting at each node which child will the
// seek continue to (extending the `path` in that direction) or pause it
// (if the desired node has been found). The seek always starts from
// the root. It modifies the position so it shouldn't be used in-between
// `Iterate` calls (it can be used to set the position *before* iterating).
// If the visitor returns any non-`nil` errors the seek will stop.
//
// TODO: The seek could be extended to seek from the current position.
// (Is there something in the logic that would prevent it at the moment?)
func (w *Walker) Seek(visitor Visitor) error {
if visitor == nil {
return ErrNilVisitor
// Although valid, there is no point in calling `Seek` without
// any extra logic, it would just go down to the leftmost leaf,
// so this would probably be a user error.
}
// Go down until it the desired node is found (that will be signaled
// pausing the seek with `errPauseWalkOperation`) or a leaf node is
// reached (end of the DAG).
for {
err := w.down(visitor)
if err == errPauseWalkOperation {
return nil
// Found the node, `errPauseWalkOperation` is just an internal
// error to signal to pause, don't pass it along.
}
if err == ErrDownNoChild {
return nil
// Can't keep going down from this node, either at a leaf node
// or the `Visitor` has moved the child index past the
// available index (probably because none indicated that the
// target node could be down from there).
}
if err != nil {
return err
// `down()` is the only movement that can return *any* error.
}
}
// TODO: Copied from the first part of `Iterate()` (although conceptually
// different from it). Could this be encapsulated in a function to avoid
// repeating code? The way the pause signal is handled it wouldn't seem
// very useful: the `errPauseWalkOperation` needs to be processed at this
// depth to return from the function (and pause the seek, returning
// from another function here wouldn't cause it to stop).
}
// Go down one level in the DAG to the child of the `ActiveNode`
// pointed to by `ActiveChildIndex` and perform some logic on it by
// through the user-specified `visitor`.
//
// This should always be the first move in any walk operation
// (to visit the root node and move the `currentDepth` away
// from the negative value).
func (w *Walker) down(visitor Visitor) error {
child, err := w.fetchChild()
if err != nil {
return err
}
w.extendPath(child)
return w.visitActiveNode(visitor)
}
// Fetch the child from the `ActiveNode` through the `FetchChild`
// method of the `NavigableNode` interface.
func (w *Walker) fetchChild() (NavigableNode, error) {
if w.currentDepth == -1 {
// First time `down()` is called, `currentDepth` is -1,
// return the root node. Don't check available child nodes
// (as the `Walker` is not actually on any node just yet
// and `ActiveChildIndex` is of no use yet).
return w.path[0], nil
}
// Check if the child to fetch exists.
if w.ActiveChildIndex() >= w.ActiveNode().ChildTotal() {
return nil, ErrDownNoChild
}
return w.ActiveNode().FetchChild(w.ctx, w.ActiveChildIndex())
// TODO: Maybe call `extendPath` here and hide it away
// from `down`.
}
// Increase the `currentDepth` and extend the `path` to the fetched
// `child` node (which now becomes the new `ActiveNode`)
func (w *Walker) extendPath(child NavigableNode) {
w.currentDepth++
// Extend the slices if needed (doubling its capacity).
if w.currentDepth >= len(w.path) {
w.path = append(w.path, make([]NavigableNode, len(w.path))...)
w.childIndex = append(w.childIndex, make([]uint, len(w.childIndex))...)
// TODO: Check the performance of this grow mechanism.
}
// `child` now becomes the `ActiveNode()`.
w.path[w.currentDepth] = child
w.childIndex[w.currentDepth] = 0
}
// Call the `Visitor` on the `ActiveNode`. This function should only be
// called from `down`. This is a wrapper function to `Visitor` to process
// the `Pause` signal and do other minor checks (taking this logic away
// from `down`).
func (w *Walker) visitActiveNode(visitor Visitor) error {
if visitor == nil {
return nil
// No need to check `pauseRequested` as `Pause` should
// only be called from within the `Visitor`.
}
err := visitor(w.ActiveNode())
if w.pauseRequested {
// If a pause was requested make sure an error is returned
// that will cause the current walk operation to return. If
// `Visitor` didn't return an error set an artificial one
// generated by the `Walker`.
if err == nil {
err = errPauseWalkOperation
}
w.pauseRequested = false
}
return err
}
// Go up from the `ActiveNode`. The only possible error this method
// can return is to signal it's already at the root and can't go up.
func (w *Walker) up() error {
if w.currentDepth < 1 {
return errUpOnRoot
}
w.currentDepth--
// w.ActiveNode().Cleanup()
// If `Cleanup` is supported this would be the place to call it.
return nil
}
// NextChild increases the child index of the `ActiveNode` to point
// to the next child (which may exist or may be the end of the available
// child nodes).
//
// This method doesn't change the `ActiveNode`, it just changes where
// is it pointing to next, it could be interpreted as "turn to the next
// child".
func (w *Walker) NextChild() error {
w.incrementActiveChildIndex()
if w.ActiveChildIndex() == w.ActiveNode().ChildTotal() {
return ErrNextNoChild
// At the end of the available children, signal it.
}
return nil
}
// incrementActiveChildIndex increments the child index of the `ActiveNode` to
// point to the next child (if it exists) or to the position past all of
// the child nodes (`ChildTotal`) to signal that all of its children have
// been visited/skipped (if already at that last position, do nothing).
func (w *Walker) incrementActiveChildIndex() {
if w.ActiveChildIndex()+1 <= w.ActiveNode().ChildTotal() {
w.childIndex[w.currentDepth]++
}
}
// ActiveChildIndex returns the index of the child the `ActiveNode()`
// is pointing to.
func (w *Walker) ActiveChildIndex() uint {
return w.childIndex[w.currentDepth]
}
// SetContext changes the internal `Walker` (that is provided to the
// `NavigableNode`s when calling `FetchChild`) with the one passed
// as argument.
func (w *Walker) SetContext(ctx context.Context) {
w.ctx = ctx
}
// Pause the current walk operation. This function must be called from
// within the `Visitor` function.
func (w *Walker) Pause() {
w.pauseRequested = true
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment