directory.go 12 KB
Newer Older
1 2 3 4 5 6 7
package io

import (
	"context"
	"fmt"
	"os"

8
	mdag "gitlab.dms3.io/dms3/go-merkledag"
9

10 11
	format "gitlab.dms3.io/dms3/go-unixfs"
	hamt "gitlab.dms3.io/dms3/go-unixfs/hamt"
12

13 14 15
	cid "gitlab.dms3.io/dms3/go-cid"
	ld "gitlab.dms3.io/dms3/go-ld-format"
	logging "gitlab.dms3.io/dms3/go-log"
16 17
)

18 19 20 21 22 23 24 25 26
var log = logging.Logger("unixfs")

// HAMTShardingSize is a global option that allows switching to a HAMTDirectory
// when the BasicDirectory grows above the size (in bytes) signalled by this
// flag. The default size of 0 disables the option.
// The size is not the *exact* block size of the encoded BasicDirectory but just
// the estimated size based byte length of links name and CID (BasicDirectory's
// ProtoNode doesn't use the Data field so this estimate is pretty accurate).
var HAMTShardingSize = 0
27

tavit ohanian's avatar
tavit ohanian committed
28 29 30
// temporary until upstream decides where to hold this
var UseHAMTSharding = true

31 32 33 34 35 36 37 38 39 40 41 42
// DefaultShardWidth is the default value used for hamt sharding width.
var DefaultShardWidth = 256

// Directory defines a UnixFS directory. It is used for creating, reading and
// editing directories. It allows to work with different directory schemes,
// like the basic or the HAMT implementation.
//
// It just allows to perform explicit edits on a single directory, working with
// directory trees is out of its scope, they are managed by the MFS layer
// (which is the main consumer of this interface).
type Directory interface {

43 44
	// SetCidBuilder sets the CID Builder of the root node.
	SetCidBuilder(cid.Builder)
45 46

	// AddChild adds a (name, key) pair to the root node.
47
	AddChild(context.Context, string, ld.Node) error
48 49

	// ForEachLink applies the given function to Links in the directory.
50
	ForEachLink(context.Context, func(*ld.Link) error) error
51

52 53
	// EnumLinksAsync returns a channel which will receive Links in the directory
	// as they are enumerated, where order is not gauranteed
54
	EnumLinksAsync(context.Context) <-chan format.LinkResult
55

56
	// Links returns the all the links in the directory node.
57
	Links(context.Context) ([]*ld.Link, error)
58 59 60

	// Find returns the root node of the file named 'name' within this directory.
	// In the case of HAMT-directories, it will traverse the tree.
61 62
	//
	// Returns os.ErrNotExist if the child does not exist.
63
	Find(context.Context, string) (ld.Node, error)
64 65

	// RemoveChild removes the child with the given name.
66 67
	//
	// Returns os.ErrNotExist if the child doesn't exist.
68 69 70
	RemoveChild(context.Context, string) error

	// GetNode returns the root of this directory.
71
	GetNode() (ld.Node, error)
72

73 74
	// GetCidBuilder returns the CID Builder used.
	GetCidBuilder() cid.Builder
75 76 77 78 79 80 81 82 83
}

// TODO: Evaluate removing `dserv` from this layer and providing it in MFS.
// (The functions should in that case add a `DAGService` argument.)

// BasicDirectory is the basic implementation of `Directory`. All the entries
// are stored in a single node.
type BasicDirectory struct {
	node  *mdag.ProtoNode
84
	dserv ld.DAGService
85 86 87 88 89 90 91

	// Internal variable used to cache the estimated size of the basic directory:
	// for each link, aggregate link name + link CID. DO NOT CHANGE THIS
	// as it will affect the HAMT transition behavior in HAMTShardingSize.
	// (We maintain this value up to date even if the HAMTShardingSize is off
	// since potentially the option could be activated on the fly.)
	estimatedSize int
92 93 94 95 96 97
}

// HAMTDirectory is the HAMT implementation of `Directory`.
// (See package `hamt` for more information.)
type HAMTDirectory struct {
	shard *hamt.Shard
98
	dserv ld.DAGService
99 100
}

101
func newEmptyBasicDirectory(dserv ld.DAGService) *BasicDirectory {
102 103
	return newBasicDirectoryFromNode(dserv, format.EmptyDirNode())
}
104

105
func newBasicDirectoryFromNode(dserv ld.DAGService, node *mdag.ProtoNode) *BasicDirectory {
Lucas Molas's avatar
Lucas Molas committed
106
	basicDir := new(BasicDirectory)
107
	basicDir.node = node
Lucas Molas's avatar
Lucas Molas committed
108
	basicDir.dserv = dserv
109 110 111 112 113 114 115 116 117

	// Scan node links (if any) to restore estimated size.
	basicDir.computeEstimatedSize()

	return basicDir
}

// NewDirectory returns a Directory implemented by UpgradeableDirectory
// containing a BasicDirectory that can be converted to a HAMTDirectory.
118
func NewDirectory(dserv ld.DAGService) Directory {
119
	return &UpgradeableDirectory{newEmptyBasicDirectory(dserv)}
120 121 122 123 124
}

// ErrNotADir implies that the given node was not a unixfs directory
var ErrNotADir = fmt.Errorf("merkledag node was not a directory or shard")

125
// NewDirectoryFromNode loads a unixfs directory from the given LD node and
126
// DAGService.
127
func NewDirectoryFromNode(dserv ld.DAGService, node ld.Node) (Directory, error) {
128 129 130 131 132 133 134 135 136 137
	protoBufNode, ok := node.(*mdag.ProtoNode)
	if !ok {
		return nil, ErrNotADir
	}

	fsNode, err := format.FSNodeFromBytes(protoBufNode.Data())
	if err != nil {
		return nil, err
	}

138
	switch fsNode.Type() {
139
	case format.TDirectory:
140
		return newBasicDirectoryFromNode(dserv, protoBufNode.Copy().(*mdag.ProtoNode)), nil
141 142 143 144 145 146 147 148 149 150 151 152 153 154
	case format.THAMTShard:
		shard, err := hamt.NewHamtFromDag(dserv, node)
		if err != nil {
			return nil, err
		}
		return &HAMTDirectory{
			dserv: dserv,
			shard: shard,
		}, nil
	}

	return nil, ErrNotADir
}

155
func (d *BasicDirectory) computeEstimatedSize() {
156
	d.ForEachLink(nil, func(l *ld.Link) error {
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
		d.addToEstimatedSize(l.Name, l.Cid)
		return nil
	})
}

func estimatedLinkSize(linkName string, linkCid cid.Cid) int {
	return len(linkName) + linkCid.ByteLen()
}

func (d *BasicDirectory) addToEstimatedSize(name string, linkCid cid.Cid) {
	d.estimatedSize += estimatedLinkSize(name, linkCid)
}

func (d *BasicDirectory) removeFromEstimatedSize(name string, linkCid cid.Cid) {
	d.estimatedSize -= estimatedLinkSize(name, linkCid)
	if d.estimatedSize < 0 {
		// Something has gone very wrong. Log an error and recompute the
		// size from scratch.
		log.Error("BasicDirectory's estimatedSize went below 0")
		d.computeEstimatedSize()
	}
}

180 181 182
// SetCidBuilder implements the `Directory` interface.
func (d *BasicDirectory) SetCidBuilder(builder cid.Builder) {
	d.node.SetCidBuilder(builder)
183 184 185 186
}

// AddChild implements the `Directory` interface. It adds (or replaces)
// a link to the given `node` under `name`.
187
func (d *BasicDirectory) AddChild(ctx context.Context, name string, node ld.Node) error {
188 189 190 191 192
	// Remove old link (if it existed; ignore `ErrNotExist` otherwise).
	err := d.RemoveChild(ctx, name)
	if err != nil && err != os.ErrNotExist {
		return err
	}
193

194 195 196 197 198 199
	err = d.node.AddNodeLink(name, node)
	if err != nil {
		return err
	}
	d.addToEstimatedSize(name, node.Cid())
	return nil
200 201
}

202 203
// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
204
func (d *BasicDirectory) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
205 206 207 208
	linkResults := make(chan format.LinkResult)
	go func() {
		defer close(linkResults)
		for _, l := range d.node.Links() {
hannahhoward's avatar
hannahhoward committed
209 210
			select {
			case linkResults <- format.LinkResult{
211 212
				Link: l,
				Err:  nil,
hannahhoward's avatar
hannahhoward committed
213 214 215
			}:
			case <-ctx.Done():
				return
216 217 218
			}
		}
	}()
219
	return linkResults
220 221
}

222
// ForEachLink implements the `Directory` interface.
223
func (d *BasicDirectory) ForEachLink(ctx context.Context, f func(*ld.Link) error) error {
224 225 226 227 228 229 230 231 232
	for _, l := range d.node.Links() {
		if err := f(l); err != nil {
			return err
		}
	}
	return nil
}

// Links implements the `Directory` interface.
233
func (d *BasicDirectory) Links(ctx context.Context) ([]*ld.Link, error) {
234 235 236 237
	return d.node.Links(), nil
}

// Find implements the `Directory` interface.
238
func (d *BasicDirectory) Find(ctx context.Context, name string) (ld.Node, error) {
239 240 241 242 243 244 245 246 247 248 249 250 251
	lnk, err := d.node.GetNodeLink(name)
	if err == mdag.ErrLinkNotFound {
		err = os.ErrNotExist
	}
	if err != nil {
		return nil, err
	}

	return d.dserv.Get(ctx, lnk.Cid)
}

// RemoveChild implements the `Directory` interface.
func (d *BasicDirectory) RemoveChild(ctx context.Context, name string) error {
252 253 254 255 256
	// We need to *retrieve* the link before removing it to update the estimated
	// size. This means we may iterate the links slice twice: if traversing this
	// becomes a problem, a factor of 2 isn't going to make much of a difference.
	// We'd likely need to cache a link resolution map in that case.
	link, err := d.node.GetNodeLink(name)
257
	if err == mdag.ErrLinkNotFound {
258 259 260 261
		return os.ErrNotExist
	}
	if err != nil {
		return err // at the moment there is no other error besides ErrLinkNotFound
262
	}
263 264 265 266 267 268 269

	// The name actually existed so we should update the estimated size.
	d.removeFromEstimatedSize(link.Name, link.Cid)

	return d.node.RemoveNodeLink(name)
	// GetNodeLink didn't return ErrLinkNotFound so this won't fail with that
	// and we don't need to convert the error again.
270 271 272
}

