Commit 19fbe75a authored by Steven Allen's avatar Steven Allen

merkledag: switch to new dag interface

Also:

* Update the blockstore/blockservice methods to match.
* Construct a new temporary offline dag instead of having a
  GetOfflineLinkService method.

License: MIT
Signed-off-by: default avatarSteven Allen <steven@stebalien.com>
parent b6e449bf
...@@ -6,6 +6,8 @@ import ( ...@@ -6,6 +6,8 @@ import (
"fmt" "fmt"
bstore "github.com/ipfs/go-ipfs/blocks/blockstore" bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
bserv "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
dag "github.com/ipfs/go-ipfs/merkledag" dag "github.com/ipfs/go-ipfs/merkledag"
pin "github.com/ipfs/go-ipfs/pin" pin "github.com/ipfs/go-ipfs/pin"
...@@ -33,7 +35,7 @@ type Result struct { ...@@ -33,7 +35,7 @@ type Result struct {
// The routine then iterates over every block in the blockstore and // The routine then iterates over every block in the blockstore and
// deletes any block that is not found in the marked set. // deletes any block that is not found in the marked set.
// //
func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []*cid.Cid) <-chan Result { func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner, bestEffortRoots []*cid.Cid) <-chan Result {
elock := log.EventBegin(ctx, "GC.lockWait") elock := log.EventBegin(ctx, "GC.lockWait")
unlocker := bs.GCLock() unlocker := bs.GCLock()
...@@ -41,7 +43,8 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin. ...@@ -41,7 +43,8 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.
elock = log.EventBegin(ctx, "GC.locked") elock = log.EventBegin(ctx, "GC.locked")
emark := log.EventBegin(ctx, "GC.mark") emark := log.EventBegin(ctx, "GC.mark")
ls = ls.GetOfflineLinkService() bsrv := bserv.New(bs, offline.Exchange(bs))
ds := dag.NewDAGService(bsrv)
output := make(chan Result, 128) output := make(chan Result, 128)
...@@ -50,7 +53,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin. ...@@ -50,7 +53,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.
defer unlocker.Unlock() defer unlocker.Unlock()
defer elock.Done() defer elock.Done()
gcs, err := ColoredSet(ctx, pn, ls, bestEffortRoots, output) gcs, err := ColoredSet(ctx, pn, ds, bestEffortRoots, output)
if err != nil { if err != nil {
output <- Result{Error: err} output <- Result{Error: err}
return return
...@@ -125,13 +128,13 @@ func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots ...@@ -125,13 +128,13 @@ func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots
// ColoredSet computes the set of nodes in the graph that are pinned by the // ColoredSet computes the set of nodes in the graph that are pinned by the
// pins in the given pinner. // pins in the given pinner.
func ColoredSet(ctx context.Context, pn pin.Pinner, ls dag.LinkService, bestEffortRoots []*cid.Cid, output chan<- Result) (*cid.Set, error) { func ColoredSet(ctx context.Context, pn pin.Pinner, ng node.NodeGetter, bestEffortRoots []*cid.Cid, output chan<- Result) (*cid.Set, error) {
// KeySet currently implemented in memory, in the future, may be bloom filter or // KeySet currently implemented in memory, in the future, may be bloom filter or
// disk backed to conserve memory. // disk backed to conserve memory.
errors := false errors := false
gcs := cid.NewSet() gcs := cid.NewSet()
getLinks := func(ctx context.Context, cid *cid.Cid) ([]*node.Link, error) { getLinks := func(ctx context.Context, cid *cid.Cid) ([]*node.Link, error) {
links, err := ls.GetLinks(ctx, cid) links, err := node.GetLinks(ctx, ng, cid)
if err != nil { if err != nil {
errors = true errors = true
output <- Result{Error: &CannotFetchLinksError{cid, err}} output <- Result{Error: &CannotFetchLinksError{cid, err}}
...@@ -145,8 +148,8 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ls dag.LinkService, bestEffo ...@@ -145,8 +148,8 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ls dag.LinkService, bestEffo
} }
bestEffortGetLinks := func(ctx context.Context, cid *cid.Cid) ([]*node.Link, error) { bestEffortGetLinks := func(ctx context.Context, cid *cid.Cid) ([]*node.Link, error) {
links, err := ls.GetLinks(ctx, cid) links, err := node.GetLinks(ctx, ng, cid)
if err != nil && err != dag.ErrNotFound { if err != nil && err != node.ErrNotFound {
errors = true errors = true
output <- Result{Error: &CannotFetchLinksError{cid, err}} output <- Result{Error: &CannotFetchLinksError{cid, err}}
} }
......
...@@ -178,13 +178,13 @@ type pinner struct { ...@@ -178,13 +178,13 @@ type pinner struct {
// Track the keys used for storing the pinning state, so gc does // Track the keys used for storing the pinning state, so gc does
// not delete them. // not delete them.
internalPin *cid.Set internalPin *cid.Set
dserv mdag.DAGService dserv node.DAGService
internal mdag.DAGService // dagservice used to store internal objects internal node.DAGService // dagservice used to store internal objects
dstore ds.Datastore dstore ds.Datastore
} }
// NewPinner creates a new pinner using the given datastore as a backend // NewPinner creates a new pinner using the given datastore as a backend
func NewPinner(dstore ds.Datastore, serv, internal mdag.DAGService) Pinner { func NewPinner(dstore ds.Datastore, serv, internal node.DAGService) Pinner {
rcset := cid.NewSet() rcset := cid.NewSet()
dirset := cid.NewSet() dirset := cid.NewSet()
...@@ -203,11 +203,13 @@ func NewPinner(dstore ds.Datastore, serv, internal mdag.DAGService) Pinner { ...@@ -203,11 +203,13 @@ func NewPinner(dstore ds.Datastore, serv, internal mdag.DAGService) Pinner {
func (p *pinner) Pin(ctx context.Context, node node.Node, recurse bool) error { func (p *pinner) Pin(ctx context.Context, node node.Node, recurse bool) error {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
c, err := p.dserv.Add(node) err := p.dserv.Add(ctx, node)
if err != nil { if err != nil {
return err return err
} }
c := node.Cid()
if recurse { if recurse {
if p.recursePin.Has(c) { if p.recursePin.Has(c) {
return nil return nil
...@@ -356,7 +358,7 @@ func (p *pinner) CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) { ...@@ -356,7 +358,7 @@ func (p *pinner) CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) {
// Now walk all recursive pins to check for indirect pins // Now walk all recursive pins to check for indirect pins
var checkChildren func(*cid.Cid, *cid.Cid) error var checkChildren func(*cid.Cid, *cid.Cid) error
checkChildren = func(rk, parentKey *cid.Cid) error { checkChildren = func(rk, parentKey *cid.Cid) error {
links, err := p.dserv.GetLinks(context.Background(), parentKey) links, err := node.GetLinks(context.TODO(), p.dserv, parentKey)
if err != nil { if err != nil {
return err return err
} }
...@@ -425,7 +427,7 @@ func cidSetWithValues(cids []*cid.Cid) *cid.Set { ...@@ -425,7 +427,7 @@ func cidSetWithValues(cids []*cid.Cid) *cid.Set {
} }
// LoadPinner loads a pinner and its keysets from the given datastore // LoadPinner loads a pinner and its keysets from the given datastore
func LoadPinner(d ds.Datastore, dserv, internal mdag.DAGService) (Pinner, error) { func LoadPinner(d ds.Datastore, dserv, internal node.DAGService) (Pinner, error) {
p := new(pinner) p := new(pinner)
rootKeyI, err := d.Get(pinDatastoreKey) rootKeyI, err := d.Get(pinDatastoreKey)
...@@ -550,16 +552,18 @@ func (p *pinner) Flush() error { ...@@ -550,16 +552,18 @@ func (p *pinner) Flush() error {
} }
// add the empty node, its referenced by the pin sets but never created // add the empty node, its referenced by the pin sets but never created
_, err := p.internal.Add(new(mdag.ProtoNode)) err := p.internal.Add(ctx, new(mdag.ProtoNode))
if err != nil { if err != nil {
return err return err
} }
k, err := p.internal.Add(root) err = p.internal.Add(ctx, root)
if err != nil { if err != nil {
return err return err
} }
k := root.Cid()
internalset.Add(k) internalset.Add(k)
if err := p.dstore.Put(pinDatastoreKey, k.Bytes()); err != nil { if err := p.dstore.Put(pinDatastoreKey, k.Bytes()); err != nil {
return fmt.Errorf("cannot store pin state: %v", err) return fmt.Errorf("cannot store pin state: %v", err)
...@@ -593,8 +597,8 @@ func (p *pinner) PinWithMode(c *cid.Cid, mode PinMode) { ...@@ -593,8 +597,8 @@ func (p *pinner) PinWithMode(c *cid.Cid, mode PinMode) {
// hasChild recursively looks for a Cid among the children of a root Cid. // hasChild recursively looks for a Cid among the children of a root Cid.
// The visit function can be used to shortcut already-visited branches. // The visit function can be used to shortcut already-visited branches.
func hasChild(ds mdag.LinkService, root *cid.Cid, child *cid.Cid, visit func(*cid.Cid) bool) (bool, error) { func hasChild(ng node.NodeGetter, root *cid.Cid, child *cid.Cid, visit func(*cid.Cid) bool) (bool, error) {
links, err := ds.GetLinks(context.Background(), root) links, err := node.GetLinks(context.TODO(), ng, root)
if err != nil { if err != nil {
return false, err return false, err
} }
...@@ -604,7 +608,7 @@ func hasChild(ds mdag.LinkService, root *cid.Cid, child *cid.Cid, visit func(*ci ...@@ -604,7 +608,7 @@ func hasChild(ds mdag.LinkService, root *cid.Cid, child *cid.Cid, visit func(*ci
return true, nil return true, nil
} }
if visit(c) { if visit(c) {
has, err := hasChild(ds, c, child, visit) has, err := hasChild(ng, c, child, visit)
if err != nil { if err != nil {
return false, err return false, err
} }
......
...@@ -59,7 +59,7 @@ func TestPinnerBasic(t *testing.T) { ...@@ -59,7 +59,7 @@ func TestPinnerBasic(t *testing.T) {
p := NewPinner(dstore, dserv, dserv) p := NewPinner(dstore, dserv, dserv)
a, ak := randNode() a, ak := randNode()
_, err := dserv.Add(a) err := dserv.Add(ctx, a)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -74,10 +74,11 @@ func TestPinnerBasic(t *testing.T) { ...@@ -74,10 +74,11 @@ func TestPinnerBasic(t *testing.T) {
// create new node c, to be indirectly pinned through b // create new node c, to be indirectly pinned through b
c, _ := randNode() c, _ := randNode()
ck, err := dserv.Add(c) err = dserv.Add(ctx, c)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
ck := c.Cid()
// Create new node b, to be parent to a and c // Create new node b, to be parent to a and c
b, _ := randNode() b, _ := randNode()
...@@ -91,10 +92,11 @@ func TestPinnerBasic(t *testing.T) { ...@@ -91,10 +92,11 @@ func TestPinnerBasic(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
_, err = dserv.Add(b) err = dserv.Add(ctx, b)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
bk := b.Cid()
// recursively pin B{A,C} // recursively pin B{A,C}
err = p.Pin(ctx, b, true) err = p.Pin(ctx, b, true)
...@@ -104,7 +106,6 @@ func TestPinnerBasic(t *testing.T) { ...@@ -104,7 +106,6 @@ func TestPinnerBasic(t *testing.T) {
assertPinned(t, p, ck, "child of recursively pinned node not found") assertPinned(t, p, ck, "child of recursively pinned node not found")
bk := b.Cid()
assertPinned(t, p, bk, "Recursively pinned node not found..") assertPinned(t, p, bk, "Recursively pinned node not found..")
d, _ := randNode() d, _ := randNode()
...@@ -115,11 +116,11 @@ func TestPinnerBasic(t *testing.T) { ...@@ -115,11 +116,11 @@ func TestPinnerBasic(t *testing.T) {
d.AddNodeLink("e", e) d.AddNodeLink("e", e)
// Must be in dagserv for unpin to work // Must be in dagserv for unpin to work
_, err = dserv.Add(e) err = dserv.Add(ctx, e)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
_, err = dserv.Add(d) err = dserv.Add(ctx, d)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -194,13 +195,13 @@ func TestIsPinnedLookup(t *testing.T) { ...@@ -194,13 +195,13 @@ func TestIsPinnedLookup(t *testing.T) {
} }
} }
ak, err := dserv.Add(a) err := dserv.Add(ctx, a)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
//t.Logf("a[%d] is %s", i, ak) //t.Logf("a[%d] is %s", i, ak)
aNodes[i] = a aNodes[i] = a
aKeys[i] = ak aKeys[i] = a.Cid()
} }
// Pin A5 recursively // Pin A5 recursively
...@@ -222,20 +223,22 @@ func TestIsPinnedLookup(t *testing.T) { ...@@ -222,20 +223,22 @@ func TestIsPinnedLookup(t *testing.T) {
} }
// Add C // Add C
ck, err := dserv.Add(c) err := dserv.Add(ctx, c)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
ck := c.Cid()
//t.Logf("C is %s", ck) //t.Logf("C is %s", ck)
// Add C to B and Add B // Add C to B and Add B
if err := b.AddNodeLink("myotherchild", c); err != nil { if err := b.AddNodeLink("myotherchild", c); err != nil {
t.Fatal(err) t.Fatal(err)
} }
bk, err := dserv.Add(b) err = dserv.Add(ctx, b)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
bk := b.Cid()
//t.Logf("B is %s", bk) //t.Logf("B is %s", bk)
// Pin C recursively // Pin C recursively
...@@ -284,7 +287,7 @@ func TestDuplicateSemantics(t *testing.T) { ...@@ -284,7 +287,7 @@ func TestDuplicateSemantics(t *testing.T) {
p := NewPinner(dstore, dserv, dserv) p := NewPinner(dstore, dserv, dserv)
a, _ := randNode() a, _ := randNode()
_, err := dserv.Add(a) err := dserv.Add(ctx, a)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -349,12 +352,12 @@ func TestPinRecursiveFail(t *testing.T) { ...@@ -349,12 +352,12 @@ func TestPinRecursiveFail(t *testing.T) {
t.Fatal("should have failed to pin here") t.Fatal("should have failed to pin here")
} }
_, err = dserv.Add(b) err = dserv.Add(ctx, b)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
_, err = dserv.Add(a) err = dserv.Add(ctx, a)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -369,6 +372,8 @@ func TestPinRecursiveFail(t *testing.T) { ...@@ -369,6 +372,8 @@ func TestPinRecursiveFail(t *testing.T) {
} }
func TestPinUpdate(t *testing.T) { func TestPinUpdate(t *testing.T) {
ctx := context.Background()
dstore := dssync.MutexWrap(ds.NewMapDatastore()) dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore) bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore)) bserv := bs.New(bstore, offline.Exchange(bstore))
...@@ -378,10 +383,9 @@ func TestPinUpdate(t *testing.T) { ...@@ -378,10 +383,9 @@ func TestPinUpdate(t *testing.T) {
n1, c1 := randNode() n1, c1 := randNode()
n2, c2 := randNode() n2, c2 := randNode()
dserv.Add(n1) dserv.Add(ctx, n1)
dserv.Add(n2) dserv.Add(ctx, n2)
ctx := context.Background()
if err := p.Pin(ctx, n1, true); err != nil { if err := p.Pin(ctx, n1, true); err != nil {
t.Fatal(err) t.Fatal(err)
} }
......
...@@ -54,7 +54,7 @@ func (s sortByHash) Swap(a, b int) { ...@@ -54,7 +54,7 @@ func (s sortByHash) Swap(a, b int) {
s.links[a], s.links[b] = s.links[b], s.links[a] s.links[a], s.links[b] = s.links[b], s.links[a]
} }
func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint64, depth uint32, iter itemIterator, internalKeys keyObserver) (*merkledag.ProtoNode, error) { func storeItems(ctx context.Context, dag node.DAGService, estimatedLen uint64, depth uint32, iter itemIterator, internalKeys keyObserver) (*merkledag.ProtoNode, error) {
links := make([]*node.Link, 0, defaultFanout+maxItems) links := make([]*node.Link, 0, defaultFanout+maxItems)
for i := 0; i < defaultFanout; i++ { for i := 0; i < defaultFanout; i++ {
links = append(links, &node.Link{Cid: emptyKey}) links = append(links, &node.Link{Cid: emptyKey})
...@@ -139,10 +139,11 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint ...@@ -139,10 +139,11 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint
return nil, err return nil, err
} }
childKey, err := dag.Add(child) err = dag.Add(ctx, child)
if err != nil { if err != nil {
return nil, err return nil, err
} }
childKey := child.Cid()
internalKeys(childKey) internalKeys(childKey)
...@@ -202,7 +203,7 @@ func writeHdr(n *merkledag.ProtoNode, hdr *pb.Set) error { ...@@ -202,7 +203,7 @@ func writeHdr(n *merkledag.ProtoNode, hdr *pb.Set) error {
type walkerFunc func(idx int, link *node.Link) error type walkerFunc func(idx int, link *node.Link) error
func walkItems(ctx context.Context, dag merkledag.DAGService, n *merkledag.ProtoNode, fn walkerFunc, children keyObserver) error { func walkItems(ctx context.Context, dag node.DAGService, n *merkledag.ProtoNode, fn walkerFunc, children keyObserver) error {
hdr, err := readHdr(n) hdr, err := readHdr(n)
if err != nil { if err != nil {
return err return err
...@@ -237,7 +238,7 @@ func walkItems(ctx context.Context, dag merkledag.DAGService, n *merkledag.Proto ...@@ -237,7 +238,7 @@ func walkItems(ctx context.Context, dag merkledag.DAGService, n *merkledag.Proto
return nil return nil
} }
func loadSet(ctx context.Context, dag merkledag.DAGService, root *merkledag.ProtoNode, name string, internalKeys keyObserver) ([]*cid.Cid, error) { func loadSet(ctx context.Context, dag node.DAGService, root *merkledag.ProtoNode, name string, internalKeys keyObserver) ([]*cid.Cid, error) {
l, err := root.GetNodeLink(name) l, err := root.GetNodeLink(name)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -280,17 +281,17 @@ func getCidListIterator(cids []*cid.Cid) itemIterator { ...@@ -280,17 +281,17 @@ func getCidListIterator(cids []*cid.Cid) itemIterator {
} }
} }
func storeSet(ctx context.Context, dag merkledag.DAGService, cids []*cid.Cid, internalKeys keyObserver) (*merkledag.ProtoNode, error) { func storeSet(ctx context.Context, dag node.DAGService, cids []*cid.Cid, internalKeys keyObserver) (*merkledag.ProtoNode, error) {
iter := getCidListIterator(cids) iter := getCidListIterator(cids)
n, err := storeItems(ctx, dag, uint64(len(cids)), 0, iter, internalKeys) n, err := storeItems(ctx, dag, uint64(len(cids)), 0, iter, internalKeys)
if err != nil { if err != nil {
return nil, err return nil, err
} }
c, err := dag.Add(n) err = dag.Add(ctx, n)
if err != nil { if err != nil {
return nil, err return nil, err
} }
internalKeys(c) internalKeys(n.Cid())
return n, nil return n, nil
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment