Commit e2be4914 authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub

Merge pull request #3640 from ipfs/fix/pinset-obj-explosion

Make pinset sharding deterministic
parents 1ae5c4a6 ccb8151b
......@@ -3,7 +3,6 @@ package pin
import (
"bytes"
"context"
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
......@@ -26,14 +25,6 @@ const (
maxItems = 8192
)
func randomSeed() (uint32, error) {
var buf [4]byte
if _, err := rand.Read(buf[:]); err != nil {
return 0, err
}
return binary.LittleEndian.Uint32(buf[:]), nil
}
func hash(seed uint32, c *cid.Cid) uint32 {
var buf [4]byte
binary.LittleEndian.PutUint32(buf[:], seed)
......@@ -63,11 +54,7 @@ func (s sortByHash) Swap(a, b int) {
s.links[a], s.links[b] = s.links[b], s.links[a]
}
func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint64, iter itemIterator, internalKeys keyObserver) (*merkledag.ProtoNode, error) {
seed, err := randomSeed()
if err != nil {
return nil, err
}
func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint64, depth uint32, iter itemIterator, internalKeys keyObserver) (*merkledag.ProtoNode, error) {
links := make([]*node.Link, 0, defaultFanout+maxItems)
for i := 0; i < defaultFanout; i++ {
links = append(links, &node.Link{Cid: emptyKey})
......@@ -82,7 +69,7 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint
hdr := &pb.Set{
Version: proto.Uint32(1),
Fanout: proto.Uint32(defaultFanout),
Seed: proto.Uint32(seed),
Seed: proto.Uint32(depth),
}
if err := writeHdr(n, hdr); err != nil {
return nil, err
......@@ -129,7 +116,7 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint
if !ok {
break
}
h := hash(seed, k) % defaultFanout
h := hash(depth, k) % defaultFanout
hashed[h] = append(hashed[h], k)
}
......@@ -142,7 +129,7 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint
childIter := getCidListIterator(items)
// recursively create a pinset from the items for this bucket index
child, err := storeItems(ctx, dag, uint64(len(items)), childIter, internalKeys)
child, err := storeItems(ctx, dag, uint64(len(items)), depth+1, childIter, internalKeys)
if err != nil {
return nil, err
}
......@@ -296,7 +283,7 @@ func getCidListIterator(cids []*cid.Cid) itemIterator {
func storeSet(ctx context.Context, dag merkledag.DAGService, cids []*cid.Cid, internalKeys keyObserver) (*merkledag.ProtoNode, error) {
iter := getCidListIterator(cids)
n, err := storeItems(ctx, dag, uint64(len(cids)), iter, internalKeys)
n, err := storeItems(ctx, dag, uint64(len(cids)), 0, iter, internalKeys)
if err != nil {
return nil, err
}
......
......@@ -2,40 +2,75 @@ package pin
import (
"context"
"fmt"
"os"
"encoding/binary"
"testing"
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
bserv "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
dag "github.com/ipfs/go-ipfs/merkledag"
mdtest "github.com/ipfs/go-ipfs/merkledag/test"
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
dsq "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/query"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
)
func ignoreCids(_ *cid.Cid) {}
func TestSet(t *testing.T) {
ds := mdtest.Mock()
limit := 10000 // 10000 reproduces the pinloss issue fairly reliably
if os.Getenv("STRESS_IT_OUT_YO") != "" {
limit = 10000000
func objCount(d ds.Datastore) int {
q := dsq.Query{KeysOnly: true}
res, err := d.Query(q)
if err != nil {
panic(err)
}
var inputs []*cid.Cid
for i := 0; i < limit; i++ {
c, err := ds.Add(dag.NodeWithData([]byte(fmt.Sprint(i))))
if err != nil {
t.Fatal(err)
var count int
for {
_, ok := res.NextSync()
if !ok {
break
}
count++
}
return count
}
func TestSet(t *testing.T) {
dst := ds.NewMapDatastore()
bstore := blockstore.NewBlockstore(dst)
ds := dag.NewDAGService(bserv.New(bstore, offline.Exchange(bstore)))
// this value triggers the creation of a recursive shard.
// If the recursive sharding is done improperly, this will result in
// an infinite recursion and crash (OOM)
limit := uint32((defaultFanout * maxItems) + 1)
var inputs []*cid.Cid
buf := make([]byte, 4)
for i := uint32(0); i < limit; i++ {
binary.BigEndian.PutUint32(buf, i)
c := dag.NewRawNode(buf).Cid()
inputs = append(inputs, c)
}
_, err := storeSet(context.Background(), ds, inputs[:len(inputs)-1], ignoreCids)
if err != nil {
t.Fatal(err)
}
objs1 := objCount(dst)
out, err := storeSet(context.Background(), ds, inputs, ignoreCids)
if err != nil {
t.Fatal(err)
}
objs2 := objCount(dst)
if objs2-objs1 > 2 {
t.Fatal("set sharding does not appear to be deterministic")
}
// weird wrapper node because loadSet expects us to pass an
// object pointing to multiple named sets
setroot := &dag.ProtoNode{}
......@@ -49,7 +84,7 @@ func TestSet(t *testing.T) {
t.Fatal(err)
}
if len(outset) != limit {
if uint32(len(outset)) != limit {
t.Fatal("got wrong number", len(outset), limit)
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment