Commit 692a3520 authored by Jeromy's avatar Jeromy

move dagutils package to top level in preparation for merkledag extraction

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent 0fa5f2ed
package dagutils
import (
"context"
"fmt"
"path"
dag "github.com/ipfs/go-ipfs/merkledag"
cid "gx/ipfs/QmYVNvtQkeZ6AKSwDrjQTs432QtL6umrrK41EBq3cu7iSP/go-cid"
ipld "gx/ipfs/QmZtNq8dArGfnpCZfx2pUNY7UcjGhVp5qqwQ4hH6mpTMRQ/go-ipld-format"
)
// These constants define the changes that can be applied to a DAG.
const (
Add = iota
Remove
Mod
)
// Change represents a change to a DAG and contains a reference to the old and
// new CIDs.
type Change struct {
Type int
Path string
Before *cid.Cid
After *cid.Cid
}
// String prints a human-friendly line about a change.
func (c *Change) String() string {
switch c.Type {
case Add:
return fmt.Sprintf("Added %s at %s", c.After.String(), c.Path)
case Remove:
return fmt.Sprintf("Removed %s from %s", c.Before.String(), c.Path)
case Mod:
return fmt.Sprintf("Changed %s to %s at %s", c.Before.String(), c.After.String(), c.Path)
default:
panic("nope")
}
}
// ApplyChange applies the requested changes to the given node in the given dag.
func ApplyChange(ctx context.Context, ds ipld.DAGService, nd *dag.ProtoNode, cs []*Change) (*dag.ProtoNode, error) {
e := NewDagEditor(nd, ds)
for _, c := range cs {
switch c.Type {
case Add:
child, err := ds.Get(ctx, c.After)
if err != nil {
return nil, err
}
childpb, ok := child.(*dag.ProtoNode)
if !ok {
return nil, dag.ErrNotProtobuf
}
err = e.InsertNodeAtPath(ctx, c.Path, childpb, nil)
if err != nil {
return nil, err
}
case Remove:
err := e.RmLink(ctx, c.Path)
if err != nil {
return nil, err
}
case Mod:
err := e.RmLink(ctx, c.Path)
if err != nil {
return nil, err
}
child, err := ds.Get(ctx, c.After)
if err != nil {
return nil, err
}
childpb, ok := child.(*dag.ProtoNode)
if !ok {
return nil, dag.ErrNotProtobuf
}
err = e.InsertNodeAtPath(ctx, c.Path, childpb, nil)
if err != nil {
return nil, err
}
}
}
return e.Finalize(ctx, ds)
}
// Diff returns a set of changes that transform node 'a' into node 'b'
func Diff(ctx context.Context, ds ipld.DAGService, a, b ipld.Node) ([]*Change, error) {
// Base case where both nodes are leaves, just compare
// their CIDs.
if len(a.Links()) == 0 && len(b.Links()) == 0 {
if a.Cid().Equals(b.Cid()) {
return []*Change{}, nil
}
return []*Change{
&Change{
Type: Mod,
Before: a.Cid(),
After: b.Cid(),
},
}, nil
}
var out []*Change
cleanA := a.Copy().(*dag.ProtoNode)
cleanB := b.Copy().(*dag.ProtoNode)
// strip out unchanged stuff
for _, lnk := range a.Links() {
l, _, err := b.ResolveLink([]string{lnk.Name})
if err == nil {
if l.Cid.Equals(lnk.Cid) {
// no change... ignore it
} else {
anode, err := lnk.GetNode(ctx, ds)
if err != nil {
return nil, err
}
bnode, err := l.GetNode(ctx, ds)
if err != nil {
return nil, err
}
anodepb, ok := anode.(*dag.ProtoNode)
if !ok {
return nil, dag.ErrNotProtobuf
}
bnodepb, ok := bnode.(*dag.ProtoNode)
if !ok {
return nil, dag.ErrNotProtobuf
}
sub, err := Diff(ctx, ds, anodepb, bnodepb)
if err != nil {
return nil, err
}
for _, subc := range sub {
subc.Path = path.Join(lnk.Name, subc.Path)
out = append(out, subc)
}
}
cleanA.RemoveNodeLink(l.Name)
cleanB.RemoveNodeLink(l.Name)
}
}
for _, lnk := range cleanA.Links() {
out = append(out, &Change{
Type: Remove,
Path: lnk.Name,
Before: lnk.Cid,
})
}
for _, lnk := range cleanB.Links() {
out = append(out, &Change{
Type: Add,
Path: lnk.Name,
After: lnk.Cid,
})
}
return out, nil
}
// Conflict represents two incompatible changes and is returned by MergeDiffs().
type Conflict struct {
A *Change
B *Change
}
// MergeDiffs takes two slice of changes and adds them to a single slice.
// When a Change from b happens to the same path of an existing change in a,
// a conflict is created and b is not added to the merged slice.
// A slice of Conflicts is returned and contains pointers to the
// Changes involved (which share the same path).
func MergeDiffs(a, b []*Change) ([]*Change, []Conflict) {
var out []*Change
var conflicts []Conflict
paths := make(map[string]*Change)
for _, c := range a {
paths[c.Path] = c
}
for _, c := range b {
if ca, ok := paths[c.Path]; ok {
conflicts = append(conflicts, Conflict{
A: ca,
B: c,
})
} else {
out = append(out, c)
}
}
for _, c := range paths {
out = append(out, c)
}
return out, conflicts
}
package dagutils
import (
"context"
"fmt"
mdag "github.com/ipfs/go-ipfs/merkledag"
cid "gx/ipfs/QmYVNvtQkeZ6AKSwDrjQTs432QtL6umrrK41EBq3cu7iSP/go-cid"
ipld "gx/ipfs/QmZtNq8dArGfnpCZfx2pUNY7UcjGhVp5qqwQ4hH6mpTMRQ/go-ipld-format"
)
// DiffEnumerate fetches every object in the graph pointed to by 'to' that is
// not in 'from'. This can be used to more efficiently fetch a graph if you can
// guarantee you already have the entirety of 'from'
func DiffEnumerate(ctx context.Context, dserv ipld.NodeGetter, from, to *cid.Cid) error {
fnd, err := dserv.Get(ctx, from)
if err != nil {
return fmt.Errorf("get %s: %s", from, err)
}
tnd, err := dserv.Get(ctx, to)
if err != nil {
return fmt.Errorf("get %s: %s", to, err)
}
diff := getLinkDiff(fnd, tnd)
sset := cid.NewSet()
for _, c := range diff {
// Since we're already assuming we have everything in the 'from' graph,
// add all those cids to our 'already seen' set to avoid potentially
// enumerating them later
if c.bef != nil {
sset.Add(c.bef)
}
}
for _, c := range diff {
if c.bef == nil {
if sset.Has(c.aft) {
continue
}
err := mdag.EnumerateChildrenAsync(ctx, mdag.GetLinksDirect(dserv), c.aft, sset.Visit)
if err != nil {
return err
}
} else {
err := DiffEnumerate(ctx, dserv, c.bef, c.aft)
if err != nil {
return err
}
}
}
return nil
}
// if both bef and aft are not nil, then that signifies bef was replaces with aft.
// if bef is nil and aft is not, that means aft was newly added
// if aft is nil and bef is not, that means bef was deleted
type diffpair struct {
bef, aft *cid.Cid
}
// getLinkDiff returns a changeset between nodes 'a' and 'b'. Currently does
// not log deletions as our usecase doesnt call for this.
func getLinkDiff(a, b ipld.Node) []diffpair {
ina := make(map[string]*ipld.Link)
inb := make(map[string]*ipld.Link)
var aonly []*cid.Cid
for _, l := range b.Links() {
inb[l.Cid.KeyString()] = l
}
for _, l := range a.Links() {
var key = l.Cid.KeyString()
ina[key] = l
if inb[key] == nil {
aonly = append(aonly, l.Cid)
}
}
var out []diffpair
var aindex int
for _, l := range b.Links() {
if ina[l.Cid.KeyString()] != nil {
continue
}
if aindex < len(aonly) {
out = append(out, diffpair{bef: aonly[aindex], aft: l.Cid})
aindex++
} else {
out = append(out, diffpair{aft: l.Cid})
continue
}
}
return out
}
package dagutils
import (
"context"
"fmt"
"testing"
dag "github.com/ipfs/go-ipfs/merkledag"
mdtest "github.com/ipfs/go-ipfs/merkledag/test"
cid "gx/ipfs/QmYVNvtQkeZ6AKSwDrjQTs432QtL6umrrK41EBq3cu7iSP/go-cid"
ipld "gx/ipfs/QmZtNq8dArGfnpCZfx2pUNY7UcjGhVp5qqwQ4hH6mpTMRQ/go-ipld-format"
)
func buildNode(name string, desc map[string]ndesc, out map[string]ipld.Node) ipld.Node {
this := desc[name]
nd := new(dag.ProtoNode)
nd.SetData([]byte(name))
for k, v := range this {
child, ok := out[v]
if !ok {
child = buildNode(v, desc, out)
out[v] = child
}
if err := nd.AddNodeLink(k, child); err != nil {
panic(err)
}
}
return nd
}
type ndesc map[string]string
func mkGraph(desc map[string]ndesc) map[string]ipld.Node {
out := make(map[string]ipld.Node)
for name := range desc {
if _, ok := out[name]; ok {
continue
}
out[name] = buildNode(name, desc, out)
}
return out
}
var tg1 = map[string]ndesc{
"a1": ndesc{
"foo": "b",
},
"b": ndesc{},
"a2": ndesc{
"foo": "b",
"bar": "c",
},
"c": ndesc{},
}
var tg2 = map[string]ndesc{
"a1": ndesc{
"foo": "b",
},
"b": ndesc{},
"a2": ndesc{
"foo": "b",
"bar": "c",
},
"c": ndesc{"baz": "d"},
"d": ndesc{},
}
var tg3 = map[string]ndesc{
"a1": ndesc{
"foo": "b",
"bar": "c",
},
"b": ndesc{},
"a2": ndesc{
"foo": "b",
"bar": "d",
},
"c": ndesc{},
"d": ndesc{},
}
var tg4 = map[string]ndesc{
"a1": ndesc{
"key1": "b",
"key2": "c",
},
"a2": ndesc{
"key1": "b",
"key2": "d",
},
}
var tg5 = map[string]ndesc{
"a1": ndesc{
"key1": "a",
"key2": "b",
},
"a2": ndesc{
"key1": "c",
"key2": "d",
},
}
func TestNameMatching(t *testing.T) {
nds := mkGraph(tg4)
diff := getLinkDiff(nds["a1"], nds["a2"])
if len(diff) != 1 {
t.Fatal(fmt.Errorf("node diff didn't match by name"))
}
}
func TestNameMatching2(t *testing.T) {
nds := mkGraph(tg5)
diff := getLinkDiff(nds["a1"], nds["a2"])
if len(diff) != 2 {
t.Fatal(fmt.Errorf("incorrect number of link diff elements"))
}
if !(diff[0].bef.Equals(nds["a1"].Links()[0].Cid) && diff[0].aft.Equals(nds["a2"].Links()[0].Cid)) {
t.Fatal(fmt.Errorf("node diff didn't match by name"))
}
}
func TestDiffEnumBasic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nds := mkGraph(tg1)
ds := mdtest.Mock()
lgds := &getLogger{ds: ds}
for _, nd := range nds {
err := ds.Add(ctx, nd)
if err != nil {
t.Fatal(err)
}
}
err := DiffEnumerate(ctx, lgds, nds["a1"].Cid(), nds["a2"].Cid())
if err != nil {
t.Fatal(err)
}
err = assertCidList(lgds.log, []*cid.Cid{nds["a1"].Cid(), nds["a2"].Cid(), nds["c"].Cid()})
if err != nil {
t.Fatal(err)
}
}
type getLogger struct {
ds ipld.NodeGetter
log []*cid.Cid
}
func (gl *getLogger) Get(ctx context.Context, c *cid.Cid) (ipld.Node, error) {
nd, err := gl.ds.Get(ctx, c)
if err != nil {
return nil, err
}
gl.log = append(gl.log, c)
return nd, nil
}
func (gl *getLogger) GetMany(ctx context.Context, cids []*cid.Cid) <-chan *ipld.NodeOption {
outCh := make(chan *ipld.NodeOption, len(cids))
nds := gl.ds.GetMany(ctx, cids)
for no := range nds {
if no.Err == nil {
gl.log = append(gl.log, no.Node.Cid())
}
select {
case outCh <- no:
default:
panic("too many responses")
}
}
return nds
}
func assertCidList(a, b []*cid.Cid) error {
if len(a) != len(b) {
return fmt.Errorf("got different number of cids than expected")
}
for i, c := range a {
if !c.Equals(b[i]) {
return fmt.Errorf("expected %s, got %s", c, b[i])
}
}
return nil
}
func TestDiffEnumFail(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nds := mkGraph(tg2)
ds := mdtest.Mock()
lgds := &getLogger{ds: ds}
for _, s := range []string{"a1", "a2", "b", "c"} {
err := ds.Add(ctx, nds[s])
if err != nil {
t.Fatal(err)
}
}
err := DiffEnumerate(ctx, lgds, nds["a1"].Cid(), nds["a2"].Cid())
if err != ipld.ErrNotFound {
t.Fatal("expected err not found")
}
err = assertCidList(lgds.log, []*cid.Cid{nds["a1"].Cid(), nds["a2"].Cid(), nds["c"].Cid()})
if err != nil {
t.Fatal(err)
}
}
func TestDiffEnumRecurse(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nds := mkGraph(tg3)
ds := mdtest.Mock()
lgds := &getLogger{ds: ds}
for _, s := range []string{"a1", "a2", "b", "c", "d"} {
err := ds.Add(ctx, nds[s])
if err != nil {
t.Fatal(err)
}
}
err := DiffEnumerate(ctx, lgds, nds["a1"].Cid(), nds["a2"].Cid())
if err != nil {
t.Fatal(err)
}
err = assertCidList(lgds.log, []*cid.Cid{nds["a1"].Cid(), nds["a2"].Cid(), nds["c"].Cid(), nds["d"].Cid()})
if err != nil {
t.Fatal(err)
}
}
package dagutils
import (
"context"
"errors"
bserv "github.com/ipfs/go-ipfs/blockservice"
dag "github.com/ipfs/go-ipfs/merkledag"
path "github.com/ipfs/go-ipfs/path"
offline "gx/ipfs/QmS6mo1dPpHdYsVkm27BRZDLxpKBCiJKUH8fHX15XFfMez/go-ipfs-exchange-offline"
ipld "gx/ipfs/QmZtNq8dArGfnpCZfx2pUNY7UcjGhVp5qqwQ4hH6mpTMRQ/go-ipld-format"
bstore "gx/ipfs/QmadMhXJLHMFjpRmh85XjpmVDkEtQpNYEZNRpWRvYVLrvb/go-ipfs-blockstore"
ds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore"
syncds "gx/ipfs/QmeiCcJfDW1GJnWUArudsv5rQsihpi4oyddPhdqo3CfX6i/go-datastore/sync"
)
// Editor represents a ProtoNode tree editor and provides methods to
// modify it.
type Editor struct {
root *dag.ProtoNode
// tmp is a temporary in memory (for now) dagstore for all of the
// intermediary nodes to be stored in
tmp ipld.DAGService
// src is the dagstore with *all* of the data on it, it is used to pull
// nodes from for modification (nil is a valid value)
src ipld.DAGService
}
// NewMemoryDagService returns a new, thread-safe in-memory DAGService.
func NewMemoryDagService() ipld.DAGService {
// build mem-datastore for editor's intermediary nodes
bs := bstore.NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
bsrv := bserv.New(bs, offline.Exchange(bs))
return dag.NewDAGService(bsrv)
}
// NewDagEditor returns an ProtoNode editor.
//
// * root is the node to be modified
// * source is the dagstore to pull nodes from (optional)
func NewDagEditor(root *dag.ProtoNode, source ipld.DAGService) *Editor {
return &Editor{
root: root,
tmp: NewMemoryDagService(),
src: source,
}
}
// GetNode returns the a copy of the root node being edited.
func (e *Editor) GetNode() *dag.ProtoNode {
return e.root.Copy().(*dag.ProtoNode)
}
// GetDagService returns the DAGService used by this editor.
func (e *Editor) GetDagService() ipld.DAGService {
return e.tmp
}
func addLink(ctx context.Context, ds ipld.DAGService, root *dag.ProtoNode, childname string, childnd ipld.Node) (*dag.ProtoNode, error) {
if childname == "" {
return nil, errors.New("cannot create link with no name")
}
// ensure that the node we are adding is in the dagservice
err := ds.Add(ctx, childnd)
if err != nil {
return nil, err
}
_ = ds.Remove(ctx, root.Cid())
// ensure no link with that name already exists
_ = root.RemoveNodeLink(childname) // ignore error, only option is ErrNotFound
if err := root.AddNodeLink(childname, childnd); err != nil {
return nil, err
}
if err := ds.Add(ctx, root); err != nil {
return nil, err
}
return root, nil
}
// InsertNodeAtPath inserts a new node in the tree and replaces the current root with the new one.
func (e *Editor) InsertNodeAtPath(ctx context.Context, pth string, toinsert ipld.Node, create func() *dag.ProtoNode) error {
splpath := path.SplitList(pth)
nd, err := e.insertNodeAtPath(ctx, e.root, splpath, toinsert, create)
if err != nil {
return err
}
e.root = nd
return nil
}
func (e *Editor) insertNodeAtPath(ctx context.Context, root *dag.ProtoNode, path []string, toinsert ipld.Node, create func() *dag.ProtoNode) (*dag.ProtoNode, error) {
if len(path) == 1 {
return addLink(ctx, e.tmp, root, path[0], toinsert)
}
nd, err := root.GetLinkedProtoNode(ctx, e.tmp, path[0])
if err != nil {
// if 'create' is true, we create directories on the way down as needed
if err == dag.ErrLinkNotFound && create != nil {
nd = create()
err = nil // no longer an error case
} else if err == ipld.ErrNotFound {
// try finding it in our source dagstore
nd, err = root.GetLinkedProtoNode(ctx, e.src, path[0])
}
// if we receive an ErrNotFound, then our second 'GetLinkedNode' call
// also fails, we want to error out
if err != nil {
return nil, err
}
}
ndprime, err := e.insertNodeAtPath(ctx, nd, path[1:], toinsert, create)
if err != nil {
return nil, err
}
_ = e.tmp.Remove(ctx, root.Cid())
_ = root.RemoveNodeLink(path[0])
err = root.AddNodeLink(path[0], ndprime)
if err != nil {
return nil, err
}
err = e.tmp.Add(ctx, root)
if err != nil {
return nil, err
}
return root, nil
}
// RmLink removes the link with the given name and updates the root node of
// the editor.
func (e *Editor) RmLink(ctx context.Context, pth string) error {
splpath := path.SplitList(pth)
nd, err := e.rmLink(ctx, e.root, splpath)
if err != nil {
return err
}
e.root = nd
return nil
}
func (e *Editor) rmLink(ctx context.Context, root *dag.ProtoNode, path []string) (*dag.ProtoNode, error) {
if len(path) == 1 {
// base case, remove node in question
err := root.RemoveNodeLink(path[0])
if err != nil {
return nil, err
}
err = e.tmp.Add(ctx, root)
if err != nil {
return nil, err
}
return root, nil
}
// search for node in both tmp dagstore and source dagstore
nd, err := root.GetLinkedProtoNode(ctx, e.tmp, path[0])
if err == ipld.ErrNotFound {
nd, err = root.GetLinkedProtoNode(ctx, e.src, path[0])
}
if err != nil {
return nil, err
}
nnode, err := e.rmLink(ctx, nd, path[1:])
if err != nil {
return nil, err
}
e.tmp.Remove(ctx, root.Cid())
_ = root.RemoveNodeLink(path[0])
err = root.AddNodeLink(path[0], nnode)
if err != nil {
return nil, err
}
err = e.tmp.Add(ctx, root)
if err != nil {
return nil, err
}
return root, nil
}
// Finalize writes the new DAG to the given DAGService and returns the modified
// root node.
func (e *Editor) Finalize(ctx context.Context, ds ipld.DAGService) (*dag.ProtoNode, error) {
nd := e.GetNode()
err := copyDag(ctx, nd, e.tmp, ds)
return nd, err
}
func copyDag(ctx context.Context, nd ipld.Node, from, to ipld.DAGService) error {
// TODO(#4609): make this batch.
err := to.Add(ctx, nd)
if err != nil {
return err
}
for _, lnk := range nd.Links() {
child, err := lnk.GetNode(ctx, from)
if err != nil {
if err == ipld.ErrNotFound {
// not found means we didnt modify it, and it should
// already be in the target datastore
continue
}
return err
}
err = copyDag(ctx, child, from, to)
if err != nil {
return err
}
}
return nil
}
package dagutils
import (
"context"
"testing"
dag "github.com/ipfs/go-ipfs/merkledag"
mdtest "github.com/ipfs/go-ipfs/merkledag/test"
path "github.com/ipfs/go-ipfs/path"
cid "gx/ipfs/QmYVNvtQkeZ6AKSwDrjQTs432QtL6umrrK41EBq3cu7iSP/go-cid"
ipld "gx/ipfs/QmZtNq8dArGfnpCZfx2pUNY7UcjGhVp5qqwQ4hH6mpTMRQ/go-ipld-format"
)
func TestAddLink(t *testing.T) {
ctx, context := context.WithCancel(context.Background())
defer context()
ds := mdtest.Mock()
fishnode := dag.NodeWithData([]byte("fishcakes!"))
err := ds.Add(ctx, fishnode)
if err != nil {
t.Fatal(err)
}
nd := new(dag.ProtoNode)
nnode, err := addLink(ctx, ds, nd, "fish", fishnode)
if err != nil {
t.Fatal(err)
}
fnprime, err := nnode.GetLinkedNode(ctx, ds, "fish")
if err != nil {
t.Fatal(err)
}
fnpkey := fnprime.Cid()
if !fnpkey.Equals(fishnode.Cid()) {
t.Fatal("wrong child node found!")
}
}
func assertNodeAtPath(t *testing.T, ds ipld.DAGService, root *dag.ProtoNode, pth string, exp *cid.Cid) {
parts := path.SplitList(pth)
cur := root
for _, e := range parts {
nxt, err := cur.GetLinkedProtoNode(context.Background(), ds, e)
if err != nil {
t.Fatal(err)
}
cur = nxt
}
curc := cur.Cid()
if !curc.Equals(exp) {
t.Fatal("node not as expected at end of path")
}
}
func TestInsertNode(t *testing.T) {
root := new(dag.ProtoNode)
e := NewDagEditor(root, nil)
testInsert(t, e, "a", "anodefortesting", false, "")
testInsert(t, e, "a/b", "data", false, "")
testInsert(t, e, "a/b/c/d/e", "blah", false, "no link by that name")
testInsert(t, e, "a/b/c/d/e", "foo", true, "")
testInsert(t, e, "a/b/c/d/f", "baz", true, "")
testInsert(t, e, "a/b/c/d/f", "bar", true, "")
testInsert(t, e, "", "bar", true, "cannot create link with no name")
testInsert(t, e, "////", "slashes", true, "cannot create link with no name")
c := e.GetNode().Cid()
if c.String() != "QmZ8yeT9uD6ouJPNAYt62XffYuXBT6b4mP4obRSE9cJrSt" {
t.Fatal("output was different than expected: ", c)
}
}
func testInsert(t *testing.T, e *Editor, path, data string, create bool, experr string) {
child := dag.NodeWithData([]byte(data))
err := e.tmp.Add(context.Background(), child)
if err != nil {
t.Fatal(err)
}
var c func() *dag.ProtoNode
if create {
c = func() *dag.ProtoNode {
return &dag.ProtoNode{}
}
}
err = e.InsertNodeAtPath(context.Background(), path, child, c)
if experr != "" {
var got string
if err != nil {
got = err.Error()
}
if got != experr {
t.Fatalf("expected '%s' but got '%s'", experr, got)
}
return
}
if err != nil {
t.Fatal(err, path, data, create, experr)
}
assertNodeAtPath(t, e.tmp, e.root, path, child.Cid())
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment