Unverified Commit 9ed588ac authored by Andrew Gillis's avatar Andrew Gillis Committed by GitHub

Avoid loading all pins into memory during migration (#5)

* Converting from IPLD to datastore-based pins no longer requires loading all dag-storage pins (including indirect pins) into memory
* increase test coverage
parent 4c920717
......@@ -141,6 +141,41 @@ func New(dstore ds.Datastore, dserv, internal ipld.DAGService) (*pinner, error)
}, nil
}
// LoadKeys reads the pinned CIDs and sends them on the given channel. This is
// used to read pins without loading them all into memory.
func LoadKeys(ctx context.Context, dstore ds.Datastore, dserv, internal ipld.DAGService, recursive bool, keyChan chan<- cid.Cid) error {
rootKey, err := dstore.Get(pinDatastoreKey)
if err != nil {
if err == ds.ErrNotFound {
return nil
}
return err
}
rootCid, err := cid.Cast(rootKey)
if err != nil {
return err
}
root, err := internal.Get(ctx, rootCid)
if err != nil {
return fmt.Errorf("cannot find pinning root object: %v", err)
}
rootpb, ok := root.(*mdag.ProtoNode)
if !ok {
return mdag.ErrNotProtobuf
}
var linkName string
if recursive {
linkName = linkRecursive
} else {
linkName = linkDirect
}
return loadSetChan(ctx, internal, rootpb, linkName, keyChan)
}
// Pin the given node, optionally recursive
func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
err := p.dserv.Add(ctx, node)
......
......@@ -54,7 +54,8 @@ func assertUnpinned(t *testing.T, p pin.Pinner, c cid.Cid, failmsg string) {
}
func TestPinnerBasic(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
......@@ -62,7 +63,6 @@ func TestPinnerBasic(t *testing.T) {
dserv := mdag.NewDAGService(bserv)
// TODO does pinner need to share datastore with blockservice?
p, err := New(dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
......@@ -165,6 +165,98 @@ func TestPinnerBasic(t *testing.T) {
// Test recursively pinned
assertPinned(t, np, bk, "could not find recursively pinned node")
// Test that LoadKeys returns the expected CIDs.
keyChan := make(chan cid.Cid)
go func() {
err = LoadKeys(ctx, dstore, dserv, dserv, true, keyChan)
close(keyChan)
}()
keys := map[cid.Cid]struct{}{}
for c := range keyChan {
keys[c] = struct{}{}
}
if err != nil {
t.Fatal(err)
}
recKeys, _ := np.RecursiveKeys(ctx)
if len(keys) != len(recKeys) {
t.Fatal("wrong number of recursive keys from LoadKeys")
}
for _, k := range recKeys {
if _, ok := keys[k]; !ok {
t.Fatal("LoadKeys did not return correct recursive keys")
}
}
keyChan = make(chan cid.Cid)
go func() {
err = LoadKeys(ctx, dstore, dserv, dserv, false, keyChan)
close(keyChan)
}()
keys = map[cid.Cid]struct{}{}
for c := range keyChan {
keys[c] = struct{}{}
}
if err != nil {
t.Fatal(err)
}
dirKeys, _ := np.DirectKeys(ctx)
if len(keys) != len(dirKeys) {
t.Fatal("wrong number of direct keys from LoadKeys")
}
for _, k := range dirKeys {
if _, ok := keys[k]; !ok {
t.Fatal("LoadKeys did not return correct direct keys")
}
}
cancel()
emptyDS := dssync.MutexWrap(ds.NewMapDatastore())
// Check key not in datastore
err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil)
if err != nil {
t.Fatal(err)
}
// Check error on bad key
if err = emptyDS.Put(pinDatastoreKey, []byte("bad-cid")); err != nil {
panic(err)
}
if err = emptyDS.Sync(pinDatastoreKey); err != nil {
panic(err)
}
if err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil); err == nil {
t.Fatal("expected error")
}
// Lookup dag that does not exist
noKey, err := cid.Decode("QmYff9iHR1Hz6wufVeJodzXqQm4pkK4QNS9ms8tyPKVWm1")
if err != nil {
panic(err)
}
if err = emptyDS.Put(pinDatastoreKey, noKey.Bytes()); err != nil {
panic(err)
}
if err = emptyDS.Sync(pinDatastoreKey); err != nil {
panic(err)
}
err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil)
if err == nil || err.Error() != "cannot find pinning root object: merkledag: not found" {
t.Fatal("did not get expected error")
}
// Check error when node has no links
if err = emptyDS.Put(pinDatastoreKey, emptyKey.Bytes()); err != nil {
panic(err)
}
if err = emptyDS.Sync(pinDatastoreKey); err != nil {
panic(err)
}
if err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil); err == nil {
t.Fatal("expected error")
}
}
func TestIsPinnedLookup(t *testing.T) {
......
......@@ -219,13 +219,15 @@ func walkItems(ctx context.Context, dag ipld.DAGService, n *merkledag.ProtoNode,
// readHdr guarantees fanout is a safe value
fanout := hdr.GetFanout()
for i, l := range n.Links()[fanout:] {
if err := fn(i, l); err != nil {
if err = fn(i, l); err != nil {
return err
}
}
for _, l := range n.Links()[:fanout] {
c := l.Cid
children(c)
if children != nil {
children(c)
}
if c.Equals(emptyKey) {
continue
}
......@@ -239,7 +241,7 @@ func walkItems(ctx context.Context, dag ipld.DAGService, n *merkledag.ProtoNode,
return merkledag.ErrNotProtobuf
}
if err := walkItems(ctx, dag, stpb, fn, children); err != nil {
if err = walkItems(ctx, dag, stpb, fn, children); err != nil {
return err
}
}
......@@ -277,6 +279,33 @@ func loadSet(ctx context.Context, dag ipld.DAGService, root *merkledag.ProtoNode
return res, nil
}
func loadSetChan(ctx context.Context, dag ipld.DAGService, root *merkledag.ProtoNode, name string, keyChan chan<- cid.Cid) error {
l, err := root.GetNodeLink(name)
if err != nil {
return err
}
n, err := l.GetNode(ctx, dag)
if err != nil {
return err
}
pbn, ok := n.(*merkledag.ProtoNode)
if !ok {
return merkledag.ErrNotProtobuf
}
walk := func(idx int, link *ipld.Link) error {
keyChan <- link.Cid
return nil
}
if err = walkItems(ctx, dag, pbn, walk, nil); err != nil {
return err
}
return nil
}
func getCidListIterator(cids []cid.Cid) itemIterator {
return func() (c cid.Cid, ok bool) {
if len(cids) == 0 {
......
......@@ -7,7 +7,7 @@ import (
"context"
"fmt"
"github.com/ipfs/go-cid"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ipfspinner "github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipfs-pinner/dspinner"
......@@ -24,39 +24,38 @@ import (
func ConvertPinsFromIPLDToDS(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, internal ipld.DAGService) (ipfspinner.Pinner, int, error) {
const ipldPinPath = "/local/pins"
ipldPinner, err := ipldpinner.New(dstore, dserv, internal)
if err != nil {
return nil, 0, err
}
dsPinner, err := dspinner.New(ctx, dstore, dserv)
if err != nil {
return nil, 0, err
}
seen := cid.NewSet()
cids, err := ipldPinner.RecursiveKeys(ctx)
if err != nil {
return nil, 0, err
var convCount int
keyChan := make(chan cid.Cid)
go func() {
err = ipldpinner.LoadKeys(ctx, dstore, dserv, internal, true, keyChan)
close(keyChan)
}()
for key := range keyChan {
dsPinner.PinWithMode(key, ipfspinner.Recursive)
convCount++
}
for i := range cids {
seen.Add(cids[i])
dsPinner.PinWithMode(cids[i], ipfspinner.Recursive)
if err != nil {
return nil, 0, fmt.Errorf("cannot load recursive keys: %s", err)
}
convCount := len(cids)
cids, err = ipldPinner.DirectKeys(ctx)
if err != nil {
return nil, 0, err
keyChan = make(chan cid.Cid)
go func() {
err = ipldpinner.LoadKeys(ctx, dstore, dserv, internal, false, keyChan)
close(keyChan)
}()
for key := range keyChan {
dsPinner.PinWithMode(key, ipfspinner.Direct)
convCount++
}
for i := range cids {
if seen.Has(cids[i]) {
// Pin was already pinned recursively
continue
}
dsPinner.PinWithMode(cids[i], ipfspinner.Direct)
if err != nil {
return nil, 0, fmt.Errorf("cannot load direct keys: %s", err)
}
convCount += len(cids)
err = dsPinner.Flush(ctx)
if err != nil {
......
......@@ -4,6 +4,7 @@ import (
"context"
"errors"
"io"
"strings"
"testing"
bs "github.com/ipfs/go-blockservice"
......@@ -55,7 +56,8 @@ func makeStore() (ds.Datastore, ipld.DAGService) {
}
func TestConversions(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dstore, dserv := makeStore()
dsPinner, err := dspinner.New(ctx, dstore, dserv)
......@@ -151,3 +153,27 @@ func TestConversions(t *testing.T) {
t.Fatal(err)
}
}
func TestConvertLoadError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dstore, dserv := makeStore()
// Point /local/pins to empty node to cause failure loading pins.
pinDatastoreKey := ds.NewKey("/local/pins")
emptyKey, err := cid.Decode("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n")
if err != nil {
panic(err)
}
if err = dstore.Put(pinDatastoreKey, emptyKey.Bytes()); err != nil {
panic(err)
}
if err = dstore.Sync(pinDatastoreKey); err != nil {
panic(err)
}
_, _, err = ConvertPinsFromIPLDToDS(ctx, dstore, dserv, dserv)
if err == nil || !strings.HasPrefix(err.Error(), "cannot load recursive keys") {
t.Fatal("did not get expected error")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment