From 19fbe75ad3037d256affca05f700d4f5b44913c1 Mon Sep 17 00:00:00 2001 From: Steven Allen <steven@stebalien.com> Date: Thu, 25 Jan 2018 12:21:51 -0800 Subject: [PATCH] merkledag: switch to new dag interface Also: * Update the blockstore/blockservice methods to match. * Construct a new temporary offline dag instead of having a GetOfflineLinkService method. License: MIT Signed-off-by: Steven Allen <steven@stebalien.com> --- gc/gc.go | 17 ++++++++++------- pin.go | 26 +++++++++++++++----------- pin_test.go | 36 ++++++++++++++++++++---------------- set.go | 15 ++++++++------- 4 files changed, 53 insertions(+), 41 deletions(-) diff --git a/gc/gc.go b/gc/gc.go index 7a19b69..c3ab3db 100644 --- a/gc/gc.go +++ b/gc/gc.go @@ -6,6 +6,8 @@ import ( "fmt" bstore "github.com/ipfs/go-ipfs/blocks/blockstore" + bserv "github.com/ipfs/go-ipfs/blockservice" + offline "github.com/ipfs/go-ipfs/exchange/offline" dag "github.com/ipfs/go-ipfs/merkledag" pin "github.com/ipfs/go-ipfs/pin" @@ -33,7 +35,7 @@ type Result struct { // The routine then iterates over every block in the blockstore and // deletes any block that is not found in the marked set. // -func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []*cid.Cid) <-chan Result { +func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner, bestEffortRoots []*cid.Cid) <-chan Result { elock := log.EventBegin(ctx, "GC.lockWait") unlocker := bs.GCLock() @@ -41,7 +43,8 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin. elock = log.EventBegin(ctx, "GC.locked") emark := log.EventBegin(ctx, "GC.mark") - ls = ls.GetOfflineLinkService() + bsrv := bserv.New(bs, offline.Exchange(bs)) + ds := dag.NewDAGService(bsrv) output := make(chan Result, 128) @@ -50,7 +53,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin. defer unlocker.Unlock() defer elock.Done() - gcs, err := ColoredSet(ctx, pn, ls, bestEffortRoots, output) + gcs, err := ColoredSet(ctx, pn, ds, bestEffortRoots, output) if err != nil { output <- Result{Error: err} return @@ -125,13 +128,13 @@ func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots // ColoredSet computes the set of nodes in the graph that are pinned by the // pins in the given pinner. -func ColoredSet(ctx context.Context, pn pin.Pinner, ls dag.LinkService, bestEffortRoots []*cid.Cid, output chan<- Result) (*cid.Set, error) { +func ColoredSet(ctx context.Context, pn pin.Pinner, ng node.NodeGetter, bestEffortRoots []*cid.Cid, output chan<- Result) (*cid.Set, error) { // KeySet currently implemented in memory, in the future, may be bloom filter or // disk backed to conserve memory. errors := false gcs := cid.NewSet() getLinks := func(ctx context.Context, cid *cid.Cid) ([]*node.Link, error) { - links, err := ls.GetLinks(ctx, cid) + links, err := node.GetLinks(ctx, ng, cid) if err != nil { errors = true output <- Result{Error: &CannotFetchLinksError{cid, err}} @@ -145,8 +148,8 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ls dag.LinkService, bestEffo } bestEffortGetLinks := func(ctx context.Context, cid *cid.Cid) ([]*node.Link, error) { - links, err := ls.GetLinks(ctx, cid) - if err != nil && err != dag.ErrNotFound { + links, err := node.GetLinks(ctx, ng, cid) + if err != nil && err != node.ErrNotFound { errors = true output <- Result{Error: &CannotFetchLinksError{cid, err}} } diff --git a/pin.go b/pin.go index f148053..9387439 100644 --- a/pin.go +++ b/pin.go @@ -178,13 +178,13 @@ type pinner struct { // Track the keys used for storing the pinning state, so gc does // not delete them. internalPin *cid.Set - dserv mdag.DAGService - internal mdag.DAGService // dagservice used to store internal objects + dserv node.DAGService + internal node.DAGService // dagservice used to store internal objects dstore ds.Datastore } // NewPinner creates a new pinner using the given datastore as a backend -func NewPinner(dstore ds.Datastore, serv, internal mdag.DAGService) Pinner { +func NewPinner(dstore ds.Datastore, serv, internal node.DAGService) Pinner { rcset := cid.NewSet() dirset := cid.NewSet() @@ -203,11 +203,13 @@ func NewPinner(dstore ds.Datastore, serv, internal mdag.DAGService) Pinner { func (p *pinner) Pin(ctx context.Context, node node.Node, recurse bool) error { p.lock.Lock() defer p.lock.Unlock() - c, err := p.dserv.Add(node) + err := p.dserv.Add(ctx, node) if err != nil { return err } + c := node.Cid() + if recurse { if p.recursePin.Has(c) { return nil @@ -356,7 +358,7 @@ func (p *pinner) CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) { // Now walk all recursive pins to check for indirect pins var checkChildren func(*cid.Cid, *cid.Cid) error checkChildren = func(rk, parentKey *cid.Cid) error { - links, err := p.dserv.GetLinks(context.Background(), parentKey) + links, err := node.GetLinks(context.TODO(), p.dserv, parentKey) if err != nil { return err } @@ -425,7 +427,7 @@ func cidSetWithValues(cids []*cid.Cid) *cid.Set { } // LoadPinner loads a pinner and its keysets from the given datastore -func LoadPinner(d ds.Datastore, dserv, internal mdag.DAGService) (Pinner, error) { +func LoadPinner(d ds.Datastore, dserv, internal node.DAGService) (Pinner, error) { p := new(pinner) rootKeyI, err := d.Get(pinDatastoreKey) @@ -550,16 +552,18 @@ func (p *pinner) Flush() error { } // add the empty node, its referenced by the pin sets but never created - _, err := p.internal.Add(new(mdag.ProtoNode)) + err := p.internal.Add(ctx, new(mdag.ProtoNode)) if err != nil { return err } - k, err := p.internal.Add(root) + err = p.internal.Add(ctx, root) if err != nil { return err } + k := root.Cid() + internalset.Add(k) if err := p.dstore.Put(pinDatastoreKey, k.Bytes()); err != nil { return fmt.Errorf("cannot store pin state: %v", err) @@ -593,8 +597,8 @@ func (p *pinner) PinWithMode(c *cid.Cid, mode PinMode) { // hasChild recursively looks for a Cid among the children of a root Cid. // The visit function can be used to shortcut already-visited branches. -func hasChild(ds mdag.LinkService, root *cid.Cid, child *cid.Cid, visit func(*cid.Cid) bool) (bool, error) { - links, err := ds.GetLinks(context.Background(), root) +func hasChild(ng node.NodeGetter, root *cid.Cid, child *cid.Cid, visit func(*cid.Cid) bool) (bool, error) { + links, err := node.GetLinks(context.TODO(), ng, root) if err != nil { return false, err } @@ -604,7 +608,7 @@ func hasChild(ds mdag.LinkService, root *cid.Cid, child *cid.Cid, visit func(*ci return true, nil } if visit(c) { - has, err := hasChild(ds, c, child, visit) + has, err := hasChild(ng, c, child, visit) if err != nil { return false, err } diff --git a/pin_test.go b/pin_test.go index 7980c8d..3652ef8 100644 --- a/pin_test.go +++ b/pin_test.go @@ -59,7 +59,7 @@ func TestPinnerBasic(t *testing.T) { p := NewPinner(dstore, dserv, dserv) a, ak := randNode() - _, err := dserv.Add(a) + err := dserv.Add(ctx, a) if err != nil { t.Fatal(err) } @@ -74,10 +74,11 @@ func TestPinnerBasic(t *testing.T) { // create new node c, to be indirectly pinned through b c, _ := randNode() - ck, err := dserv.Add(c) + err = dserv.Add(ctx, c) if err != nil { t.Fatal(err) } + ck := c.Cid() // Create new node b, to be parent to a and c b, _ := randNode() @@ -91,10 +92,11 @@ func TestPinnerBasic(t *testing.T) { t.Fatal(err) } - _, err = dserv.Add(b) + err = dserv.Add(ctx, b) if err != nil { t.Fatal(err) } + bk := b.Cid() // recursively pin B{A,C} err = p.Pin(ctx, b, true) @@ -104,7 +106,6 @@ func TestPinnerBasic(t *testing.T) { assertPinned(t, p, ck, "child of recursively pinned node not found") - bk := b.Cid() assertPinned(t, p, bk, "Recursively pinned node not found..") d, _ := randNode() @@ -115,11 +116,11 @@ func TestPinnerBasic(t *testing.T) { d.AddNodeLink("e", e) // Must be in dagserv for unpin to work - _, err = dserv.Add(e) + err = dserv.Add(ctx, e) if err != nil { t.Fatal(err) } - _, err = dserv.Add(d) + err = dserv.Add(ctx, d) if err != nil { t.Fatal(err) } @@ -194,13 +195,13 @@ func TestIsPinnedLookup(t *testing.T) { } } - ak, err := dserv.Add(a) + err := dserv.Add(ctx, a) if err != nil { t.Fatal(err) } //t.Logf("a[%d] is %s", i, ak) aNodes[i] = a - aKeys[i] = ak + aKeys[i] = a.Cid() } // Pin A5 recursively @@ -222,20 +223,22 @@ func TestIsPinnedLookup(t *testing.T) { } // Add C - ck, err := dserv.Add(c) + err := dserv.Add(ctx, c) if err != nil { t.Fatal(err) } + ck := c.Cid() //t.Logf("C is %s", ck) // Add C to B and Add B if err := b.AddNodeLink("myotherchild", c); err != nil { t.Fatal(err) } - bk, err := dserv.Add(b) + err = dserv.Add(ctx, b) if err != nil { t.Fatal(err) } + bk := b.Cid() //t.Logf("B is %s", bk) // Pin C recursively @@ -284,7 +287,7 @@ func TestDuplicateSemantics(t *testing.T) { p := NewPinner(dstore, dserv, dserv) a, _ := randNode() - _, err := dserv.Add(a) + err := dserv.Add(ctx, a) if err != nil { t.Fatal(err) } @@ -349,12 +352,12 @@ func TestPinRecursiveFail(t *testing.T) { t.Fatal("should have failed to pin here") } - _, err = dserv.Add(b) + err = dserv.Add(ctx, b) if err != nil { t.Fatal(err) } - _, err = dserv.Add(a) + err = dserv.Add(ctx, a) if err != nil { t.Fatal(err) } @@ -369,6 +372,8 @@ func TestPinRecursiveFail(t *testing.T) { } func TestPinUpdate(t *testing.T) { + ctx := context.Background() + dstore := dssync.MutexWrap(ds.NewMapDatastore()) bstore := blockstore.NewBlockstore(dstore) bserv := bs.New(bstore, offline.Exchange(bstore)) @@ -378,10 +383,9 @@ func TestPinUpdate(t *testing.T) { n1, c1 := randNode() n2, c2 := randNode() - dserv.Add(n1) - dserv.Add(n2) + dserv.Add(ctx, n1) + dserv.Add(ctx, n2) - ctx := context.Background() if err := p.Pin(ctx, n1, true); err != nil { t.Fatal(err) } diff --git a/set.go b/set.go index 6d4bff5..8778df9 100644 --- a/set.go +++ b/set.go @@ -54,7 +54,7 @@ func (s sortByHash) Swap(a, b int) { s.links[a], s.links[b] = s.links[b], s.links[a] } -func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint64, depth uint32, iter itemIterator, internalKeys keyObserver) (*merkledag.ProtoNode, error) { +func storeItems(ctx context.Context, dag node.DAGService, estimatedLen uint64, depth uint32, iter itemIterator, internalKeys keyObserver) (*merkledag.ProtoNode, error) { links := make([]*node.Link, 0, defaultFanout+maxItems) for i := 0; i < defaultFanout; i++ { links = append(links, &node.Link{Cid: emptyKey}) @@ -139,10 +139,11 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint return nil, err } - childKey, err := dag.Add(child) + err = dag.Add(ctx, child) if err != nil { return nil, err } + childKey := child.Cid() internalKeys(childKey) @@ -202,7 +203,7 @@ func writeHdr(n *merkledag.ProtoNode, hdr *pb.Set) error { type walkerFunc func(idx int, link *node.Link) error -func walkItems(ctx context.Context, dag merkledag.DAGService, n *merkledag.ProtoNode, fn walkerFunc, children keyObserver) error { +func walkItems(ctx context.Context, dag node.DAGService, n *merkledag.ProtoNode, fn walkerFunc, children keyObserver) error { hdr, err := readHdr(n) if err != nil { return err @@ -237,7 +238,7 @@ func walkItems(ctx context.Context, dag merkledag.DAGService, n *merkledag.Proto return nil } -func loadSet(ctx context.Context, dag merkledag.DAGService, root *merkledag.ProtoNode, name string, internalKeys keyObserver) ([]*cid.Cid, error) { +func loadSet(ctx context.Context, dag node.DAGService, root *merkledag.ProtoNode, name string, internalKeys keyObserver) ([]*cid.Cid, error) { l, err := root.GetNodeLink(name) if err != nil { return nil, err @@ -280,17 +281,17 @@ func getCidListIterator(cids []*cid.Cid) itemIterator { } } -func storeSet(ctx context.Context, dag merkledag.DAGService, cids []*cid.Cid, internalKeys keyObserver) (*merkledag.ProtoNode, error) { +func storeSet(ctx context.Context, dag node.DAGService, cids []*cid.Cid, internalKeys keyObserver) (*merkledag.ProtoNode, error) { iter := getCidListIterator(cids) n, err := storeItems(ctx, dag, uint64(len(cids)), 0, iter, internalKeys) if err != nil { return nil, err } - c, err := dag.Add(n) + err = dag.Add(ctx, n) if err != nil { return nil, err } - internalKeys(c) + internalKeys(n.Cid()) return n, nil } -- GitLab