Commit ccb8151b authored by Jeromy's avatar Jeromy

Make pinset sharding deterministic

Making this deterministic keeps us from creating an exponential amount
of objects as the number of pins in the set increases.

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent 235ff52e
...@@ -3,7 +3,6 @@ package pin ...@@ -3,7 +3,6 @@ package pin
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/rand"
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
...@@ -26,14 +25,6 @@ const ( ...@@ -26,14 +25,6 @@ const (
maxItems = 8192 maxItems = 8192
) )
func randomSeed() (uint32, error) {
var buf [4]byte
if _, err := rand.Read(buf[:]); err != nil {
return 0, err
}
return binary.LittleEndian.Uint32(buf[:]), nil
}
func hash(seed uint32, c *cid.Cid) uint32 { func hash(seed uint32, c *cid.Cid) uint32 {
var buf [4]byte var buf [4]byte
binary.LittleEndian.PutUint32(buf[:], seed) binary.LittleEndian.PutUint32(buf[:], seed)
...@@ -63,11 +54,7 @@ func (s sortByHash) Swap(a, b int) { ...@@ -63,11 +54,7 @@ func (s sortByHash) Swap(a, b int) {
s.links[a], s.links[b] = s.links[b], s.links[a] s.links[a], s.links[b] = s.links[b], s.links[a]
} }
func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint64, iter itemIterator, internalKeys keyObserver) (*merkledag.ProtoNode, error) { func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint64, depth uint32, iter itemIterator, internalKeys keyObserver) (*merkledag.ProtoNode, error) {
seed, err := randomSeed()
if err != nil {
return nil, err
}
links := make([]*node.Link, 0, defaultFanout+maxItems) links := make([]*node.Link, 0, defaultFanout+maxItems)
for i := 0; i < defaultFanout; i++ { for i := 0; i < defaultFanout; i++ {
links = append(links, &node.Link{Cid: emptyKey}) links = append(links, &node.Link{Cid: emptyKey})
...@@ -82,7 +69,7 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint ...@@ -82,7 +69,7 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint
hdr := &pb.Set{ hdr := &pb.Set{
Version: proto.Uint32(1), Version: proto.Uint32(1),
Fanout: proto.Uint32(defaultFanout), Fanout: proto.Uint32(defaultFanout),
Seed: proto.Uint32(seed), Seed: proto.Uint32(depth),
} }
if err := writeHdr(n, hdr); err != nil { if err := writeHdr(n, hdr); err != nil {
return nil, err return nil, err
...@@ -129,7 +116,7 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint ...@@ -129,7 +116,7 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint
if !ok { if !ok {
break break
} }
h := hash(seed, k) % defaultFanout h := hash(depth, k) % defaultFanout
hashed[h] = append(hashed[h], k) hashed[h] = append(hashed[h], k)
} }
...@@ -142,7 +129,7 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint ...@@ -142,7 +129,7 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint
childIter := getCidListIterator(items) childIter := getCidListIterator(items)
// recursively create a pinset from the items for this bucket index // recursively create a pinset from the items for this bucket index
child, err := storeItems(ctx, dag, uint64(len(items)), childIter, internalKeys) child, err := storeItems(ctx, dag, uint64(len(items)), depth+1, childIter, internalKeys)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -296,7 +283,7 @@ func getCidListIterator(cids []*cid.Cid) itemIterator { ...@@ -296,7 +283,7 @@ func getCidListIterator(cids []*cid.Cid) itemIterator {
func storeSet(ctx context.Context, dag merkledag.DAGService, cids []*cid.Cid, internalKeys keyObserver) (*merkledag.ProtoNode, error) { func storeSet(ctx context.Context, dag merkledag.DAGService, cids []*cid.Cid, internalKeys keyObserver) (*merkledag.ProtoNode, error) {
iter := getCidListIterator(cids) iter := getCidListIterator(cids)
n, err := storeItems(ctx, dag, uint64(len(cids)), iter, internalKeys) n, err := storeItems(ctx, dag, uint64(len(cids)), 0, iter, internalKeys)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -2,40 +2,75 @@ package pin ...@@ -2,40 +2,75 @@ package pin
import ( import (
"context" "context"
"fmt" "encoding/binary"
"os"
"testing" "testing"
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
bserv "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
dag "github.com/ipfs/go-ipfs/merkledag" dag "github.com/ipfs/go-ipfs/merkledag"
mdtest "github.com/ipfs/go-ipfs/merkledag/test"
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
dsq "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/query"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
) )
func ignoreCids(_ *cid.Cid) {} func ignoreCids(_ *cid.Cid) {}
func TestSet(t *testing.T) { func objCount(d ds.Datastore) int {
ds := mdtest.Mock() q := dsq.Query{KeysOnly: true}
limit := 10000 // 10000 reproduces the pinloss issue fairly reliably res, err := d.Query(q)
if err != nil {
if os.Getenv("STRESS_IT_OUT_YO") != "" { panic(err)
limit = 10000000
} }
var inputs []*cid.Cid
for i := 0; i < limit; i++ { var count int
c, err := ds.Add(dag.NodeWithData([]byte(fmt.Sprint(i)))) for {
if err != nil { _, ok := res.NextSync()
t.Fatal(err) if !ok {
break
} }
count++
}
return count
}
func TestSet(t *testing.T) {
dst := ds.NewMapDatastore()
bstore := blockstore.NewBlockstore(dst)
ds := dag.NewDAGService(bserv.New(bstore, offline.Exchange(bstore)))
// this value triggers the creation of a recursive shard.
// If the recursive sharding is done improperly, this will result in
// an infinite recursion and crash (OOM)
limit := uint32((defaultFanout * maxItems) + 1)
var inputs []*cid.Cid
buf := make([]byte, 4)
for i := uint32(0); i < limit; i++ {
binary.BigEndian.PutUint32(buf, i)
c := dag.NewRawNode(buf).Cid()
inputs = append(inputs, c) inputs = append(inputs, c)
} }
_, err := storeSet(context.Background(), ds, inputs[:len(inputs)-1], ignoreCids)
if err != nil {
t.Fatal(err)
}
objs1 := objCount(dst)
out, err := storeSet(context.Background(), ds, inputs, ignoreCids) out, err := storeSet(context.Background(), ds, inputs, ignoreCids)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
objs2 := objCount(dst)
if objs2-objs1 > 2 {
t.Fatal("set sharding does not appear to be deterministic")
}
// weird wrapper node because loadSet expects us to pass an // weird wrapper node because loadSet expects us to pass an
// object pointing to multiple named sets // object pointing to multiple named sets
setroot := &dag.ProtoNode{} setroot := &dag.ProtoNode{}
...@@ -49,7 +84,7 @@ func TestSet(t *testing.T) { ...@@ -49,7 +84,7 @@ func TestSet(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if len(outset) != limit { if uint32(len(outset)) != limit {
t.Fatal("got wrong number", len(outset), limit) t.Fatal("got wrong number", len(outset), limit)
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment