Commit 2fbf40a0 authored by Steven Allen's avatar Steven Allen

merkledag: switch to new dag interface

Also:

* Update the blockstore/blockservice methods to match.
* Construct a new temporary offline dag instead of having a
  GetOfflineLinkService method.

License: MIT
Signed-off-by: default avatarSteven Allen <steven@stebalien.com>
parent 2b1de3e6
package merkledag
import (
"runtime"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
node "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
)
// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking.
// TODO(#4299): Experiment with multiple datastores, storage devices, and CPUs to find
// the right value/formula.
var ParallelBatchCommits = runtime.NumCPU() * 2
// Batch is a buffer for batching adds to a dag.
type Batch struct {
ds *dagService
activeCommits int
commitError error
commitResults chan error
blocks []blocks.Block
size int
MaxSize int
MaxBlocks int
}
func (t *Batch) processResults() {
for t.activeCommits > 0 && t.commitError == nil {
select {
case err := <-t.commitResults:
t.activeCommits--
if err != nil {
t.commitError = err
}
default:
return
}
}
}
func (t *Batch) asyncCommit() {
numBlocks := len(t.blocks)
if numBlocks == 0 || t.commitError != nil {
return
}
if t.activeCommits >= ParallelBatchCommits {
err := <-t.commitResults
t.activeCommits--
if err != nil {
t.commitError = err
return
}
}
go func(b []blocks.Block) {
_, err := t.ds.Blocks.AddBlocks(b)
t.commitResults <- err
}(t.blocks)
t.activeCommits++
t.blocks = make([]blocks.Block, 0, numBlocks)
t.size = 0
return
}
// Add adds a node to the batch and commits the batch if necessary.
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
// Not strictly necessary but allows us to catch errors early.
t.processResults()
if t.commitError != nil {
return nil, t.commitError
}
t.blocks = append(t.blocks, nd)
t.size += len(nd.RawData())
if t.size > t.MaxSize || len(t.blocks) > t.MaxBlocks {
t.asyncCommit()
}
return nd.Cid(), t.commitError
}
// Commit commits batched nodes.
func (t *Batch) Commit() error {
t.asyncCommit()
for t.activeCommits > 0 && t.commitError == nil {
err := <-t.commitResults
t.activeCommits--
if err != nil {
t.commitError = err
}
}
return t.commitError
}
......@@ -7,11 +7,11 @@ import (
"sync"
bserv "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
ipldcbor "gx/ipfs/QmNRz7BDWfdFNVLt7AVvmRefkrURD25EeoipcXqo6yoXU1/go-ipld-cbor"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
node "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format"
)
// TODO: We should move these registrations elsewhere. Really, most of the IPLD
......@@ -23,37 +23,7 @@ func init() {
node.Register(cid.DagCBOR, ipldcbor.DecodeBlock)
}
var ErrNotFound = fmt.Errorf("merkledag: not found")
// DAGService is an IPFS Merkle DAG service.
type DAGService interface {
// Add adds the node to the DAGService
Add(node.Node) (*cid.Cid, error)
// Get gets the node the from the DAGService
Get(context.Context, *cid.Cid) (node.Node, error)
// Remove removes the node from the DAGService
Remove(node.Node) error
// GetMany returns a channel of NodeOption given
// a set of CIDs.
GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
// Batch is a buffer for batching adds to a dag.
Batch() *Batch
LinkService
}
type LinkService interface {
// GetLinks return all links for a node. The complete node does not
// necessarily have to exist locally, or at all. For example, raw
// leaves cannot possibly have links so there is no need to look
// at the node.
GetLinks(context.Context, *cid.Cid) ([]*node.Link, error)
GetOfflineLinkService() LinkService
}
// NewDAGService constructs a new DAGService (using the default implementation).
func NewDAGService(bs bserv.BlockService) *dagService {
return &dagService{Blocks: bs}
}
......@@ -68,25 +38,20 @@ type dagService struct {
}
// Add adds a node to the dagService, storing the block in the BlockService
func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {
func (n *dagService) Add(ctx context.Context, nd node.Node) error {
if n == nil { // FIXME remove this assertion. protect with constructor invariant
return nil, fmt.Errorf("dagService is nil")
return fmt.Errorf("dagService is nil")
}
return n.Blocks.AddBlock(nd)
}
func (n *dagService) Batch() *Batch {
return &Batch{
ds: n,
commitResults: make(chan error, ParallelBatchCommits),
MaxSize: 8 << 20,
// By default, only batch up to 128 nodes at a time.
// The current implementation of flatfs opens this many file
// descriptors at the same time for the optimized batch write.
MaxBlocks: 128,
func (n *dagService) AddMany(ctx context.Context, nds []node.Node) error {
blks := make([]blocks.Block, len(nds))
for i, nd := range nds {
blks[i] = nd
}
return n.Blocks.AddBlocks(blks)
}
// Get retrieves a node from the dagService, fetching the block in the BlockService
......@@ -101,7 +66,7 @@ func (n *dagService) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
b, err := n.Blocks.GetBlock(ctx, c)
if err != nil {
if err == bserv.ErrNotFound {
return nil, ErrNotFound
return nil, node.ErrNotFound
}
return nil, fmt.Errorf("Failed to get block for %s: %v", c, err)
}
......@@ -122,17 +87,23 @@ func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, er
return node.Links(), nil
}
func (n *dagService) GetOfflineLinkService() LinkService {
if n.Blocks.Exchange().IsOnline() {
bsrv := bserv.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
return NewDAGService(bsrv)
} else {
return n
}
func (n *dagService) Remove(ctx context.Context, c *cid.Cid) error {
return n.Blocks.DeleteBlock(c)
}
func (n *dagService) Remove(nd node.Node) error {
return n.Blocks.DeleteBlock(nd)
// RemoveMany removes multiple nodes from the DAG. It will likely be faster than
// removing them individually.
//
// This operation is not atomic. If it returns an error, some nodes may or may
// not have been removed.
func (n *dagService) RemoveMany(ctx context.Context, cids []*cid.Cid) error {
// TODO(#4608): make this batch all the way down.
for _, c := range cids {
if err := n.Blocks.DeleteBlock(c); err != nil {
return err
}
}
return nil
}
// GetLinksDirect creates a function to get the links for a node, from
......@@ -140,14 +111,14 @@ func (n *dagService) Remove(nd node.Node) error {
// locally (and can not be retrieved) an error will be returned.
func GetLinksDirect(serv node.NodeGetter) GetLinks {
return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
node, err := serv.Get(ctx, c)
nd, err := serv.Get(ctx, c)
if err != nil {
if err == bserv.ErrNotFound {
err = ErrNotFound
err = node.ErrNotFound
}
return nil, err
}
return node.Links(), nil
return nd.Links(), nil
}
}
......@@ -155,11 +126,12 @@ type sesGetter struct {
bs *bserv.Session
}
// Get gets a single node from the DAG.
func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
blk, err := sg.bs.GetBlock(ctx, c)
switch err {
case bserv.ErrNotFound:
return nil, ErrNotFound
return nil, node.ErrNotFound
default:
return nil, err
case nil:
......@@ -169,8 +141,13 @@ func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
return node.Decode(blk)
}
// GetMany gets many nodes at once, batching the request if possible.
func (sg *sesGetter) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *node.NodeOption {
return getNodesFromBG(ctx, sg.bs, keys)
}
// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
func FetchGraph(ctx context.Context, root *cid.Cid, serv node.DAGService) error {
var ng node.NodeGetter = serv
ds, ok := serv.(*dagService)
if ok {
......@@ -205,14 +182,18 @@ func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int {
return out
}
type NodeOption struct {
Node node.Node
Err error
// GetMany gets many nodes from the DAG at once.
//
// This method may not return all requested nodes (and may or may not return an
// error indicating that it failed to do so. It is up to the caller to verify
// that it received all nodes.
func (n *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *node.NodeOption {
return getNodesFromBG(ctx, n.Blocks, keys)
}
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
out := make(chan *NodeOption, len(keys))
blocks := ds.Blocks.GetBlocks(ctx, keys)
func getNodesFromBG(ctx context.Context, bs bserv.BlockGetter, keys []*cid.Cid) <-chan *node.NodeOption {
out := make(chan *node.NodeOption, len(keys))
blocks := bs.GetBlocks(ctx, keys)
var count int
go func() {
......@@ -222,182 +203,43 @@ func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *Node
case b, ok := <-blocks:
if !ok {
if count != len(keys) {
out <- &NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
out <- &node.NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
}
return
}
nd, err := node.Decode(b)
if err != nil {
out <- &NodeOption{Err: err}
out <- &node.NodeOption{Err: err}
return
}
out <- &NodeOption{Node: nd}
out <- &node.NodeOption{Node: nd}
count++
case <-ctx.Done():
out <- &NodeOption{Err: ctx.Err()}
return
}
}
}()
return out
}
// GetDAG will fill out all of the links of the given Node.
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
var cids []*cid.Cid
for _, lnk := range root.Links() {
cids = append(cids, lnk.Cid)
}
return GetNodes(ctx, ds, cids)
}
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
// Early out if no work to do
if len(keys) == 0 {
return nil
}
promises := make([]NodeGetter, len(keys))
for i := range keys {
promises[i] = newNodePromise(ctx)
}
dedupedKeys := dedupeKeys(keys)
go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
nodechan := ds.GetMany(ctx, dedupedKeys)
for count := 0; count < len(keys); {
select {
case opt, ok := <-nodechan:
if !ok {
for _, p := range promises {
p.Fail(ErrNotFound)
}
return
}
if opt.Err != nil {
for _, p := range promises {
p.Fail(opt.Err)
}
return
}
nd := opt.Node
is := FindLinks(keys, nd.Cid(), 0)
for _, i := range is {
count++
promises[i].Send(nd)
}
case <-ctx.Done():
out <- &node.NodeOption{Err: ctx.Err()}
return
}
}
}()
return promises
}
// Remove duplicates from a list of keys
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
out := make([]*cid.Cid, 0, len(cids))
set := cid.NewSet()
for _, c := range cids {
if set.Visit(c) {
out = append(out, c)
}
}
return out
}
func newNodePromise(ctx context.Context) NodeGetter {
return &nodePromise{
recv: make(chan node.Node, 1),
ctx: ctx,
err: make(chan error, 1),
}
}
type nodePromise struct {
cache node.Node
clk sync.Mutex
recv chan node.Node
ctx context.Context
err chan error
}
// NodeGetter provides a promise like interface for a dag Node
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
type NodeGetter interface {
Get(context.Context) (node.Node, error)
Fail(err error)
Send(node.Node)
}
func (np *nodePromise) Fail(err error) {
np.clk.Lock()
v := np.cache
np.clk.Unlock()
// if promise has a value, don't fail it
if v != nil {
return
}
np.err <- err
}
func (np *nodePromise) Send(nd node.Node) {
var already bool
np.clk.Lock()
if np.cache != nil {
already = true
}
np.cache = nd
np.clk.Unlock()
if already {
panic("sending twice to the same promise is an error!")
}
np.recv <- nd
}
func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
np.clk.Lock()
c := np.cache
np.clk.Unlock()
if c != nil {
return c, nil
}
// GetLinks is the type of function passed to the EnumerateChildren function(s)
// for getting the children of an IPLD node.
type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)
select {
case nd := <-np.recv:
return nd, nil
case <-np.ctx.Done():
return nil, np.ctx.Err()
case <-ctx.Done():
return nil, ctx.Err()
case err := <-np.err:
return nil, err
// GetLinksWithDAG returns a GetLinks function that tries to use the given
// NodeGetter as a LinkGetter to get the children of a given IPLD node. This may
// allow us to traverse the DAG without actually loading and parsing the node in
// question (if we already have the links cached).
func GetLinksWithDAG(ng node.NodeGetter) GetLinks {
return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
return node.GetLinks(ctx, ng, c)
}
}
type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
......@@ -443,6 +285,10 @@ func (p *ProgressTracker) Value() int {
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8
// EnumerateChildrenAsync is equivalent to EnumerateChildren *except* that it
// fetches children in parallel.
//
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error {
feed := make(chan *cid.Cid)
out := make(chan []*node.Link)
......@@ -523,3 +369,8 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid,
}
}
var _ node.LinkGetter = &dagService{}
var _ node.NodeGetter = &dagService{}
var _ node.NodeGetter = &sesGetter{}
var _ node.DAGService = &dagService{}
......@@ -131,7 +131,7 @@ func TestBatchFetchDupBlock(t *testing.T) {
func runBatchFetchTest(t *testing.T, read io.Reader) {
ctx := context.Background()
var dagservs []DAGService
var dagservs []node.DAGService
for _, bsi := range bstest.Mocks(5) {
dagservs = append(dagservs, NewDAGService(bsi))
}
......@@ -155,7 +155,7 @@ func runBatchFetchTest(t *testing.T, read io.Reader) {
t.Fatal(err)
}
_, err = dagservs[0].Add(root)
err = dagservs[0].Add(ctx, root)
if err != nil {
t.Fatal(err)
}
......@@ -221,7 +221,7 @@ func TestCantGet(t *testing.T) {
}
func TestFetchGraph(t *testing.T) {
var dservs []DAGService
var dservs []node.DAGService
bsis := bstest.Mocks(2)
for _, bsi := range bsis {
dservs = append(dservs, NewDAGService(bsi))
......@@ -285,13 +285,15 @@ func TestEnumerateChildren(t *testing.T) {
}
func TestFetchFailure(t *testing.T) {
ctx := context.Background()
ds := dstest.Mock()
ds_bad := dstest.Mock()
top := new(ProtoNode)
for i := 0; i < 10; i++ {
nd := NodeWithData([]byte{byte('a' + i)})
_, err := ds.Add(nd)
err := ds.Add(ctx, nd)
if err != nil {
t.Fatal(err)
}
......@@ -304,7 +306,7 @@ func TestFetchFailure(t *testing.T) {
for i := 0; i < 10; i++ {
nd := NodeWithData([]byte{'f', 'a' + byte(i)})
_, err := ds_bad.Add(nd)
err := ds_bad.Add(ctx, nd)
if err != nil {
t.Fatal(err)
}
......@@ -315,9 +317,9 @@ func TestFetchFailure(t *testing.T) {
}
}
getters := GetDAG(context.Background(), ds, top)
getters := node.GetDAG(ctx, ds, top)
for i, getter := range getters {
_, err := getter.Get(context.Background())
_, err := getter.Get(ctx)
if err != nil && i < 10 {
t.Fatal(err)
}
......@@ -352,15 +354,17 @@ func TestUnmarshalFailure(t *testing.T) {
}
func TestBasicAddGet(t *testing.T) {
ctx := context.Background()
ds := dstest.Mock()
nd := new(ProtoNode)
c, err := ds.Add(nd)
err := ds.Add(ctx, nd)
if err != nil {
t.Fatal(err)
}
out, err := ds.Get(context.Background(), c)
out, err := ds.Get(ctx, nd.Cid())
if err != nil {
t.Fatal(err)
}
......@@ -371,20 +375,22 @@ func TestBasicAddGet(t *testing.T) {
}
func TestGetRawNodes(t *testing.T) {
ctx := context.Background()
rn := NewRawNode([]byte("test"))
ds := dstest.Mock()
c, err := ds.Add(rn)
err := ds.Add(ctx, rn)
if err != nil {
t.Fatal(err)
}
if !c.Equals(rn.Cid()) {
if !rn.Cid().Equals(rn.Cid()) {
t.Fatal("output cids didnt match")
}
out, err := ds.Get(context.TODO(), c)
out, err := ds.Get(ctx, rn.Cid())
if err != nil {
t.Fatal(err)
}
......@@ -449,6 +455,8 @@ func TestProtoNodeResolve(t *testing.T) {
}
func TestCidRetention(t *testing.T) {
ctx := context.Background()
nd := new(ProtoNode)
nd.SetData([]byte("fooooo"))
......@@ -466,13 +474,13 @@ func TestCidRetention(t *testing.T) {
}
bs := dstest.Bserv()
_, err = bs.AddBlock(blk)
err = bs.AddBlock(blk)
if err != nil {
t.Fatal(err)
}
ds := NewDAGService(bs)
out, err := ds.Get(context.Background(), c2)
out, err := ds.Get(ctx, c2)
if err != nil {
t.Fatal(err)
}
......@@ -501,6 +509,8 @@ func TestCidRawDoesnNeedData(t *testing.T) {
}
func TestEnumerateAsyncFailsNotFound(t *testing.T) {
ctx := context.Background()
a := NodeWithData([]byte("foo1"))
b := NodeWithData([]byte("foo2"))
c := NodeWithData([]byte("foo3"))
......@@ -508,7 +518,7 @@ func TestEnumerateAsyncFailsNotFound(t *testing.T) {
ds := dstest.Mock()
for _, n := range []node.Node{a, b, c} {
_, err := ds.Add(n)
err := ds.Add(ctx, n)
if err != nil {
t.Fatal(err)
}
......@@ -531,13 +541,13 @@ func TestEnumerateAsyncFailsNotFound(t *testing.T) {
t.Fatal(err)
}
pcid, err := ds.Add(parent)
err := ds.Add(ctx, parent)
if err != nil {
t.Fatal(err)
}
cset := cid.NewSet()
err = EnumerateChildrenAsync(context.Background(), GetLinksDirect(ds), pcid, cset.Visit)
err = EnumerateChildrenAsync(ctx, GetLinksDirect(ds), parent.Cid(), cset.Visit)
if err == nil {
t.Fatal("this should have failed")
}
......@@ -570,7 +580,9 @@ func testProgressIndicator(t *testing.T, depth int) {
}
}
func mkDag(ds DAGService, depth int) (*cid.Cid, int) {
func mkDag(ds node.DAGService, depth int) (*cid.Cid, int) {
ctx := context.Background()
totalChildren := 0
f := func() *ProtoNode {
p := new(ProtoNode)
......@@ -578,7 +590,7 @@ func mkDag(ds DAGService, depth int) (*cid.Cid, int) {
rand.Read(buf)
p.SetData(buf)
_, err := ds.Add(p)
err := ds.Add(ctx, p)
if err != nil {
panic(err)
}
......@@ -589,7 +601,7 @@ func mkDag(ds DAGService, depth int) (*cid.Cid, int) {
thisf := f
f = func() *ProtoNode {
pn := mkNodeWithChildren(thisf, 10)
_, err := ds.Add(pn)
err := ds.Add(ctx, pn)
if err != nil {
panic(err)
}
......@@ -599,12 +611,12 @@ func mkDag(ds DAGService, depth int) (*cid.Cid, int) {
}
nd := f()
c, err := ds.Add(nd)
err := ds.Add(ctx, nd)
if err != nil {
panic(err)
}
return c, totalChildren
return nd.Cid(), totalChildren
}
func mkNodeWithChildren(getChild func() *ProtoNode, width int) *ProtoNode {
......
......@@ -140,7 +140,7 @@ func (n *ProtoNode) RemoveNodeLink(name string) error {
n.links = good
if !found {
return ErrNotFound
return node.ErrNotFound
}
return nil
......@@ -160,7 +160,7 @@ func (n *ProtoNode) GetNodeLink(name string) (*node.Link, error) {
return nil, ErrLinkNotFound
}
func (n *ProtoNode) GetLinkedProtoNode(ctx context.Context, ds DAGService, name string) (*ProtoNode, error) {
func (n *ProtoNode) GetLinkedProtoNode(ctx context.Context, ds node.DAGService, name string) (*ProtoNode, error) {
nd, err := n.GetLinkedNode(ctx, ds, name)
if err != nil {
return nil, err
......@@ -174,7 +174,7 @@ func (n *ProtoNode) GetLinkedProtoNode(ctx context.Context, ds DAGService, name
return pbnd, nil
}
func (n *ProtoNode) GetLinkedNode(ctx context.Context, ds DAGService, name string) (node.Node, error) {
func (n *ProtoNode) GetLinkedNode(ctx context.Context, ds node.DAGService, name string) (node.Node, error) {
lnk, err := n.GetNodeLink(name)
if err != nil {
return nil, err
......
......@@ -41,7 +41,7 @@ func TestRemoveLink(t *testing.T) {
// should fail
err = nd.RemoveNodeLink("a")
if err != ErrNotFound {
if err != node.ErrNotFound {
t.Fatal("should have failed to remove link")
}
......@@ -60,20 +60,25 @@ func TestRemoveLink(t *testing.T) {
}
func TestFindLink(t *testing.T) {
ctx := context.Background()
ds := mdtest.Mock()
k, err := ds.Add(new(ProtoNode))
ndEmpty := new(ProtoNode)
err := ds.Add(ctx, ndEmpty)
if err != nil {
t.Fatal(err)
}
kEmpty := ndEmpty.Cid()
nd := &ProtoNode{}
nd.SetLinks([]*node.Link{
{Name: "a", Cid: k},
{Name: "c", Cid: k},
{Name: "b", Cid: k},
{Name: "a", Cid: kEmpty},
{Name: "c", Cid: kEmpty},
{Name: "b", Cid: kEmpty},
})
_, err = ds.Add(nd)
err = ds.Add(ctx, nd)
if err != nil {
t.Fatal(err)
}
......@@ -107,7 +112,7 @@ func TestFindLink(t *testing.T) {
t.Fatal(err)
}
if olnk.Cid.String() == k.String() {
if olnk.Cid.String() == kEmpty.String() {
t.Fatal("new link should have different hash")
}
}
......
......@@ -5,11 +5,13 @@ import (
bsrv "github.com/ipfs/go-ipfs/blockservice"
"github.com/ipfs/go-ipfs/exchange/offline"
dag "github.com/ipfs/go-ipfs/merkledag"
ds "gx/ipfs/QmPpegoMqhAEqjncrzArm7KVWAkCm78rqL2DPuNjhPrshg/go-datastore"
dssync "gx/ipfs/QmPpegoMqhAEqjncrzArm7KVWAkCm78rqL2DPuNjhPrshg/go-datastore/sync"
node "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)
func Mock() dag.DAGService {
func Mock() node.DAGService {
return dag.NewDAGService(Bserv())
}
......
......@@ -2,6 +2,7 @@ package traverse
import (
"bytes"
"context"
"fmt"
"testing"
......@@ -350,7 +351,7 @@ func testWalkOutputs(t *testing.T, root node.Node, opts Options, expect []byte)
}
}
func newFan(t *testing.T, ds mdag.DAGService) node.Node {
func newFan(t *testing.T, ds node.DAGService) node.Node {
a := mdag.NodeWithData([]byte("/a"))
addLink(t, ds, a, child(t, ds, a, "aa"))
addLink(t, ds, a, child(t, ds, a, "ab"))
......@@ -359,7 +360,7 @@ func newFan(t *testing.T, ds mdag.DAGService) node.Node {
return a
}
func newLinkedList(t *testing.T, ds mdag.DAGService) node.Node {
func newLinkedList(t *testing.T, ds node.DAGService) node.Node {
a := mdag.NodeWithData([]byte("/a"))
aa := child(t, ds, a, "aa")
aaa := child(t, ds, aa, "aaa")
......@@ -372,7 +373,7 @@ func newLinkedList(t *testing.T, ds mdag.DAGService) node.Node {
return a
}
func newBinaryTree(t *testing.T, ds mdag.DAGService) node.Node {
func newBinaryTree(t *testing.T, ds node.DAGService) node.Node {
a := mdag.NodeWithData([]byte("/a"))
aa := child(t, ds, a, "aa")
ab := child(t, ds, a, "ab")
......@@ -385,7 +386,7 @@ func newBinaryTree(t *testing.T, ds mdag.DAGService) node.Node {
return a
}
func newBinaryDAG(t *testing.T, ds mdag.DAGService) node.Node {
func newBinaryDAG(t *testing.T, ds node.DAGService) node.Node {
a := mdag.NodeWithData([]byte("/a"))
aa := child(t, ds, a, "aa")
aaa := child(t, ds, aa, "aaa")
......@@ -402,9 +403,9 @@ func newBinaryDAG(t *testing.T, ds mdag.DAGService) node.Node {
return a
}
func addLink(t *testing.T, ds mdag.DAGService, a, b node.Node) {
func addLink(t *testing.T, ds node.DAGService, a, b node.Node) {
to := string(a.(*mdag.ProtoNode).Data()) + "2" + string(b.(*mdag.ProtoNode).Data())
if _, err := ds.Add(b); err != nil {
if err := ds.Add(context.Background(), b); err != nil {
t.Error(err)
}
if err := a.(*mdag.ProtoNode).AddNodeLink(to, b.(*mdag.ProtoNode)); err != nil {
......@@ -412,6 +413,6 @@ func addLink(t *testing.T, ds mdag.DAGService, a, b node.Node) {
}
}
func child(t *testing.T, ds mdag.DAGService, a node.Node, name string) node.Node {
func child(t *testing.T, ds node.DAGService, a node.Node, name string) node.Node {
return mdag.NodeWithData([]byte(string(a.(*mdag.ProtoNode).Data()) + "/" + name))
}
......@@ -37,7 +37,7 @@ func (c *Change) String() string {
}
}
func ApplyChange(ctx context.Context, ds dag.DAGService, nd *dag.ProtoNode, cs []*Change) (*dag.ProtoNode, error) {
func ApplyChange(ctx context.Context, ds node.DAGService, nd *dag.ProtoNode, cs []*Change) (*dag.ProtoNode, error) {
e := NewDagEditor(nd, ds)
for _, c := range cs {
switch c.Type {
......@@ -85,11 +85,11 @@ func ApplyChange(ctx context.Context, ds dag.DAGService, nd *dag.ProtoNode, cs [
}
}
return e.Finalize(ds)
return e.Finalize(ctx, ds)
}
// Diff returns a set of changes that transform node 'a' into node 'b'
func Diff(ctx context.Context, ds dag.DAGService, a, b node.Node) ([]*Change, error) {
func Diff(ctx context.Context, ds node.DAGService, a, b node.Node) ([]*Change, error) {
if len(a.Links()) == 0 && len(b.Links()) == 0 {
return []*Change{
&Change{
......
......@@ -136,7 +136,7 @@ func TestDiffEnumBasic(t *testing.T) {
lgds := &getLogger{ds: ds}
for _, nd := range nds {
_, err := ds.Add(nd)
err := ds.Add(ctx, nd)
if err != nil {
t.Fatal(err)
}
......@@ -167,6 +167,22 @@ func (gl *getLogger) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
return nd, nil
}
func (gl *getLogger) GetMany(ctx context.Context, cids []*cid.Cid) <-chan *node.NodeOption {
outCh := make(chan *node.NodeOption, len(cids))
nds := gl.ds.GetMany(ctx, cids)
for no := range nds {
if no.Err == nil {
gl.log = append(gl.log, no.Node.Cid())
}
select {
case outCh <- no:
default:
panic("too many responses")
}
}
return nds
}
func assertCidList(a, b []*cid.Cid) error {
if len(a) != len(b) {
return fmt.Errorf("got different number of cids than expected")
......@@ -188,14 +204,14 @@ func TestDiffEnumFail(t *testing.T) {
lgds := &getLogger{ds: ds}
for _, s := range []string{"a1", "a2", "b", "c"} {
_, err := ds.Add(nds[s])
err := ds.Add(ctx, nds[s])
if err != nil {
t.Fatal(err)
}
}
err := DiffEnumerate(ctx, lgds, nds["a1"].Cid(), nds["a2"].Cid())
if err != dag.ErrNotFound {
if err != node.ErrNotFound {
t.Fatal("expected err not found")
}
......@@ -215,7 +231,7 @@ func TestDiffEnumRecurse(t *testing.T) {
lgds := &getLogger{ds: ds}
for _, s := range []string{"a1", "a2", "b", "c", "d"} {
_, err := ds.Add(nds[s])
err := ds.Add(ctx, nds[s])
if err != nil {
t.Fatal(err)
}
......
......@@ -20,14 +20,14 @@ type Editor struct {
// tmp is a temporary in memory (for now) dagstore for all of the
// intermediary nodes to be stored in
tmp dag.DAGService
tmp node.DAGService
// src is the dagstore with *all* of the data on it, it is used to pull
// nodes from for modification (nil is a valid value)
src dag.DAGService
src node.DAGService
}
func NewMemoryDagService() dag.DAGService {
func NewMemoryDagService() node.DAGService {
// build mem-datastore for editor's intermediary nodes
bs := bstore.NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
bsrv := bserv.New(bs, offline.Exchange(bs))
......@@ -35,7 +35,7 @@ func NewMemoryDagService() dag.DAGService {
}
// root is the node to be modified, source is the dagstore to pull nodes from (optional)
func NewDagEditor(root *dag.ProtoNode, source dag.DAGService) *Editor {
func NewDagEditor(root *dag.ProtoNode, source node.DAGService) *Editor {
return &Editor{
root: root,
tmp: NewMemoryDagService(),
......@@ -47,22 +47,22 @@ func (e *Editor) GetNode() *dag.ProtoNode {
return e.root.Copy().(*dag.ProtoNode)
}
func (e *Editor) GetDagService() dag.DAGService {
func (e *Editor) GetDagService() node.DAGService {
return e.tmp
}
func addLink(ctx context.Context, ds dag.DAGService, root *dag.ProtoNode, childname string, childnd node.Node) (*dag.ProtoNode, error) {
func addLink(ctx context.Context, ds node.DAGService, root *dag.ProtoNode, childname string, childnd node.Node) (*dag.ProtoNode, error) {
if childname == "" {
return nil, errors.New("cannot create link with no name!")
}
// ensure that the node we are adding is in the dagservice
_, err := ds.Add(childnd)
err := ds.Add(ctx, childnd)
if err != nil {
return nil, err
}
_ = ds.Remove(root)
_ = ds.Remove(ctx, root.Cid())
// ensure no link with that name already exists
_ = root.RemoveNodeLink(childname) // ignore error, only option is ErrNotFound
......@@ -71,7 +71,7 @@ func addLink(ctx context.Context, ds dag.DAGService, root *dag.ProtoNode, childn
return nil, err
}
if _, err := ds.Add(root); err != nil {
if err := ds.Add(ctx, root); err != nil {
return nil, err
}
return root, nil
......@@ -98,7 +98,7 @@ func (e *Editor) insertNodeAtPath(ctx context.Context, root *dag.ProtoNode, path
if err == dag.ErrLinkNotFound && create != nil {
nd = create()
err = nil // no longer an error case
} else if err == dag.ErrNotFound {
} else if err == node.ErrNotFound {
// try finding it in our source dagstore
nd, err = root.GetLinkedProtoNode(ctx, e.src, path[0])
}
......@@ -115,7 +115,7 @@ func (e *Editor) insertNodeAtPath(ctx context.Context, root *dag.ProtoNode, path
return nil, err
}
_ = e.tmp.Remove(root)
_ = e.tmp.Remove(ctx, root.Cid())
_ = root.RemoveNodeLink(path[0])
err = root.AddNodeLinkClean(path[0], ndprime)
......@@ -123,7 +123,7 @@ func (e *Editor) insertNodeAtPath(ctx context.Context, root *dag.ProtoNode, path
return nil, err
}
_, err = e.tmp.Add(root)
err = e.tmp.Add(ctx, root)
if err != nil {
return nil, err
}
......@@ -149,7 +149,7 @@ func (e *Editor) rmLink(ctx context.Context, root *dag.ProtoNode, path []string)
return nil, err
}
_, err = e.tmp.Add(root)
err = e.tmp.Add(ctx, root)
if err != nil {
return nil, err
}
......@@ -159,7 +159,7 @@ func (e *Editor) rmLink(ctx context.Context, root *dag.ProtoNode, path []string)
// search for node in both tmp dagstore and source dagstore
nd, err := root.GetLinkedProtoNode(ctx, e.tmp, path[0])
if err == dag.ErrNotFound {
if err == node.ErrNotFound {
nd, err = root.GetLinkedProtoNode(ctx, e.src, path[0])
}
......@@ -172,7 +172,7 @@ func (e *Editor) rmLink(ctx context.Context, root *dag.ProtoNode, path []string)
return nil, err
}
_ = e.tmp.Remove(root)
e.tmp.Remove(ctx, root.Cid())
_ = root.RemoveNodeLink(path[0])
err = root.AddNodeLinkClean(path[0], nnode)
......@@ -180,7 +180,7 @@ func (e *Editor) rmLink(ctx context.Context, root *dag.ProtoNode, path []string)
return nil, err
}
_, err = e.tmp.Add(root)
err = e.tmp.Add(ctx, root)
if err != nil {
return nil, err
}
......@@ -188,22 +188,23 @@ func (e *Editor) rmLink(ctx context.Context, root *dag.ProtoNode, path []string)
return root, nil
}
func (e *Editor) Finalize(ds dag.DAGService) (*dag.ProtoNode, error) {
func (e *Editor) Finalize(ctx context.Context, ds node.DAGService) (*dag.ProtoNode, error) {
nd := e.GetNode()
err := copyDag(nd, e.tmp, ds)
err := copyDag(ctx, nd, e.tmp, ds)
return nd, err
}
func copyDag(nd node.Node, from, to dag.DAGService) error {
_, err := to.Add(nd)
func copyDag(ctx context.Context, nd node.Node, from, to node.DAGService) error {
// TODO(#4609): make this batch.
err := to.Add(ctx, nd)
if err != nil {
return err
}
for _, lnk := range nd.Links() {
child, err := lnk.GetNode(context.Background(), from)
child, err := lnk.GetNode(ctx, from)
if err != nil {
if err == dag.ErrNotFound {
if err == node.ErrNotFound {
// not found means we didnt modify it, and it should
// already be in the target datastore
continue
......@@ -211,7 +212,7 @@ func copyDag(nd node.Node, from, to dag.DAGService) error {
return err
}
err = copyDag(child, from, to)
err = copyDag(ctx, child, from, to)
if err != nil {
return err
}
......
......@@ -9,35 +9,39 @@ import (
path "github.com/ipfs/go-ipfs/path"
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid"
node "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)
func TestAddLink(t *testing.T) {
ctx, context := context.WithCancel(context.Background())
defer context()
ds := mdtest.Mock()
fishnode := dag.NodeWithData([]byte("fishcakes!"))
fk, err := ds.Add(fishnode)
err := ds.Add(ctx, fishnode)
if err != nil {
t.Fatal(err)
}
nd := new(dag.ProtoNode)
nnode, err := addLink(context.Background(), ds, nd, "fish", fishnode)
nnode, err := addLink(ctx, ds, nd, "fish", fishnode)
if err != nil {
t.Fatal(err)
}
fnprime, err := nnode.GetLinkedNode(context.Background(), ds, "fish")
fnprime, err := nnode.GetLinkedNode(ctx, ds, "fish")
if err != nil {
t.Fatal(err)
}
fnpkey := fnprime.Cid()
if !fnpkey.Equals(fk) {
if !fnpkey.Equals(fishnode.Cid()) {
t.Fatal("wrong child node found!")
}
}
func assertNodeAtPath(t *testing.T, ds dag.DAGService, root *dag.ProtoNode, pth string, exp *cid.Cid) {
func assertNodeAtPath(t *testing.T, ds node.DAGService, root *dag.ProtoNode, pth string, exp *cid.Cid) {
parts := path.SplitList(pth)
cur := root
for _, e := range parts {
......@@ -78,7 +82,7 @@ func TestInsertNode(t *testing.T) {
func testInsert(t *testing.T, e *Editor, path, data string, create bool, experr string) {
child := dag.NodeWithData([]byte(data))
ck, err := e.tmp.Add(child)
err := e.tmp.Add(context.Background(), child)
if err != nil {
t.Fatal(err)
}
......@@ -106,5 +110,5 @@ func testInsert(t *testing.T, e *Editor, path, data string, create bool, experr
t.Fatal(err, path, data, create, experr)
}
assertNodeAtPath(t, e.tmp, e.root, path, ck)
assertNodeAtPath(t, e.tmp, e.root, path, child.Cid())
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment