Commit 1d2607cd authored by Jeromy's avatar Jeromy

implement mark and sweep GC

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>

dont GC blocks used by pinner

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>

comment GC algo

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>

add lock to blockstore to prevent GC from eating wanted blocks

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>

improve FetchGraph

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>

separate interfaces for blockstore and GCBlockstore

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>

reintroduce indirect pinning, add enumerateChildren dag method

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent 370c62c1
package gc
import (
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
key "github.com/ipfs/go-ipfs/blocks/key"
bserv "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
dag "github.com/ipfs/go-ipfs/merkledag"
pin "github.com/ipfs/go-ipfs/pin"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
logging "github.com/ipfs/go-ipfs/vendor/QmQg1J6vikuXF9oDvm4wpdeAUvvkVEKW1EYDw9HhTMnP2b/go-log"
)
var log = logging.Logger("gc")
// GC performs a mark and sweep garbage collection of the blocks in the blockstore
// first, it creates a 'marked' set and adds to it the following:
// - all recursively pinned blocks, plus all of their descendants (recursively)
// - all directly pinned blocks
// - all blocks utilized internally by the pinner
//
// The routine then iterates over every block in the blockstore and
// deletes any block that is not found in the marked set.
func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key.Key, error) {
unlock := bs.GCLock()
defer unlock()
bsrv := bserv.New(bs, offline.Exchange(bs))
ds := dag.NewDAGService(bsrv)
// KeySet currently implemented in memory, in the future, may be bloom filter or
// disk backed to conserve memory.
gcs := key.NewKeySet()
for _, k := range pn.RecursiveKeys() {
gcs.Add(k)
nd, err := ds.Get(ctx, k)
if err != nil {
return nil, err
}
// EnumerateChildren recursively walks the dag and adds the keys to the given set
err = dag.EnumerateChildren(ctx, ds, nd, gcs)
if err != nil {
return nil, err
}
}
for _, k := range pn.DirectKeys() {
gcs.Add(k)
}
for _, k := range pn.InternalPins() {
gcs.Add(k)
nd, err := ds.Get(ctx, k)
if err != nil {
return nil, err
}
// EnumerateChildren recursively walks the dag and adds the keys to the given set
err = dag.EnumerateChildren(ctx, ds, nd, gcs)
if err != nil {
return nil, err
}
}
keychan, err := bs.AllKeysChan(ctx)
if err != nil {
return nil, err
}
output := make(chan key.Key)
go func() {
defer close(output)
for {
select {
case k, ok := <-keychan:
if !ok {
return
}
if !gcs.Has(k) {
err := bs.DeleteBlock(k)
if err != nil {
log.Debugf("Error removing key from blockstore: %s", err)
return
}
select {
case output <- k:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}()
return output, nil
}
......@@ -24,7 +24,6 @@ var emptyKey = key.B58KeyDecode("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n"
const (
linkDirect = "direct"
linkRecursive = "recursive"
linkIndirect = "indirect"
)
type PinMode int
......@@ -32,7 +31,6 @@ type PinMode int
const (
Recursive PinMode = iota
Direct
Indirect
NotPinned
)
......@@ -52,8 +50,8 @@ type Pinner interface {
Flush() error
DirectKeys() []key.Key
IndirectKeys() map[key.Key]uint64
RecursiveKeys() []key.Key
InternalPins() []key.Key
}
// pinner implements the Pinner interface
......@@ -61,7 +59,7 @@ type pinner struct {
lock sync.RWMutex
recursePin set.BlockSet
directPin set.BlockSet
indirPin *indirectPin
// Track the keys used for storing the pinning state, so gc does
// not delete them.
internalPin map[key.Key]struct{}
......@@ -80,7 +78,6 @@ func NewPinner(dstore ds.ThreadSafeDatastore, serv mdag.DAGService) Pinner {
return &pinner{
recursePin: rcset,
directPin: dirset,
indirPin: newIndirectPin(),
dserv: serv,
dstore: dstore,
}
......@@ -104,7 +101,8 @@ func (p *pinner) Pin(ctx context.Context, node *mdag.Node, recurse bool) error {
p.directPin.RemoveBlock(k)
}
err := p.pinLinks(ctx, node)
// fetch entire graph
err := mdag.FetchGraph(ctx, node, p.dserv)
if err != nil {
return err
}
......@@ -131,72 +129,18 @@ func (p *pinner) Unpin(ctx context.Context, k key.Key, recursive bool) error {
if p.recursePin.HasKey(k) {
if recursive {
p.recursePin.RemoveBlock(k)
node, err := p.dserv.Get(ctx, k)
if err != nil {
return err
}
return p.unpinLinks(ctx, node)
return nil
} else {
return fmt.Errorf("%s is pinned recursively", k)
}
} else if p.directPin.HasKey(k) {
p.directPin.RemoveBlock(k)
return nil
} else if p.indirPin.HasKey(k) {
return fmt.Errorf("%s is pinned indirectly. indirect pins cannot be removed directly", k)
} else {
return fmt.Errorf("%s is not pinned", k)
}
}
func (p *pinner) unpinLinks(ctx context.Context, node *mdag.Node) error {
for _, l := range node.Links {
node, err := l.GetNode(ctx, p.dserv)
if err != nil {
return err
}
k, err := node.Key()
if err != nil {
return err
}
p.indirPin.Decrement(k)
err = p.unpinLinks(ctx, node)
if err != nil {
return err
}
}
return nil
}
func (p *pinner) pinIndirectRecurse(ctx context.Context, node *mdag.Node) error {
k, err := node.Key()
if err != nil {
return err
}
p.indirPin.Increment(k)
return p.pinLinks(ctx, node)
}
func (p *pinner) pinLinks(ctx context.Context, node *mdag.Node) error {
for _, ng := range p.dserv.GetDAG(ctx, node) {
subnode, err := ng.Get(ctx)
if err != nil {
// TODO: Maybe just log and continue?
return err
}
err = p.pinIndirectRecurse(ctx, subnode)
if err != nil {
return err
}
}
return nil
}
func (p *pinner) isInternalPin(key key.Key) bool {
_, ok := p.internalPin[key]
return ok
......@@ -208,7 +152,6 @@ func (p *pinner) IsPinned(key key.Key) bool {
defer p.lock.RUnlock()
return p.recursePin.HasKey(key) ||
p.directPin.HasKey(key) ||
p.indirPin.HasKey(key) ||
p.isInternalPin(key)
}
......@@ -218,8 +161,6 @@ func (p *pinner) RemovePinWithMode(key key.Key, mode PinMode) {
switch mode {
case Direct:
p.directPin.RemoveBlock(key)
case Indirect:
p.indirPin.Decrement(key)
case Recursive:
p.recursePin.RemoveBlock(key)
default:
......@@ -274,14 +215,6 @@ func LoadPinner(d ds.ThreadSafeDatastore, dserv mdag.DAGService) (Pinner, error)
p.directPin = set.SimpleSetFromKeys(directKeys)
}
{ // load indirect set
refcnt, err := loadMultiset(ctx, dserv, root, linkIndirect, recordInternal)
if err != nil {
return nil, fmt.Errorf("cannot load indirect pins: %v", err)
}
p.indirPin = &indirectPin{refCounts: refcnt}
}
p.internalPin = internalPin
// assign services
......@@ -296,11 +229,6 @@ func (p *pinner) DirectKeys() []key.Key {
return p.directPin.GetKeys()
}
// IndirectKeys returns a slice containing the indirectly pinned keys
func (p *pinner) IndirectKeys() map[key.Key]uint64 {
return p.indirPin.GetRefs()
}
// RecursiveKeys returns a slice containing the recursively pinned keys
func (p *pinner) RecursiveKeys() []key.Key {
return p.recursePin.GetKeys()
......@@ -339,20 +267,17 @@ func (p *pinner) Flush() error {
}
}
{
n, err := storeMultiset(ctx, p.dserv, p.indirPin.GetRefs(), recordInternal)
if err != nil {
return err
}
if err := root.AddNodeLink(linkIndirect, n); err != nil {
return err
}
// add the empty node, its referenced by the pin sets but never created
_, err := p.dserv.Add(new(mdag.Node))
if err != nil {
return err
}
k, err := p.dserv.Add(root)
if err != nil {
return err
}
internalPin[k] = struct{}{}
if err := p.dstore.Put(pinDatastoreKey, []byte(k)); err != nil {
return fmt.Errorf("cannot store pin state: %v", err)
......@@ -361,6 +286,16 @@ func (p *pinner) Flush() error {
return nil
}
func (p *pinner) InternalPins() []key.Key {
p.lock.Lock()
defer p.lock.Unlock()
var out []key.Key
for k, _ := range p.internalPin {
out = append(out, k)
}
return out
}
// PinWithMode allows the user to have fine grained control over pin
// counts
func (p *pinner) PinWithMode(k key.Key, mode PinMode) {
......@@ -371,7 +306,5 @@ func (p *pinner) PinWithMode(k key.Key, mode PinMode) {
p.recursePin.AddBlock(k)
case Direct:
p.directPin.AddBlock(k)
case Indirect:
p.indirPin.Increment(k)
}
}
......@@ -53,7 +53,7 @@ func TestPinnerBasic(t *testing.T) {
}
// create new node c, to be indirectly pinned through b
c, ck := randNode()
c, _ := randNode()
_, err = dserv.Add(c)
if err != nil {
t.Fatal(err)
......@@ -82,10 +82,6 @@ func TestPinnerBasic(t *testing.T) {
t.Fatal(err)
}
if !p.IsPinned(ck) {
t.Fatal("Child of recursively pinned node not found")
}
bk, _ := b.Key()
if !p.IsPinned(bk) {
t.Fatal("Recursively pinned node not found..")
......@@ -95,7 +91,7 @@ func TestPinnerBasic(t *testing.T) {
d.AddNodeLink("a", a)
d.AddNodeLink("c", c)
e, ek := randNode()
e, _ := randNode()
d.AddNodeLink("e", e)
// Must be in dagserv for unpin to work
......@@ -110,10 +106,6 @@ func TestPinnerBasic(t *testing.T) {
t.Fatal(err)
}
if !p.IsPinned(ek) {
t.Fatal(err)
}
dk, _ := d.Key()
if !p.IsPinned(dk) {
t.Fatal("pinned node not found.")
......@@ -125,11 +117,6 @@ func TestPinnerBasic(t *testing.T) {
t.Fatal(err)
}
// c should still be pinned under b
if !p.IsPinned(ck) {
t.Fatal("Recursive / indirect unpin fail.")
}
err = p.Flush()
if err != nil {
t.Fatal(err)
......@@ -145,11 +132,6 @@ func TestPinnerBasic(t *testing.T) {
t.Fatal("Could not find pinned node!")
}
// Test indirectly pinned
if !np.IsPinned(ck) {
t.Fatal("could not find indirectly pinned node")
}
// Test recursively pinned
if !np.IsPinned(bk) {
t.Fatal("could not find recursively pinned node")
......@@ -201,7 +183,7 @@ func TestFlush(t *testing.T) {
p := NewPinner(dstore, dserv)
_, k := randNode()
p.PinWithMode(k, Indirect)
p.PinWithMode(k, Recursive)
if err := p.Flush(); err != nil {
t.Fatal(err)
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment