pin.go 3.89 KB
Newer Older
1 2 3
package pin

import (
4 5

	//ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
Jeromy's avatar
Jeromy committed
6 7
	"sync"

8
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
9
	nsds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go/namespace"
10 11 12 13 14
	"github.com/jbenet/go-ipfs/blocks/set"
	mdag "github.com/jbenet/go-ipfs/merkledag"
	"github.com/jbenet/go-ipfs/util"
)

15 16 17 18
var recursePinDatastoreKey = ds.NewKey("/local/pins/recursive/keys")
var directPinDatastoreKey = ds.NewKey("/local/pins/direct/keys")
var indirectPinDatastoreKey = ds.NewKey("/local/pins/indirect/keys")

19
type Pinner interface {
Jeromy's avatar
Jeromy committed
20
	IsPinned(util.Key) bool
21 22
	Pin(*mdag.Node, bool) error
	Unpin(util.Key, bool) error
23
	Flush() error
24 25 26
}

type pinner struct {
Jeromy's avatar
Jeromy committed
27
	lock       sync.RWMutex
28 29
	recursePin set.BlockSet
	directPin  set.BlockSet
30
	indirPin   *indirectPin
31
	dserv      *mdag.DAGService
32
	dstore     ds.Datastore
33 34 35
}

func NewPinner(dstore ds.Datastore, serv *mdag.DAGService) Pinner {
36 37

	// Load set from given datastore...
38 39
	rcds := nsds.Wrap(dstore, recursePinDatastoreKey)
	rcset := set.NewDBWrapperSet(rcds, set.NewSimpleBlockSet())
40

41 42 43 44
	dirds := nsds.Wrap(dstore, directPinDatastoreKey)
	dirset := set.NewDBWrapperSet(dirds, set.NewSimpleBlockSet())

	nsdstore := nsds.Wrap(dstore, indirectPinDatastoreKey)
45
	return &pinner{
46 47
		recursePin: rcset,
		directPin:  dirset,
48
		indirPin:   NewIndirectPin(nsdstore),
49
		dserv:      serv,
50
		dstore:     dstore,
51 52 53 54
	}
}

func (p *pinner) Pin(node *mdag.Node, recurse bool) error {
Jeromy's avatar
Jeromy committed
55 56
	p.lock.Lock()
	defer p.lock.Unlock()
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
	k, err := node.Key()
	if err != nil {
		return err
	}

	if recurse {
		if p.recursePin.HasKey(k) {
			return nil
		}

		p.recursePin.AddBlock(k)

		err := p.pinLinks(node)
		if err != nil {
			return err
		}
	} else {
		p.directPin.AddBlock(k)
	}
	return nil
}

func (p *pinner) Unpin(k util.Key, recurse bool) error {
Jeromy's avatar
Jeromy committed
80 81
	p.lock.Lock()
	defer p.lock.Unlock()
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
	if recurse {
		p.recursePin.RemoveBlock(k)
		node, err := p.dserv.Get(k)
		if err != nil {
			return err
		}

		return p.unpinLinks(node)
	} else {
		p.directPin.RemoveBlock(k)
	}
	return nil
}

func (p *pinner) unpinLinks(node *mdag.Node) error {
	for _, l := range node.Links {
		node, err := l.GetNode(p.dserv)
		if err != nil {
			return err
		}

		k, err := node.Key()
		if err != nil {
			return err
		}

		p.recursePin.RemoveBlock(k)

		err = p.unpinLinks(node)
		if err != nil {
			return err
		}
	}
115 116 117 118 119 120 121 122 123
	return nil
}

func (p *pinner) pinIndirectRecurse(node *mdag.Node) error {
	k, err := node.Key()
	if err != nil {
		return err
	}

124
	p.indirPin.Increment(k)
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
	return p.pinLinks(node)
}

func (p *pinner) pinLinks(node *mdag.Node) error {
	for _, l := range node.Links {
		subnode, err := l.GetNode(p.dserv)
		if err != nil {
			// TODO: Maybe just log and continue?
			return err
		}
		err = p.pinIndirectRecurse(subnode)
		if err != nil {
			return err
		}
	}
	return nil
}

func (p *pinner) IsPinned(key util.Key) bool {
Jeromy's avatar
Jeromy committed
144 145
	p.lock.RLock()
	defer p.lock.RUnlock()
146 147 148 149
	return p.recursePin.HasKey(key) ||
		p.directPin.HasKey(key) ||
		p.indirPin.HasKey(key)
}
150

Jeromy's avatar
Jeromy committed
151
func LoadPinner(d ds.Datastore, dserv *mdag.DAGService) (Pinner, error) {
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
	p := new(pinner)

	var err error
	p.recursePin, err = set.SetFromDatastore(d, recursePinDatastoreKey)
	if err != nil {
		return nil, err
	}
	p.directPin, err = set.SetFromDatastore(d, directPinDatastoreKey)
	if err != nil {
		return nil, err
	}

	p.indirPin, err = loadIndirPin(d, indirectPinDatastoreKey)
	if err != nil {
		return nil, err
	}

Jeromy's avatar
Jeromy committed
169 170 171
	p.dserv = dserv
	p.dstore = d

172 173 174 175
	return p, nil
}

func (p *pinner) Flush() error {
Jeromy's avatar
Jeromy committed
176 177
	p.lock.RLock()
	defer p.lock.RUnlock()
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
	recurse := p.recursePin.GetKeys()
	err := p.dstore.Put(recursePinDatastoreKey, recurse)
	if err != nil {
		return err
	}

	direct := p.directPin.GetKeys()
	err = p.dstore.Put(directPinDatastoreKey, direct)
	if err != nil {
		return err
	}

	err = p.dstore.Put(indirectPinDatastoreKey, p.indirPin.refCounts)
	if err != nil {
		return err
	}
	return nil
}