You need to sign in or sign up before continuing.
set.go 7.55 KB
Newer Older
1
package ldpinner
2 3 4

import (
	"bytes"
5
	"context"
6 7 8 9 10 11
	"encoding/binary"
	"errors"
	"fmt"
	"hash/fnv"
	"sort"

Jakub Sztandera's avatar
Jakub Sztandera committed
12
	"github.com/gogo/protobuf/proto"
13 14 15
	cid "gitlab.dms3.io/dms3/go-cid"
	ld "gitlab.dms3.io/dms3/go-ld-format"
	"gitlab.dms3.io/dms3/go-merkledag"
Michael Muré's avatar
Michael Muré committed
16

17
	"gitlab.dms3.io/dms3/go-dms3-pinner/internal/pb"
18 19 20
)

const (
21
	// defaultFanout specifies the default number of fan-out links per layer
22
	defaultFanout = 256
23 24 25

	// maxItems is the maximum number of items that will fit in a single bucket
	maxItems = 8192
26 27
)

28
func hash(seed uint32, c cid.Cid) uint32 {
29 30 31 32
	var buf [4]byte
	binary.LittleEndian.PutUint32(buf[:], seed)
	h := fnv.New32a()
	_, _ = h.Write(buf[:])
Jeromy's avatar
Jeromy committed
33
	_, _ = h.Write(c.Bytes())
34 35 36
	return h.Sum32()
}

37
type itemIterator func() (c cid.Cid, ok bool)
38

39
type keyObserver func(cid.Cid)
40 41

type sortByHash struct {
42
	links []*ld.Link
43 44 45 46 47 48 49
}

func (s sortByHash) Len() int {
	return len(s.links)
}

func (s sortByHash) Less(a, b int) bool {
50
	return bytes.Compare(s.links[a].Cid.Bytes(), s.links[b].Cid.Bytes()) == -1
51 52 53 54 55 56
}

func (s sortByHash) Swap(a, b int) {
	s.links[a], s.links[b] = s.links[b], s.links[a]
}

57
func storeItems(ctx context.Context, dag ld.DAGService, estimatedLen uint64, depth uint32, iter itemIterator, internalKeys keyObserver) (*merkledag.ProtoNode, error) {
Andrew Gillis's avatar
Andrew Gillis committed
58 59 60 61 62
	// Each node wastes up to defaultFanout in empty links.
	var leafLinks uint64
	if estimatedLen < maxItems {
		leafLinks = estimatedLen
	}
63
	links := make([]*ld.Link, defaultFanout, defaultFanout+leafLinks)
64
	for i := 0; i < defaultFanout; i++ {
65
		links[i] = &ld.Link{Cid: emptyKey}
66
	}
67 68

	// add emptyKey to our set of internal pinset objects
69 70 71
	n := &merkledag.ProtoNode{}
	n.SetLinks(links)

72
	internalKeys(emptyKey)
73

74
	hdr := &pb.Set{
75 76 77
		Version: 1,
		Fanout:  defaultFanout,
		Seed:    depth,
78 79 80 81 82 83 84
	}
	if err := writeHdr(n, hdr); err != nil {
		return nil, err
	}

	if estimatedLen < maxItems {
		// it'll probably fit
85
		links := n.Links()
86
		for i := 0; i < maxItems; i++ {
87
			k, ok := iter()
88 89 90 91
			if !ok {
				// all done
				break
			}
92

93
			links = append(links, &ld.Link{Cid: k})
94
		}
95 96 97

		n.SetLinks(links)

98 99
		// sort by hash, also swap item Data
		s := sortByHash{
100
			links: n.Links()[defaultFanout:],
101 102 103 104
		}
		sort.Stable(s)
	}

Andrew Gillis's avatar
Andrew Gillis committed
105
	var hashed [][]cid.Cid
106
	for {
107 108 109 110 111 112 113 114 115 116 117 118 119
		// This loop essentially enumerates every single item in the set
		// and maps them all into a set of buckets. Each bucket will be recursively
		// turned into its own sub-set, and so on down the chain. Each sub-set
		// gets added to the dagservice, and put into its place in a set nodes
		// links array.
		//
		// Previously, the bucket was selected by taking an int32 from the hash of
		// the input key + seed. This was erroneous as we would later be assigning
		// the created sub-sets into an array of length 256 by the modulus of the
		// int32 hash value with 256. This resulted in overwriting existing sub-sets
		// and losing pins. The fix (a few lines down from this comment), is to
		// map the hash value down to the 8 bit keyspace here while creating the
		// buckets. This way, we avoid any overlapping later on.
120
		k, ok := iter()
121 122 123
		if !ok {
			break
		}
Andrew Gillis's avatar
Andrew Gillis committed
124 125 126
		if hashed == nil {
			hashed = make([][]cid.Cid, defaultFanout)
		}
Jeromy's avatar
Jeromy committed
127
		h := hash(depth, k) % defaultFanout
128
		hashed[h] = append(hashed[h], k)
129
	}
130

131
	for h, items := range hashed {
132 133 134 135 136
		if len(items) == 0 {
			// recursion base case
			continue
		}

137
		childIter := getCidListIterator(items)
138

139
		// recursively create a pinset from the items for this bucket index
Jeromy's avatar
Jeromy committed
140
		child, err := storeItems(ctx, dag, uint64(len(items)), depth+1, childIter, internalKeys)
141 142 143
		if err != nil {
			return nil, err
		}
144

145 146 147 148
		size, err := child.Size()
		if err != nil {
			return nil, err
		}
149

150
		err = dag.Add(ctx, child)
151 152 153
		if err != nil {
			return nil, err
		}
154
		childKey := child.Cid()
155

156
		internalKeys(childKey)
157 158

		// overwrite the 'empty key' in the existing links array
159
		n.Links()[h] = &ld.Link{
160
			Cid:  childKey,
161 162 163 164 165 166
			Size: size,
		}
	}
	return n, nil
}

167
func readHdr(n *merkledag.ProtoNode) (*pb.Set, error) {
168
	hdrLenRaw, consumed := binary.Uvarint(n.Data())
169
	if consumed <= 0 {
170
		return nil, errors.New("invalid Set header length")
171
	}
172 173 174 175

	pbdata := n.Data()[consumed:]
	if hdrLenRaw > uint64(len(pbdata)) {
		return nil, errors.New("impossibly large Set header length")
176 177 178 179
	}
	// as hdrLenRaw was <= an int, we now know it fits in an int
	hdrLen := int(hdrLenRaw)
	var hdr pb.Set
180 181
	if err := proto.Unmarshal(pbdata[:hdrLen], &hdr); err != nil {
		return nil, err
182 183 184
	}

	if v := hdr.GetVersion(); v != 1 {
185
		return nil, fmt.Errorf("unsupported Set version: %d", v)
186
	}
187
	if uint64(hdr.GetFanout()) > uint64(len(n.Links())) {
188
		return nil, errors.New("impossibly large Fanout")
189
	}
190
	return &hdr, nil
191 192
}

193
func writeHdr(n *merkledag.ProtoNode, hdr *pb.Set) error {
194 195 196 197
	hdrData, err := proto.Marshal(hdr)
	if err != nil {
		return err
	}
198

Łukasz Magiera's avatar
Łukasz Magiera committed
199
	// make enough space for the length prefix and the marshaled header data
200 201 202 203 204 205 206 207 208
	data := make([]byte, binary.MaxVarintLen64, binary.MaxVarintLen64+len(hdrData))

	// write the uvarint length of the header data
	uvarlen := binary.PutUvarint(data, uint64(len(hdrData)))

	// append the actual protobuf data *after* the length value we wrote
	data = append(data[:uvarlen], hdrData...)

	n.SetData(data)
209 210 211
	return nil
}

212
type walkerFunc func(idx int, link *ld.Link) error
213

214
func walkItems(ctx context.Context, dag ld.DAGService, n *merkledag.ProtoNode, fn walkerFunc, children keyObserver) error {
215
	hdr, err := readHdr(n)
216 217 218 219 220
	if err != nil {
		return err
	}
	// readHdr guarantees fanout is a safe value
	fanout := hdr.GetFanout()
221
	for i, l := range n.Links()[fanout:] {
222
		if err = fn(i, l); err != nil {
223 224 225
			return err
		}
	}
226 227
	for _, l := range n.Links()[:fanout] {
		c := l.Cid
228 229 230
		if children != nil {
			children(c)
		}
Jeromy's avatar
Jeromy committed
231
		if c.Equals(emptyKey) {
232 233 234 235 236 237
			continue
		}
		subtree, err := l.GetNode(ctx, dag)
		if err != nil {
			return err
		}
238 239 240 241 242 243

		stpb, ok := subtree.(*merkledag.ProtoNode)
		if !ok {
			return merkledag.ErrNotProtobuf
		}

244
		if err = walkItems(ctx, dag, stpb, fn, children); err != nil {
245 246 247 248 249 250
			return err
		}
	}
	return nil
}

251
func loadSet(ctx context.Context, dag ld.DAGService, root *merkledag.ProtoNode, name string, internalKeys keyObserver) ([]cid.Cid, error) {
252 253 254 255
	l, err := root.GetNodeLink(name)
	if err != nil {
		return nil, err
	}
Jeromy's avatar
Jeromy committed
256

257
	lnkc := l.Cid
Jeromy's avatar
Jeromy committed
258 259
	internalKeys(lnkc)

260 261 262 263 264
	n, err := l.GetNode(ctx, dag)
	if err != nil {
		return nil, err
	}

265 266 267 268 269
	pbn, ok := n.(*merkledag.ProtoNode)
	if !ok {
		return nil, merkledag.ErrNotProtobuf
	}

270
	var res []cid.Cid
271
	walk := func(idx int, link *ld.Link) error {
272
		res = append(res, link.Cid)
273 274
		return nil
	}
275 276

	if err := walkItems(ctx, dag, pbn, walk, internalKeys); err != nil {
277 278 279 280 281
		return nil, err
	}
	return res, nil
}

282
func loadSetChan(ctx context.Context, dag ld.DAGService, root *merkledag.ProtoNode, name string, keyChan chan<- cid.Cid) error {
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
	l, err := root.GetNodeLink(name)
	if err != nil {
		return err
	}

	n, err := l.GetNode(ctx, dag)
	if err != nil {
		return err
	}

	pbn, ok := n.(*merkledag.ProtoNode)
	if !ok {
		return merkledag.ErrNotProtobuf
	}

298
	walk := func(idx int, link *ld.Link) error {
299 300 301 302 303 304 305 306 307 308
		keyChan <- link.Cid
		return nil
	}

	if err = walkItems(ctx, dag, pbn, walk, nil); err != nil {
		return err
	}
	return nil
}

309 310
func getCidListIterator(cids []cid.Cid) itemIterator {
	return func() (c cid.Cid, ok bool) {
Jeromy's avatar
Jeromy committed
311
		if len(cids) == 0 {
312
			return cid.Cid{}, false
313
		}
314

Jeromy's avatar
Jeromy committed
315 316
		first := cids[0]
		cids = cids[1:]
317
		return first, true
318
	}
319 320
}

321
func storeSet(ctx context.Context, dag ld.DAGService, cids []cid.Cid, internalKeys keyObserver) (*merkledag.ProtoNode, error) {
322 323
	iter := getCidListIterator(cids)

Jeromy's avatar
Jeromy committed
324
	n, err := storeItems(ctx, dag, uint64(len(cids)), 0, iter, internalKeys)
325 326 327
	if err != nil {
		return nil, err
	}
328
	err = dag.Add(ctx, n)
329 330 331
	if err != nil {
		return nil, err
	}
332
	internalKeys(n.Cid())
333 334
	return n, nil
}