directory.go 11.9 KB
Newer Older
1 2 3 4 5 6 7
package io

import (
	"context"
	"fmt"
	"os"

Jeromy's avatar
Jeromy committed
8
	mdag "github.com/ipfs/go-merkledag"
9

Jeromy's avatar
Jeromy committed
10 11
	format "github.com/ipfs/go-unixfs"
	hamt "github.com/ipfs/go-unixfs/hamt"
12

Jeromy's avatar
Jeromy committed
13 14
	cid "github.com/ipfs/go-cid"
	ipld "github.com/ipfs/go-ipld-format"
15
	logging "github.com/ipfs/go-log"
16 17
)

18 19 20 21 22 23 24 25 26
var log = logging.Logger("unixfs")

// HAMTShardingSize is a global option that allows switching to a HAMTDirectory
// when the BasicDirectory grows above the size (in bytes) signalled by this
// flag. The default size of 0 disables the option.
// The size is not the *exact* block size of the encoded BasicDirectory but just
// the estimated size based byte length of links name and CID (BasicDirectory's
// ProtoNode doesn't use the Data field so this estimate is pretty accurate).
var HAMTShardingSize = 0
27 28 29 30 31 32 33 34 35 36 37 38 39

// DefaultShardWidth is the default value used for hamt sharding width.
var DefaultShardWidth = 256

// Directory defines a UnixFS directory. It is used for creating, reading and
// editing directories. It allows to work with different directory schemes,
// like the basic or the HAMT implementation.
//
// It just allows to perform explicit edits on a single directory, working with
// directory trees is out of its scope, they are managed by the MFS layer
// (which is the main consumer of this interface).
type Directory interface {

40 41
	// SetCidBuilder sets the CID Builder of the root node.
	SetCidBuilder(cid.Builder)
42 43 44 45 46 47 48

	// AddChild adds a (name, key) pair to the root node.
	AddChild(context.Context, string, ipld.Node) error

	// ForEachLink applies the given function to Links in the directory.
	ForEachLink(context.Context, func(*ipld.Link) error) error

49 50
	// EnumLinksAsync returns a channel which will receive Links in the directory
	// as they are enumerated, where order is not gauranteed
51
	EnumLinksAsync(context.Context) <-chan format.LinkResult
52

53 54 55 56 57
	// Links returns the all the links in the directory node.
	Links(context.Context) ([]*ipld.Link, error)

	// Find returns the root node of the file named 'name' within this directory.
	// In the case of HAMT-directories, it will traverse the tree.
58 59
	//
	// Returns os.ErrNotExist if the child does not exist.
60 61 62
	Find(context.Context, string) (ipld.Node, error)

	// RemoveChild removes the child with the given name.
63 64
	//
	// Returns os.ErrNotExist if the child doesn't exist.
65 66 67 68 69
	RemoveChild(context.Context, string) error

	// GetNode returns the root of this directory.
	GetNode() (ipld.Node, error)

70 71
	// GetCidBuilder returns the CID Builder used.
	GetCidBuilder() cid.Builder
72 73 74 75 76 77 78 79 80 81
}

// TODO: Evaluate removing `dserv` from this layer and providing it in MFS.
// (The functions should in that case add a `DAGService` argument.)

// BasicDirectory is the basic implementation of `Directory`. All the entries
// are stored in a single node.
type BasicDirectory struct {
	node  *mdag.ProtoNode
	dserv ipld.DAGService
82 83 84 85 86 87 88

	// Internal variable used to cache the estimated size of the basic directory:
	// for each link, aggregate link name + link CID. DO NOT CHANGE THIS
	// as it will affect the HAMT transition behavior in HAMTShardingSize.
	// (We maintain this value up to date even if the HAMTShardingSize is off
	// since potentially the option could be activated on the fly.)
	estimatedSize int
89 90 91 92 93 94 95 96 97
}

// HAMTDirectory is the HAMT implementation of `Directory`.
// (See package `hamt` for more information.)
type HAMTDirectory struct {
	shard *hamt.Shard
	dserv ipld.DAGService
}

98 99 100
func newEmptyBasicDirectory(dserv ipld.DAGService) *BasicDirectory {
	return newBasicDirectoryFromNode(dserv, format.EmptyDirNode())
}
101

102
func newBasicDirectoryFromNode(dserv ipld.DAGService, node *mdag.ProtoNode) *BasicDirectory {
Lucas Molas's avatar
Lucas Molas committed
103
	basicDir := new(BasicDirectory)
104
	basicDir.node = node
Lucas Molas's avatar
Lucas Molas committed
105
	basicDir.dserv = dserv
106 107 108 109 110 111 112 113 114 115 116

	// Scan node links (if any) to restore estimated size.
	basicDir.computeEstimatedSize()

	return basicDir
}

// NewDirectory returns a Directory implemented by UpgradeableDirectory
// containing a BasicDirectory that can be converted to a HAMTDirectory.
func NewDirectory(dserv ipld.DAGService) Directory {
	return &UpgradeableDirectory{newEmptyBasicDirectory(dserv)}
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
}

// ErrNotADir implies that the given node was not a unixfs directory
var ErrNotADir = fmt.Errorf("merkledag node was not a directory or shard")

// NewDirectoryFromNode loads a unixfs directory from the given IPLD node and
// DAGService.
func NewDirectoryFromNode(dserv ipld.DAGService, node ipld.Node) (Directory, error) {
	protoBufNode, ok := node.(*mdag.ProtoNode)
	if !ok {
		return nil, ErrNotADir
	}

	fsNode, err := format.FSNodeFromBytes(protoBufNode.Data())
	if err != nil {
		return nil, err
	}

135
	switch fsNode.Type() {
136
	case format.TDirectory:
137
		return newBasicDirectoryFromNode(dserv, protoBufNode.Copy().(*mdag.ProtoNode)), nil
138 139 140 141 142 143 144 145 146 147 148 149 150 151
	case format.THAMTShard:
		shard, err := hamt.NewHamtFromDag(dserv, node)
		if err != nil {
			return nil, err
		}
		return &HAMTDirectory{
			dserv: dserv,
			shard: shard,
		}, nil
	}

	return nil, ErrNotADir
}

152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
func (d *BasicDirectory) computeEstimatedSize() {
	d.ForEachLink(nil, func(l *ipld.Link) error {
		d.addToEstimatedSize(l.Name, l.Cid)
		return nil
	})
}

func estimatedLinkSize(linkName string, linkCid cid.Cid) int {
	return len(linkName) + linkCid.ByteLen()
}

func (d *BasicDirectory) addToEstimatedSize(name string, linkCid cid.Cid) {
	d.estimatedSize += estimatedLinkSize(name, linkCid)
}

func (d *BasicDirectory) removeFromEstimatedSize(name string, linkCid cid.Cid) {
	d.estimatedSize -= estimatedLinkSize(name, linkCid)
	if d.estimatedSize < 0 {
		// Something has gone very wrong. Log an error and recompute the
		// size from scratch.
		log.Error("BasicDirectory's estimatedSize went below 0")
		d.computeEstimatedSize()
	}
}

177 178 179
// SetCidBuilder implements the `Directory` interface.
func (d *BasicDirectory) SetCidBuilder(builder cid.Builder) {
	d.node.SetCidBuilder(builder)
180 181 182 183 184
}

// AddChild implements the `Directory` interface. It adds (or replaces)
// a link to the given `node` under `name`.
func (d *BasicDirectory) AddChild(ctx context.Context, name string, node ipld.Node) error {
185 186 187 188 189
	// Remove old link (if it existed; ignore `ErrNotExist` otherwise).
	err := d.RemoveChild(ctx, name)
	if err != nil && err != os.ErrNotExist {
		return err
	}
190

191 192 193 194 195 196
	err = d.node.AddNodeLink(name, node)
	if err != nil {
		return err
	}
	d.addToEstimatedSize(name, node.Cid())
	return nil
197 198
}

199 200
// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
201
func (d *BasicDirectory) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
202 203 204 205
	linkResults := make(chan format.LinkResult)
	go func() {
		defer close(linkResults)
		for _, l := range d.node.Links() {
hannahhoward's avatar
hannahhoward committed
206 207
			select {
			case linkResults <- format.LinkResult{
208 209
				Link: l,
				Err:  nil,
hannahhoward's avatar
hannahhoward committed
210 211 212
			}:
			case <-ctx.Done():
				return
213 214 215
			}
		}
	}()
216
	return linkResults
217 218
}

219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
// ForEachLink implements the `Directory` interface.
func (d *BasicDirectory) ForEachLink(ctx context.Context, f func(*ipld.Link) error) error {
	for _, l := range d.node.Links() {
		if err := f(l); err != nil {
			return err
		}
	}
	return nil
}

// Links implements the `Directory` interface.
func (d *BasicDirectory) Links(ctx context.Context) ([]*ipld.Link, error) {
	return d.node.Links(), nil
}

// Find implements the `Directory` interface.
func (d *BasicDirectory) Find(ctx context.Context, name string) (ipld.Node, error) {
	lnk, err := d.node.GetNodeLink(name)
	if err == mdag.ErrLinkNotFound {
		err = os.ErrNotExist
	}
	if err != nil {
		return nil, err
	}

	return d.dserv.Get(ctx, lnk.Cid)
}

// RemoveChild implements the `Directory` interface.
func (d *BasicDirectory) RemoveChild(ctx context.Context, name string) error {
249 250 251 252 253
	// We need to *retrieve* the link before removing it to update the estimated
	// size. This means we may iterate the links slice twice: if traversing this
	// becomes a problem, a factor of 2 isn't going to make much of a difference.
	// We'd likely need to cache a link resolution map in that case.
	link, err := d.node.GetNodeLink(name)
254
	if err == mdag.ErrLinkNotFound {
255 256 257 258
		return os.ErrNotExist
	}
	if err != nil {
		return err // at the moment there is no other error besides ErrLinkNotFound
259
	}
260 261 262 263 264 265 266

	// The name actually existed so we should update the estimated size.
	d.removeFromEstimatedSize(link.Name, link.Cid)

	return d.node.RemoveNodeLink(name)
	// GetNodeLink didn't return ErrLinkNotFound so this won't fail with that
	// and we don't need to convert the error again.
267 268 269 270 271 272 273
}

