pin.go 13.4 KB
Newer Older
1
// Package ldpinner implements structures and methods to keep track of
Andrew Gillis's avatar
Andrew Gillis committed
2 3
// which objects a user wants to keep stored locally.  This implementation
// stores pin information in a mdag structure.
4
package ldpinner
Andrew Gillis's avatar
Andrew Gillis committed
5 6 7 8 9 10 11 12

import (
	"context"
	"fmt"
	"os"
	"sync"
	"time"

13 14 15 16 17 18 19
	cid "gitlab.dms3.io/dms3/go-cid"
	ds "gitlab.dms3.io/dms3/go-datastore"
	ld "gitlab.dms3.io/dms3/go-ld-format"
	logging "gitlab.dms3.io/dms3/go-log"
	"gitlab.dms3.io/dms3/go-merkledag"
	mdag "gitlab.dms3.io/dms3/go-merkledag"
	"gitlab.dms3.io/dms3/go-merkledag/dagutils"
Andrew Gillis's avatar
Andrew Gillis committed
20

21
	dms3pinner "gitlab.dms3.io/dms3/go-dms3-pinner"
Andrew Gillis's avatar
Andrew Gillis committed
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
)

const loadTimeout = 5 * time.Second

var log = logging.Logger("pin")

var pinDatastoreKey = ds.NewKey("/local/pins")

var emptyKey cid.Cid

var linkDirect, linkRecursive, linkInternal string

func init() {
	e, err := cid.Decode("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n")
	if err != nil {
		log.Error("failed to decode empty key constant")
		os.Exit(1)
	}
	emptyKey = e

42
	directStr, ok := dms3pinner.ModeToString(dms3pinner.Direct)
Andrew Gillis's avatar
Andrew Gillis committed
43 44 45 46 47
	if !ok {
		panic("could not find Direct pin enum")
	}
	linkDirect = directStr

48
	recursiveStr, ok := dms3pinner.ModeToString(dms3pinner.Recursive)
Andrew Gillis's avatar
Andrew Gillis committed
49 50 51 52 53
	if !ok {
		panic("could not find Recursive pin enum")
	}
	linkRecursive = recursiveStr

54
	internalStr, ok := dms3pinner.ModeToString(dms3pinner.Internal)
Andrew Gillis's avatar
Andrew Gillis committed
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
	if !ok {
		panic("could not find Internal pin enum")
	}
	linkInternal = internalStr
}

// pinner implements the Pinner interface
type pinner struct {
	lock       sync.RWMutex
	recursePin *cid.Set
	directPin  *cid.Set

	// Track the keys used for storing the pinning state, so gc does
	// not delete them.
	internalPin *cid.Set
70 71
	dserv       ld.DAGService
	internal    ld.DAGService // dagservice used to store internal objects
Andrew Gillis's avatar
Andrew Gillis committed
72 73 74
	dstore      ds.Datastore
}

75
var _ dms3pinner.Pinner = (*pinner)(nil)
Andrew Gillis's avatar
Andrew Gillis committed
76 77

type syncDAGService interface {
78
	ld.DAGService
Andrew Gillis's avatar
Andrew Gillis committed
79 80 81 82 83
	Sync() error
}

// New creates a new pinner using the given datastore as a backend, and loads
// the pinner's keysets from the datastore
84
func New(dstore ds.Datastore, dserv, internal ld.DAGService) (*pinner, error) {
Andrew Gillis's avatar
Andrew Gillis committed
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
	rootKey, err := dstore.Get(pinDatastoreKey)
	if err != nil {
		if err == ds.ErrNotFound {
			return &pinner{
				recursePin:  cid.NewSet(),
				directPin:   cid.NewSet(),
				internalPin: cid.NewSet(),
				dserv:       dserv,
				internal:    internal,
				dstore:      dstore,
			}, nil
		}
		return nil, err
	}
	rootCid, err := cid.Cast(rootKey)
	if err != nil {
		return nil, err
	}

	ctx, cancel := context.WithTimeout(context.TODO(), loadTimeout)
	defer cancel()

	root, err := internal.Get(ctx, rootCid)
	if err != nil {
		return nil, fmt.Errorf("cannot find pinning root object: %v", err)
	}

	rootpb, ok := root.(*mdag.ProtoNode)
	if !ok {
		return nil, mdag.ErrNotProtobuf
	}

	internalset := cid.NewSet()
	internalset.Add(rootCid)
	recordInternal := internalset.Add

	// load recursive set
	recurseKeys, err := loadSet(ctx, internal, rootpb, linkRecursive, recordInternal)
	if err != nil {
		return nil, fmt.Errorf("cannot load recursive pins: %v", err)
	}

	// load direct set
	directKeys, err := loadSet(ctx, internal, rootpb, linkDirect, recordInternal)
	if err != nil {
		return nil, fmt.Errorf("cannot load direct pins: %v", err)
	}

	return &pinner{
		// assign pinsets
		recursePin:  cidSetWithValues(recurseKeys),
		directPin:   cidSetWithValues(directKeys),
		internalPin: internalset,
		// assign services
		dserv:    dserv,
		dstore:   dstore,
		internal: internal,
	}, nil
}

145 146
// LoadKeys reads the pinned CIDs and sends them on the given channel.  This is
// used to read pins without loading them all into memory.
147
func LoadKeys(ctx context.Context, dstore ds.Datastore, dserv, internal ld.DAGService, recursive bool, keyChan chan<- cid.Cid) error {
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
	rootKey, err := dstore.Get(pinDatastoreKey)
	if err != nil {
		if err == ds.ErrNotFound {
			return nil
		}
		return err
	}
	rootCid, err := cid.Cast(rootKey)
	if err != nil {
		return err
	}

	root, err := internal.Get(ctx, rootCid)
	if err != nil {
		return fmt.Errorf("cannot find pinning root object: %v", err)
	}

	rootpb, ok := root.(*mdag.ProtoNode)
	if !ok {
		return mdag.ErrNotProtobuf
	}

	var linkName string
	if recursive {
		linkName = linkRecursive
	} else {
		linkName = linkDirect
	}

	return loadSetChan(ctx, internal, rootpb, linkName, keyChan)
}

Andrew Gillis's avatar
Andrew Gillis committed
180
// Pin the given node, optionally recursive
181
func (p *pinner) Pin(ctx context.Context, node ld.Node, recurse bool) error {
Andrew Gillis's avatar
Andrew Gillis committed
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
	err := p.dserv.Add(ctx, node)
	if err != nil {
		return err
	}

	c := node.Cid()

	p.lock.Lock()
	defer p.lock.Unlock()

	if recurse {
		if p.recursePin.Has(c) {
			return nil
		}

		p.lock.Unlock()
		// temporary unlock to fetch the entire graph
		err := mdag.FetchGraph(ctx, c, p.dserv)
		p.lock.Lock()
		if err != nil {
			return err
		}

		if p.recursePin.Has(c) {
			return nil
		}

		if p.directPin.Has(c) {
			p.directPin.Remove(c)
		}

		p.recursePin.Add(c)
	} else {
		if p.recursePin.Has(c) {
			return fmt.Errorf("%s already pinned recursively", c.String())
		}

		p.directPin.Add(c)
	}
	return nil
}

// ErrNotPinned is returned when trying to unpin items which are not pinned.
var ErrNotPinned = fmt.Errorf("not pinned or pinned indirectly")

// Unpin a given key
func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error {
	p.lock.Lock()
	defer p.lock.Unlock()
	if p.recursePin.Has(c) {
		if !recursive {
			return fmt.Errorf("%s is pinned recursively", c)
		}
		p.recursePin.Remove(c)
		return nil
	}
	if p.directPin.Has(c) {
		p.directPin.Remove(c)
		return nil
	}
	return ErrNotPinned
}

func (p *pinner) isInternalPin(c cid.Cid) bool {
	return p.internalPin.Has(c)
}

// IsPinned returns whether or not the given key is pinned
// and an explanation of why its pinned
func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) {
	p.lock.RLock()
	defer p.lock.RUnlock()
254
	return p.isPinnedWithType(ctx, c, dms3pinner.Any)
Andrew Gillis's avatar
Andrew Gillis committed
255 256 257 258
}

// IsPinnedWithType returns whether or not the given cid is pinned with the
// given pin type, as well as returning the type of pin its pinned with.
259
func (p *pinner) IsPinnedWithType(ctx context.Context, c cid.Cid, mode dms3pinner.Mode) (string, bool, error) {
Andrew Gillis's avatar
Andrew Gillis committed
260 261 262 263 264 265 266
	p.lock.RLock()
	defer p.lock.RUnlock()
	return p.isPinnedWithType(ctx, c, mode)
}

// isPinnedWithType is the implementation of IsPinnedWithType that does not lock.
// intended for use by other pinned methods that already take locks
267
func (p *pinner) isPinnedWithType(ctx context.Context, c cid.Cid, mode dms3pinner.Mode) (string, bool, error) {
Andrew Gillis's avatar
Andrew Gillis committed
268
	switch mode {
269
	case dms3pinner.Any, dms3pinner.Direct, dms3pinner.Indirect, dms3pinner.Recursive, dms3pinner.Internal:
Andrew Gillis's avatar
Andrew Gillis committed
270 271
	default:
		err := fmt.Errorf("invalid Pin Mode '%d', must be one of {%d, %d, %d, %d, %d}",
272
			mode, dms3pinner.Direct, dms3pinner.Indirect, dms3pinner.Recursive, dms3pinner.Internal, dms3pinner.Any)
Andrew Gillis's avatar
Andrew Gillis committed
273 274
		return "", false, err
	}
275
	if (mode == dms3pinner.Recursive || mode == dms3pinner.Any) && p.recursePin.Has(c) {
Andrew Gillis's avatar
Andrew Gillis committed
276 277
		return linkRecursive, true, nil
	}
278
	if mode == dms3pinner.Recursive {
Andrew Gillis's avatar
Andrew Gillis committed
279 280 281
		return "", false, nil
	}

282
	if (mode == dms3pinner.Direct || mode == dms3pinner.Any) && p.directPin.Has(c) {
Andrew Gillis's avatar
Andrew Gillis committed
283 284
		return linkDirect, true, nil
	}
285
	if mode == dms3pinner.Direct {
Andrew Gillis's avatar
Andrew Gillis committed
286 287 288
		return "", false, nil
	}

289
	if (mode == dms3pinner.Internal || mode == dms3pinner.Any) && p.isInternalPin(c) {
Andrew Gillis's avatar
Andrew Gillis committed
290 291
		return linkInternal, true, nil
	}
292
	if mode == dms3pinner.Internal {
Andrew Gillis's avatar
Andrew Gillis committed
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
		return "", false, nil
	}

	// Default is Indirect
	visitedSet := cid.NewSet()
	for _, rc := range p.recursePin.Keys() {
		has, err := hasChild(ctx, p.dserv, rc, c, visitedSet.Visit)
		if err != nil {
			return "", false, err
		}
		if has {
			return rc.String(), true, nil
		}
	}
	return "", false, nil
}

