hamt.go 14.4 KB
Newer Older
Jeromy's avatar
Jeromy committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// Package hamt implements a Hash Array Mapped Trie over ipfs merkledag nodes.
// It is implemented mostly as described in the wikipedia article on HAMTs,
// however the table size is variable (usually 256 in our usages) as opposed to
// 32 as suggested in the article.  The hash function used is currently
// Murmur3, but this value is configurable (the datastructure reports which
// hash function its using).
//
// The one algorithmic change we implement that is not mentioned in the
// wikipedia article is the collapsing of empty shards.
// Given the following tree: ( '[' = shards, '{' = values )
// [ 'A' ] -> [ 'B' ] -> { "ABC" }
//    |       L-> { "ABD" }
//    L-> { "ASDF" }
// If we simply removed "ABC", we would end up with a tree where shard 'B' only
// has a single child.  This causes two issues, the first, is that now we have
// an extra lookup required to get to "ABD".  The second issue is that now we
// have a tree that contains only "ABD", but is not the same tree that we would
// get by simply inserting "ABD" into a new tree.  To address this, we always
// check for empty shard nodes upon deletion and prune them to maintain a
// consistent tree, independent of insertion order.
21 22 23 24 25 26 27
package hamt

import (
	"context"
	"fmt"
	"os"

Jeromy's avatar
Jeromy committed
28 29 30
	bitfield "github.com/Stebalien/go-bitfield"
	cid "github.com/ipfs/go-cid"
	ipld "github.com/ipfs/go-ipld-format"
Kevin Atkinson's avatar
Kevin Atkinson committed
31
	dag "github.com/ipfs/go-merkledag"
Jeromy's avatar
Jeromy committed
32
	"github.com/spaolacci/murmur3"
33 34

	format "github.com/ipfs/go-unixfs"
35 36 37
)

const (
Hector Sanjuan's avatar
Hector Sanjuan committed
38
	// HashMurmur3 is the multiformats identifier for Murmur3
39 40 41
	HashMurmur3 uint64 = 0x22
)

42
func (ds *Shard) isValueNode() bool {
Overbool's avatar
Overbool committed
43
	return ds.key != "" && ds.val != nil
44 45
}

Hector Sanjuan's avatar
Hector Sanjuan committed
46 47
// A Shard represents the HAMT. It should be initialized with NewShard().
type Shard struct {
48 49
	nd *dag.ProtoNode

Steven Allen's avatar
Steven Allen committed
50
	bitfield bitfield.Bitfield
51

52
	children []*Shard
53 54 55 56

	tableSize    int
	tableSizeLg2 int

57
	builder  cid.Builder
58 59 60 61 62
	hashFunc uint64

	prefixPadStr string
	maxpadlen    int

63
	dserv ipld.DAGService
64

65 66 67
	// leaf node
	key string
	val *ipld.Link
68 69
}

Hector Sanjuan's avatar
Hector Sanjuan committed
70 71 72
// NewShard creates a new, empty HAMT shard with the given size.
func NewShard(dserv ipld.DAGService, size int) (*Shard, error) {
	ds, err := makeShard(dserv, size)
73 74 75 76
	if err != nil {
		return nil, err
	}

77 78
	ds.nd = new(dag.ProtoNode)
	ds.hashFunc = HashMurmur3
79
	return ds, nil
80 81
}

Hector Sanjuan's avatar
Hector Sanjuan committed
82
func makeShard(ds ipld.DAGService, size int) (*Shard, error) {
Steven Allen's avatar
Steven Allen committed
83 84 85
	lg2s, err := logtwo(size)
	if err != nil {
		return nil, err
86
	}
87
	maxpadding := fmt.Sprintf("%X", size-1)
Hector Sanjuan's avatar
Hector Sanjuan committed
88
	return &Shard{
89
		tableSizeLg2: lg2s,
90 91
		prefixPadStr: fmt.Sprintf("%%0%dX", len(maxpadding)),
		maxpadlen:    len(maxpadding),
Steven Allen's avatar
Steven Allen committed
92
		bitfield:     bitfield.NewBitfield(size),
93 94
		tableSize:    size,
		dserv:        ds,
95
	}, nil
96 97
}

Steven Allen's avatar
Steven Allen committed
98
// NewHamtFromDag creates new a HAMT shard from the given DAG.
Hector Sanjuan's avatar
Hector Sanjuan committed
99
func NewHamtFromDag(dserv ipld.DAGService, nd ipld.Node) (*Shard, error) {
100 101
	pbnd, ok := nd.(*dag.ProtoNode)
	if !ok {
102
		return nil, dag.ErrNotProtobuf
103 104
	}

Overbool's avatar
Overbool committed
105
	fsn, err := format.FSNodeFromBytes(pbnd.Data())
106 107 108 109
	if err != nil {
		return nil, err
	}

Overbool's avatar
Overbool committed
110
	if fsn.Type() != format.THAMTShard {
111 112 113
		return nil, fmt.Errorf("node was not a dir shard")
	}

Overbool's avatar
Overbool committed
114
	if fsn.HashType() != HashMurmur3 {
115 116 117
		return nil, fmt.Errorf("only murmur3 supported as hash function")
	}

Overbool's avatar
Overbool committed
118
	ds, err := makeShard(dserv, int(fsn.Fanout()))
119 120 121 122
	if err != nil {
		return nil, err
	}

123
	ds.nd = pbnd.Copy().(*dag.ProtoNode)
124
	ds.children = make([]*Shard, len(pbnd.Links()))
Overbool's avatar
Overbool committed
125 126
	ds.bitfield.SetBytes(fsn.Data())
	ds.hashFunc = fsn.HashType()
127
	ds.builder = ds.nd.CidBuilder()
128 129 130 131

	return ds, nil
}

132 133 134
// SetCidBuilder sets the CID Builder
func (ds *Shard) SetCidBuilder(builder cid.Builder) {
	ds.builder = builder
135 136
}

137 138 139
// CidBuilder gets the CID Builder, may be nil if unset
func (ds *Shard) CidBuilder() cid.Builder {
	return ds.builder
140 141
}

142
// Node serializes the HAMT structure into a merkledag node with unixfs formatting
Hector Sanjuan's avatar
Hector Sanjuan committed
143
func (ds *Shard) Node() (ipld.Node, error) {
144
	out := new(dag.ProtoNode)
145
	out.SetCidBuilder(ds.builder)
146

Steven Allen's avatar
Steven Allen committed
147
	cindex := 0
148 149
	// TODO: optimized 'for each set bit'
	for i := 0; i < ds.tableSize; i++ {
Steven Allen's avatar
Steven Allen committed
150
		if !ds.bitfield.Bit(i) {
151 152 153 154 155
			continue
		}

		ch := ds.children[cindex]
		if ch != nil {
156
			clnk, err := ch.Link()
157 158 159 160
			if err != nil {
				return nil, err
			}

161
			err = out.AddRawLink(ds.linkNamePrefix(i)+ch.key, clnk)
162 163 164 165 166 167 168 169 170 171 172 173 174
			if err != nil {
				return nil, err
			}
		} else {
			// child unloaded, just copy in link with updated name
			lnk := ds.nd.Links()[cindex]
			label := lnk.Name[ds.maxpadlen:]

			err := out.AddRawLink(ds.linkNamePrefix(i)+label, lnk)
			if err != nil {
				return nil, err
			}
		}
Steven Allen's avatar
Steven Allen committed
175
		cindex++
176 177
	}

Overbool's avatar
Overbool committed
178
	data, err := format.HAMTShardData(ds.bitfield.Bytes(), uint64(ds.tableSize), HashMurmur3)
179 180 181 182 183 184
	if err != nil {
		return nil, err
	}

	out.SetData(data)

185
	err = ds.dserv.Add(context.TODO(), out)
186 187 188 189 190 191 192
	if err != nil {
		return nil, err
	}

	return out, nil
}

193
func (ds *Shard) makeShardValue(lnk *ipld.Link) (*Shard, error) {
194
	lnk2 := *lnk
195 196 197 198
	s, err := makeShard(ds.dserv, ds.tableSize)
	if err != nil {
		return nil, err
	}
199

200 201
	s.key = lnk.Name[ds.maxpadlen:]
	s.val = &lnk2
202

203
	return s, nil
204 205
}

206 207 208 209 210 211 212
func hash(val []byte) []byte {
	h := murmur3.New64()
	h.Write(val)
	return h.Sum(nil)
}

// Set sets 'name' = nd in the HAMT
Hector Sanjuan's avatar
Hector Sanjuan committed
213
func (ds *Shard) Set(ctx context.Context, name string, nd ipld.Node) error {
214
	hv := &hashBits{b: hash([]byte(name))}
215
	err := ds.dserv.Add(ctx, nd)
216 217 218 219
	if err != nil {
		return err
	}

220
	lnk, err := ipld.MakeLink(nd)
221 222 223 224 225 226
	if err != nil {
		return err
	}
	lnk.Name = ds.linkNamePrefix(0) + name

	return ds.modifyValue(ctx, hv, name, lnk)
227 228 229
}

// Remove deletes the named entry if it exists, this operation is idempotent.
Hector Sanjuan's avatar
Hector Sanjuan committed
230
func (ds *Shard) Remove(ctx context.Context, name string) error {
231 232 233 234
	hv := &hashBits{b: hash([]byte(name))}
	return ds.modifyValue(ctx, hv, name, nil)
}

Jeromy's avatar
Jeromy committed
235
// Find searches for a child node by 'name' within this hamt
Hector Sanjuan's avatar
Hector Sanjuan committed
236
func (ds *Shard) Find(ctx context.Context, name string) (*ipld.Link, error) {
237 238
	hv := &hashBits{b: hash([]byte(name))}

239
	var out *ipld.Link
240
	err := ds.getValue(ctx, hv, name, func(sv *Shard) error {
241 242 243
		out = sv.val
		return nil
	})
244 245 246
	if err != nil {
		return nil, err
	}
247

Jeromy's avatar
Jeromy committed
248
	return out, nil
249 250
}

251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
type linkType int

const (
	invalidLink linkType = iota
	shardLink
	shardValueLink
)

func (ds *Shard) childLinkType(lnk *ipld.Link) (linkType, error) {
	if len(lnk.Name) < ds.maxpadlen {
		return invalidLink, fmt.Errorf("invalid link name '%s'", lnk.Name)
	}
	if len(lnk.Name) == ds.maxpadlen {
		return shardLink, nil
	}
	return shardValueLink, nil
}

269 270 271
// getChild returns the i'th child of this shard. If it is cached in the
// children array, it will return it from there. Otherwise, it loads the child
// node from disk.
272
func (ds *Shard) getChild(ctx context.Context, i int) (*Shard, error) {
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
	if i >= len(ds.children) || i < 0 {
		return nil, fmt.Errorf("invalid index passed to getChild (likely corrupt bitfield)")
	}

	if len(ds.children) != len(ds.nd.Links()) {
		return nil, fmt.Errorf("inconsistent lengths between children array and Links array")
	}

	c := ds.children[i]
	if c != nil {
		return c, nil
	}

	return ds.loadChild(ctx, i)
}

// loadChild reads the i'th child node of this shard from disk and returns it
// as a 'child' interface
291
func (ds *Shard) loadChild(ctx context.Context, i int) (*Shard, error) {
292
	lnk := ds.nd.Links()[i]
293 294 295
	lnkLinkType, err := ds.childLinkType(lnk)
	if err != nil {
		return nil, err
296 297
	}

298
	var c *Shard
299
	if lnkLinkType == shardLink {
300 301 302 303
		nd, err := lnk.GetNode(ctx, ds.dserv)
		if err != nil {
			return nil, err
		}
304 305 306 307 308 309 310
		cds, err := NewHamtFromDag(ds.dserv, nd)
		if err != nil {
			return nil, err
		}

		c = cds
	} else {
311 312 313 314 315
		s, err := ds.makeShardValue(lnk)
		if err != nil {
			return nil, err
		}
		c = s
316 317 318 319 320 321
	}

	ds.children[i] = c
	return c, nil
}

322
func (ds *Shard) setChild(i int, c *Shard) {
323 324 325
	ds.children[i] = c
}

Jeromy's avatar
Jeromy committed
326
// Link returns a merklelink to this shard node
Hector Sanjuan's avatar
Hector Sanjuan committed
327
func (ds *Shard) Link() (*ipld.Link, error) {
328 329 330 331
	if ds.isValueNode() {
		return ds.val, nil
	}

332 333 334 335 336
	nd, err := ds.Node()
	if err != nil {
		return nil, err
	}

337
	err = ds.dserv.Add(context.TODO(), nd)
338 339 340 341
	if err != nil {
		return nil, err
	}

342
	return ipld.MakeLink(nd)
343 344
}

Hector Sanjuan's avatar
Hector Sanjuan committed
345
func (ds *Shard) insertChild(idx int, key string, lnk *ipld.Link) error {
346
	if lnk == nil {
347 348 349 350
		return os.ErrNotExist
	}

	i := ds.indexForBitPos(idx)
Steven Allen's avatar
Steven Allen committed
351
	ds.bitfield.SetBit(idx)
352 353

	lnk.Name = ds.linkNamePrefix(idx) + key
354
	sv := &Shard{
355
		key: key,
356
		val: lnk,
357 358
	}

359
	ds.children = append(ds.children[:i], append([]*Shard{sv}, ds.children[i:]...)...)
360
	ds.nd.SetLinks(append(ds.nd.Links()[:i], append([]*ipld.Link{nil}, ds.nd.Links()[i:]...)...))
361 362 363
	return nil
}

Hector Sanjuan's avatar
Hector Sanjuan committed
364
func (ds *Shard) rmChild(i int) error {
365 366 367 368 369 370 371 372 373 374 375 376 377
	if i < 0 || i >= len(ds.children) || i >= len(ds.nd.Links()) {
		return fmt.Errorf("hamt: attempted to remove child with out of range index")
	}

	copy(ds.children[i:], ds.children[i+1:])
	ds.children = ds.children[:len(ds.children)-1]

	copy(ds.nd.Links()[i:], ds.nd.Links()[i+1:])
	ds.nd.SetLinks(ds.nd.Links()[:len(ds.nd.Links())-1])

	return nil
}

378
func (ds *Shard) getValue(ctx context.Context, hv *hashBits, key string, cb func(*Shard) error) error {
379
	idx := hv.Next(ds.tableSizeLg2)
Steven Allen's avatar
Steven Allen committed
380
	if ds.bitfield.Bit(int(idx)) {
381 382 383 384 385 386 387
		cindex := ds.indexForBitPos(idx)

		child, err := ds.getChild(ctx, cindex)
		if err != nil {
			return err
		}

388
		if child.isValueNode() {
389 390 391
			if child.key == key {
				return cb(child)
			}
392 393
		} else {
			return child.getValue(ctx, hv, key, cb)
394 395 396 397 398 399
		}
	}

	return os.ErrNotExist
}

Hector Sanjuan's avatar
Hector Sanjuan committed
400 401
// EnumLinks collects all links in the Shard.
func (ds *Shard) EnumLinks(ctx context.Context) ([]*ipld.Link, error) {
402
	var links []*ipld.Link
403

404 405 406 407 408 409 410 411 412 413 414
	linkResults, err := ds.EnumLinksAsync(ctx)
	if err != nil {
		return nil, err
	}
	for linkResult := range linkResults {
		if linkResult.Err != nil {
			return links, linkResult.Err
		}
		links = append(links, linkResult.Link)
	}
	return links, nil
415 416
}

Hector Sanjuan's avatar
Hector Sanjuan committed
417 418
// ForEachLink walks the Shard and calls the given function.
func (ds *Shard) ForEachLink(ctx context.Context, f func(*ipld.Link) error) error {
419
	return ds.walkTrie(ctx, func(sv *Shard) error {
420
		lnk := sv.val
421 422
		lnk.Name = sv.key

423
		return f(lnk)
424 425 426
	})
}

427 428 429 430
// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
func (ds *Shard) EnumLinksAsync(ctx context.Context) (<-chan format.LinkResult, error) {
	linkResults := make(chan format.LinkResult)
hannahhoward's avatar
hannahhoward committed
431
	ctx, cancel := context.WithCancel(ctx)
432 433
	go func() {
		defer close(linkResults)
hannahhoward's avatar
hannahhoward committed
434
		defer cancel()
435 436 437 438 439 440 441
		getLinks := makeAsyncTrieGetLinks(ds.dserv, linkResults)
		cset := cid.NewSet()
		dag.EnumerateChildrenAsync(ctx, getLinks, ds.nd.Cid(), cset.Visit)
	}()
	return linkResults, nil
}

442 443 444
// makeAsyncTrieGetLinks builds a getLinks function that can be used with EnumerateChildrenAsync
// to iterate a HAMT shard. It takes an IPLD Dag Service to fetch nodes, and a call back that will get called
// on all links to leaf nodes in a HAMT tree, so they can be collected for an EnumLinks operation
445
func makeAsyncTrieGetLinks(dagService ipld.DAGService, linkResults chan<- format.LinkResult) dag.GetLinks {
446

447 448
	return func(ctx context.Context, currentCid cid.Cid) ([]*ipld.Link, error) {
		node, err := dagService.Get(ctx, currentCid)
449
		if err != nil {
hannahhoward's avatar
hannahhoward committed
450
			emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
451 452
			return nil, err
		}
453
		directoryShard, err := NewHamtFromDag(dagService, node)
454
		if err != nil {
hannahhoward's avatar
hannahhoward committed
455
			emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
456 457 458
			return nil, err
		}

459
		childShards := make([]*ipld.Link, 0, len(directoryShard.children))
460
		links := directoryShard.nd.Links()
461
		for idx := range directoryShard.children {
462
			lnk := links[idx]
463
			lnkLinkType, err := directoryShard.childLinkType(lnk)
464

465
			if err != nil {
hannahhoward's avatar
hannahhoward committed
466
				emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
467
				return nil, err
468
			}
469
			if lnkLinkType == shardLink {
470 471
				childShards = append(childShards, lnk)
			} else {
472 473 474 475 476 477 478
				sv, err := directoryShard.makeShardValue(lnk)
				if err != nil {
					return nil, err
				}
				formattedLink := sv.val
				formattedLink.Name = sv.key
				emitResult(ctx, linkResults, format.LinkResult{Link: formattedLink, Err: nil})
479 480 481 482 483 484
			}
		}
		return childShards, nil
	}
}

hannahhoward's avatar
hannahhoward committed
485 486 487 488 489 490 491
func emitResult(ctx context.Context, linkResults chan<- format.LinkResult, r format.LinkResult) {
	select {
	case linkResults <- r:
	case <-ctx.Done():
	}
}

492
func (ds *Shard) walkTrie(ctx context.Context, cb func(*Shard) error) error {
Steven Allen's avatar
Steven Allen committed
493
	for idx := range ds.children {
Jeromy's avatar
Jeromy committed
494
		c, err := ds.getChild(ctx, idx)
495 496 497 498
		if err != nil {
			return err
		}

499
		if c.isValueNode() {
Steven Allen's avatar
Steven Allen committed
500
			if err := cb(c); err != nil {
501 502
				return err
			}
503
		} else {
Steven Allen's avatar
Steven Allen committed
504
			if err := c.walkTrie(ctx, cb); err != nil {
505 506 507 508 509 510 511
				return err
			}
		}
	}
	return nil
}

Hector Sanjuan's avatar
Hector Sanjuan committed
512
func (ds *Shard) modifyValue(ctx context.Context, hv *hashBits, key string, val *ipld.Link) error {
513
	idx := hv.Next(ds.tableSizeLg2)
Steven Allen's avatar
Steven Allen committed
514
	if !ds.bitfield.Bit(idx) {
515 516 517 518 519 520 521 522 523 524
		return ds.insertChild(idx, key, val)
	}

	cindex := ds.indexForBitPos(idx)

	child, err := ds.getChild(ctx, cindex)
	if err != nil {
		return err
	}

525
	if child.isValueNode() {
Jeromy's avatar
Jeromy committed
526 527 528
		if child.key == key {
			// value modification
			if val == nil {
Steven Allen's avatar
Steven Allen committed
529
				ds.bitfield.UnsetBit(idx)
Jeromy's avatar
Jeromy committed
530 531
				return ds.rmChild(cindex)
			}
532 533 534

			child.val = val
			return nil
Jeromy's avatar
Jeromy committed
535
		}
536

Jeromy's avatar
Jeromy committed
537 538 539
		if val == nil {
			return os.ErrNotExist
		}
540

Jeromy's avatar
Jeromy committed
541
		// replace value with another shard, one level deeper
Hector Sanjuan's avatar
Hector Sanjuan committed
542
		ns, err := NewShard(ds.dserv, ds.tableSize)
Jeromy's avatar
Jeromy committed
543 544 545
		if err != nil {
			return err
		}
546
		ns.builder = ds.builder
Jeromy's avatar
Jeromy committed
547 548 549 550
		chhv := &hashBits{
			b:        hash([]byte(child.key)),
			consumed: hv.consumed,
		}
551

Jeromy's avatar
Jeromy committed
552 553 554 555
		err = ns.modifyValue(ctx, hv, key, val)
		if err != nil {
			return err
		}
556

Jeromy's avatar
Jeromy committed
557 558 559
		err = ns.modifyValue(ctx, chhv, child.key, child.val)
		if err != nil {
			return err
560
		}
Jeromy's avatar
Jeromy committed
561 562 563

		ds.setChild(cindex, ns)
		return nil
564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589
	} else {
		err := child.modifyValue(ctx, hv, key, val)
		if err != nil {
			return err
		}

		if val == nil {
			switch len(child.children) {
			case 0:
				// empty sub-shard, prune it
				// Note: this shouldnt normally ever happen
				//       in the event of another implementation creates flawed
				//       structures, this will help to normalize them.
				ds.bitfield.UnsetBit(idx)
				return ds.rmChild(cindex)
			case 1:
				nchild := child.children[0]
				if nchild.isValueNode() {
					// sub-shard with a single value element, collapse it
					ds.setChild(cindex, nchild)
				}
				return nil
			}
		}

		return nil
590 591 592
	}
}

Jeromy's avatar
Jeromy committed
593 594 595
// indexForBitPos returns the index within the collapsed array corresponding to
// the given bit in the bitset.  The collapsed array contains only one entry
// per bit set in the bitfield, and this function is used to map the indices.
Hector Sanjuan's avatar
Hector Sanjuan committed
596
func (ds *Shard) indexForBitPos(bp int) int {
Steven Allen's avatar
Steven Allen committed
597
	return ds.bitfield.OnesBefore(bp)
598 599 600
}

// linkNamePrefix takes in the bitfield index of an entry and returns its hex prefix
Hector Sanjuan's avatar
Hector Sanjuan committed
601
func (ds *Shard) linkNamePrefix(idx int) string {
602 603
	return fmt.Sprintf(ds.prefixPadStr, idx)
}