Commit 971dd820 authored by tavit ohanian's avatar tavit ohanian

Merge remote-tracking branch 'upstream/master' into reference

parents 06f81104 d86645f8
Pipeline #169 failed with stages
in 0 seconds
ratings:
paths:
- "**/*.go"
exclude_paths:
- test/
- Godeps/
- thirdparty/
- "**/*.pb.go"
engines:
fixme:
enabled: true
config:
strings:
- FIXME
- HACK
- XXX
- BUG
golint:
enabled: true
govet:
enabled: true
gofmt:
enabled: true
version: "2"
checks:
argument-count:
enabled: false
complex-logic:
enabled: false
file-lines:
enabled: false
method-complexity:
enabled: false
method-count:
enabled: false
method-lines:
enabled: false
nested-control-flow:
enabled: false
return-statements:
enabled: false
similar-code:
enabled: false
blank_issues_enabled: false
contact_links:
- name: Getting Help on IPFS
url: https://ipfs.io/help
about: All information about how and where to get help on IPFS.
- name: IPFS Official Forum
url: https://discuss.ipfs.io
about: Please post general questions, support requests, and discussions here.
---
name: Open an issue
about: Only for actionable issues relevant to this repository.
title: ''
labels: need/triage
assignees: ''
---
<!--
Hello! To ensure this issue is correctly addressed as soon as possible by the IPFS team, please try to make sure:
- This issue is relevant to this repository's topic or codebase.
- A clear description is provided. It should includes as much relevant information as possible and clear scope for the issue to be actionable.
FOR GENERAL DISCUSSION, HELP OR QUESTIONS, please see the options at https://ipfs.io/help or head directly to https://discuss.ipfs.io.
(you can delete this section after reading)
-->
*~
*.log
# Test binary, build with `go test -c`
*.test
# Output of the go coverage tool
*.out
os:
- linux
language: go
go:
- 1.11.x
env:
global:
- GOTFLAGS="-race"
matrix:
- BUILD_DEPTYPE=gomod
# disable travis install
install:
- true
script:
- bash <(curl -s https://raw.githubusercontent.com/ipfs/ci-helpers/master/travis-ci/run-standard-tests.sh)
cache:
directories:
- $GOPATH/src/gx
- $GOPATH/pkg/mod
- $HOME/.cache/go-build
notifications:
email: false
Copyright 2019. Protocol Labs, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Copyright 2019. Protocol Labs, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# go-dms3-pinner
# go-ipfs-pinner
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/)
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
[![Coverage Status](https://codecov.io/gh/ipfs/go-ipfs-pinner/branch/master/graph/badge.svg)](https://codecov.io/gh/ipfs/go-ipfs-pinner)
[![Travis CI](https://travis-ci.org/ipfs/go-ipfs-pinner.svg?branch=master)](https://travis-ci.org/ipfs/go-ipfs-pinner)
## Background
The pinner system is responsible for keeping track of which objects a user wants to keep stored locally
## Install
Via `go get`:
```sh
$ go get github.com/ipfs/go-ipfs-pinner
```
> Requires Go 1.13
## Documentation
https://godoc.org/github.com/ipfs/go-ipfs-pinner
## Contribute
PRs are welcome!
Small note: If editing the Readme, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification.
## License
This library is dual-licensed under Apache 2.0 and MIT terms.
Copyright 2019. Protocol Labs, Inc.
package dsindex
import "errors"
var (
ErrEmptyKey = errors.New("key is empty")
ErrEmptyValue = errors.New("value is empty")
)
// Package dsindex provides secondary indexing functionality for a datastore.
package dsindex
import (
"context"
"fmt"
"path"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
"github.com/multiformats/go-multibase"
)
// Indexer maintains a secondary index. An index is a collection of key-value
// mappings where the key is the secondary index that maps to one or more
// values, where each value is a unique key being indexed.
type Indexer interface {
// Add adds the specified value to the key
Add(ctx context.Context, key, value string) error
// Delete deletes the specified value from the key. If the value is not in
// the datastore, this method returns no error.
Delete(ctx context.Context, key, value string) error
// DeleteKey deletes all values in the given key. If a key is not in the
// datastore, this method returns no error. Returns a count of values that
// were deleted.
DeleteKey(ctx context.Context, key string) (count int, err error)
// DeleteAll deletes all keys managed by this Indexer. Returns a count of
// the values that were deleted.
DeleteAll(ctx context.Context) (count int, err error)
// ForEach calls the function for each value in the specified key, until
// there are no more values, or until the function returns false. If key
// is empty string, then all keys are iterated.
ForEach(ctx context.Context, key string, fn func(key, value string) bool) error
// HasValue determines if the key contains the specified value
HasValue(ctx context.Context, key, value string) (bool, error)
// HasAny determines if any value is in the specified key. If key is
// empty string, then all values are searched.
HasAny(ctx context.Context, key string) (bool, error)
// Search returns all values for the given key
Search(ctx context.Context, key string) (values []string, err error)
}
// indexer is a simple implementation of Indexer. This implementation relies
// on the underlying data store to support efficient querying by prefix.
//
// TODO: Consider adding caching
type indexer struct {
dstore ds.Datastore
}
// New creates a new datastore index. All indexes are stored under the
// specified index name.
//
// To persist the actions of calling Indexer functions, it is necessary to call
// dstore.Sync.
func New(dstore ds.Datastore, name ds.Key) Indexer {
return &indexer{
dstore: namespace.Wrap(dstore, name),
}
}
func (x *indexer) Add(ctx context.Context, key, value string) error {
if key == "" {
return ErrEmptyKey
}
if value == "" {
return ErrEmptyValue
}
dsKey := ds.NewKey(encode(key)).ChildString(encode(value))
return x.dstore.Put(dsKey, []byte{})
}
func (x *indexer) Delete(ctx context.Context, key, value string) error {
if key == "" {
return ErrEmptyKey
}
if value == "" {
return ErrEmptyValue
}
return x.dstore.Delete(ds.NewKey(encode(key)).ChildString(encode(value)))
}
func (x *indexer) DeleteKey(ctx context.Context, key string) (int, error) {
if key == "" {
return 0, ErrEmptyKey
}
return x.deletePrefix(ctx, encode(key))
}
func (x *indexer) DeleteAll(ctx context.Context) (int, error) {
return x.deletePrefix(ctx, "")
}
func (x *indexer) ForEach(ctx context.Context, key string, fn func(key, value string) bool) error {
if key != "" {
key = encode(key)
}
q := query.Query{
Prefix: key,
KeysOnly: true,
}
results, err := x.dstore.Query(q)
if err != nil {
return err
}
for {
r, ok := results.NextSync()
if !ok {
break
}
if r.Error != nil {
err = r.Error
break
}
if ctx.Err() != nil {
err = ctx.Err()
break
}
ent := r.Entry
decIdx, err := decode(path.Base(path.Dir(ent.Key)))
if err != nil {
err = fmt.Errorf("cannot decode index: %v", err)
break
}
decKey, err := decode(path.Base(ent.Key))
if err != nil {
err = fmt.Errorf("cannot decode key: %v", err)
break
}
if !fn(decIdx, decKey) {
break
}
}
results.Close()
return err
}
func (x *indexer) HasValue(ctx context.Context, key, value string) (bool, error) {
if key == "" {
return false, ErrEmptyKey
}
if value == "" {
return false, ErrEmptyValue
}
return x.dstore.Has(ds.NewKey(encode(key)).ChildString(encode(value)))
}
func (x *indexer) HasAny(ctx context.Context, key string) (bool, error) {
var any bool
err := x.ForEach(ctx, key, func(key, value string) bool {
any = true
return false
})
return any, err
}
func (x *indexer) Search(ctx context.Context, key string) ([]string, error) {
if key == "" {
return nil, ErrEmptyKey
}
ents, err := x.queryPrefix(ctx, encode(key))
if err != nil {
return nil, err
}
if len(ents) == 0 {
return nil, nil
}
values := make([]string, len(ents))
for i := range ents {
values[i], err = decode(path.Base(ents[i].Key))
if err != nil {
return nil, fmt.Errorf("cannot decode value: %v", err)
}
}
return values, nil
}
// SyncIndex synchronizes the keys in the target Indexer to match those of the
// ref Indexer. This function does not change this indexer's key root (name
// passed into New).
func SyncIndex(ctx context.Context, ref, target Indexer) (bool, error) {
// Build reference index map
refs := map[string]string{}
err := ref.ForEach(ctx, "", func(key, value string) bool {
refs[value] = key
return true
})
if err != nil {
return false, err
}
if len(refs) == 0 {
return false, nil
}
// Compare current indexes
dels := map[string]string{}
err = target.ForEach(ctx, "", func(key, value string) bool {
refKey, ok := refs[value]
if ok && refKey == key {
// same in both; delete from refs, do not add to dels
delete(refs, value)
} else {
dels[value] = key
}
return true
})
if err != nil {
return false, err
}
// Items in dels are keys that no longer exist
for value, key := range dels {
err = target.Delete(ctx, key, value)
if err != nil {
return false, err
}
}
// What remains in refs are keys that need to be added
for value, key := range refs {
err = target.Add(ctx, key, value)
if err != nil {
return false, err
}
}
return len(refs) != 0 || len(dels) != 0, nil
}
func (x *indexer) deletePrefix(ctx context.Context, prefix string) (int, error) {
ents, err := x.queryPrefix(ctx, prefix)
if err != nil {
return 0, err
}
for i := range ents {
err = x.dstore.Delete(ds.NewKey(ents[i].Key))
if err != nil {
return 0, err
}
}
return len(ents), nil
}
func (x *indexer) queryPrefix(ctx context.Context, prefix string) ([]query.Entry, error) {
q := query.Query{
Prefix: prefix,
KeysOnly: true,
}
results, err := x.dstore.Query(q)
if err != nil {
return nil, err
}
return results.Rest()
}
func encode(data string) string {
encData, err := multibase.Encode(multibase.Base64url, []byte(data))
if err != nil {
// programming error; using unsupported encoding
panic(err.Error())
}
return encData
}
func decode(data string) (string, error) {
_, b, err := multibase.Decode(data)
if err != nil {
return "", err
}
return string(b), nil
}
package dsindex
import (
"context"
"testing"
ds "github.com/ipfs/go-datastore"
)
func createIndexer() Indexer {
dstore := ds.NewMapDatastore()
nameIndex := New(dstore, ds.NewKey("/data/nameindex"))
ctx := context.Background()
nameIndex.Add(ctx, "alice", "a1")
nameIndex.Add(ctx, "bob", "b1")
nameIndex.Add(ctx, "bob", "b2")
nameIndex.Add(ctx, "cathy", "c1")
return nameIndex
}
func TestAdd(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nameIndex := createIndexer()
err := nameIndex.Add(ctx, "someone", "s1")
if err != nil {
t.Fatal(err)
}
err = nameIndex.Add(ctx, "someone", "s1")
if err != nil {
t.Fatal(err)
}
err = nameIndex.Add(ctx, "", "noindex")
if err != ErrEmptyKey {
t.Fatal("unexpected error:", err)
}
err = nameIndex.Add(ctx, "nokey", "")
if err != ErrEmptyValue {
t.Fatal("unexpected error:", err)
}
}
func TestHasValue(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nameIndex := createIndexer()
ok, err := nameIndex.HasValue(ctx, "bob", "b1")
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("missing index")
}
ok, err = nameIndex.HasValue(ctx, "bob", "b3")
if err != nil {
t.Fatal(err)
}
if ok {
t.Fatal("should not have index")
}
_, err = nameIndex.HasValue(ctx, "", "b1")
if err != ErrEmptyKey {
t.Fatal("unexpected error:", err)
}
_, err = nameIndex.HasValue(ctx, "bob", "")
if err != ErrEmptyValue {
t.Fatal("unexpected error:", err)
}
}
func TestHasAny(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nameIndex := createIndexer()
ok, err := nameIndex.HasAny(ctx, "nothere")
if err != nil {
t.Fatal(err)
}
if ok {
t.Fatal("should return false")
}
for _, idx := range []string{"alice", "bob", ""} {
ok, err = nameIndex.HasAny(ctx, idx)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("missing index", idx)
}
}
count, err := nameIndex.DeleteAll(ctx)
if err != nil {
t.Fatal(err)
}
if count != 4 {
t.Fatal("expected 4 deletions")
}
ok, err = nameIndex.HasAny(ctx, "")
if err != nil {
t.Fatal(err)
}
if ok {
t.Fatal("should return false")
}
}
func TestForEach(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nameIndex := createIndexer()
found := make(map[string]struct{})
err := nameIndex.ForEach(ctx, "bob", func(key, value string) bool {
found[value] = struct{}{}
return true
})
if err != nil {
t.Fatal(err)
}
for _, value := range []string{"b1", "b2"} {
_, ok := found[value]
if !ok {
t.Fatal("missing key for value", value)
}
}
values := map[string]string{}
err = nameIndex.ForEach(ctx, "", func(key, value string) bool {
values[value] = key
return true
})
if err != nil {
t.Fatal(err)
}
if len(values) != 4 {
t.Fatal("expected 4 keys")
}
if values["a1"] != "alice" {
t.Error("expected a1: alice")
}
if values["b1"] != "bob" {
t.Error("expected b1: bob")
}
if values["b2"] != "bob" {
t.Error("expected b2: bob")
}
if values["c1"] != "cathy" {
t.Error("expected c1: cathy")
}
}
func TestSearch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nameIndex := createIndexer()
ids, err := nameIndex.Search(ctx, "bob")
if err != nil {
t.Fatal(err)
}
if len(ids) != 2 {
t.Fatal("wrong number of ids - expected 2 got", ids)
}
for _, id := range ids {
if id != "b1" && id != "b2" {
t.Fatal("wrong value in id set")
}
}
if ids[0] == ids[1] {
t.Fatal("duplicate id")
}
ids, err = nameIndex.Search(ctx, "cathy")
if err != nil {
t.Fatal(err)
}
if len(ids) != 1 || ids[0] != "c1" {
t.Fatal("wrong ids")
}
ids, err = nameIndex.Search(ctx, "amit")
if err != nil {
t.Fatal(err)
}
if len(ids) != 0 {
t.Fatal("unexpected ids returned")
}
}
func TestDelete(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nameIndex := createIndexer()
err := nameIndex.Delete(ctx, "bob", "b3")
if err != nil {
t.Fatal(err)
}
err = nameIndex.Delete(ctx, "alice", "a1")
if err != nil {
t.Fatal(err)
}
ok, err := nameIndex.HasValue(ctx, "alice", "a1")
if err != nil {
t.Fatal(err)
}
if ok {
t.Fatal("index key should have been deleted")
}
count, err := nameIndex.DeleteKey(ctx, "bob")
if err != nil {
t.Fatal(err)
}
if count != 2 {
t.Fatal("wrong deleted count")
}
ok, _ = nameIndex.HasValue(ctx, "bob", "b1")
if ok {
t.Fatal("index not deleted")
}
}
func TestSyncIndex(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nameIndex := createIndexer()
dstore := ds.NewMapDatastore()
refIndex := New(dstore, ds.NewKey("/ref"))
refIndex.Add(ctx, "alice", "a1")
refIndex.Add(ctx, "cathy", "zz")
refIndex.Add(ctx, "dennis", "d1")
changed, err := SyncIndex(ctx, refIndex, nameIndex)
if err != nil {
t.Fatal(err)
}
if !changed {
t.Error("change was not indicated")
}
// Create map of id->index in sync target
syncs := map[string]string{}
err = nameIndex.ForEach(ctx, "", func(key, value string) bool {
syncs[value] = key
return true
})
if err != nil {
t.Fatal(err)
}
// Iterate items in sync source and make sure they appear in target
var itemCount int
err = refIndex.ForEach(ctx, "", func(key, value string) bool {
itemCount++
syncKey, ok := syncs[value]
if !ok || key != syncKey {
t.Fatal("key", key, "-->", value, "was not synced")
}
return true
})
if err != nil {
t.Fatal(err)
}
if itemCount != len(syncs) {
t.Fatal("different number of items in sync source and target")
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --proto_path=$(GOPATH)/src:. --gogofaster_out=. $<
clean:
rm -f *.pb.go
rm -f *.go
package pb
//go:generate protoc --gogo_out=. header.proto
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: header.proto
package pb
import (
encoding_binary "encoding/binary"
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type Set struct {
// 1 for now, library will refuse to handle entries with an unrecognized version.
Version uint32 `protobuf:"varint,1,opt,name=version" json:"version"`
// how many of the links are subtrees
Fanout uint32 `protobuf:"varint,2,opt,name=fanout" json:"fanout"`
// hash seed for subtree selection, a random number
Seed uint32 `protobuf:"fixed32,3,opt,name=seed" json:"seed"`
}
func (m *Set) Reset() { *m = Set{} }
func (m *Set) String() string { return proto.CompactTextString(m) }
func (*Set) ProtoMessage() {}
func (*Set) Descriptor() ([]byte, []int) {
return fileDescriptor_6398613e36d6c2ce, []int{0}
}
func (m *Set) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Set) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Set.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Set) XXX_Merge(src proto.Message) {
xxx_messageInfo_Set.Merge(m, src)
}
func (m *Set) XXX_Size() int {
return m.Size()
}
func (m *Set) XXX_DiscardUnknown() {
xxx_messageInfo_Set.DiscardUnknown(m)
}
var xxx_messageInfo_Set proto.InternalMessageInfo
func (m *Set) GetVersion() uint32 {
if m != nil {
return m.Version
}
return 0
}
func (m *Set) GetFanout() uint32 {
if m != nil {
return m.Fanout
}
return 0
}
func (m *Set) GetSeed() uint32 {
if m != nil {
return m.Seed
}
return 0
}
func init() {
proto.RegisterType((*Set)(nil), "ipfs.pin.Set")
}
func init() { proto.RegisterFile("header.proto", fileDescriptor_6398613e36d6c2ce) }
var fileDescriptor_6398613e36d6c2ce = []byte{
// 146 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xc9, 0x48, 0x4d, 0x4c,
0x49, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0xc8, 0x2c, 0x48, 0x2b, 0xd6, 0x2b,
0xc8, 0xcc, 0x53, 0x8a, 0xe5, 0x62, 0x0e, 0x4e, 0x2d, 0x11, 0x92, 0xe3, 0x62, 0x2f, 0x4b, 0x2d,
0x2a, 0xce, 0xcc, 0xcf, 0x93, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x75, 0x62, 0x39, 0x71, 0x4f, 0x9e,
0x21, 0x08, 0x26, 0x28, 0x24, 0xc3, 0xc5, 0x96, 0x96, 0x98, 0x97, 0x5f, 0x5a, 0x22, 0xc1, 0x84,
0x24, 0x0d, 0x15, 0x13, 0x92, 0xe0, 0x62, 0x29, 0x4e, 0x4d, 0x4d, 0x91, 0x60, 0x56, 0x60, 0xd4,
0x60, 0x87, 0xca, 0x81, 0x45, 0x9c, 0x64, 0x4e, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1,
0xc1, 0x23, 0x39, 0xc6, 0x09, 0x8f, 0xe5, 0x18, 0x2e, 0x3c, 0x96, 0x63, 0xb8, 0xf1, 0x58, 0x8e,
0x21, 0x8a, 0xa9, 0x20, 0x09, 0x10, 0x00, 0x00, 0xff, 0xff, 0x3c, 0x49, 0x19, 0x51, 0x95, 0x00,
0x00, 0x00,
}
func (m *Set) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Set) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Set) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
i -= 4
encoding_binary.LittleEndian.PutUint32(dAtA[i:], uint32(m.Seed))
i--
dAtA[i] = 0x1d
i = encodeVarintHeader(dAtA, i, uint64(m.Fanout))
i--
dAtA[i] = 0x10
i = encodeVarintHeader(dAtA, i, uint64(m.Version))
i--
dAtA[i] = 0x8
return len(dAtA) - i, nil
}
func encodeVarintHeader(dAtA []byte, offset int, v uint64) int {
offset -= sovHeader(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *Set) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
n += 1 + sovHeader(uint64(m.Version))
n += 1 + sovHeader(uint64(m.Fanout))
n += 5
return n
}
func sovHeader(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozHeader(x uint64) (n int) {
return sovHeader(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *Set) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowHeader
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Set: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Set: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Version", wireType)
}
m.Version = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowHeader
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Version |= uint32(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Fanout", wireType)
}
m.Fanout = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowHeader
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Fanout |= uint32(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 5 {
return fmt.Errorf("proto: wrong wireType = %d for field Seed", wireType)
}
m.Seed = 0
if (iNdEx + 4) > l {
return io.ErrUnexpectedEOF
}
m.Seed = uint32(encoding_binary.LittleEndian.Uint32(dAtA[iNdEx:]))
iNdEx += 4
default:
iNdEx = preIndex
skippy, err := skipHeader(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthHeader
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthHeader
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipHeader(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowHeader
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowHeader
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowHeader
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthHeader
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupHeader
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthHeader
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthHeader = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowHeader = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupHeader = fmt.Errorf("proto: unexpected end of group")
)
syntax = "proto2";
package ipfs.pin;
option go_package = "pb";
message Set {
// 1 for now, library will refuse to handle entries with an unrecognized version.
optional uint32 version = 1;
// how many of the links are subtrees
optional uint32 fanout = 2;
// hash seed for subtree selection, a random number
optional fixed32 seed = 3;
}
// Package ipldpinner implements structures and methods to keep track of
// which objects a user wants to keep stored locally. This implementation
// stores pin information in a mdag structure.
package ipldpinner
import (
"context"
"fmt"
"os"
"sync"
"time"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-merkledag"
mdag "github.com/ipfs/go-merkledag"
"github.com/ipfs/go-merkledag/dagutils"
ipfspinner "github.com/ipfs/go-ipfs-pinner"
)
const loadTimeout = 5 * time.Second
var log = logging.Logger("pin")
var pinDatastoreKey = ds.NewKey("/local/pins")
var emptyKey cid.Cid
var linkDirect, linkRecursive, linkInternal string
func init() {
e, err := cid.Decode("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n")
if err != nil {
log.Error("failed to decode empty key constant")
os.Exit(1)
}
emptyKey = e
directStr, ok := ipfspinner.ModeToString(ipfspinner.Direct)
if !ok {
panic("could not find Direct pin enum")
}
linkDirect = directStr
recursiveStr, ok := ipfspinner.ModeToString(ipfspinner.Recursive)
if !ok {
panic("could not find Recursive pin enum")
}
linkRecursive = recursiveStr
internalStr, ok := ipfspinner.ModeToString(ipfspinner.Internal)
if !ok {
panic("could not find Internal pin enum")
}
linkInternal = internalStr
}
// pinner implements the Pinner interface
type pinner struct {
lock sync.RWMutex
recursePin *cid.Set
directPin *cid.Set
// Track the keys used for storing the pinning state, so gc does
// not delete them.
internalPin *cid.Set
dserv ipld.DAGService
internal ipld.DAGService // dagservice used to store internal objects
dstore ds.Datastore
}
var _ ipfspinner.Pinner = (*pinner)(nil)
type syncDAGService interface {
ipld.DAGService
Sync() error
}
// New creates a new pinner using the given datastore as a backend, and loads
// the pinner's keysets from the datastore
func New(dstore ds.Datastore, dserv, internal ipld.DAGService) (*pinner, error) {
rootKey, err := dstore.Get(pinDatastoreKey)
if err != nil {
if err == ds.ErrNotFound {
return &pinner{
recursePin: cid.NewSet(),
directPin: cid.NewSet(),
internalPin: cid.NewSet(),
dserv: dserv,
internal: internal,
dstore: dstore,
}, nil
}
return nil, err
}
rootCid, err := cid.Cast(rootKey)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.TODO(), loadTimeout)
defer cancel()
root, err := internal.Get(ctx, rootCid)
if err != nil {
return nil, fmt.Errorf("cannot find pinning root object: %v", err)
}
rootpb, ok := root.(*mdag.ProtoNode)
if !ok {
return nil, mdag.ErrNotProtobuf
}
internalset := cid.NewSet()
internalset.Add(rootCid)
recordInternal := internalset.Add
// load recursive set
recurseKeys, err := loadSet(ctx, internal, rootpb, linkRecursive, recordInternal)
if err != nil {
return nil, fmt.Errorf("cannot load recursive pins: %v", err)
}
// load direct set
directKeys, err := loadSet(ctx, internal, rootpb, linkDirect, recordInternal)
if err != nil {
return nil, fmt.Errorf("cannot load direct pins: %v", err)
}
return &pinner{
// assign pinsets
recursePin: cidSetWithValues(recurseKeys),
directPin: cidSetWithValues(directKeys),
internalPin: internalset,
// assign services
dserv: dserv,
dstore: dstore,
internal: internal,
}, nil
}
// LoadKeys reads the pinned CIDs and sends them on the given channel. This is
// used to read pins without loading them all into memory.
func LoadKeys(ctx context.Context, dstore ds.Datastore, dserv, internal ipld.DAGService, recursive bool, keyChan chan<- cid.Cid) error {
rootKey, err := dstore.Get(pinDatastoreKey)
if err != nil {
if err == ds.ErrNotFound {
return nil
}
return err
}
rootCid, err := cid.Cast(rootKey)
if err != nil {
return err
}
root, err := internal.Get(ctx, rootCid)
if err != nil {
return fmt.Errorf("cannot find pinning root object: %v", err)
}
rootpb, ok := root.(*mdag.ProtoNode)
if !ok {
return mdag.ErrNotProtobuf
}
var linkName string
if recursive {
linkName = linkRecursive
} else {
linkName = linkDirect
}
return loadSetChan(ctx, internal, rootpb, linkName, keyChan)
}
// Pin the given node, optionally recursive
func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
err := p.dserv.Add(ctx, node)
if err != nil {
return err
}
c := node.Cid()
p.lock.Lock()
defer p.lock.Unlock()
if recurse {
if p.recursePin.Has(c) {
return nil
}
p.lock.Unlock()
// temporary unlock to fetch the entire graph
err := mdag.FetchGraph(ctx, c, p.dserv)
p.lock.Lock()
if err != nil {
return err
}
if p.recursePin.Has(c) {
return nil
}
if p.directPin.Has(c) {
p.directPin.Remove(c)
}
p.recursePin.Add(c)
} else {
if p.recursePin.Has(c) {
return fmt.Errorf("%s already pinned recursively", c.String())
}
p.directPin.Add(c)
}
return nil
}
// ErrNotPinned is returned when trying to unpin items which are not pinned.
var ErrNotPinned = fmt.Errorf("not pinned or pinned indirectly")
// Unpin a given key
func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.recursePin.Has(c) {
if !recursive {
return fmt.Errorf("%s is pinned recursively", c)
}
p.recursePin.Remove(c)
return nil
}
if p.directPin.Has(c) {
p.directPin.Remove(c)
return nil
}
return ErrNotPinned
}
func (p *pinner) isInternalPin(c cid.Cid) bool {
return p.internalPin.Has(c)
}
// IsPinned returns whether or not the given key is pinned
// and an explanation of why its pinned
func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.isPinnedWithType(ctx, c, ipfspinner.Any)
}
// IsPinnedWithType returns whether or not the given cid is pinned with the
// given pin type, as well as returning the type of pin its pinned with.
func (p *pinner) IsPinnedWithType(ctx context.Context, c cid.Cid, mode ipfspinner.Mode) (string, bool, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.isPinnedWithType(ctx, c, mode)
}
// isPinnedWithType is the implementation of IsPinnedWithType that does not lock.
// intended for use by other pinned methods that already take locks
func (p *pinner) isPinnedWithType(ctx context.Context, c cid.Cid, mode ipfspinner.Mode) (string, bool, error) {
switch mode {
case ipfspinner.Any, ipfspinner.Direct, ipfspinner.Indirect, ipfspinner.Recursive, ipfspinner.Internal:
default:
err := fmt.Errorf("invalid Pin Mode '%d', must be one of {%d, %d, %d, %d, %d}",
mode, ipfspinner.Direct, ipfspinner.Indirect, ipfspinner.Recursive, ipfspinner.Internal, ipfspinner.Any)
return "", false, err
}
if (mode == ipfspinner.Recursive || mode == ipfspinner.Any) && p.recursePin.Has(c) {
return linkRecursive, true, nil
}
if mode == ipfspinner.Recursive {
return "", false, nil
}
if (mode == ipfspinner.Direct || mode == ipfspinner.Any) && p.directPin.Has(c) {
return linkDirect, true, nil
}
if mode == ipfspinner.Direct {
return "", false, nil
}
if (mode == ipfspinner.Internal || mode == ipfspinner.Any) && p.isInternalPin(c) {
return linkInternal, true, nil
}
if mode == ipfspinner.Internal {
return "", false, nil
}
// Default is Indirect
visitedSet := cid.NewSet()
for _, rc := range p.recursePin.Keys() {
has, err := hasChild(ctx, p.dserv, rc, c, visitedSet.Visit)
if err != nil {
return "", false, err
}
if has {
return rc.String(), true, nil
}
}
return "", false, nil
}
// CheckIfPinned Checks if a set of keys are pinned, more efficient than
// calling IsPinned for each key, returns the pinned status of cid(s)
func (p *pinner) CheckIfPinned(ctx context.Context, cids ...cid.Cid) ([]ipfspinner.Pinned, error) {
p.lock.RLock()
defer p.lock.RUnlock()
pinned := make([]ipfspinner.Pinned, 0, len(cids))
toCheck := cid.NewSet()
// First check for non-Indirect pins directly
for _, c := range cids {
if p.recursePin.Has(c) {
pinned = append(pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.Recursive})
} else if p.directPin.Has(c) {
pinned = append(pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.Direct})
} else if p.isInternalPin(c) {
pinned = append(pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.Internal})
} else {
toCheck.Add(c)
}
}
// Now walk all recursive pins to check for indirect pins
visited := cid.NewSet()
for _, rk := range p.recursePin.Keys() {
err := merkledag.Walk(ctx, merkledag.GetLinksWithDAG(p.dserv), rk, func(c cid.Cid) bool {
if toCheck.Len() == 0 || !visited.Visit(c) {
return false
}
if toCheck.Has(c) {
pinned = append(pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.Indirect, Via: rk})
toCheck.Remove(c)
}
return true
}, merkledag.Concurrent())
if err != nil {
return nil, err
}
if toCheck.Len() == 0 {
break
}
}
// Anything left in toCheck is not pinned
for _, k := range toCheck.Keys() {
pinned = append(pinned, ipfspinner.Pinned{Key: k, Mode: ipfspinner.NotPinned})
}
return pinned, nil
}
// RemovePinWithMode is for manually editing the pin structure.
// Use with care! If used improperly, garbage collection may not
// be successful.
func (p *pinner) RemovePinWithMode(c cid.Cid, mode ipfspinner.Mode) {
p.lock.Lock()
defer p.lock.Unlock()
switch mode {
case ipfspinner.Direct:
p.directPin.Remove(c)
case ipfspinner.Recursive:
p.recursePin.Remove(c)
default:
// programmer error, panic OK
panic("unrecognized pin type")
}
}
func cidSetWithValues(cids []cid.Cid) *cid.Set {
out := cid.NewSet()
for _, c := range cids {
out.Add(c)
}
return out
}
// DirectKeys returns a slice containing the directly pinned keys
func (p *pinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.directPin.Keys(), nil
}
// RecursiveKeys returns a slice containing the recursively pinned keys
func (p *pinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.recursePin.Keys(), nil
}
// Update updates a recursive pin from one cid to another
// this is more efficient than simply pinning the new one and unpinning the
// old one
func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error {
if from == to {
// Nothing to do. Don't remove this check or we'll end up
// _removing_ the pin.
//
// See #6648
return nil
}
p.lock.Lock()
defer p.lock.Unlock()
if !p.recursePin.Has(from) {
return fmt.Errorf("'from' cid was not recursively pinned already")
}
// Temporarily unlock while we fetch the differences.
p.lock.Unlock()
err := dagutils.DiffEnumerate(ctx, p.dserv, from, to)
p.lock.Lock()
if err != nil {
return err
}
p.recursePin.Add(to)
if unpin {
p.recursePin.Remove(from)
}
return nil
}
// Flush encodes and writes pinner keysets to the datastore
func (p *pinner) Flush(ctx context.Context) error {
p.lock.Lock()
defer p.lock.Unlock()
internalset := cid.NewSet()
recordInternal := internalset.Add
root := &mdag.ProtoNode{}
{
n, err := storeSet(ctx, p.internal, p.directPin.Keys(), recordInternal)
if err != nil {
return err
}
if err := root.AddNodeLink(linkDirect, n); err != nil {
return err
}
}
{
n, err := storeSet(ctx, p.internal, p.recursePin.Keys(), recordInternal)
if err != nil {
return err
}
if err := root.AddNodeLink(linkRecursive, n); err != nil {
return err
}
}
// add the empty node, its referenced by the pin sets but never created
err := p.internal.Add(ctx, new(mdag.ProtoNode))
if err != nil {
return err
}
err = p.internal.Add(ctx, root)
if err != nil {
return err
}
k := root.Cid()
internalset.Add(k)
if syncDServ, ok := p.dserv.(syncDAGService); ok {
if err := syncDServ.Sync(); err != nil {
return fmt.Errorf("cannot sync pinned data: %v", err)
}
}
if syncInternal, ok := p.internal.(syncDAGService); ok {
if err := syncInternal.Sync(); err != nil {
return fmt.Errorf("cannot sync pinning data: %v", err)
}
}
if err := p.dstore.Put(pinDatastoreKey, k.Bytes()); err != nil {
return fmt.Errorf("cannot store pin state: %v", err)
}
if err := p.dstore.Sync(pinDatastoreKey); err != nil {
return fmt.Errorf("cannot sync pin state: %v", err)
}
p.internalPin = internalset
return nil
}
// InternalPins returns all cids kept pinned for the internal state of the
// pinner
func (p *pinner) InternalPins(ctx context.Context) ([]cid.Cid, error) {
p.lock.Lock()
defer p.lock.Unlock()
return p.internalPin.Keys(), nil
}
// PinWithMode allows the user to have fine grained control over pin
// counts
func (p *pinner) PinWithMode(c cid.Cid, mode ipfspinner.Mode) {
p.lock.Lock()
defer p.lock.Unlock()
switch mode {
case ipfspinner.Recursive:
p.recursePin.Add(c)
case ipfspinner.Direct:
p.directPin.Add(c)
}
}
// hasChild recursively looks for a Cid among the children of a root Cid.
// The visit function can be used to shortcut already-visited branches.
func hasChild(ctx context.Context, ng ipld.NodeGetter, root cid.Cid, child cid.Cid, visit func(cid.Cid) bool) (bool, error) {
links, err := ipld.GetLinks(ctx, ng, root)
if err != nil {
return false, err
}
for _, lnk := range links {
c := lnk.Cid
if lnk.Cid.Equals(child) {
return true, nil
}
if visit(c) {
has, err := hasChild(ctx, ng, c, child, visit)
if err != nil {
return false, err
}
if has {
return has, nil
}
}
}
return false, nil
}
package ipldpinner
import (
"context"
"io"
"testing"
"time"
bs "github.com/ipfs/go-blockservice"
mdag "github.com/ipfs/go-merkledag"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
pin "github.com/ipfs/go-ipfs-pinner"
util "github.com/ipfs/go-ipfs-util"
)
var rand = util.NewTimeSeededRand()
func randNode() (*mdag.ProtoNode, cid.Cid) {
nd := new(mdag.ProtoNode)
nd.SetData(make([]byte, 32))
_, err := io.ReadFull(rand, nd.Data())
if err != nil {
panic(err)
}
k := nd.Cid()
return nd, k
}
func assertPinned(t *testing.T, p pin.Pinner, c cid.Cid, failmsg string) {
_, pinned, err := p.IsPinned(context.Background(), c)
if err != nil {
t.Fatal(err)
}
if !pinned {
t.Fatal(failmsg)
}
}
func assertUnpinned(t *testing.T, p pin.Pinner, c cid.Cid, failmsg string) {
_, pinned, err := p.IsPinned(context.Background(), c)
if err != nil {
t.Fatal(err)
}
if pinned {
t.Fatal(failmsg)
}
}
func TestPinnerBasic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
p, err := New(dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
}
a, ak := randNode()
err = dserv.Add(ctx, a)
if err != nil {
t.Fatal(err)
}
// Pin A{}
err = p.Pin(ctx, a, false)
if err != nil {
t.Fatal(err)
}
assertPinned(t, p, ak, "Failed to find key")
// create new node c, to be indirectly pinned through b
c, _ := randNode()
err = dserv.Add(ctx, c)
if err != nil {
t.Fatal(err)
}
ck := c.Cid()
// Create new node b, to be parent to a and c
b, _ := randNode()
err = b.AddNodeLink("child", a)
if err != nil {
t.Fatal(err)
}
err = b.AddNodeLink("otherchild", c)
if err != nil {
t.Fatal(err)
}
err = dserv.Add(ctx, b)
if err != nil {
t.Fatal(err)
}
bk := b.Cid()
// recursively pin B{A,C}
err = p.Pin(ctx, b, true)
if err != nil {
t.Fatal(err)
}
assertPinned(t, p, ck, "child of recursively pinned node not found")
assertPinned(t, p, bk, "Recursively pinned node not found..")
d, _ := randNode()
_ = d.AddNodeLink("a", a)
_ = d.AddNodeLink("c", c)
e, _ := randNode()
_ = d.AddNodeLink("e", e)
// Must be in dagserv for unpin to work
err = dserv.Add(ctx, e)
if err != nil {
t.Fatal(err)
}
err = dserv.Add(ctx, d)
if err != nil {
t.Fatal(err)
}
// Add D{A,C,E}
err = p.Pin(ctx, d, true)
if err != nil {
t.Fatal(err)
}
dk := d.Cid()
assertPinned(t, p, dk, "pinned node not found.")
// Test recursive unpin
err = p.Unpin(ctx, dk, true)
if err != nil {
t.Fatal(err)
}
err = p.Flush(ctx)
if err != nil {
t.Fatal(err)
}
np, err := New(dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
}
// Test directly pinned
assertPinned(t, np, ak, "Could not find pinned node!")
// Test recursively pinned
assertPinned(t, np, bk, "could not find recursively pinned node")
// Test that LoadKeys returns the expected CIDs.
keyChan := make(chan cid.Cid)
go func() {
err = LoadKeys(ctx, dstore, dserv, dserv, true, keyChan)
close(keyChan)
}()
keys := map[cid.Cid]struct{}{}
for c := range keyChan {
keys[c] = struct{}{}
}
if err != nil {
t.Fatal(err)
}
recKeys, _ := np.RecursiveKeys(ctx)
if len(keys) != len(recKeys) {
t.Fatal("wrong number of recursive keys from LoadKeys")
}
for _, k := range recKeys {
if _, ok := keys[k]; !ok {
t.Fatal("LoadKeys did not return correct recursive keys")
}
}
keyChan = make(chan cid.Cid)
go func() {
err = LoadKeys(ctx, dstore, dserv, dserv, false, keyChan)
close(keyChan)
}()
keys = map[cid.Cid]struct{}{}
for c := range keyChan {
keys[c] = struct{}{}
}
if err != nil {
t.Fatal(err)
}
dirKeys, _ := np.DirectKeys(ctx)
if len(keys) != len(dirKeys) {
t.Fatal("wrong number of direct keys from LoadKeys")
}
for _, k := range dirKeys {
if _, ok := keys[k]; !ok {
t.Fatal("LoadKeys did not return correct direct keys")
}
}
cancel()
emptyDS := dssync.MutexWrap(ds.NewMapDatastore())
// Check key not in datastore
err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil)
if err != nil {
t.Fatal(err)
}
// Check error on bad key
if err = emptyDS.Put(pinDatastoreKey, []byte("bad-cid")); err != nil {
panic(err)
}
if err = emptyDS.Sync(pinDatastoreKey); err != nil {
panic(err)
}
if err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil); err == nil {
t.Fatal("expected error")
}
// Lookup dag that does not exist
noKey, err := cid.Decode("QmYff9iHR1Hz6wufVeJodzXqQm4pkK4QNS9ms8tyPKVWm1")
if err != nil {
panic(err)
}
if err = emptyDS.Put(pinDatastoreKey, noKey.Bytes()); err != nil {
panic(err)
}
if err = emptyDS.Sync(pinDatastoreKey); err != nil {
panic(err)
}
err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil)
if err == nil || err.Error() != "cannot find pinning root object: merkledag: not found" {
t.Fatal("did not get expected error")
}
// Check error when node has no links
if err = emptyDS.Put(pinDatastoreKey, emptyKey.Bytes()); err != nil {
panic(err)
}
if err = emptyDS.Sync(pinDatastoreKey); err != nil {
panic(err)
}
if err = LoadKeys(ctx, emptyDS, dserv, dserv, true, nil); err == nil {
t.Fatal("expected error")
}
}
func TestIsPinnedLookup(t *testing.T) {
// We are going to test that lookups work in pins which share
// the same branches. For that we will construct this tree:
//
// A5->A4->A3->A2->A1->A0
// / /
// B------- /
// \ /
// C---------------
//
// We will ensure that IsPinned works for all objects both when they
// are pinned and once they have been unpinned.
aBranchLen := 6
if aBranchLen < 3 {
t.Fatal("set aBranchLen to at least 3")
}
ctx := context.Background()
dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
// TODO does pinner need to share datastore with blockservice?
p, err := New(dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
}
aNodes := make([]*mdag.ProtoNode, aBranchLen)
aKeys := make([]cid.Cid, aBranchLen)
for i := 0; i < aBranchLen; i++ {
a, _ := randNode()
if i >= 1 {
err := a.AddNodeLink("child", aNodes[i-1])
if err != nil {
t.Fatal(err)
}
}
err := dserv.Add(ctx, a)
if err != nil {
t.Fatal(err)
}
//t.Logf("a[%d] is %s", i, ak)
aNodes[i] = a
aKeys[i] = a.Cid()
}
// Pin A5 recursively
if err := p.Pin(ctx, aNodes[aBranchLen-1], true); err != nil {
t.Fatal(err)
}
// Create node B and add A3 as child
b, _ := randNode()
if err := b.AddNodeLink("mychild", aNodes[3]); err != nil {
t.Fatal(err)
}
// Create C node
c, _ := randNode()
// Add A0 as child of C
if err := c.AddNodeLink("child", aNodes[0]); err != nil {
t.Fatal(err)
}
// Add C
err = dserv.Add(ctx, c)
if err != nil {
t.Fatal(err)
}
ck := c.Cid()
//t.Logf("C is %s", ck)
// Add C to B and Add B
if err := b.AddNodeLink("myotherchild", c); err != nil {
t.Fatal(err)
}
err = dserv.Add(ctx, b)
if err != nil {
t.Fatal(err)
}
bk := b.Cid()
//t.Logf("B is %s", bk)
// Pin C recursively
if err := p.Pin(ctx, c, true); err != nil {
t.Fatal(err)
}
// Pin B recursively
if err := p.Pin(ctx, b, true); err != nil {
t.Fatal(err)
}
assertPinned(t, p, aKeys[0], "A0 should be pinned")
assertPinned(t, p, aKeys[1], "A1 should be pinned")
assertPinned(t, p, ck, "C should be pinned")
assertPinned(t, p, bk, "B should be pinned")
// Unpin A5 recursively
if err := p.Unpin(ctx, aKeys[5], true); err != nil {
t.Fatal(err)
}
assertPinned(t, p, aKeys[0], "A0 should still be pinned through B")
assertUnpinned(t, p, aKeys[4], "A4 should be unpinned")
// Unpin B recursively
if err := p.Unpin(ctx, bk, true); err != nil {
t.Fatal(err)
}
assertUnpinned(t, p, bk, "B should be unpinned")
assertUnpinned(t, p, aKeys[1], "A1 should be unpinned")
assertPinned(t, p, aKeys[0], "A0 should still be pinned through C")
}
func TestDuplicateSemantics(t *testing.T) {
ctx := context.Background()
dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
p, err := New(dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
}
a, _ := randNode()
err = dserv.Add(ctx, a)
if err != nil {
t.Fatal(err)
}
// pin is recursively
err = p.Pin(ctx, a, true)
if err != nil {
t.Fatal(err)
}
// pinning directly should fail
err = p.Pin(ctx, a, false)
if err == nil {
t.Fatal("expected direct pin to fail")
}
// pinning recursively again should succeed
err = p.Pin(ctx, a, true)
if err != nil {
t.Fatal(err)
}
}
func TestFlush(t *testing.T) {
dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
p, err := New(dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
}
_, k := randNode()
p.PinWithMode(k, pin.Recursive)
if err := p.Flush(context.Background()); err != nil {
t.Fatal(err)
}
assertPinned(t, p, k, "expected key to still be pinned")
}
func TestPinRecursiveFail(t *testing.T) {
ctx := context.Background()
dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
p, err := New(dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
}
a, _ := randNode()
b, _ := randNode()
err = a.AddNodeLink("child", b)
if err != nil {
t.Fatal(err)
}
// NOTE: This isnt a time based test, we expect the pin to fail
mctx, cancel := context.WithTimeout(ctx, time.Millisecond)
defer cancel()
err = p.Pin(mctx, a, true)
if err == nil {
t.Fatal("should have failed to pin here")
}
err = dserv.Add(ctx, b)
if err != nil {
t.Fatal(err)
}
err = dserv.Add(ctx, a)
if err != nil {
t.Fatal(err)
}
// this one is time based... but shouldnt cause any issues
mctx, cancel = context.WithTimeout(ctx, time.Second)
defer cancel()
err = p.Pin(mctx, a, true)
if err != nil {
t.Fatal(err)
}
}
func TestPinUpdate(t *testing.T) {
ctx := context.Background()
dstore := dssync.MutexWrap(ds.NewMapDatastore())
bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
p, err := New(dstore, dserv, dserv)
if err != nil {
t.Fatal(err)
}
n1, c1 := randNode()
n2, c2 := randNode()
if err := dserv.Add(ctx, n1); err != nil {
t.Fatal(err)
}
if err := dserv.Add(ctx, n2); err != nil {
t.Fatal(err)
}
if err := p.Pin(ctx, n1, true); err != nil {
t.Fatal(err)
}
if err := p.Update(ctx, c1, c2, true); err != nil {
t.Fatal(err)
}
assertPinned(t, p, c2, "c2 should be pinned now")
assertUnpinned(t, p, c1, "c1 should no longer be pinned")
if err := p.Update(ctx, c2, c1, false); err != nil {
t.Fatal(err)
}
assertPinned(t, p, c2, "c2 should be pinned still")
assertPinned(t, p, c1, "c1 should be pinned now")
}
This diff is collapsed.
package ipldpinner
import (
"context"
"encoding/binary"
"testing"
bserv "github.com/ipfs/go-blockservice"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
blockstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
dag "github.com/ipfs/go-merkledag"
)
func ignoreCids(_ cid.Cid) {}
func objCount(d ds.Datastore) int {
q := dsq.Query{KeysOnly: true}
res, err := d.Query(q)
if err != nil {
panic(err)
}
var count int
for {
_, ok := res.NextSync()
if !ok {
break
}
count++
}
return count
}
func TestSet(t *testing.T) {
dst := ds.NewMapDatastore()
bstore := blockstore.NewBlockstore(dst)
ds := dag.NewDAGService(bserv.New(bstore, offline.Exchange(bstore)))
// this value triggers the creation of a recursive shard.
// If the recursive sharding is done improperly, this will result in
// an infinite recursion and crash (OOM)
limit := uint32((defaultFanout * maxItems) + 1)
var inputs []cid.Cid
buf := make([]byte, 4)
for i := uint32(0); i < limit; i++ {
binary.BigEndian.PutUint32(buf, i)
c := dag.NewRawNode(buf).Cid()
inputs = append(inputs, c)
}
_, err := storeSet(context.Background(), ds, inputs[:len(inputs)-1], ignoreCids)
if err != nil {
t.Fatal(err)
}
objs1 := objCount(dst)
out, err := storeSet(context.Background(), ds, inputs, ignoreCids)
if err != nil {
t.Fatal(err)
}
objs2 := objCount(dst)
if objs2-objs1 > 2 {
t.Fatal("set sharding does not appear to be deterministic")
}
// weird wrapper node because loadSet expects us to pass an
// object pointing to multiple named sets
setroot := &dag.ProtoNode{}
err = setroot.AddNodeLink("foo", out)
if err != nil {
t.Fatal(err)
}
outset, err := loadSet(context.Background(), ds, setroot, "foo", ignoreCids)
if err != nil {
t.Fatal(err)
}
if uint32(len(outset)) != limit {
t.Fatal("got wrong number", len(outset), limit)
}
seen := cid.NewSet()
for _, c := range outset {
seen.Add(c)
}
for _, c := range inputs {
if !seen.Has(c) {
t.Fatalf("expected to have '%s', didnt find it", c)
}
}
}
This diff is collapsed.
// Package pinconv converts pins between the dag-based ipldpinner and the
// datastore-based dspinner. Once conversion is complete, the pins from the
// source pinner are removed.
package pinconv
import (
"context"
"fmt"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ipfspinner "github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipfs-pinner/dspinner"
"github.com/ipfs/go-ipfs-pinner/ipldpinner"
ipld "github.com/ipfs/go-ipld-format"
)
// ConvertPinsFromIPLDToDS converts pins stored in mdag based storage to pins
// stores in the datastore. Returns a dspinner loaded with the converted pins,
// and a count of the recursive and direct pins converted.
//
// After pins are stored in datastore, the root pin key is deleted to unlink
// the pin data in the DAGService.
func ConvertPinsFromIPLDToDS(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, internal ipld.DAGService) (ipfspinner.Pinner, int, error) {
const ipldPinPath = "/local/pins"
dsPinner, err := dspinner.New(ctx, dstore, dserv)
if err != nil {
return nil, 0, err
}
var convCount int
keyChan := make(chan cid.Cid)
go func() {
err = ipldpinner.LoadKeys(ctx, dstore, dserv, internal, true, keyChan)
close(keyChan)
}()
for key := range keyChan {
dsPinner.PinWithMode(key, ipfspinner.Recursive)
convCount++
}
if err != nil {
return nil, 0, fmt.Errorf("cannot load recursive keys: %s", err)
}
keyChan = make(chan cid.Cid)
go func() {
err = ipldpinner.LoadKeys(ctx, dstore, dserv, internal, false, keyChan)
close(keyChan)
}()
for key := range keyChan {
dsPinner.PinWithMode(key, ipfspinner.Direct)
convCount++
}
if err != nil {
return nil, 0, fmt.Errorf("cannot load direct keys: %s", err)
}
err = dsPinner.Flush(ctx)
if err != nil {
return nil, 0, err
}
// Delete root mdag key from datastore to remove old pin storage.
ipldPinDatastoreKey := ds.NewKey(ipldPinPath)
if err = dstore.Delete(ipldPinDatastoreKey); err != nil {
return nil, 0, fmt.Errorf("cannot delete old pin state: %v", err)
}
if err = dstore.Sync(ipldPinDatastoreKey); err != nil {
return nil, 0, fmt.Errorf("cannot sync old pin state: %v", err)
}
return dsPinner, convCount, nil
}
// ConvertPinsFromDSToIPLD converts the pins stored in the datastore by
// dspinner, into pins stored in the given internal DAGService by ipldpinner.
// Returns an ipldpinner loaded with the converted pins, and a count of the
// recursive and direct pins converted.
//
// After the pins are stored in the DAGService, the pins and their indexes are
// removed from the dspinner.
func ConvertPinsFromDSToIPLD(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, internal ipld.DAGService) (ipfspinner.Pinner, int, error) {
dsPinner, err := dspinner.New(ctx, dstore, dserv)
if err != nil {
return nil, 0, err
}
ipldPinner, err := ipldpinner.New(dstore, dserv, internal)
if err != nil {
return nil, 0, err
}
cids, err := dsPinner.RecursiveKeys(ctx)
if err != nil {
return nil, 0, err
}
for i := range cids {
ipldPinner.PinWithMode(cids[i], ipfspinner.Recursive)
dsPinner.RemovePinWithMode(cids[i], ipfspinner.Recursive)
}
convCount := len(cids)
cids, err = dsPinner.DirectKeys(ctx)
if err != nil {
return nil, 0, err
}
for i := range cids {
ipldPinner.PinWithMode(cids[i], ipfspinner.Direct)
dsPinner.RemovePinWithMode(cids[i], ipfspinner.Direct)
}
convCount += len(cids)
// Save the ipldpinner pins
err = ipldPinner.Flush(ctx)
if err != nil {
return nil, 0, err
}
err = dsPinner.Flush(ctx)
if err != nil {
return nil, 0, err
}
return ipldPinner, convCount, nil
}
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment