Unverified Commit 4c920717 authored by Andrew Gillis's avatar Andrew Gillis Committed by GitHub

Datastore based pinner (#4)

feat: store pins in datastore instead of a DAG

Adds a new `/pins` namespace to the given datastore and uses that to store pins as cbor binary, keyed by unique pin ID.

The new datastore pinner stores pins in the datastore as individual key-value items. This is faster than the dag pinner, which stored all pins in a single dag that had to be rewritten every time a pin was added or removed.

The new pinner provides a secondary indexing mechanism that can be used to index any data that a pin has. Secondary indexing logic is provided by the `dsindex` package. The new pinner currently includes indexing by CID.

Both the new datastore pinner (`dspinner` package) and the old dag pinner (`ipldpinner` package) implementations are included to support migration between the two.  Migration logic is provided by the `pinconv` package.

Other features in new pinner:
- Benchmarks are provided to compare performance of between the old and new pinners
- New pinner does not keep in-memory set of pinned CIDs, instead it relies on the datastore
- Separate recursive and direct CID indexes allow searching for pins without having to load pin data to check the mode
- New pinner can rebuild indexes on load, if saved pins appear out of sync with the indexes
parent 9e800d13
*~
*.log
# Test binary, build with `go test -c`
*.test
# Output of the go coverage tool
*.out
package dsindex
import "errors"
var (
ErrEmptyKey = errors.New("key is empty")
ErrEmptyValue = errors.New("value is empty")
)
// Package dsindex provides secondary indexing functionality for a datastore.
package dsindex
import (
"context"
"fmt"
"path"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
"github.com/multiformats/go-multibase"
)
// Indexer maintains a secondary index. An index is a collection of key-value
// mappings where the key is the secondary index that maps to one or more
// values, where each value is a unique key being indexed.
type Indexer interface {
// Add adds the specified value to the key
Add(ctx context.Context, key, value string) error
// Delete deletes the specified value from the key. If the value is not in
// the datastore, this method returns no error.
Delete(ctx context.Context, key, value string) error
// DeleteKey deletes all values in the given key. If a key is not in the
// datastore, this method returns no error. Returns a count of values that
// were deleted.
DeleteKey(ctx context.Context, key string) (count int, err error)
// DeleteAll deletes all keys managed by this Indexer. Returns a count of
// the values that were deleted.
DeleteAll(ctx context.Context) (count int, err error)
// ForEach calls the function for each value in the specified key, until
// there are no more values, or until the function returns false. If key
// is empty string, then all keys are iterated.
ForEach(ctx context.Context, key string, fn func(key, value string) bool) error
// HasValue determines if the key contains the specified value
HasValue(ctx context.Context, key, value string) (bool, error)
// HasAny determines if any value is in the specified key. If key is
// empty string, then all values are searched.
HasAny(ctx context.Context, key string) (bool, error)
// Search returns all values for the given key
Search(ctx context.Context, key string) (values []string, err error)
}
// indexer is a simple implementation of Indexer. This implementation relies
// on the underlying data store to support efficient querying by prefix.
//
// TODO: Consider adding caching
type indexer struct {
dstore ds.Datastore
}
// New creates a new datastore index. All indexes are stored under the
// specified index name.
//
// To persist the actions of calling Indexer functions, it is necessary to call
// dstore.Sync.
func New(dstore ds.Datastore, name ds.Key) Indexer {
return &indexer{
dstore: namespace.Wrap(dstore, name),
}
}
func (x *indexer) Add(ctx context.Context, key, value string) error {
if key == "" {
return ErrEmptyKey
}
if value == "" {
return ErrEmptyValue
}
dsKey := ds.NewKey(encode(key)).ChildString(encode(value))
return x.dstore.Put(dsKey, []byte{})
}
func (x *indexer) Delete(ctx context.Context, key, value string) error {
if key == "" {
return ErrEmptyKey
}
if value == "" {
return ErrEmptyValue
}
return x.dstore.Delete(ds.NewKey(encode(key)).ChildString(encode(value)))
}
func (x *indexer) DeleteKey(ctx context.Context, key string) (int, error) {
if key == "" {
return 0, ErrEmptyKey
}
return x.deletePrefix(ctx, encode(key))
}
func (x *indexer) DeleteAll(ctx context.Context) (int, error) {
return x.deletePrefix(ctx, "")
}
func (x *indexer) ForEach(ctx context.Context, key string, fn func(key, value string) bool) error {
if key != "" {
key = encode(key)
}
q := query.Query{
Prefix: key,
KeysOnly: true,
}
results, err := x.dstore.Query(q)
if err != nil {
return err
}
for {
r, ok := results.NextSync()
if !ok {
break
}
if r.Error != nil {
err = r.Error
break
}
if ctx.Err() != nil {
err = ctx.Err()
break
}
ent := r.Entry
decIdx, err := decode(path.Base(path.Dir(ent.Key)))
if err != nil {
err = fmt.Errorf("cannot decode index: %v", err)
break
}
decKey, err := decode(path.Base(ent.Key))
if err != nil {
err = fmt.Errorf("cannot decode key: %v", err)
break
}
if !fn(decIdx, decKey) {
break
}
}
results.Close()
return err
}
func (x *indexer) HasValue(ctx context.Context, key, value string) (bool, error) {
if key == "" {
return false, ErrEmptyKey
}
if value == "" {
return false, ErrEmptyValue
}
return x.dstore.Has(ds.NewKey(encode(key)).ChildString(encode(value)))
}
func (x *indexer) HasAny(ctx context.Context, key string) (bool, error) {
var any bool
err := x.ForEach(ctx, key, func(key, value string) bool {
any = true
return false
})
return any, err
}
func (x *indexer) Search(ctx context.Context, key string) ([]string, error) {
if key == "" {
return nil, ErrEmptyKey
}
ents, err := x.queryPrefix(ctx, encode(key))
if err != nil {
return nil, err
}
if len(ents) == 0 {
return nil, nil
}
values := make([]string, len(ents))
for i := range ents {
values[i], err = decode(path.Base(ents[i].Key))
if err != nil {
return nil, fmt.Errorf("cannot decode value: %v", err)
}
}
return values, nil
}
// SyncIndex synchronizes the keys in the target Indexer to match those of the
// ref Indexer. This function does not change this indexer's key root (name
// passed into New).
func SyncIndex(ctx context.Context, ref, target Indexer) (bool, error) {
// Build reference index map
refs := map[string]string{}
err := ref.ForEach(ctx, "", func(key, value string) bool {
refs[value] = key
return true
})
if err != nil {
return false, err
}
if len(refs) == 0 {
return false, nil
}
// Compare current indexes
dels := map[string]string{}
err = target.ForEach(ctx, "", func(key, value string) bool {
refKey, ok := refs[value]
if ok && refKey == key {
// same in both; delete from refs, do not add to dels
delete(refs, value)
} else {
dels[value] = key
}
return true
})
if err != nil {
return false, err
}
// Items in dels are keys that no longer exist
for value, key := range dels {
err = target.Delete(ctx, key, value)
if err != nil {
return false, err
}
}
// What remains in refs are keys that need to be added
for value, key := range refs {
err = target.Add(ctx, key, value)
if err != nil {
return false, err
}
}
return len(refs) != 0 || len(dels) != 0, nil
}
func (x *indexer) deletePrefix(ctx context.Context, prefix string) (int, error) {
ents, err := x.queryPrefix(ctx, prefix)
if err != nil {
return 0, err
}
for i := range ents {
err = x.dstore.Delete(ds.NewKey(ents[i].Key))
if err != nil {
return 0, err
}
}
return len(ents), nil
}
func (x *indexer) queryPrefix(ctx context.Context, prefix string) ([]query.Entry, error) {
q := query.Query{
Prefix: prefix,
KeysOnly: true,
}
results, err := x.dstore.Query(q)
if err != nil {
return nil, err
}
return results.Rest()
}
func encode(data string) string {
encData, err := multibase.Encode(multibase.Base64url, []byte(data))
if err != nil {
// programming error; using unsupported encoding
panic(err.Error())
}
return encData
}
func decode(data string) (string, error) {
_, b, err := multibase.Decode(data)
if err != nil {
return "", err
}
return string(b), nil
}
package dsindex
import (
"context"
"testing"
ds "github.com/ipfs/go-datastore"
)
func createIndexer() Indexer {
dstore := ds.NewMapDatastore()
nameIndex := New(dstore, ds.NewKey("/data/nameindex"))
ctx := context.Background()
nameIndex.Add(ctx, "alice", "a1")
nameIndex.Add(ctx, "bob", "b1")
nameIndex.Add(ctx, "bob", "b2")
nameIndex.Add(ctx, "cathy", "c1")
return nameIndex
}
func TestAdd(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nameIndex := createIndexer()
err := nameIndex.Add(ctx, "someone", "s1")
if err != nil {
t.Fatal(err)
}
err = nameIndex.Add(ctx, "someone", "s1")
if err != nil {
t.Fatal(err)
}
err = nameIndex.Add(ctx, "", "noindex")
if err != ErrEmptyKey {
t.Fatal("unexpected error:", err)
}
err = nameIndex.Add(ctx, "nokey", "")
if err != ErrEmptyValue {
t.Fatal("unexpected error:", err)
}
}
func TestHasValue(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nameIndex := createIndexer()
ok, err := nameIndex.HasValue(ctx, "bob", "b1")
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("missing index")
}
ok, err = nameIndex.HasValue(ctx, "bob", "b3")
if err != nil {
t.Fatal(err)
}
if ok {
t.Fatal("should not have index")
}
_, err = nameIndex.HasValue(ctx, "", "b1")
if err != ErrEmptyKey {
t.Fatal("unexpected error:", err)
}
_, err = nameIndex.HasValue(ctx, "bob", "")
if err != ErrEmptyValue {
t.Fatal("unexpected error:", err)
}
}
func TestHasAny(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nameIndex := createIndexer()
ok, err := nameIndex.HasAny(ctx, "nothere")
if err != nil {
t.Fatal(err)
}
if ok {
t.Fatal("should return false")
}
for _, idx := range []string{"alice", "bob", ""} {
ok, err = nameIndex.HasAny(ctx, idx)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("missing index", idx)
}
}
count, err := nameIndex.DeleteAll(ctx)
if err != nil {
t.Fatal(err)
}
if count != 4 {
t.Fatal("expected 4 deletions")
}
ok, err = nameIndex.HasAny(ctx, "")
if err != nil {
t.Fatal(err)
}
if ok {
t.Fatal("should return false")
}
}
func TestForEach(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nameIndex := createIndexer()
found := make(map[string]struct{})
err := nameIndex.ForEach(ctx, "bob", func(key, value string) bool {
found[value] = struct{}{}
return true
})
if err != nil {
t.Fatal(err)
}
for _, value := range []string{"b1", "b2"} {
_, ok := found[value]
if !ok {
t.Fatal("missing key for value", value)
}
}
values := map[string]string{}
err = nameIndex.ForEach(ctx, "", func(key, value string) bool {
values[value] = key
return true
})
if err != nil {
t.Fatal(err)
}
if len(values) != 4 {
t.Fatal("expected 4 keys")
}
if values["a1"] != "alice" {
t.Error("expected a1: alice")
}
if values["b1"] != "bob" {
t.Error("expected b1: bob")
}
if values["b2"] != "bob" {
t.Error("expected b2: bob")
}
if values["c1"] != "cathy" {
t.Error("expected c1: cathy")
}
}
func TestSearch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nameIndex := createIndexer()
ids, err := nameIndex.Search(ctx, "bob")
if err != nil {
t.Fatal(err)
}
if len(ids) != 2 {
t.Fatal("wrong number of ids - expected 2 got", ids)
}
for _, id := range ids {
if id != "b1" && id != "b2" {
t.Fatal("wrong value in id set")
}
}
if ids[0] == ids[1] {
t.Fatal("duplicate id")
}
ids, err = nameIndex.Search(ctx, "cathy")
if err != nil {
t.Fatal(err)
}
if len(ids) != 1 || ids[0] != "c1" {
t.Fatal("wrong ids")
}
ids, err = nameIndex.Search(ctx, "amit")
if err != nil {
t.Fatal(err)
}
if len(ids) != 0 {
t.Fatal("unexpected ids returned")
}
}
func TestDelete(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nameIndex := createIndexer()
err := nameIndex.Delete(ctx, "bob", "b3")
if err != nil {
t.Fatal(err)
}
err = nameIndex.Delete(ctx, "alice", "a1")
if err != nil {
t.Fatal(err)
}
ok, err := nameIndex.HasValue(ctx, "alice", "a1")
if err != nil {
t.Fatal(err)
}
if ok {
t.Fatal("index key should have been deleted")
}
count, err := nameIndex.DeleteKey(ctx, "bob")
if err != nil {
t.Fatal(err)
}
if count != 2 {
t.Fatal("wrong deleted count")
}
ok, _ = nameIndex.HasValue(ctx, "bob", "b1")
if ok {
t.Fatal("index not deleted")
}
}
func TestSyncIndex(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nameIndex := createIndexer()
dstore := ds.NewMapDatastore()
refIndex := New(dstore, ds.NewKey("/ref"))
refIndex.Add(ctx, "alice", "a1")
refIndex.Add(ctx, "cathy", "zz")
refIndex.Add(ctx, "dennis", "d1")
changed, err := SyncIndex(ctx, refIndex, nameIndex)
if err != nil {
t.Fatal(err)
}
if !changed {
t.Error("change was not indicated")
}
// Create map of id->index in sync target
syncs := map[string]string{}
err = nameIndex.ForEach(ctx, "", func(key, value string) bool {
syncs[value] = key
return true
})
if err != nil {
t.Fatal(err)
}
// Iterate items in sync source and make sure they appear in target
var itemCount int
err = refIndex.ForEach(ctx, "", func(key, value string) bool {
itemCount++
syncKey, ok := syncs[value]
if !ok || key != syncKey {
t.Fatal("key", key, "-->", value, "was not synced")
}
return true
})
if err != nil {
t.Fatal(err)
}
if itemCount != len(syncs) {
t.Fatal("different number of items in sync source and target")
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
// Package ipldpinner implements structures and methods to keep track of
// which objects a user wants to keep stored locally. This implementation
// stores pin information in a mdag structure.
package ipldpinner
import (
"context"
"fmt"
"os"
"sync"
"time"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
mdag "github.com/ipfs/go-merkledag"
"github.com/ipfs/go-merkledag/dagutils"
ipfspinner "github.com/ipfs/go-ipfs-pinner"
)
const loadTimeout = 5 * time.Second
var log = logging.Logger("pin")
var pinDatastoreKey = ds.NewKey("/local/pins")
var emptyKey cid.Cid
var linkDirect, linkRecursive, linkInternal string
func init() {
e, err := cid.Decode("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n")
if err != nil {
log.Error("failed to decode empty key constant")
os.Exit(1)
}
emptyKey = e
directStr, ok := ipfspinner.ModeToString(ipfspinner.Direct)
if !ok {
panic("could not find Direct pin enum")
}
linkDirect = directStr
recursiveStr, ok := ipfspinner.ModeToString(ipfspinner.Recursive)
if !ok {
panic("could not find Recursive pin enum")
}
linkRecursive = recursiveStr
internalStr, ok := ipfspinner.ModeToString(ipfspinner.Internal)
if !ok {
panic("could not find Internal pin enum")
}
linkInternal = internalStr
}
// pinner implements the Pinner interface
type pinner struct {
lock sync.RWMutex
recursePin *cid.Set
directPin *cid.Set
// Track the keys used for storing the pinning state, so gc does
// not delete them.
internalPin *cid.Set
dserv ipld.DAGService
internal ipld.DAGService // dagservice used to store internal objects
dstore ds.Datastore
}
var _ ipfspinner.Pinner = (*pinner)(nil)
type syncDAGService interface {
ipld.DAGService
Sync() error
}
// New creates a new pinner using the given datastore as a backend, and loads
// the pinner's keysets from the datastore
func New(dstore ds.Datastore, dserv, internal ipld.DAGService) (*pinner, error) {
rootKey, err := dstore.Get(pinDatastoreKey)
if err != nil {
if err == ds.ErrNotFound {
return &pinner{
recursePin: cid.NewSet(),
directPin: cid.NewSet(),
internalPin: cid.NewSet(),
dserv: dserv,
internal: internal,
dstore: dstore,
}, nil
}
return nil, err
}
rootCid, err := cid.Cast(rootKey)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.TODO(), loadTimeout)
defer cancel()
root, err := internal.Get(ctx, rootCid)
if err != nil {
return nil, fmt.Errorf("cannot find pinning root object: %v", err)
}
rootpb, ok := root.(*mdag.ProtoNode)
if !ok {
return nil, mdag.ErrNotProtobuf
}
internalset := cid.NewSet()
internalset.Add(rootCid)
recordInternal := internalset.Add
// load recursive set
recurseKeys, err := loadSet(ctx, internal, rootpb, linkRecursive, recordInternal)
if err != nil {
return nil, fmt.Errorf("cannot load recursive pins: %v", err)
}
// load direct set
directKeys, err := loadSet(ctx, internal, rootpb, linkDirect, recordInternal)
if err != nil {
return nil, fmt.Errorf("cannot load direct pins: %v", err)
}
return &pinner{
// assign pinsets
recursePin: cidSetWithValues(recurseKeys),
directPin: cidSetWithValues(directKeys),
internalPin: internalset,
// assign services
dserv: dserv,
dstore: dstore,
internal: internal,
}, nil
}
// Pin the given node, optionally recursive
func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
err := p.dserv.Add(ctx, node)
if err != nil {
return err
}
c := node.Cid()
p.lock.Lock()
defer p.lock.Unlock()
if recurse {
if p.recursePin.Has(c) {
return nil
}
p.lock.Unlock()
// temporary unlock to fetch the entire graph
err := mdag.FetchGraph(ctx, c, p.dserv)
p.lock.Lock()
if err != nil {
return err
}
if p.recursePin.Has(c) {
return nil
}
if p.directPin.Has(c) {
p.directPin.Remove(c)
}
p.recursePin.Add(c)
} else {
if p.recursePin.Has(c) {
return fmt.Errorf("%s already pinned recursively", c.String())
}
p.directPin.Add(c)
}
return nil
}
// ErrNotPinned is returned when trying to unpin items which are not pinned.
var ErrNotPinned = fmt.Errorf("not pinned or pinned indirectly")
// Unpin a given key
func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.recursePin.Has(c) {
if !recursive {
return fmt.Errorf("%s is pinned recursively", c)
}
p.recursePin.Remove(c)
return nil
}
if p.directPin.Has(c) {
p.directPin.Remove(c)
return nil
}
return ErrNotPinned
}
func (p *pinner) isInternalPin(c cid.Cid) bool {
return p.internalPin.Has(c)
}
// IsPinned returns whether or not the given key is pinned
// and an explanation of why its pinned
func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.isPinnedWithType(ctx, c, ipfspinner.Any)
}
// IsPinnedWithType returns whether or not the given cid is pinned with the
// given pin type, as well as returning the type of pin its pinned with.
func (p *pinner) IsPinnedWithType(ctx context.Context, c cid.Cid, mode ipfspinner.Mode) (string, bool, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.isPinnedWithType(ctx, c, mode)
}
// isPinnedWithType is the implementation of IsPinnedWithType that does not lock.
// intended for use by other pinned methods that already take locks
func (p *pinner) isPinnedWithType(ctx context.Context, c cid.Cid, mode ipfspinner.Mode) (string, bool, error) {
switch mode {
case ipfspinner.Any, ipfspinner.Direct, ipfspinner.Indirect, ipfspinner.Recursive, ipfspinner.Internal:
default:
err := fmt.Errorf("invalid Pin Mode '%d', must be one of {%d, %d, %d, %d, %d}",
mode, ipfspinner.Direct, ipfspinner.Indirect, ipfspinner.Recursive, ipfspinner.Internal, ipfspinner.Any)
return "", false, err
}
if (mode == ipfspinner.Recursive || mode == ipfspinner.Any) && p.recursePin.Has(c) {
return linkRecursive, true, nil
}
if mode == ipfspinner.Recursive {
return "", false, nil
}
if (mode == ipfspinner.Direct || mode == ipfspinner.Any) && p.directPin.Has(c) {
return linkDirect, true, nil
}
if mode == ipfspinner.Direct {
return "", false, nil
}
if (mode == ipfspinner.Internal || mode == ipfspinner.Any) && p.isInternalPin(c) {
return linkInternal, true, nil
}
if mode == ipfspinner.Internal {
return "", false, nil
}
// Default is Indirect
visitedSet := cid.NewSet()
for _, rc := range p.recursePin.Keys() {
has, err := hasChild(ctx, p.dserv, rc, c, visitedSet.Visit)
if err != nil {
return "", false, err
}
if has {
return rc.String(), true, nil
}
}
return "", false, nil
}
// CheckIfPinned Checks if a set of keys are pinned, more efficient than
// calling IsPinned for each key, returns the pinned status of cid(s)
func (p *pinner) CheckIfPinned(ctx context.Context, cids ...cid.Cid) ([]ipfspinner.Pinned, error) {
p.lock.RLock()
defer p.lock.RUnlock()
pinned := make([]ipfspinner.Pinned, 0, len(cids))
toCheck := cid.NewSet()
// First check for non-Indirect pins directly
for _, c := range cids {
if p.recursePin.Has(c) {
pinned = append(pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.Recursive})
} else if p.directPin.Has(c) {
pinned = append(pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.Direct})
} else if p.isInternalPin(c) {
pinned = append(pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.Internal})
} else {
toCheck.Add(c)
}
}
// Now walk all recursive pins to check for indirect pins
var checkChildren func(cid.Cid, cid.Cid) error
checkChildren = func(rk, parentKey cid.Cid) error {
links, err := ipld.GetLinks(ctx, p.dserv, parentKey)
if err != nil {
return err
}
for _, lnk := range links {
c := lnk.Cid
if toCheck.Has(c) {
pinned = append(pinned,
ipfspinner.Pinned{Key: c, Mode: ipfspinner.Indirect, Via: rk})
toCheck.Remove(c)
}
err := checkChildren(rk, c)
if err != nil {
return err
}
if toCheck.Len() == 0 {
return nil
}
}
return nil
}
for _, rk := range p.recursePin.Keys() {
err := checkChildren(rk, rk)
if err != nil {
return nil, err
}
if toCheck.Len() == 0 {
break
}
}
// Anything left in toCheck is not pinned
for _, k := range toCheck.Keys() {
pinned = append(pinned, ipfspinner.Pinned{Key: k, Mode: ipfspinner.NotPinned})
}
return pinned, nil
}
// RemovePinWithMode is for manually editing the pin structure.
// Use with care! If used improperly, garbage collection may not
// be successful.
func (p *pinner) RemovePinWithMode(c cid.Cid, mode ipfspinner.Mode) {
p.lock.Lock()
defer p.lock.Unlock()
switch mode {
case ipfspinner.Direct:
p.directPin.Remove(c)
case ipfspinner.Recursive:
p.recursePin.Remove(c)
default:
// programmer error, panic OK
panic("unrecognized pin type")
}
}
func cidSetWithValues(cids []cid.Cid) *cid.Set {
out := cid.NewSet()
for _, c := range cids {
out.Add(c)
}
return out
}
// DirectKeys returns a slice containing the directly pinned keys
func (p *pinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.directPin.Keys(), nil
}
// RecursiveKeys returns a slice containing the recursively pinned keys
func (p *pinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.recursePin.Keys(), nil
}
// Update updates a recursive pin from one cid to another
// this is more efficient than simply pinning the new one and unpinning the
// old one
func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error {
if from == to {
// Nothing to do. Don't remove this check or we'll end up
// _removing_ the pin.
//
// See #6648
return nil
}
p.lock.Lock()
defer p.lock.Unlock()
if !p.recursePin.Has(from) {
return fmt.Errorf("'from' cid was not recursively pinned already")
}
// Temporarily unlock while we fetch the differences.
p.lock.Unlock()
err := dagutils.DiffEnumerate(ctx, p.dserv, from, to)
p.lock.Lock()
if err != nil {
return err
}
p.recursePin.Add(to)
if unpin {
p.recursePin.Remove(from)
}
return nil
}
// Flush encodes and writes pinner keysets to the datastore
func (p *pinner) Flush(ctx context.Context) error {
p.lock.Lock()
defer p.lock.Unlock()
internalset := cid.NewSet()
recordInternal := internalset.Add
root := &mdag.ProtoNode{}
{
n, err := storeSet(ctx, p.internal, p.directPin.Keys(), recordInternal)
if err != nil {
return err
}
if err := root.AddNodeLink(linkDirect, n); err != nil {
return err
}
}
{
n, err := storeSet(ctx, p.internal, p.recursePin.Keys(), recordInternal)
if err != nil {
return err
}
if err := root.AddNodeLink(linkRecursive, n); err != nil {
return err
}
}
// add the empty node, its referenced by the pin sets but never created
err := p.internal.Add(ctx, new(mdag.ProtoNode))
if err != nil {
return err
}
err = p.internal.Add(ctx, root)
if err != nil {
return err
}
k := root.Cid()
internalset.Add(k)
if syncDServ, ok := p.dserv.(syncDAGService); ok {
if err := syncDServ.Sync(); err != nil {
return fmt.Errorf("cannot sync pinned data: %v", err)
}
}
if syncInternal, ok := p.internal.(syncDAGService); ok {
if err := syncInternal.Sync(); err != nil {
return fmt.Errorf("cannot sync pinning data: %v", err)
}
}
if err := p.dstore.Put(pinDatastoreKey, k.Bytes()); err != nil {
return fmt.Errorf("cannot store pin state: %v", err)
}
if err := p.dstore.Sync(pinDatastoreKey); err != nil {
return fmt.Errorf("cannot sync pin state: %v", err)
}
p.internalPin = internalset
return nil
}
// InternalPins returns all cids kept pinned for the internal state of the
// pinner
func (p *pinner) InternalPins(ctx context.Context) ([]cid.Cid, error) {
p.lock.Lock()
defer p.lock.Unlock()
return p.internalPin.Keys(), nil
}
// PinWithMode allows the user to have fine grained control over pin
// counts
func (p *pinner) PinWithMode(c cid.Cid, mode ipfspinner.Mode) {
p.lock.Lock()
defer p.lock.Unlock()
switch mode {
case ipfspinner.Recursive:
p.recursePin.Add(c)
case ipfspinner.Direct:
p.directPin.Add(c)
}
}
// hasChild recursively looks for a Cid among the children of a root Cid.
// The visit function can be used to shortcut already-visited branches.
func hasChild(ctx context.Context, ng ipld.NodeGetter, root cid.Cid, child cid.Cid, visit func(cid.Cid) bool) (bool, error) {
links, err := ipld.GetLinks(ctx, ng, root)
if err != nil {
return false, err
}
for _, lnk := range links {
c := lnk.Cid
if lnk.Cid.Equals(child) {
return true, nil
}
if visit(c) {
has, err := hasChild(ctx, ng, c, child, visit)
if err != nil {
return false, err
}
if has {
return has, nil
}
}
}
return false, nil
}
package pin
package ipldpinner
import (
"context"
......@@ -14,6 +14,7 @@ import (
dssync "github.com/ipfs/go-datastore/sync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
pin "github.com/ipfs/go-ipfs-pinner"
util "github.com/ipfs/go-ipfs-util"
)
......@@ -30,7 +31,7 @@ func randNode() (*mdag.ProtoNode, cid.Cid) {
return nd, k
}
func assertPinned(t *testing.T, p Pinner, c cid.Cid, failmsg string) {
func assertPinned(t *testing.T, p pin.Pinner, c cid.Cid, failmsg string) {
_, pinned, err := p.IsPinned(context.Background(), c)
if err != nil {
t.Fatal(err)
......@@ -41,7 +42,7 @@ func assertPinned(t *testing.T, p Pinner, c cid.Cid, failmsg string) {
}
}
func assertUnpinned(t *testing.T, p Pinner, c cid.Cid, failmsg string) {
func assertUnpinned(t *testing.T, p pin.Pinner, c cid.Cid, failmsg string) {
_, pinned, err := p.IsPinned(context.Background(), c)
if err != nil {
t.Fatal(err)
......@@ -62,10 +63,13 @@ func TestPinnerBasic(t *testing.T) {
dserv := mdag.NewDAGService(bserv)
// TODO does pinner need to share datastore with blockservice?
p := NewPinner(dstore, dserv, dserv)
p, err := New(dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
}
a, ak := randNode()
err := dserv.Add(ctx, a)
err = dserv.Add(ctx, a)
if err != nil {
t.Fatal(err)
}
......@@ -151,7 +155,7 @@ func TestPinnerBasic(t *testing.T) {
t.Fatal(err)
}
np, err := LoadPinner(dstore, dserv, dserv)
np, err := New(dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
}
......@@ -188,7 +192,10 @@ func TestIsPinnedLookup(t *testing.T) {
dserv := mdag.NewDAGService(bserv)
// TODO does pinner need to share datastore with blockservice?
p := NewPinner(dstore, dserv, dserv)
p, err := New(dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
}
aNodes := make([]*mdag.ProtoNode, aBranchLen)
aKeys := make([]cid.Cid, aBranchLen)
......@@ -229,7 +236,7 @@ func TestIsPinnedLookup(t *testing.T) {
}
// Add C
err := dserv.Add(ctx, c)
err = dserv.Add(ctx, c)
if err != nil {
t.Fatal(err)
}
......@@ -289,11 +296,13 @@ func TestDuplicateSemantics(t *testing.T) {
dserv := mdag.NewDAGService(bserv)
// TODO does pinner need to share datastore with blockservice?
p := NewPinner(dstore, dserv, dserv)
p, err := New(dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
}
a, _ := randNode()
err := dserv.Add(ctx, a)
err = dserv.Add(ctx, a)
if err != nil {
t.Fatal(err)
}
......@@ -323,10 +332,13 @@ func TestFlush(t *testing.T) {
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
p := NewPinner(dstore, dserv, dserv)
p, err := New(dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
}
_, k := randNode()
p.PinWithMode(k, Recursive)
p.PinWithMode(k, pin.Recursive)
if err := p.Flush(context.Background()); err != nil {
t.Fatal(err)
}
......@@ -340,11 +352,14 @@ func TestPinRecursiveFail(t *testing.T) {
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
p := NewPinner(dstore, dserv, dserv)
p, err := New(dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
}
a, _ := randNode()
b, _ := randNode()
err := a.AddNodeLink("child", b)
err = a.AddNodeLink("child", b)
if err != nil {
t.Fatal(err)
}
......@@ -385,7 +400,10 @@ func TestPinUpdate(t *testing.T) {
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
p := NewPinner(dstore, dserv, dserv)
p, err := New(dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
}
n1, c1 := randNode()
n2, c2 := randNode()
......
package pin
package ipldpinner
import (
"bytes"
......@@ -55,9 +55,14 @@ func (s sortByHash) Swap(a, b int) {
}
func storeItems(ctx context.Context, dag ipld.DAGService, estimatedLen uint64, depth uint32, iter itemIterator, internalKeys keyObserver) (*merkledag.ProtoNode, error) {
links := make([]*ipld.Link, 0, defaultFanout+maxItems)
// Each node wastes up to defaultFanout in empty links.
var leafLinks uint64
if estimatedLen < maxItems {
leafLinks = estimatedLen
}
links := make([]*ipld.Link, defaultFanout, defaultFanout+leafLinks)
for i := 0; i < defaultFanout; i++ {
links = append(links, &ipld.Link{Cid: emptyKey})
links[i] = &ipld.Link{Cid: emptyKey}
}
// add emptyKey to our set of internal pinset objects
......@@ -97,7 +102,7 @@ func storeItems(ctx context.Context, dag ipld.DAGService, estimatedLen uint64, d
sort.Stable(s)
}
hashed := make([][]cid.Cid, defaultFanout)
var hashed [][]cid.Cid
for {
// This loop essentially enumerates every single item in the set
// and maps them all into a set of buckets. Each bucket will be recursively
......@@ -116,6 +121,9 @@ func storeItems(ctx context.Context, dag ipld.DAGService, estimatedLen uint64, d
if !ok {
break
}
if hashed == nil {
hashed = make([][]cid.Cid, defaultFanout)
}
h := hash(depth, k) % defaultFanout
hashed[h] = append(hashed[h], k)
}
......
package pin
package ipldpinner
import (
"context"
......
......@@ -5,33 +5,14 @@ package pin
import (
"context"
"fmt"
"os"
"sync"
"time"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
mdag "github.com/ipfs/go-merkledag"
"github.com/ipfs/go-merkledag/dagutils"
)
var log = logging.Logger("pin")
var pinDatastoreKey = ds.NewKey("/local/pins")
var emptyKey cid.Cid
func init() {
e, err := cid.Decode("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n")
if err != nil {
log.Error("failed to decode empty key constant")
os.Exit(1)
}
emptyKey = e
}
const (
linkRecursive = "recursive"
linkDirect = "direct"
......@@ -177,482 +158,3 @@ func (p Pinned) String() string {
return fmt.Sprintf("pinned: %s", modeStr)
}
}
// pinner implements the Pinner interface
type pinner struct {
lock sync.RWMutex
recursePin *cid.Set
directPin *cid.Set
// Track the keys used for storing the pinning state, so gc does
// not delete them.
internalPin *cid.Set
dserv ipld.DAGService
internal ipld.DAGService // dagservice used to store internal objects
dstore ds.Datastore
}
type syncDAGService interface {
ipld.DAGService
Sync() error
}
// NewPinner creates a new pinner using the given datastore as a backend
func NewPinner(dstore ds.Datastore, serv, internal ipld.DAGService) Pinner {
rcset := cid.NewSet()
dirset := cid.NewSet()
return &pinner{
recursePin: rcset,
directPin: dirset,
dserv: serv,
dstore: dstore,
internal: internal,
internalPin: cid.NewSet(),
}
}
// Pin the given node, optionally recursive
func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
err := p.dserv.Add(ctx, node)
if err != nil {
return err
}
c := node.Cid()
p.lock.Lock()
defer p.lock.Unlock()
if recurse {
if p.recursePin.Has(c) {
return nil
}
p.lock.Unlock()
// temporary unlock to fetch the entire graph
err := mdag.FetchGraph(ctx, c, p.dserv)
p.lock.Lock()
if err != nil {
return err
}
if p.recursePin.Has(c) {
return nil
}
if p.directPin.Has(c) {
p.directPin.Remove(c)
}
p.recursePin.Add(c)
} else {
if p.recursePin.Has(c) {
return fmt.Errorf("%s already pinned recursively", c.String())
}
p.directPin.Add(c)
}
return nil
}
// ErrNotPinned is returned when trying to unpin items which are not pinned.
var ErrNotPinned = fmt.Errorf("not pinned or pinned indirectly")
// Unpin a given key
func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.recursePin.Has(c) {
if !recursive {
return fmt.Errorf("%s is pinned recursively", c)
}
p.recursePin.Remove(c)
return nil
}
if p.directPin.Has(c) {
p.directPin.Remove(c)
return nil
}
return ErrNotPinned
}
func (p *pinner) isInternalPin(c cid.Cid) bool {
return p.internalPin.Has(c)
}
// IsPinned returns whether or not the given key is pinned
// and an explanation of why its pinned
func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.isPinnedWithType(ctx, c, Any)
}
// IsPinnedWithType returns whether or not the given cid is pinned with the
// given pin type, as well as returning the type of pin its pinned with.
func (p *pinner) IsPinnedWithType(ctx context.Context, c cid.Cid, mode Mode) (string, bool, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.isPinnedWithType(ctx, c, mode)
}
// isPinnedWithType is the implementation of IsPinnedWithType that does not lock.
// intended for use by other pinned methods that already take locks
func (p *pinner) isPinnedWithType(ctx context.Context, c cid.Cid, mode Mode) (string, bool, error) {
switch mode {
case Any, Direct, Indirect, Recursive, Internal:
default:
err := fmt.Errorf("invalid Pin Mode '%d', must be one of {%d, %d, %d, %d, %d}",
mode, Direct, Indirect, Recursive, Internal, Any)
return "", false, err
}
if (mode == Recursive || mode == Any) && p.recursePin.Has(c) {
return linkRecursive, true, nil
}
if mode == Recursive {
return "", false, nil
}
if (mode == Direct || mode == Any) && p.directPin.Has(c) {
return linkDirect, true, nil
}
if mode == Direct {
return "", false, nil
}
if (mode == Internal || mode == Any) && p.isInternalPin(c) {
return linkInternal, true, nil
}
if mode == Internal {
return "", false, nil
}
// Default is Indirect
visitedSet := cid.NewSet()
for _, rc := range p.recursePin.Keys() {
has, err := hasChild(ctx, p.dserv, rc, c, visitedSet.Visit)
if err != nil {
return "", false, err
}
if has {
return rc.String(), true, nil
}
}
return "", false, nil
}
// CheckIfPinned Checks if a set of keys are pinned, more efficient than
// calling IsPinned for each key, returns the pinned status of cid(s)
func (p *pinner) CheckIfPinned(ctx context.Context, cids ...cid.Cid) ([]Pinned, error) {
p.lock.RLock()
defer p.lock.RUnlock()
pinned := make([]Pinned, 0, len(cids))
toCheck := cid.NewSet()
// First check for non-Indirect pins directly
for _, c := range cids {
if p.recursePin.Has(c) {
pinned = append(pinned, Pinned{Key: c, Mode: Recursive})
} else if p.directPin.Has(c) {
pinned = append(pinned, Pinned{Key: c, Mode: Direct})
} else if p.isInternalPin(c) {
pinned = append(pinned, Pinned{Key: c, Mode: Internal})
} else {
toCheck.Add(c)
}
}
// Now walk all recursive pins to check for indirect pins
var checkChildren func(cid.Cid, cid.Cid) error
checkChildren = func(rk, parentKey cid.Cid) error {
links, err := ipld.GetLinks(ctx, p.dserv, parentKey)
if err != nil {
return err
}
for _, lnk := range links {
c := lnk.Cid
if toCheck.Has(c) {
pinned = append(pinned,
Pinned{Key: c, Mode: Indirect, Via: rk})
toCheck.Remove(c)
}
err := checkChildren(rk, c)
if err != nil {
return err
}
if toCheck.Len() == 0 {
return nil
}
}
return nil
}
for _, rk := range p.recursePin.Keys() {
err := checkChildren(rk, rk)
if err != nil {
return nil, err
}
if toCheck.Len() == 0 {
break
}
}
// Anything left in toCheck is not pinned
for _, k := range toCheck.Keys() {
pinned = append(pinned, Pinned{Key: k, Mode: NotPinned})
}
return pinned, nil
}
// RemovePinWithMode is for manually editing the pin structure.
// Use with care! If used improperly, garbage collection may not
// be successful.
func (p *pinner) RemovePinWithMode(c cid.Cid, mode Mode) {
p.lock.Lock()
defer p.lock.Unlock()
switch mode {
case Direct:
p.directPin.Remove(c)
case Recursive:
p.recursePin.Remove(c)
default:
// programmer error, panic OK
panic("unrecognized pin type")
}
}
func cidSetWithValues(cids []cid.Cid) *cid.Set {
out := cid.NewSet()
for _, c := range cids {
out.Add(c)
}
return out
}
// LoadPinner loads a pinner and its keysets from the given datastore
func LoadPinner(d ds.Datastore, dserv, internal ipld.DAGService) (Pinner, error) {
p := new(pinner)
rootKey, err := d.Get(pinDatastoreKey)
if err != nil {
return nil, fmt.Errorf("cannot load pin state: %v", err)
}
rootCid, err := cid.Cast(rootKey)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5)
defer cancel()
root, err := internal.Get(ctx, rootCid)
if err != nil {
return nil, fmt.Errorf("cannot find pinning root object: %v", err)
}
rootpb, ok := root.(*mdag.ProtoNode)
if !ok {
return nil, mdag.ErrNotProtobuf
}
internalset := cid.NewSet()
internalset.Add(rootCid)
recordInternal := internalset.Add
{ // load recursive set
recurseKeys, err := loadSet(ctx, internal, rootpb, linkRecursive, recordInternal)
if err != nil {
return nil, fmt.Errorf("cannot load recursive pins: %v", err)
}
p.recursePin = cidSetWithValues(recurseKeys)
}
{ // load direct set
directKeys, err := loadSet(ctx, internal, rootpb, linkDirect, recordInternal)
if err != nil {
return nil, fmt.Errorf("cannot load direct pins: %v", err)
}
p.directPin = cidSetWithValues(directKeys)
}
p.internalPin = internalset
// assign services
p.dserv = dserv
p.dstore = d
p.internal = internal
return p, nil
}
// DirectKeys returns a slice containing the directly pinned keys
func (p *pinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.directPin.Keys(), nil
}
// RecursiveKeys returns a slice containing the recursively pinned keys
func (p *pinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.recursePin.Keys(), nil
}
// Update updates a recursive pin from one cid to another
// this is more efficient than simply pinning the new one and unpinning the
// old one
func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error {
if from == to {
// Nothing to do. Don't remove this check or we'll end up
// _removing_ the pin.
//
// See #6648
return nil
}
p.lock.Lock()
defer p.lock.Unlock()
if !p.recursePin.Has(from) {
return fmt.Errorf("'from' cid was not recursively pinned already")
}
// Temporarily unlock while we fetch the differences.
p.lock.Unlock()
err := dagutils.DiffEnumerate(ctx, p.dserv, from, to)
p.lock.Lock()
if err != nil {
return err
}
p.recursePin.Add(to)
if unpin {
p.recursePin.Remove(from)
}
return nil
}
// Flush encodes and writes pinner keysets to the datastore
func (p *pinner) Flush(ctx context.Context) error {
p.lock.Lock()
defer p.lock.Unlock()
internalset := cid.NewSet()
recordInternal := internalset.Add
root := &mdag.ProtoNode{}
{
n, err := storeSet(ctx, p.internal, p.directPin.Keys(), recordInternal)
if err != nil {
return err
}
if err := root.AddNodeLink(linkDirect, n); err != nil {
return err
}
}
{
n, err := storeSet(ctx, p.internal, p.recursePin.Keys(), recordInternal)
if err != nil {
return err
}
if err := root.AddNodeLink(linkRecursive, n); err != nil {
return err
}
}
// add the empty node, its referenced by the pin sets but never created
err := p.internal.Add(ctx, new(mdag.ProtoNode))
if err != nil {
return err
}
err = p.internal.Add(ctx, root)
if err != nil {
return err
}
k := root.Cid()
internalset.Add(k)
if syncDServ, ok := p.dserv.(syncDAGService); ok {
if err := syncDServ.Sync(); err != nil {
return fmt.Errorf("cannot sync pinned data: %v", err)
}
}
if syncInternal, ok := p.internal.(syncDAGService); ok {
if err := syncInternal.Sync(); err != nil {
return fmt.Errorf("cannot sync pinning data: %v", err)
}
}
if err := p.dstore.Put(pinDatastoreKey, k.Bytes()); err != nil {
return fmt.Errorf("cannot store pin state: %v", err)
}
if err := p.dstore.Sync(pinDatastoreKey); err != nil {
return fmt.Errorf("cannot sync pin state: %v", err)
}
p.internalPin = internalset
return nil
}
// InternalPins returns all cids kept pinned for the internal state of the
// pinner
func (p *pinner) InternalPins(ctx context.Context) ([]cid.Cid, error) {
p.lock.Lock()
defer p.lock.Unlock()
var out []cid.Cid
out = append(out, p.internalPin.Keys()...)
return out, nil
}
// PinWithMode allows the user to have fine grained control over pin
// counts
func (p *pinner) PinWithMode(c cid.Cid, mode Mode) {
p.lock.Lock()
defer p.lock.Unlock()
switch mode {
case Recursive:
p.recursePin.Add(c)
case Direct:
p.directPin.Add(c)
}
}
// hasChild recursively looks for a Cid among the children of a root Cid.
// The visit function can be used to shortcut already-visited branches.
func hasChild(ctx context.Context, ng ipld.NodeGetter, root cid.Cid, child cid.Cid, visit func(cid.Cid) bool) (bool, error) {
links, err := ipld.GetLinks(ctx, ng, root)
if err != nil {
return false, err
}
for _, lnk := range links {
c := lnk.Cid
if lnk.Cid.Equals(child) {
return true, nil
}
if visit(c) {
has, err := hasChild(ctx, ng, c, child, visit)
if err != nil {
return false, err
}
if has {
return has, nil
}
}
}
return false, nil
}
// Package pinconv converts pins between the dag-based ipldpinner and the
// datastore-based dspinner. Once conversion is complete, the pins from the
// source pinner are removed.
package pinconv
import (
"context"
"fmt"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ipfspinner "github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipfs-pinner/dspinner"
"github.com/ipfs/go-ipfs-pinner/ipldpinner"
ipld "github.com/ipfs/go-ipld-format"
)
// ConvertPinsFromIPLDToDS converts pins stored in mdag based storage to pins
// stores in the datastore. Returns a dspinner loaded with the converted pins,
// and a count of the recursive and direct pins converted.
//
// After pins are stored in datastore, the root pin key is deleted to unlink
// the pin data in the DAGService.
func ConvertPinsFromIPLDToDS(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, internal ipld.DAGService) (ipfspinner.Pinner, int, error) {
const ipldPinPath = "/local/pins"
ipldPinner, err := ipldpinner.New(dstore, dserv, internal)
if err != nil {
return nil, 0, err
}
dsPinner, err := dspinner.New(ctx, dstore, dserv)
if err != nil {
return nil, 0, err
}
seen := cid.NewSet()
cids, err := ipldPinner.RecursiveKeys(ctx)
if err != nil {
return nil, 0, err
}
for i := range cids {
seen.Add(cids[i])
dsPinner.PinWithMode(cids[i], ipfspinner.Recursive)
}
convCount := len(cids)
cids, err = ipldPinner.DirectKeys(ctx)
if err != nil {
return nil, 0, err
}
for i := range cids {
if seen.Has(cids[i]) {
// Pin was already pinned recursively
continue
}
dsPinner.PinWithMode(cids[i], ipfspinner.Direct)
}
convCount += len(cids)
err = dsPinner.Flush(ctx)
if err != nil {
return nil, 0, err
}
// Delete root mdag key from datastore to remove old pin storage.
ipldPinDatastoreKey := ds.NewKey(ipldPinPath)
if err = dstore.Delete(ipldPinDatastoreKey); err != nil {
return nil, 0, fmt.Errorf("cannot delete old pin state: %v", err)
}
if err = dstore.Sync(ipldPinDatastoreKey); err != nil {
return nil, 0, fmt.Errorf("cannot sync old pin state: %v", err)
}
return dsPinner, convCount, nil
}
// ConvertPinsFromDSToIPLD converts the pins stored in the datastore by
// dspinner, into pins stored in the given internal DAGService by ipldpinner.
// Returns an ipldpinner loaded with the converted pins, and a count of the
// recursive and direct pins converted.
//
// After the pins are stored in the DAGService, the pins and their indexes are
// removed from the dspinner.
func ConvertPinsFromDSToIPLD(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, internal ipld.DAGService) (ipfspinner.Pinner, int, error) {
dsPinner, err := dspinner.New(ctx, dstore, dserv)
if err != nil {
return nil, 0, err
}
ipldPinner, err := ipldpinner.New(dstore, dserv, internal)
if err != nil {
return nil, 0, err
}
cids, err := dsPinner.RecursiveKeys(ctx)
if err != nil {
return nil, 0, err
}
for i := range cids {
ipldPinner.PinWithMode(cids[i], ipfspinner.Recursive)
dsPinner.RemovePinWithMode(cids[i], ipfspinner.Recursive)
}
convCount := len(cids)
cids, err = dsPinner.DirectKeys(ctx)
if err != nil {
return nil, 0, err
}
for i := range cids {
ipldPinner.PinWithMode(cids[i], ipfspinner.Direct)
dsPinner.RemovePinWithMode(cids[i], ipfspinner.Direct)
}
convCount += len(cids)
// Save the ipldpinner pins
err = ipldPinner.Flush(ctx)
if err != nil {
return nil, 0, err
}
err = dsPinner.Flush(ctx)
if err != nil {
return nil, 0, err
}
return ipldPinner, convCount, nil
}
package pinconv
import (
"context"
"errors"
"io"
"testing"
bs "github.com/ipfs/go-blockservice"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
lds "github.com/ipfs/go-ds-leveldb"
blockstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
ipfspin "github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipfs-pinner/dspinner"
util "github.com/ipfs/go-ipfs-util"
ipld "github.com/ipfs/go-ipld-format"
mdag "github.com/ipfs/go-merkledag"
)
var rand = util.NewTimeSeededRand()
type batchWrap struct {
ds.Datastore
}
func randNode() (*mdag.ProtoNode, cid.Cid) {
nd := new(mdag.ProtoNode)
nd.SetData(make([]byte, 32))
_, err := io.ReadFull(rand, nd.Data())
if err != nil {
panic(err)
}
k := nd.Cid()
return nd, k
}
func (d *batchWrap) Batch() (ds.Batch, error) {
return ds.NewBasicBatch(d), nil
}
func makeStore() (ds.Datastore, ipld.DAGService) {
ldstore, err := lds.NewDatastore("", nil)
if err != nil {
panic(err)
}
var dstore ds.Batching
dstore = &batchWrap{ldstore}
bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
return dstore, dserv
}
func TestConversions(t *testing.T) {
ctx := context.Background()
dstore, dserv := makeStore()
dsPinner, err := dspinner.New(ctx, dstore, dserv)
if err != nil {
t.Fatal(err)
}
a, ak := randNode()
err = dsPinner.Pin(ctx, a, false)
if err != nil {
t.Fatal(err)
}
// create new node c, to be indirectly pinned through b
c, ck := randNode()
dserv.Add(ctx, c)
// Create new node b, to be parent to a and c
b, _ := randNode()
b.AddNodeLink("child", a)
b.AddNodeLink("otherchild", c)
bk := b.Cid() // CID changed after adding links
// recursively pin B{A,C}
err = dsPinner.Pin(ctx, b, true)
if err != nil {
t.Fatal(err)
}
err = dsPinner.Flush(ctx)
if err != nil {
t.Fatal(err)
}
verifyPins := func(pinner ipfspin.Pinner) error {
pinned, err := pinner.CheckIfPinned(ctx, ak, bk, ck)
if err != nil {
return err
}
if len(pinned) != 3 {
return errors.New("incorrect number of results")
}
for _, pn := range pinned {
switch pn.Key {
case ak:
if pn.Mode != ipfspin.Direct {
return errors.New("A pinned with wrong mode")
}
case bk:
if pn.Mode != ipfspin.Recursive {
return errors.New("B pinned with wrong mode")
}
case ck:
if pn.Mode != ipfspin.Indirect {
return errors.New("C should be pinned indirectly")
}
if pn.Via != bk {
return errors.New("C should be pinned via B")
}
}
}
return nil
}
err = verifyPins(dsPinner)
if err != nil {
t.Fatal(err)
}
ipldPinner, toIPLDCount, err := ConvertPinsFromDSToIPLD(ctx, dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
}
if toIPLDCount != 2 {
t.Fatal("expected 2 ds-to-ipld pins, got", toIPLDCount)
}
err = verifyPins(ipldPinner)
if err != nil {
t.Fatal(err)
}
toDSPinner, toDSCount, err := ConvertPinsFromIPLDToDS(ctx, dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
}
if toDSCount != toIPLDCount {
t.Fatal("ds-to-ipld pins", toIPLDCount, "not equal to ipld-to-ds-pins", toDSCount)
}
err = verifyPins(toDSPinner)
if err != nil {
t.Fatal(err)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment