flatfs.go 21.9 KB
Newer Older
Tommi Virtanen's avatar
Tommi Virtanen committed
1 2 3 4 5 6
// Package flatfs is a Datastore implementation that stores all
// objects in a two-level directory structure in the local file
// system, regardless of the hierarchy of the keys.
package flatfs

import (
7
	"encoding/json"
Tommi Virtanen's avatar
Tommi Virtanen committed
8
	"errors"
9
	"fmt"
Tommi Virtanen's avatar
Tommi Virtanen committed
10
	"io/ioutil"
11 12
	"math"
	"math/rand"
Tommi Virtanen's avatar
Tommi Virtanen committed
13
	"os"
Jeromy's avatar
Jeromy committed
14
	"path/filepath"
Tommi Virtanen's avatar
Tommi Virtanen committed
15
	"strings"
16 17
	"sync"
	"sync/atomic"
18
	"time"
Tommi Virtanen's avatar
Tommi Virtanen committed
19

Jeromy's avatar
Jeromy committed
20 21
	"github.com/ipfs/go-datastore"
	"github.com/ipfs/go-datastore/query"
22

Jakub Sztandera's avatar
Jakub Sztandera committed
23
	logging "github.com/ipfs/go-log"
Tommi Virtanen's avatar
Tommi Virtanen committed
24 25
)

26 27
var log = logging.Logger("flatfs")

Tommi Virtanen's avatar
Tommi Virtanen committed
28
const (
29 30
	extension                  = ".data"
	diskUsageCheckpointPercent = 1.0
Tommi Virtanen's avatar
Tommi Virtanen committed
31 32
)

33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
var (
	// DiskUsageFile is the name of the file to cache the size of the
	// datastore in disk
	DiskUsageFile = "diskUsage.cache"
	// DiskUsageFilesAverage is the maximum number of files per folder
	// to stat in order to calculate the size of the datastore.
	// The size of the rest of the files in a folder will be assumed
	// to be the average of the values obtained. This includes
	// regular files and directories.
	DiskUsageFilesAverage = 2000
	// DiskUsageCalcTimeout is the maximum time to spend
	// calculating the DiskUsage upon a start when no
	// DiskUsageFile is present.
	// If this period did not suffice to read the size of the datastore,
	// the remaining sizes will be stimated.
	DiskUsageCalcTimeout = 5 * time.Minute
)

const (
	opPut = iota
	opDelete
	opRename
)

57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
type initAccuracy string

const (
	exactA    initAccuracy = "initial-exact"
	approxA   initAccuracy = "initial-approximate"
	timedoutA initAccuracy = "initial-timed-out"
)

func combineAccuracy(a, b initAccuracy) initAccuracy {
	if a == timedoutA || b == timedoutA {
		return timedoutA
	}
	if a == approxA || b == approxA {
		return approxA
	}
	if a == exactA && b == exactA {
		return exactA
	}
	return ""
}

78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
var _ datastore.Datastore = (*Datastore)(nil)

var (
	ErrDatastoreExists       = errors.New("datastore already exists")
	ErrDatastoreDoesNotExist = errors.New("datastore directory does not exist")
	ErrShardingFileMissing   = fmt.Errorf("%s file not found in datastore", SHARDING_FN)
)

func init() {
	rand.Seed(time.Now().UTC().UnixNano())
}

// Datastore implements the go-datastore Interface.
// Note this datastore cannot guarantee order of concurrent
// write operations to the same key. See the explanation in
// Put().
Tommi Virtanen's avatar
Tommi Virtanen committed
94 95
type Datastore struct {
	path string
96

97 98
	shardStr string
	getDir   ShardFunc
Jeromy's avatar
Jeromy committed
99 100 101

	// sychronize all writes and directory changes for added safety
	sync bool
102

103 104 105 106 107 108
	// atmoic operations should always be used with diskUsage
	diskUsage int64

	// these values should only be used during internalization or
	// inside the checkpoint loop
	dirty       bool
109
	storedValue diskUsageValue
110 111 112

	checkpointCh chan bool
	done         chan bool
113 114 115 116

	// opMap handles concurrent write operations (put/delete)
	// to the same key
	opMap *opMap
Tommi Virtanen's avatar
Tommi Virtanen committed
117 118
}

119 120 121 122 123
type diskUsageValue struct {
	diskUsage int64
	accuracy  initAccuracy
}

124 125
type ShardFunc func(string) string

126
type opT int
127

128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
// op wraps useful arguments of write operations
type op struct {
	typ  opT           // operation type
	key  datastore.Key // datastore key. Mandatory.
	tmp  string        // temp file path
	path string        // file path
	v    []byte        // value
}

type opMap struct {
	ops sync.Map
}

type opResult struct {
	mu      sync.RWMutex
	success bool

	opMap *opMap
	name  string
}

// Returns nil if there's nothing to do.
func (m *opMap) Begin(name string) *opResult {
	for {
		myOp := &opResult{opMap: m, name: name}
		myOp.mu.Lock()
		opIface, loaded := m.ops.LoadOrStore(name, myOp)
		if !loaded { // no one else doing ops with this key
			return myOp
		}

		op := opIface.(*opResult)
		// someone else doing ops with this key, wait for
		// the result
		op.mu.RLock()
		if op.success {
			return nil
		}

		// if we are here, we will retry the operation
	}
}

func (o *opResult) Finish(ok bool) {
	o.success = ok
	o.opMap.ops.Delete(o.name)
	o.mu.Unlock()
}
176

177
func Create(path string, fun *ShardIdV1) error {
178

kpcyrd's avatar
kpcyrd committed
179
	err := os.Mkdir(path, 0755)
180 181
	if err != nil && !os.IsExist(err) {
		return err
Tommi Virtanen's avatar
Tommi Virtanen committed
182 183
	}

184 185
	dsFun, err := ReadShardFunc(path)
	switch err {
Kevin Atkinson's avatar
Kevin Atkinson committed
186
	case ErrShardingFileMissing:
187 188 189 190 191
		isEmpty, err := DirIsEmpty(path)
		if err != nil {
			return err
		}
		if !isEmpty {
192
			return fmt.Errorf("directory missing %s file: %s", SHARDING_FN, path)
193 194 195
		}

		err = WriteShardFunc(path, fun)
196 197 198
		if err != nil {
			return err
		}
199 200
		err = WriteReadme(path, fun)
		return err
201
	case nil:
202
		if fun.String() != dsFun.String() {
203
			return fmt.Errorf("specified shard func '%s' does not match repo shard func '%s'",
204
				fun.String(), dsFun.String())
205
		}
Kevin Atkinson's avatar
Kevin Atkinson committed
206
		return ErrDatastoreExists
207
	default:
208
		return err
209
	}
210 211
}

212
func Open(path string, syncFiles bool) (*Datastore, error) {
213 214
	_, err := os.Stat(path)
	if os.IsNotExist(err) {
Kevin Atkinson's avatar
Kevin Atkinson committed
215
		return nil, ErrDatastoreDoesNotExist
216 217 218 219
	} else if err != nil {
		return nil, err
	}

220
	shardId, err := ReadShardFunc(path)
221
	if err != nil {
222
		return nil, err
223 224
	}

225
	fs := &Datastore{
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
		path:      path,
		shardStr:  shardId.String(),
		getDir:    shardId.Func(),
		sync:      syncFiles,
		diskUsage: 0,
		opMap:     new(opMap),
	}

	// This sets diskUsage to the correct value
	// It might be slow, but allowing it to happen
	// while the datastore is usable might
	// cause diskUsage to not be accurate.
	err = fs.calculateDiskUsage()
	if err != nil {
		// Cannot stat() all
		// elements in the datastore.
		return nil, err
243
	}
244 245 246 247 248 249

	fs.checkpointCh = make(chan bool, 1)
	fs.done = make(chan bool)
	go func() {
		fs.checkpointLoop()
	}()
250 251 252
	return fs, nil
}

253
// convenience method
254
func CreateOrOpen(path string, fun *ShardIdV1, sync bool) (*Datastore, error) {
255
	err := Create(path, fun)
Kevin Atkinson's avatar
Kevin Atkinson committed
256
	if err != nil && err != ErrDatastoreExists {
257 258 259 260 261
		return nil, err
	}
	return Open(path, sync)
}

262 263
func (fs *Datastore) ShardStr() string {
	return fs.shardStr
264 265
}

Tommi Virtanen's avatar
Tommi Virtanen committed
266
func (fs *Datastore) encode(key datastore.Key) (dir, file string) {
267
	noslash := key.String()[1:]
Jeromy's avatar
Jeromy committed
268 269
	dir = filepath.Join(fs.path, fs.getDir(noslash))
	file = filepath.Join(dir, noslash+extension)
Tommi Virtanen's avatar
Tommi Virtanen committed
270 271 272
	return dir, file
}

273
func (fs *Datastore) decode(file string) (key datastore.Key, ok bool) {
Jeromy's avatar
Jeromy committed
274
	if filepath.Ext(file) != extension {
275 276 277
		return datastore.Key{}, false
	}
	name := file[:len(file)-len(extension)]
Jeromy's avatar
Jeromy committed
278
	return datastore.NewKey(name), true
279 280
}

281 282
func (fs *Datastore) makeDir(dir string) error {
	if err := fs.makeDirNoSync(dir); err != nil {
Jeromy's avatar
Jeromy committed
283
		return err
284 285 286 287 288 289
	}

	// In theory, if we create a new prefix dir and add a file to
	// it, the creation of the prefix dir itself might not be
	// durable yet. Sync the root dir after a successful mkdir of
	// a prefix dir, just to be paranoid.
Jeromy's avatar
Jeromy committed
290 291 292 293
	if fs.sync {
		if err := syncDir(fs.path); err != nil {
			return err
		}
294 295 296 297
	}
	return nil
}

298
func (fs *Datastore) makeDirNoSync(dir string) error {
kpcyrd's avatar
kpcyrd committed
299
	if err := os.Mkdir(dir, 0755); err != nil {
Jeromy's avatar
Jeromy committed
300 301 302 303 304
		// EEXIST is safe to ignore here, that just means the prefix
		// directory already existed.
		if !os.IsExist(err) {
			return err
		}
305
		return nil
Jeromy's avatar
Jeromy committed
306
	}
307 308 309

	// Track DiskUsage of this NEW folder
	fs.updateDiskUsage(dir, true)
Jeromy's avatar
Jeromy committed
310 311 312
	return nil
}

313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
// This function always runs under an opLock. Therefore, only one thread is
// touching the affected files.
func (fs *Datastore) renameAndUpdateDiskUsage(tmpPath, path string) error {
	fi, err := os.Stat(path)

	// Destination exists, we need to discount it from diskUsage
	if fs != nil && err == nil {
		atomic.AddInt64(&fs.diskUsage, -fi.Size())
	} else if !os.IsNotExist(err) {
		return err
	}

	// Rename and add new file's diskUsage. If the rename fails,
	// it will either a) Re-add the size of an existing file, which
	// was sustracted before b) Add 0 if there is no existing file.
328
	err = os.Rename(tmpPath, path)
329 330 331 332
	fs.updateDiskUsage(path, true)
	return err
}

Jeromy's avatar
Jeromy committed
333
var putMaxRetries = 6
334

335 336 337 338 339 340 341 342 343 344
// Put stores a key/value in the datastore.
//
// Note, that we do not guarantee order of write operations (Put or Delete)
// to the same key in this datastore.
//
// For example. i.e. in the case of two concurrent Put, we only guarantee
// that one of them will come through, but cannot assure which one even if
// one arrived slightly later than the other. In the case of a
// concurrent Put and a Delete operation, we cannot guarantee which one
// will win.
Tommi Virtanen's avatar
Tommi Virtanen committed
345 346 347 348 349 350
func (fs *Datastore) Put(key datastore.Key, value interface{}) error {
	val, ok := value.([]byte)
	if !ok {
		return datastore.ErrInvalidType
	}

351
	var err error
Jeromy's avatar
Jeromy committed
352
	for i := 1; i <= putMaxRetries; i++ {
353 354 355 356 357
		err = fs.doWriteOp(&op{
			typ: opPut,
			key: key,
			v:   val,
		})
358
		if err == nil {
Jeromy's avatar
Jeromy committed
359
			break
360 361 362
		}

		if !strings.Contains(err.Error(), "too many open files") {
Jeromy's avatar
Jeromy committed
363
			break
364 365
		}

Or Rikon's avatar
Or Rikon committed
366
		log.Errorf("too many open files, retrying in %dms", 100*i)
Jeromy's avatar
Jeromy committed
367
		time.Sleep(time.Millisecond * 100 * time.Duration(i))
368 369 370 371
	}
	return err
}

372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408
func (fs *Datastore) doOp(oper *op) error {
	switch oper.typ {
	case opPut:
		return fs.doPut(oper.key, oper.v)
	case opDelete:
		return fs.doDelete(oper.key)
	case opRename:
		return fs.renameAndUpdateDiskUsage(oper.tmp, oper.path)
	default:
		panic("bad operation, this is a bug")
	}
}

// doWrite optmizes out write operations (put/delete) to the same
// key by queueing them and suceeding all queued
// operations if one of them does. In such case,
// we assume that the first suceeding operation
// on that key was the last one to happen after
// all successful others.
func (fs *Datastore) doWriteOp(oper *op) error {
	keyStr := oper.key.String()

	opRes := fs.opMap.Begin(keyStr)
	if opRes == nil { // nothing to do, a concurrent op succeeded
		return nil
	}

	// Do the operation
	err := fs.doOp(oper)

	// Finish it. If no error, it will signal other operations
	// waiting on this result to succeed. Otherwise, they will
	// retry.
	opRes.Finish(err == nil)
	return err
}

409
func (fs *Datastore) doPut(key datastore.Key, val []byte) error {
410

Tommi Virtanen's avatar
Tommi Virtanen committed
411
	dir, path := fs.encode(key)
412
	if err := fs.makeDir(dir); err != nil {
413
		return err
Tommi Virtanen's avatar
Tommi Virtanen committed
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435
	}

	tmp, err := ioutil.TempFile(dir, "put-")
	if err != nil {
		return err
	}
	closed := false
	removed := false
	defer func() {
		if !closed {
			// silence errcheck
			_ = tmp.Close()
		}
		if !removed {
			// silence errcheck
			_ = os.Remove(tmp.Name())
		}
	}()

	if _, err := tmp.Write(val); err != nil {
		return err
	}
Jeromy's avatar
Jeromy committed
436
	if fs.sync {
437
		if err := syncFile(tmp); err != nil {
Jeromy's avatar
Jeromy committed
438 439
			return err
		}
440
	}
Tommi Virtanen's avatar
Tommi Virtanen committed
441 442 443 444 445
	if err := tmp.Close(); err != nil {
		return err
	}
	closed = true

446
	err = fs.renameAndUpdateDiskUsage(tmp.Name(), path)
Tommi Virtanen's avatar
Tommi Virtanen committed
447 448 449 450 451
	if err != nil {
		return err
	}
	removed = true

Jeromy's avatar
Jeromy committed
452 453 454 455
	if fs.sync {
		if err := syncDir(dir); err != nil {
			return err
		}
456
	}
Tommi Virtanen's avatar
Tommi Virtanen committed
457 458 459
	return nil
}

Jeromy's avatar
Jeromy committed
460 461
func (fs *Datastore) putMany(data map[datastore.Key]interface{}) error {
	var dirsToSync []string
462
	files := make(map[*os.File]*op)
Jeromy's avatar
Jeromy committed
463 464 465 466 467 468 469

	for key, value := range data {
		val, ok := value.([]byte)
		if !ok {
			return datastore.ErrInvalidType
		}
		dir, path := fs.encode(key)
470
		if err := fs.makeDirNoSync(dir); err != nil {
Jeromy's avatar
Jeromy committed
471 472 473 474 475 476 477 478 479 480 481 482 483
			return err
		}
		dirsToSync = append(dirsToSync, dir)

		tmp, err := ioutil.TempFile(dir, "put-")
		if err != nil {
			return err
		}

		if _, err := tmp.Write(val); err != nil {
			return err
		}

484 485 486 487 488 489
		files[tmp] = &op{
			typ:  opRename,
			path: path,
			tmp:  tmp.Name(),
			key:  key,
		}
Jeromy's avatar
Jeromy committed
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
	}

	ops := make(map[*os.File]int)

	defer func() {
		for fi, _ := range files {
			val, _ := ops[fi]
			switch val {
			case 0:
				_ = fi.Close()
				fallthrough
			case 1:
				_ = os.Remove(fi.Name())
			}
		}
	}()

	// Now we sync everything
	// sync and close files
	for fi, _ := range files {
Jeromy's avatar
Jeromy committed
510
		if fs.sync {
511
			if err := syncFile(fi); err != nil {
Jeromy's avatar
Jeromy committed
512 513
				return err
			}
Jeromy's avatar
Jeromy committed
514 515 516 517 518 519 520 521 522 523 524
		}

		if err := fi.Close(); err != nil {
			return err
		}

		// signify closed
		ops[fi] = 1
	}

	// move files to their proper places
525 526
	for fi, op := range files {
		fs.doWriteOp(op)
Jeromy's avatar
Jeromy committed
527 528 529 530 531
		// signify removed
		ops[fi] = 2
	}

	// now sync the dirs for those files
Jeromy's avatar
Jeromy committed
532 533 534 535 536
	if fs.sync {
		for _, dir := range dirsToSync {
			if err := syncDir(dir); err != nil {
				return err
			}
Jeromy's avatar
Jeromy committed
537 538
		}

Jeromy's avatar
Jeromy committed
539 540 541 542
		// sync top flatfs dir
		if err := syncDir(fs.path); err != nil {
			return err
		}
Jeromy's avatar
Jeromy committed
543 544 545 546 547
	}

	return nil
}

Tommi Virtanen's avatar
Tommi Virtanen committed
548 549 550 551 552 553 554 555 556 557 558 559 560 561
func (fs *Datastore) Get(key datastore.Key) (value interface{}, err error) {
	_, path := fs.encode(key)
	data, err := ioutil.ReadFile(path)
	if err != nil {
		if os.IsNotExist(err) {
			return nil, datastore.ErrNotFound
		}
		// no specific error to return, so just pass it through
		return nil, err
	}
	return data, nil
}

func (fs *Datastore) Has(key datastore.Key) (exists bool, err error) {
Tommi Virtanen's avatar
Tommi Virtanen committed
562 563 564 565 566 567 568 569 570
	_, path := fs.encode(key)
	switch _, err := os.Stat(path); {
	case err == nil:
		return true, nil
	case os.IsNotExist(err):
		return false, nil
	default:
		return false, err
	}
Tommi Virtanen's avatar
Tommi Virtanen committed
571 572
}

573 574 575
// Delete removes a key/value from the Datastore. Please read
// the Put() explanation about the handling of concurrent write
// operations to the same key.
Tommi Virtanen's avatar
Tommi Virtanen committed
576
func (fs *Datastore) Delete(key datastore.Key) error {
577 578 579 580 581 582 583 584 585 586
	return fs.doWriteOp(&op{
		typ: opDelete,
		key: key,
		v:   nil,
	})
}

// This function always runs within an opLock for the given
// key, and not concurrently.
func (fs *Datastore) doDelete(key datastore.Key) error {
Tommi Virtanen's avatar
Tommi Virtanen committed
587
	_, path := fs.encode(key)
588 589 590

	fSize := fileSize(path)

Tommi Virtanen's avatar
Tommi Virtanen committed
591 592
	switch err := os.Remove(path); {
	case err == nil:
593 594
		atomic.AddInt64(&fs.diskUsage, -fSize)
		fs.checkpointDiskUsage()
Tommi Virtanen's avatar
Tommi Virtanen committed
595 596 597 598 599 600
		return nil
	case os.IsNotExist(err):
		return datastore.ErrNotFound
	default:
		return err
	}
Tommi Virtanen's avatar
Tommi Virtanen committed
601 602 603
}

func (fs *Datastore) Query(q query.Query) (query.Results, error) {
604 605 606 607 608 609 610 611 612 613 614
	if (q.Prefix != "" && q.Prefix != "/") ||
		len(q.Filters) > 0 ||
		len(q.Orders) > 0 ||
		q.Limit > 0 ||
		q.Offset > 0 ||
		!q.KeysOnly {
		// TODO this is overly simplistic, but the only caller is
		// `ipfs refs local` for now, and this gets us moving.
		return nil, errors.New("flatfs only supports listing all keys in random order")
	}

615
	reschan := make(chan query.Result, query.KeysOnlyBufSize)
Jeromy's avatar
Jeromy committed
616 617
	go func() {
		defer close(reschan)
618
		err := fs.walkTopLevel(fs.path, reschan)
Jeromy's avatar
Jeromy committed
619
		if err != nil {
620
			reschan <- query.Result{Error: errors.New("walk failed: " + err.Error())}
621
		}
Jeromy's avatar
Jeromy committed
622 623
	}()
	return query.ResultsWithChan(q, reschan), nil
Tommi Virtanen's avatar
Tommi Virtanen committed
624 625
}

626 627 628 629 630
func (fs *Datastore) walkTopLevel(path string, reschan chan query.Result) error {
	dir, err := os.Open(path)
	if err != nil {
		return err
	}
Kevin Atkinson's avatar
Kevin Atkinson committed
631
	defer dir.Close()
632 633 634 635 636
	names, err := dir.Readdirnames(-1)
	if err != nil {
		return err
	}
	for _, dir := range names {
Kevin Atkinson's avatar
Kevin Atkinson committed
637 638 639 640 641

		if len(dir) == 0 || dir[0] == '.' {
			continue
		}

642 643 644 645
		err = fs.walk(filepath.Join(path, dir), reschan)
		if err != nil {
			return err
		}
Kevin Atkinson's avatar
Kevin Atkinson committed
646

647 648 649 650
	}
	return nil
}

651 652 653
// folderSize estimates the diskUsage of a folder by reading
// up to DiskUsageFilesAverage entries in it and assumming any
// other files will have an avereage size.
654
func folderSize(path string, deadline time.Time) (int64, initAccuracy, error) {
655 656 657 658
	var du int64

	folder, err := os.Open(path)
	if err != nil {
659
		return 0, "", err
660 661 662 663 664
	}
	defer folder.Close()

	stat, err := folder.Stat()
	if err != nil {
665
		return 0, "", err
666 667 668 669
	}

	files, err := folder.Readdirnames(-1)
	if err != nil {
670
		return 0, "", err
671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687
	}

	totalFiles := len(files)
	i := 0
	filesProcessed := 0
	maxFiles := DiskUsageFilesAverage
	if maxFiles <= 0 {
		maxFiles = totalFiles
	}

	// randomize file order
	// https://stackoverflow.com/a/42776696
	for i := len(files) - 1; i > 0; i-- {
		j := rand.Intn(i + 1)
		files[i], files[j] = files[j], files[i]
	}

688
	accuracy := exactA
689
	for {
690 691 692
		// Do not process any files after deadline is over
		if time.Now().After(deadline) {
			accuracy = timedoutA
693 694 695
			break
		}

696 697 698 699
		if i >= totalFiles || filesProcessed >= maxFiles {
			if filesProcessed >= maxFiles {
				accuracy = approxA
			}
700 701 702 703 704 705 706 707
			break
		}

		// Stat the file
		fname := files[i]
		subpath := filepath.Join(path, fname)
		st, err := os.Stat(subpath)
		if err != nil {
708
			return 0, "", err
709 710 711 712
		}

		// Find folder size recursively
		if st.IsDir() {
713
			du2, acc, err := folderSize(filepath.Join(subpath), deadline)
714
			if err != nil {
715
				return 0, "", err
716
			}
717
			accuracy = combineAccuracy(acc, accuracy)
718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739
			du += du2
			filesProcessed++
		} else { // in any other case, add the file size
			du += st.Size()
			filesProcessed++
		}

		i++
	}

	nonProcessed := totalFiles - filesProcessed

	// Avg is total size in this folder up to now / total files processed
	// it includes folders ant not folders
	avg := 0.0
	if filesProcessed > 0 {
		avg = float64(du) / float64(filesProcessed)
	}
	duEstimation := int64(avg * float64(nonProcessed))
	du += duEstimation
	du += stat.Size()
	//fmt.Println(path, "total:", totalFiles, "totalStat:", i, "totalFile:", filesProcessed, "left:", nonProcessed, "avg:", int(avg), "est:", int(duEstimation), "du:", du)
740
	return du, accuracy, nil
741 742 743 744
}