// CheckIfPinned Checks if a set of keys are pinned, more efficient than
// calling IsPinned for each key, returns the pinned status of cid(s)
312
func (p *pinner) CheckIfPinned(ctx context.Context, cids ...cid.Cid) ([]dms3pinner.Pinned, error) {
Andrew Gillis's avatar
Andrew Gillis committed
313 314
	p.lock.RLock()
	defer p.lock.RUnlock()
315
	pinned := make([]dms3pinner.Pinned, 0, len(cids))
Andrew Gillis's avatar
Andrew Gillis committed
316 317 318 319 320
	toCheck := cid.NewSet()

	// First check for non-Indirect pins directly
	for _, c := range cids {
		if p.recursePin.Has(c) {
321
			pinned = append(pinned, dms3pinner.Pinned{Key: c, Mode: dms3pinner.Recursive})
Andrew Gillis's avatar
Andrew Gillis committed
322
		} else if p.directPin.Has(c) {
323
			pinned = append(pinned, dms3pinner.Pinned{Key: c, Mode: dms3pinner.Direct})
Andrew Gillis's avatar
Andrew Gillis committed
324
		} else if p.isInternalPin(c) {
325
			pinned = append(pinned, dms3pinner.Pinned{Key: c, Mode: dms3pinner.Internal})
Andrew Gillis's avatar
Andrew Gillis committed
326 327 328 329 330 331
		} else {
			toCheck.Add(c)
		}
	}

	// Now walk all recursive pins to check for indirect pins
Steven Allen's avatar
Steven Allen committed
332 333 334 335 336 337
	visited := cid.NewSet()
	for _, rk := range p.recursePin.Keys() {
		err := merkledag.Walk(ctx, merkledag.GetLinksWithDAG(p.dserv), rk, func(c cid.Cid) bool {
			if toCheck.Len() == 0 || !visited.Visit(c) {
				return false
			}
Andrew Gillis's avatar
Andrew Gillis committed
338 339

			if toCheck.Has(c) {
340
				pinned = append(pinned, dms3pinner.Pinned{Key: c, Mode: dms3pinner.Indirect, Via: rk})
Andrew Gillis's avatar
Andrew Gillis committed
341 342 343
				toCheck.Remove(c)
			}

Steven Allen's avatar
Steven Allen committed
344 345
			return true
		}, merkledag.Concurrent())
Andrew Gillis's avatar
Andrew Gillis committed
346 347 348 349 350 351 352 353 354 355
		if err != nil {
			return nil, err
		}
		if toCheck.Len() == 0 {
			break
		}
	}

	// Anything left in toCheck is not pinned
	for _, k := range toCheck.Keys() {
356
		pinned = append(pinned, dms3pinner.Pinned{Key: k, Mode: dms3pinner.NotPinned})
Andrew Gillis's avatar
Andrew Gillis committed
357 358 359 360 361 362 363 364
	}

	return pinned, nil
}

// RemovePinWithMode is for manually editing the pin structure.
// Use with care! If used improperly, garbage collection may not
// be successful.
365
func (p *pinner) RemovePinWithMode(c cid.Cid, mode dms3pinner.Mode) {
Andrew Gillis's avatar
Andrew Gillis committed
366 367 368
	p.lock.Lock()
	defer p.lock.Unlock()
	switch mode {
369
	case dms3pinner.Direct:
Andrew Gillis's avatar
Andrew Gillis committed
370
		p.directPin.Remove(c)
371
	case dms3pinner.Recursive:
Andrew Gillis's avatar
Andrew Gillis committed
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513
		p.recursePin.Remove(c)
	default:
		// programmer error, panic OK
		panic("unrecognized pin type")
	}
}

func cidSetWithValues(cids []cid.Cid) *cid.Set {
	out := cid.NewSet()
	for _, c := range cids {
		out.Add(c)
	}
	return out
}

// DirectKeys returns a slice containing the directly pinned keys
func (p *pinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) {
	p.lock.RLock()
	defer p.lock.RUnlock()

	return p.directPin.Keys(), nil
}

// RecursiveKeys returns a slice containing the recursively pinned keys
func (p *pinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) {
	p.lock.RLock()
	defer p.lock.RUnlock()

	return p.recursePin.Keys(), nil
}

