// Package pin implements structures and methods to keep track of // which objects a user wants to keep stored locally. package pin import ( "context" "fmt" "os" "sync" "time" mdag "github.com/ipfs/go-ipfs/merkledag" dutils "github.com/ipfs/go-ipfs/merkledag/utils" logging "gx/ipfs/QmRb5jh8z2E8hMGN2tkvs1yHynUanqnZ3UeKwgN1i9P1F8/go-log" ds "gx/ipfs/QmXRKBQA4wXP7xWbFiZsR1GP4HV6wMDQ1aWFxZZ4uBcPX9/go-datastore" cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid" ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format" ) var log = logging.Logger("pin") var pinDatastoreKey = ds.NewKey("/local/pins") var emptyKey *cid.Cid func init() { e, err := cid.Decode("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n") if err != nil { log.Error("failed to decode empty key constant") os.Exit(1) } emptyKey = e } const ( linkRecursive = "recursive" linkDirect = "direct" linkIndirect = "indirect" linkInternal = "internal" linkNotPinned = "not pinned" linkAny = "any" linkAll = "all" ) // Mode allows to specify different types of pin (recursive, direct etc.). // See the Pin Modes constants for a full list. type Mode int // Pin Modes const ( // Recursive pins pin the target cids along with any reachable children. Recursive Mode = iota // Direct pins pin just the target cid. Direct // Indirect pins are cids who have some ancestor pinned recursively. Indirect // Internal pins are cids used to keep the internal state of the pinner. Internal // NotPinned NotPinned // Any refers to any pinned cid Any ) // ModeToString returns a human-readable name for the Mode. func ModeToString(mode Mode) (string, bool) { m := map[Mode]string{ Recursive: linkRecursive, Direct: linkDirect, Indirect: linkIndirect, Internal: linkInternal, NotPinned: linkNotPinned, Any: linkAny, } s, ok := m[mode] return s, ok } // StringToMode parses the result of ModeToString() back to a Mode. // It returns a boolean which is set to false if the mode is unknown. func StringToMode(s string) (Mode, bool) { m := map[string]Mode{ linkRecursive: Recursive, linkDirect: Direct, linkIndirect: Indirect, linkInternal: Internal, linkNotPinned: NotPinned, linkAny: Any, linkAll: Any, // "all" and "any" means the same thing } mode, ok := m[s] return mode, ok } // A Pinner provides the necessary methods to keep track of Nodes which are // to be kept locally, according to a pin mode. In practice, a Pinner is in // in charge of keeping the list of items from the local storage that should // not be garbaged-collected. type Pinner interface { // IsPinned returns whether or not the given cid is pinned // and an explanation of why its pinned IsPinned(*cid.Cid) (string, bool, error) // IsPinnedWithType returns whether or not the given cid is pinned with the // given pin type, as well as returning the type of pin its pinned with. IsPinnedWithType(*cid.Cid, Mode) (string, bool, error) // Pin the given node, optionally recursively. Pin(ctx context.Context, node ipld.Node, recursive bool) error // Unpin the given cid. If recursive is true, removes either a recursive or // a direct pin. If recursive is false, only removes a direct pin. Unpin(ctx context.Context, cid *cid.Cid, recursive bool) error // Update updates a recursive pin from one cid to another // this is more efficient than simply pinning the new one and unpinning the // old one Update(ctx context.Context, from, to *cid.Cid, unpin bool) error // Check if a set of keys are pinned, more efficient than // calling IsPinned for each key CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) // PinWithMode is for manually editing the pin structure. Use with // care! If used improperly, garbage collection may not be // successful. PinWithMode(*cid.Cid, Mode) // RemovePinWithMode is for manually editing the pin structure. // Use with care! If used improperly, garbage collection may not // be successful. RemovePinWithMode(*cid.Cid, Mode) // Flush writes the pin state to the backing datastore Flush() error // DirectKeys returns all directly pinned cids DirectKeys() []*cid.Cid // DirectKeys returns all recursively pinned cids RecursiveKeys() []*cid.Cid // InternalPins returns all cids kept pinned for the internal state of the // pinner InternalPins() []*cid.Cid } // Pinned represents CID which has been pinned with a pinning strategy. // The Via field allows to identify the pinning parent of this CID, in the // case that the item is not pinned directly (but rather pinned recursively // by some ascendant). type Pinned struct { Key *cid.Cid Mode Mode Via *cid.Cid } // Pinned returns whether or not the given cid is pinned func (p Pinned) Pinned() bool { return p.Mode != NotPinned } // String Returns pin status as string func (p Pinned) String() string { switch p.Mode { case NotPinned: return "not pinned" case Indirect: return fmt.Sprintf("pinned via %s", p.Via) default: modeStr, _ := ModeToString(p.Mode) return fmt.Sprintf("pinned: %s", modeStr) } } // pinner implements the Pinner interface type pinner struct { lock sync.RWMutex recursePin *cid.Set directPin *cid.Set // Track the keys used for storing the pinning state, so gc does // not delete them. internalPin *cid.Set dserv ipld.DAGService internal ipld.DAGService // dagservice used to store internal objects dstore ds.Datastore } // NewPinner creates a new pinner using the given datastore as a backend func NewPinner(dstore ds.Datastore, serv, internal ipld.DAGService) Pinner { rcset := cid.NewSet() dirset := cid.NewSet() return &pinner{ recursePin: rcset, directPin: dirset, dserv: serv, dstore: dstore, internal: internal, internalPin: cid.NewSet(), } } // Pin the given node, optionally recursive func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error { p.lock.Lock() defer p.lock.Unlock() err := p.dserv.Add(ctx, node) if err != nil { return err } c := node.Cid() if recurse { if p.recursePin.Has(c) { return nil } if p.directPin.Has(c) { p.directPin.Remove(c) } // fetch entire graph err := mdag.FetchGraph(ctx, c, p.dserv) if err != nil { return err } p.recursePin.Add(c) } else { if _, err := p.dserv.Get(ctx, c); err != nil { return err } if p.recursePin.Has(c) { return fmt.Errorf("%s already pinned recursively", c.String()) } p.directPin.Add(c) } return nil } // ErrNotPinned is returned when trying to unpin items which are not pinned. var ErrNotPinned = fmt.Errorf("not pinned") // Unpin a given key func (p *pinner) Unpin(ctx context.Context, c *cid.Cid, recursive bool) error { p.lock.Lock() defer p.lock.Unlock() reason, pinned, err := p.isPinnedWithType(c, Any) if err != nil { return err } if !pinned { return ErrNotPinned } switch reason { case "recursive": if recursive { p.recursePin.Remove(c) return nil } return fmt.Errorf("%s is pinned recursively", c) case "direct": p.directPin.Remove(c) return nil default: return fmt.Errorf("%s is pinned indirectly under %s", c, reason) } } func (p *pinner) isInternalPin(c *cid.Cid) bool { return p.internalPin.Has(c) } // IsPinned returns whether or not the given key is pinned // and an explanation of why its pinned func (p *pinner) IsPinned(c *cid.Cid) (string, bool, error) { p.lock.RLock() defer p.lock.RUnlock() return p.isPinnedWithType(c, Any) } // IsPinnedWithType returns whether or not the given cid is pinned with the // given pin type, as well as returning the type of pin its pinned with. func (p *pinner) IsPinnedWithType(c *cid.Cid, mode Mode) (string, bool, error) { p.lock.RLock() defer p.lock.RUnlock() return p.isPinnedWithType(c, mode) } // isPinnedWithType is the implementation of IsPinnedWithType that does not lock. // intended for use by other pinned methods that already take locks func (p *pinner) isPinnedWithType(c *cid.Cid, mode Mode) (string, bool, error) { switch mode { case Any, Direct, Indirect, Recursive, Internal: default: err := fmt.Errorf("Invalid Pin Mode '%d', must be one of {%d, %d, %d, %d, %d}", mode, Direct, Indirect, Recursive, Internal, Any) return "", false, err } if (mode == Recursive || mode == Any) && p.recursePin.Has(c) { return linkRecursive, true, nil } if mode == Recursive { return "", false, nil } if (mode == Direct || mode == Any) && p.directPin.Has(c) { return linkDirect, true, nil } if mode == Direct { return "", false, nil } if (mode == Internal || mode == Any) && p.isInternalPin(c) { return linkInternal, true, nil } if mode == Internal { return "", false, nil } // Default is Indirect visitedSet := cid.NewSet() for _, rc := range p.recursePin.Keys() { has, err := hasChild(p.dserv, rc, c, visitedSet.Visit) if err != nil { return "", false, err } if has { return rc.String(), true, nil } } return "", false, nil } // CheckIfPinned Checks if a set of keys are pinned, more efficient than // calling IsPinned for each key, returns the pinned status of cid(s) func (p *pinner) CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) { p.lock.RLock() defer p.lock.RUnlock() pinned := make([]Pinned, 0, len(cids)) toCheck := cid.NewSet() // First check for non-Indirect pins directly for _, c := range cids { if p.recursePin.Has(c) { pinned = append(pinned, Pinned{Key: c, Mode: Recursive}) } else if p.directPin.Has(c) { pinned = append(pinned, Pinned{Key: c, Mode: Direct}) } else if p.isInternalPin(c) { pinned = append(pinned, Pinned{Key: c, Mode: Internal}) } else { toCheck.Add(c) } } // Now walk all recursive pins to check for indirect pins var checkChildren func(*cid.Cid, *cid.Cid) error checkChildren = func(rk, parentKey *cid.Cid) error { links, err := ipld.GetLinks(context.TODO(), p.dserv, parentKey) if err != nil { return err } for _, lnk := range links { c := lnk.Cid if toCheck.Has(c) { pinned = append(pinned, Pinned{Key: c, Mode: Indirect, Via: rk}) toCheck.Remove(c) } err := checkChildren(rk, c) if err != nil { return err } if toCheck.Len() == 0 { return nil } } return nil } for _, rk := range p.recursePin.Keys() { err := checkChildren(rk, rk) if err != nil { return nil, err } if toCheck.Len() == 0 { break } } // Anything left in toCheck is not pinned for _, k := range toCheck.Keys() { pinned = append(pinned, Pinned{Key: k, Mode: NotPinned}) } return pinned, nil } // RemovePinWithMode is for manually editing the pin structure. // Use with care! If used improperly, garbage collection may not // be successful. func (p *pinner) RemovePinWithMode(c *cid.Cid, mode Mode) { p.lock.Lock() defer p.lock.Unlock() switch mode { case Direct: p.directPin.Remove(c) case Recursive: p.recursePin.Remove(c) default: // programmer error, panic OK panic("unrecognized pin type") } } func cidSetWithValues(cids []*cid.Cid) *cid.Set { out := cid.NewSet() for _, c := range cids { out.Add(c) } return out } // LoadPinner loads a pinner and its keysets from the given datastore func LoadPinner(d ds.Datastore, dserv, internal ipld.DAGService) (Pinner, error) { p := new(pinner) rootKeyI, err := d.Get(pinDatastoreKey) if err != nil { return nil, fmt.Errorf("cannot load pin state: %v", err) } rootKeyBytes, ok := rootKeyI.([]byte) if !ok { return nil, fmt.Errorf("cannot load pin state: %s was not bytes", pinDatastoreKey) } rootCid, err := cid.Cast(rootKeyBytes) if err != nil { return nil, err } ctx, cancel := context.WithTimeout(context.TODO(), time.Second*5) defer cancel() root, err := internal.Get(ctx, rootCid) if err != nil { return nil, fmt.Errorf("cannot find pinning root object: %v", err) } rootpb, ok := root.(*mdag.ProtoNode) if !ok { return nil, mdag.ErrNotProtobuf } internalset := cid.NewSet() internalset.Add(rootCid) recordInternal := internalset.Add { // load recursive set recurseKeys, err := loadSet(ctx, internal, rootpb, linkRecursive, recordInternal) if err != nil { return nil, fmt.Errorf("cannot load recursive pins: %v", err) } p.recursePin = cidSetWithValues(recurseKeys) } { // load direct set directKeys, err := loadSet(ctx, internal, rootpb, linkDirect, recordInternal) if err != nil { return nil, fmt.Errorf("cannot load direct pins: %v", err) } p.directPin = cidSetWithValues(directKeys) } p.internalPin = internalset // assign services p.dserv = dserv p.dstore = d p.internal = internal return p, nil } // DirectKeys returns a slice containing the directly pinned keys func (p *pinner) DirectKeys() []*cid.Cid { return p.directPin.Keys() } // RecursiveKeys returns a slice containing the recursively pinned keys func (p *pinner) RecursiveKeys() []*cid.Cid { return p.recursePin.Keys() } // Update updates a recursive pin from one cid to another // this is more efficient than simply pinning the new one and unpinning the // old one func (p *pinner) Update(ctx context.Context, from, to *cid.Cid, unpin bool) error { p.lock.Lock() defer p.lock.Unlock() if !p.recursePin.Has(from) { return fmt.Errorf("'from' cid was not recursively pinned already") } err := dutils.DiffEnumerate(ctx, p.dserv, from, to) if err != nil { return err } p.recursePin.Add(to) if unpin { p.recursePin.Remove(from) } return nil } // Flush encodes and writes pinner keysets to the datastore func (p *pinner) Flush() error { p.lock.Lock() defer p.lock.Unlock() ctx := context.TODO() internalset := cid.NewSet() recordInternal := internalset.Add root := &mdag.ProtoNode{} { n, err := storeSet(ctx, p.internal, p.directPin.Keys(), recordInternal) if err != nil { return err } if err := root.AddNodeLink(linkDirect, n); err != nil { return err } } { n, err := storeSet(ctx, p.internal, p.recursePin.Keys(), recordInternal) if err != nil { return err } if err := root.AddNodeLink(linkRecursive, n); err != nil { return err } } // add the empty node, its referenced by the pin sets but never created err := p.internal.Add(ctx, new(mdag.ProtoNode)) if err != nil { return err } err = p.internal.Add(ctx, root) if err != nil { return err } k := root.Cid() internalset.Add(k) if err := p.dstore.Put(pinDatastoreKey, k.Bytes()); err != nil { return fmt.Errorf("cannot store pin state: %v", err) } p.internalPin = internalset return nil } // InternalPins returns all cids kept pinned for the internal state of the // pinner func (p *pinner) InternalPins() []*cid.Cid { p.lock.Lock() defer p.lock.Unlock() var out []*cid.Cid out = append(out, p.internalPin.Keys()...) return out } // PinWithMode allows the user to have fine grained control over pin // counts func (p *pinner) PinWithMode(c *cid.Cid, mode Mode) { p.lock.Lock() defer p.lock.Unlock() switch mode { case Recursive: p.recursePin.Add(c) case Direct: p.directPin.Add(c) } } // hasChild recursively looks for a Cid among the children of a root Cid. // The visit function can be used to shortcut already-visited branches. func hasChild(ng ipld.NodeGetter, root *cid.Cid, child *cid.Cid, visit func(*cid.Cid) bool) (bool, error) { links, err := ipld.GetLinks(context.TODO(), ng, root) if err != nil { return false, err } for _, lnk := range links { c := lnk.Cid if lnk.Cid.Equals(child) { return true, nil } if visit(c) { has, err := hasChild(ng, c, child, visit) if err != nil { return false, err } if has { return has, nil } } } return false, nil }