// calculateDiskUsage tries to read the DiskUsageFile for a cached
// diskUsage value, otherwise walks the datastore files.
745
// it is only safe to call in Open()
746 747 748
func (fs *Datastore) calculateDiskUsage() error {
	// Try to obtain a previously stored value from disk
	if persDu := fs.readDiskUsageFile(); persDu > 0 {
749
		fs.diskUsage = persDu
750 751 752 753 754
		return nil
	}

	fmt.Printf("Calculating datastore size. This might take %s at most and will happen only once\n", DiskUsageCalcTimeout.String())
	deadline := time.Now().Add(DiskUsageCalcTimeout)
755
	du, accuracy, err := folderSize(fs.path, deadline)
756 757 758
	if err != nil {
		return err
	}
759
	if accuracy == timedoutA {
760 761 762 763 764 765 766
		fmt.Println("WARN: It took to long to calculate the datastore size")
		fmt.Printf("WARN: The total size (%d) is an estimation. You can fix errors by\n", du)
		fmt.Printf("WARN: replacing the %s file with the right disk usage in bytes and\n",
			filepath.Join(fs.path, DiskUsageFile))
		fmt.Println("WARN: re-opening the datastore")
	}

767 768
	fs.storedValue.accuracy = accuracy
	fs.diskUsage = du
769
	fs.writeDiskUsageFile(du)
770

771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791
	return nil
}

func fileSize(path string) int64 {
	fi, err := os.Stat(path)
	if err != nil {
		return 0
	}
	return fi.Size()
}

// updateDiskUsage reads the size of path and atomically
// increases or decreases the diskUsage variable.
// setting add to false will subtract from disk usage.
func (fs *Datastore) updateDiskUsage(path string, add bool) {
	fsize := fileSize(path)
	if !add {
		fsize = -fsize
	}

	if fsize != 0 {
792 793
		atomic.AddInt64(&fs.diskUsage, fsize)
		fs.checkpointDiskUsage()
794 795 796
	}
}

797
func (fs *Datastore) checkpointDiskUsage() {
798 799 800 801 802
	select {
	case fs.checkpointCh <- true:
		// msg send
	default:
		// checkpoint request already pending
803 804 805
	}
}

806
func (fs *Datastore) checkpointLoop() {
807 808 809 810 811 812 813 814 815 816 817 818 819
	for {
		_, more := <-fs.checkpointCh
		fs.dirty = true
		du := atomic.LoadInt64(&fs.diskUsage)
		if more {
			newDu := float64(du)
			lastCheckpointDu := float64(fs.storedValue.diskUsage)
			diff := math.Abs(newDu - lastCheckpointDu)

			// If the difference between the checkpointed disk usage and
			// current one is larger than than 1% of the checkpointed: store it.
			if (lastCheckpointDu * diskUsageCheckpointPercent / 100.0) < diff {
				fs.writeDiskUsageFile(du)
820
			}
821 822 823 824 825 826 827 828

			// FIXME: If dirty set a timer to write the diskusage
			// anyway after X seconds of inactivity.
		} else {
			// shutting down, write the final value
			fs.writeDiskUsageFile(du)
			fs.done <- true
			return
829
		}
830
	}
831 832
}

833
func (fs *Datastore) writeDiskUsageFile(du int64) {
834 835 836 837
	tmp, err := ioutil.TempFile(fs.path, "du-")
	if err != nil {
		return
	}
838 839 840

	encoder := json.NewEncoder(tmp)
	if err := encoder.Encode(&fs.storedValue); err != nil {
841 842
		return
	}
843

844 845 846 847
	if err := tmp.Close(); err != nil {
		return
	}

848
	if err := os.Rename(tmp.Name(), filepath.Join(fs.path, DiskUsageFile)); err != nil {
849
		return
850
	}
851 852 853

	fs.storedValue.diskUsage = du
	fs.dirty = false
854 855
}

856
// readDiskUsageFile is only safe to call in Open()
857 858 859 860 861 862
func (fs *Datastore) readDiskUsageFile() int64 {
	fpath := filepath.Join(fs.path, DiskUsageFile)
	duB, err := ioutil.ReadFile(fpath)
	if err != nil {
		return 0
	}
863
	err = json.Unmarshal(duB, &fs.storedValue)
864 865 866
	if err != nil {
		return 0
	}
867
	return fs.storedValue.diskUsage
868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887
}