// Update updates a recursive pin from one cid to another
// this is more efficient than simply pinning the new one and unpinning the
// old one
func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error {
	if from == to {
		// Nothing to do. Don't remove this check or we'll end up
		// _removing_ the pin.
		//
		// See #6648
		return nil
	}

	p.lock.Lock()
	defer p.lock.Unlock()

	if !p.recursePin.Has(from) {
		return fmt.Errorf("'from' cid was not recursively pinned already")
	}

	// Temporarily unlock while we fetch the differences.
	p.lock.Unlock()
	err := dagutils.DiffEnumerate(ctx, p.dserv, from, to)
	p.lock.Lock()

	if err != nil {
		return err
	}

	p.recursePin.Add(to)
	if unpin {
		p.recursePin.Remove(from)
	}
	return nil
}

// Flush encodes and writes pinner keysets to the datastore
func (p *pinner) Flush(ctx context.Context) error {
	p.lock.Lock()
	defer p.lock.Unlock()

	internalset := cid.NewSet()
	recordInternal := internalset.Add

	root := &mdag.ProtoNode{}
	{
		n, err := storeSet(ctx, p.internal, p.directPin.Keys(), recordInternal)
		if err != nil {
			return err
		}
		if err := root.AddNodeLink(linkDirect, n); err != nil {
			return err
		}
	}

	{
		n, err := storeSet(ctx, p.internal, p.recursePin.Keys(), recordInternal)
		if err != nil {
			return err
		}
		if err := root.AddNodeLink(linkRecursive, n); err != nil {
			return err
		}
	}

	// add the empty node, its referenced by the pin sets but never created
	err := p.internal.Add(ctx, new(mdag.ProtoNode))
	if err != nil {
		return err
	}

	err = p.internal.Add(ctx, root)
	if err != nil {
		return err
	}

	k := root.Cid()

	internalset.Add(k)

	if syncDServ, ok := p.dserv.(syncDAGService); ok {
		if err := syncDServ.Sync(); err != nil {
			return fmt.Errorf("cannot sync pinned data: %v", err)
		}
	}

	if syncInternal, ok := p.internal.(syncDAGService); ok {
		if err := syncInternal.Sync(); err != nil {
			return fmt.Errorf("cannot sync pinning data: %v", err)
		}
	}

	if err := p.dstore.Put(pinDatastoreKey, k.Bytes()); err != nil {
		return fmt.Errorf("cannot store pin state: %v", err)
	}
	if err := p.dstore.Sync(pinDatastoreKey); err != nil {
		return fmt.Errorf("cannot sync pin state: %v", err)
	}
	p.internalPin = internalset
	return nil
}

// InternalPins returns all cids kept pinned for the internal state of the
// pinner
func (p *pinner) InternalPins(ctx context.Context) ([]cid.Cid, error) {
	p.lock.Lock()
	defer p.lock.Unlock()
	return p.internalPin.Keys(), nil
}

// PinWithMode allows the user to have fine grained control over pin
// counts
514
func (p *pinner) PinWithMode(c cid.Cid, mode dms3pinner.Mode) {
Andrew Gillis's avatar
Andrew Gillis committed
515 516 517
	p.lock.Lock()
	defer p.lock.Unlock()
	switch mode {
518
	case dms3pinner.Recursive:
Andrew Gillis's avatar
Andrew Gillis committed
519
		p.recursePin.Add(c)
520
	case dms3pinner.Direct:
Andrew Gillis's avatar
Andrew Gillis committed
521 522 523 524 525 526
		p.directPin.Add(c)
	}
}

// hasChild recursively looks for a Cid among the children of a root Cid.
// The visit function can be used to shortcut already-visited branches.
527 528
func hasChild(ctx context.Context, ng ld.NodeGetter, root cid.Cid, child cid.Cid, visit func(cid.Cid) bool) (bool, error) {
	links, err := ld.GetLinks(ctx, ng, root)
Andrew Gillis's avatar
Andrew Gillis committed
529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549
	if err != nil {
		return false, err
	}
	for _, lnk := range links {
		c := lnk.Cid
		if lnk.Cid.Equals(child) {
			return true, nil
		}
		if visit(c) {
			has, err := hasChild(ctx, ng, c, child, visit)
			if err != nil {
				return false, err
			}

			if has {
				return has, nil
			}
		}
	}
	return false, nil
}