// GetNode implements the `Directory` interface.
273
func (d *BasicDirectory) GetNode() (ld.Node, error) {
274 275 276
	return d.node, nil
}

277 278
// GetCidBuilder implements the `Directory` interface.
func (d *BasicDirectory) GetCidBuilder() cid.Builder {
279
	return d.node.CidBuilder()
280 281 282 283 284 285 286 287 288 289 290
}

// SwitchToSharding returns a HAMT implementation of this directory.
func (d *BasicDirectory) SwitchToSharding(ctx context.Context) (Directory, error) {
	hamtDir := new(HAMTDirectory)
	hamtDir.dserv = d.dserv

	shard, err := hamt.NewShard(d.dserv, DefaultShardWidth)
	if err != nil {
		return nil, err
	}
291
	shard.SetCidBuilder(d.node.CidBuilder())
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
	hamtDir.shard = shard

	for _, lnk := range d.node.Links() {
		node, err := d.dserv.Get(ctx, lnk.Cid)
		if err != nil {
			return nil, err
		}

		err = hamtDir.shard.Set(ctx, lnk.Name, node)
		if err != nil {
			return nil, err
		}
	}

	return hamtDir, nil
}

309 310 311
// SetCidBuilder implements the `Directory` interface.
func (d *HAMTDirectory) SetCidBuilder(builder cid.Builder) {
	d.shard.SetCidBuilder(builder)
312 313 314
}

// AddChild implements the `Directory` interface.
315
func (d *HAMTDirectory) AddChild(ctx context.Context, name string, nd ld.Node) error {
316 317 318 319
	return d.shard.Set(ctx, name, nd)
}

// ForEachLink implements the `Directory` interface.
320
func (d *HAMTDirectory) ForEachLink(ctx context.Context, f func(*ld.Link) error) error {
321 322 323
	return d.shard.ForEachLink(ctx, f)
}

324 325
// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
326
func (d *HAMTDirectory) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
327 328 329
	return d.shard.EnumLinksAsync(ctx)
}

330
// Links implements the `Directory` interface.
331
func (d *HAMTDirectory) Links(ctx context.Context) ([]*ld.Link, error) {
332 333 334 335
	return d.shard.EnumLinks(ctx)
}

// Find implements the `Directory` interface. It will traverse the tree.
336
func (d *HAMTDirectory) Find(ctx context.Context, name string) (ld.Node, error) {
337 338 339 340 341 342 343 344 345 346 347 348 349 350
	lnk, err := d.shard.Find(ctx, name)
	if err != nil {
		return nil, err
	}

	return lnk.GetNode(ctx, d.dserv)
}

// RemoveChild implements the `Directory` interface.
func (d *HAMTDirectory) RemoveChild(ctx context.Context, name string) error {
	return d.shard.Remove(ctx, name)
}

// GetNode implements the `Directory` interface.
351
func (d *HAMTDirectory) GetNode() (ld.Node, error) {
352 353 354
	return d.shard.Node()
}

355 356 357
// GetCidBuilder implements the `Directory` interface.
func (d *HAMTDirectory) GetCidBuilder() cid.Builder {
	return d.shard.CidBuilder()
358
}
Lucas Molas's avatar
Lucas Molas committed
359 360 361 362 363 364 365 366 367 368 369

// UpgradeableDirectory wraps a Directory interface and provides extra logic
// to upgrade from its BasicDirectory implementation to HAMTDirectory.
type UpgradeableDirectory struct {
	Directory
}

var _ Directory = (*UpgradeableDirectory)(nil)

// AddChild implements the `Directory` interface. We check when adding new entries
// if we should switch to HAMTDirectory according to global option(s).
370
func (d *UpgradeableDirectory) AddChild(ctx context.Context, name string, nd ld.Node) error {
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
	err := d.Directory.AddChild(ctx, name, nd)
	if err != nil {
		return err
	}

	// Evaluate possible HAMT upgrade.
	if HAMTShardingSize == 0 {
		return nil
	}
	basicDir, ok := d.Directory.(*BasicDirectory)
	if !ok {
		return nil
	}
	if basicDir.estimatedSize >= HAMTShardingSize {
		// Ideally to minimize performance we should check if this last
		// `AddChild` call would bring the directory size over the threshold
		// *before* executing it since we would end up switching anyway and
		// that call would be "wasted". This is a minimal performance impact
		// and we prioritize a simple code base.
		hamtDir, err := basicDir.SwitchToSharding(ctx)
		if err != nil {
			return err
Lucas Molas's avatar
Lucas Molas committed
393
		}
394
		d.Directory = hamtDir
Lucas Molas's avatar
Lucas Molas committed
395 396
	}

397
	return nil
Lucas Molas's avatar
Lucas Molas committed
398
}