// DiskUsage implements the PersistentDatastore interface
// and returns the current disk usage in bytes used by
// this datastore.
//
// The size is approximative and may slightly differ from
// the real disk values.
func (fs *Datastore) DiskUsage() (uint64, error) {
	// it may differ from real disk values if
	// the filesystem has allocated for blocks
	// for a directory because it has many files in it
	// we don't account for "resized" directories.
	// In a large datastore, the differences should be
	// are negligible though.

	du := atomic.LoadInt64(&fs.diskUsage)
	return uint64(du), nil
}

888 889 890 891 892 893 894
// Accuracy returns a string representing the accuracy of the
// DiskUsage() result, the value returned is implementation defined
// and for informational purposes only
func (fs *Datastore) Accuracy() string {
	return string(fs.storedValue.accuracy)
}

895 896 897
func (fs *Datastore) walk(path string, reschan chan query.Result) error {
	dir, err := os.Open(path)
	if err != nil {
898 899 900 901
		if os.IsNotExist(err) {
			// not an error if the file disappeared
			return nil
		}
902 903
		return err
	}
Kevin Atkinson's avatar
Kevin Atkinson committed
904
	defer dir.Close()
905 906 907 908 909 910 911 912 913 914

	// ignore non-directories
	fileInfo, err := dir.Stat()
	if err != nil {
		return err
	}
	if !fileInfo.IsDir() {
		return nil
	}

915 916 917 918 919 920 921 922 923 924 925 926
	names, err := dir.Readdirnames(-1)
	if err != nil {
		return err
	}
	for _, fn := range names {

		if len(fn) == 0 || fn[0] == '.' {
			continue
		}

		key, ok := fs.decode(fn)
		if !ok {
927
			log.Warningf("failed to decode flatfs entry: %s", fn)
928 929 930 931 932 933 934 935 936 937 938 939
			continue
		}

		reschan <- query.Result{
			Entry: query.Entry{
				Key: key.String(),
			},
		}
	}
	return nil
}

940 941 942 943 944 945 946 947 948 949
// Deactivate closes background maintenance threads, most write
// operations will fail but readonly operations will continue to
// function
func (fs *Datastore) deactivate() error {
	if fs.checkpointCh != nil {
		fs.checkpointCh <- true
		close(fs.checkpointCh)
		<-fs.done
		fs.checkpointCh = nil
	}
Jeromy's avatar
Jeromy committed
950 951 952
	return nil
}

953 954 955 956
func (fs *Datastore) Close() error {
	return fs.deactivate()
}

Jeromy's avatar
Jeromy committed
957
type flatfsBatch struct {
Jeromy's avatar
Jeromy committed
958 959 960 961 962 963
	puts    map[datastore.Key]interface{}
	deletes map[datastore.Key]struct{}

	ds *Datastore
}

Jeromy's avatar
Jeromy committed
964
func (fs *Datastore) Batch() (datastore.Batch, error) {
Jeromy's avatar
Jeromy committed
965
	return &flatfsBatch{
Jeromy's avatar
Jeromy committed
966 967 968
		puts:    make(map[datastore.Key]interface{}),
		deletes: make(map[datastore.Key]struct{}),
		ds:      fs,
Jeromy's avatar
Jeromy committed
969
	}, nil
Jeromy's avatar
Jeromy committed
970 971
}

Jeromy's avatar
Jeromy committed
972
func (bt *flatfsBatch) Put(key datastore.Key, val interface{}) error {
Jeromy's avatar
Jeromy committed
973 974 975 976
	bt.puts[key] = val
	return nil
}

Jeromy's avatar
Jeromy committed
977
func (bt *flatfsBatch) Delete(key datastore.Key) error {
Jeromy's avatar
Jeromy committed
978 979 980 981
	bt.deletes[key] = struct{}{}
	return nil
}

Jeromy's avatar
Jeromy committed
982
func (bt *flatfsBatch) Commit() error {
Jeromy's avatar
Jeromy committed
983 984 985 986 987 988 989 990 991 992 993 994 995
	if err := bt.ds.putMany(bt.puts); err != nil {
		return err
	}

	for k, _ := range bt.deletes {
		if err := bt.ds.Delete(k); err != nil {
			return err
		}
	}

	return nil
}

Tommi Virtanen's avatar
Tommi Virtanen committed
996 997 998
var _ datastore.ThreadSafeDatastore = (*Datastore)(nil)

func (*Datastore) IsThreadSafe() {}