// GetNode implements the `Directory` interface.
func (d *BasicDirectory) GetNode() (ipld.Node, error) {
	return d.node, nil
}

274 275
// GetCidBuilder implements the `Directory` interface.
func (d *BasicDirectory) GetCidBuilder() cid.Builder {
276
	return d.node.CidBuilder()
277 278 279 280 281 282 283 284 285 286 287
}

// SwitchToSharding returns a HAMT implementation of this directory.
func (d *BasicDirectory) SwitchToSharding(ctx context.Context) (Directory, error) {
	hamtDir := new(HAMTDirectory)
	hamtDir.dserv = d.dserv

	shard, err := hamt.NewShard(d.dserv, DefaultShardWidth)
	if err != nil {
		return nil, err
	}
288
	shard.SetCidBuilder(d.node.CidBuilder())
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
	hamtDir.shard = shard

	for _, lnk := range d.node.Links() {
		node, err := d.dserv.Get(ctx, lnk.Cid)
		if err != nil {
			return nil, err
		}

		err = hamtDir.shard.Set(ctx, lnk.Name, node)
		if err != nil {
			return nil, err
		}
	}

	return hamtDir, nil
}

306 307 308
// SetCidBuilder implements the `Directory` interface.
func (d *HAMTDirectory) SetCidBuilder(builder cid.Builder) {
	d.shard.SetCidBuilder(builder)
309 310 311 312 313 314 315 316 317 318 319 320
}

// AddChild implements the `Directory` interface.
func (d *HAMTDirectory) AddChild(ctx context.Context, name string, nd ipld.Node) error {
	return d.shard.Set(ctx, name, nd)
}

// ForEachLink implements the `Directory` interface.
func (d *HAMTDirectory) ForEachLink(ctx context.Context, f func(*ipld.Link) error) error {
	return d.shard.ForEachLink(ctx, f)
}

321 322
// EnumLinksAsync returns a channel which will receive Links in the directory
// as they are enumerated, where order is not gauranteed
323
func (d *HAMTDirectory) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
324 325 326
	return d.shard.EnumLinksAsync(ctx)
}

327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
// Links implements the `Directory` interface.
func (d *HAMTDirectory) Links(ctx context.Context) ([]*ipld.Link, error) {
	return d.shard.EnumLinks(ctx)
}

// Find implements the `Directory` interface. It will traverse the tree.
func (d *HAMTDirectory) Find(ctx context.Context, name string) (ipld.Node, error) {
	lnk, err := d.shard.Find(ctx, name)
	if err != nil {
		return nil, err
	}

	return lnk.GetNode(ctx, d.dserv)
}

// RemoveChild implements the `Directory` interface.
func (d *HAMTDirectory) RemoveChild(ctx context.Context, name string) error {
	return d.shard.Remove(ctx, name)
}

// GetNode implements the `Directory` interface.
func (d *HAMTDirectory) GetNode() (ipld.Node, error) {
	return d.shard.Node()
}

352 353 354
// GetCidBuilder implements the `Directory` interface.
func (d *HAMTDirectory) GetCidBuilder() cid.Builder {
	return d.shard.CidBuilder()
355
}
Lucas Molas's avatar
Lucas Molas committed
356 357 358 359 360 361 362 363 364 365 366

// UpgradeableDirectory wraps a Directory interface and provides extra logic
// to upgrade from its BasicDirectory implementation to HAMTDirectory.
type UpgradeableDirectory struct {
	Directory
}

var _ Directory = (*UpgradeableDirectory)(nil)

// AddChild implements the `Directory` interface. We check when adding new entries
// if we should switch to HAMTDirectory according to global option(s).
Lucas Molas's avatar
Lucas Molas committed
367
func (d *UpgradeableDirectory) AddChild(ctx context.Context, name string, nd ipld.Node) error {
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
	err := d.Directory.AddChild(ctx, name, nd)
	if err != nil {
		return err
	}

	// Evaluate possible HAMT upgrade.
	if HAMTShardingSize == 0 {
		return nil
	}
	basicDir, ok := d.Directory.(*BasicDirectory)
	if !ok {
		return nil
	}
	if basicDir.estimatedSize >= HAMTShardingSize {
		// Ideally to minimize performance we should check if this last
		// `AddChild` call would bring the directory size over the threshold
		// *before* executing it since we would end up switching anyway and
		// that call would be "wasted". This is a minimal performance impact
		// and we prioritize a simple code base.
		hamtDir, err := basicDir.SwitchToSharding(ctx)
		if err != nil {
			return err
Lucas Molas's avatar
Lucas Molas committed
390
		}
391
		d.Directory = hamtDir
Lucas Molas's avatar
Lucas Molas committed
392 393
	}

394
	return nil
Lucas Molas's avatar
Lucas Molas committed
395
}