pin.go 4.59 KB
Newer Older
1 2 3
package pin

import (
4 5

	//ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6

Jeromy's avatar
flush!  
Jeromy committed
7
	"encoding/json"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8
	"errors"
Jeromy's avatar
Jeromy committed
9 10
	"sync"

Jeromy's avatar
Jeromy committed
11 12
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
	nsds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/namespace"
13 14 15 16 17
	"github.com/jbenet/go-ipfs/blocks/set"
	mdag "github.com/jbenet/go-ipfs/merkledag"
	"github.com/jbenet/go-ipfs/util"
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18
var log = util.Logger("pin")
19 20 21 22
var recursePinDatastoreKey = ds.NewKey("/local/pins/recursive/keys")
var directPinDatastoreKey = ds.NewKey("/local/pins/direct/keys")
var indirectPinDatastoreKey = ds.NewKey("/local/pins/indirect/keys")

23
type Pinner interface {
Jeromy's avatar
Jeromy committed
24
	IsPinned(util.Key) bool
25 26
	Pin(*mdag.Node, bool) error
	Unpin(util.Key, bool) error
27
	Flush() error
28 29 30
}

type pinner struct {
Jeromy's avatar
Jeromy committed
31
	lock       sync.RWMutex
32 33
	recursePin set.BlockSet
	directPin  set.BlockSet
34
	indirPin   *indirectPin
35
	dserv      *mdag.DAGService
36
	dstore     ds.Datastore
37 38 39
}

func NewPinner(dstore ds.Datastore, serv *mdag.DAGService) Pinner {
40 41

	// Load set from given datastore...
42 43
	rcds := nsds.Wrap(dstore, recursePinDatastoreKey)
	rcset := set.NewDBWrapperSet(rcds, set.NewSimpleBlockSet())
44

45 46 47 48
	dirds := nsds.Wrap(dstore, directPinDatastoreKey)
	dirset := set.NewDBWrapperSet(dirds, set.NewSimpleBlockSet())

	nsdstore := nsds.Wrap(dstore, indirectPinDatastoreKey)
49
	return &pinner{
50 51
		recursePin: rcset,
		directPin:  dirset,
52
		indirPin:   NewIndirectPin(nsdstore),
53
		dserv:      serv,
54
		dstore:     dstore,
55 56 57 58
	}
}

func (p *pinner) Pin(node *mdag.Node, recurse bool) error {
Jeromy's avatar
Jeromy committed
59 60
	p.lock.Lock()
	defer p.lock.Unlock()
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
	k, err := node.Key()
	if err != nil {
		return err
	}

	if recurse {
		if p.recursePin.HasKey(k) {
			return nil
		}

		p.recursePin.AddBlock(k)

		err := p.pinLinks(node)
		if err != nil {
			return err
		}
	} else {
		p.directPin.AddBlock(k)
	}
	return nil
}

func (p *pinner) Unpin(k util.Key, recurse bool) error {
Jeromy's avatar
Jeromy committed
84 85
	p.lock.Lock()
	defer p.lock.Unlock()
86 87 88 89 90 91 92 93 94
	if recurse {
		p.recursePin.RemoveBlock(k)
		node, err := p.dserv.Get(k)
		if err != nil {
			return err
		}

		return p.unpinLinks(node)
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
95
	p.directPin.RemoveBlock(k)
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
	return nil
}

func (p *pinner) unpinLinks(node *mdag.Node) error {
	for _, l := range node.Links {
		node, err := l.GetNode(p.dserv)
		if err != nil {
			return err
		}

		k, err := node.Key()
		if err != nil {
			return err
		}

		p.recursePin.RemoveBlock(k)

		err = p.unpinLinks(node)
		if err != nil {
			return err
		}
	}
118 119 120 121 122 123 124 125 126
	return nil
}

func (p *pinner) pinIndirectRecurse(node *mdag.Node) error {
	k, err := node.Key()
	if err != nil {
		return err
	}

127
	p.indirPin.Increment(k)
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
	return p.pinLinks(node)
}

func (p *pinner) pinLinks(node *mdag.Node) error {
	for _, l := range node.Links {
		subnode, err := l.GetNode(p.dserv)
		if err != nil {
			// TODO: Maybe just log and continue?
			return err
		}
		err = p.pinIndirectRecurse(subnode)
		if err != nil {
			return err
		}
	}
	return nil
}

func (p *pinner) IsPinned(key util.Key) bool {
Jeromy's avatar
Jeromy committed
147 148
	p.lock.RLock()
	defer p.lock.RUnlock()
149 150 151 152
	return p.recursePin.HasKey(key) ||
		p.directPin.HasKey(key) ||
		p.indirPin.HasKey(key)
}
153

Jeromy's avatar
Jeromy committed
154
func LoadPinner(d ds.Datastore, dserv *mdag.DAGService) (Pinner, error) {
155 156
	p := new(pinner)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157 158 159 160 161 162
	{ // load recursive set
		var recurseKeys []util.Key
		if err := loadSet(d, recursePinDatastoreKey, &recurseKeys); err != nil {
			return nil, err
		}
		p.recursePin = set.SimpleSetFromKeys(recurseKeys)
163
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
164 165 166 167 168 169 170

	{ // load direct set
		var directKeys []util.Key
		if err := loadSet(d, directPinDatastoreKey, &directKeys); err != nil {
			return nil, err
		}
		p.directPin = set.SimpleSetFromKeys(directKeys)
171 172
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
173 174 175 176 177 178
	{ // load indirect set
		var err error
		p.indirPin, err = loadIndirPin(d, indirectPinDatastoreKey)
		if err != nil {
			return nil, err
		}
179 180
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
181
	// assign services
Jeromy's avatar
Jeromy committed
182 183 184
	p.dserv = dserv
	p.dstore = d

185 186 187 188
	return p, nil
}

func (p *pinner) Flush() error {
Jeromy's avatar
Jeromy committed
189 190
	p.lock.RLock()
	defer p.lock.RUnlock()
Jeromy's avatar
flush!  
Jeromy committed
191

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
192
	err := storeSet(p.dstore, directPinDatastoreKey, p.directPin.GetKeys())
193 194 195 196
	if err != nil {
		return err
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
197
	err = storeSet(p.dstore, recursePinDatastoreKey, p.recursePin.GetKeys())
Jeromy's avatar
flush!  
Jeromy committed
198 199 200 201
	if err != nil {
		return err
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202
	err = storeIndirPin(p.dstore, indirectPinDatastoreKey, p.indirPin)
Jeromy's avatar
flush!  
Jeromy committed
203 204 205
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
206 207
	return nil
}
Jeromy's avatar
flush!  
Jeromy committed
208

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
209 210 211
// helpers to marshal / unmarshal a pin set
func storeSet(d ds.Datastore, k ds.Key, val interface{}) error {
	buf, err := json.Marshal(val)
Jeromy's avatar
flush!  
Jeromy committed
212 213 214 215
	if err != nil {
		return err
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
216 217 218 219 220
	return d.Put(k, buf)
}

func loadSet(d ds.Datastore, k ds.Key, val interface{}) error {
	buf, err := d.Get(k)
221 222 223 224
	if err != nil {
		return err
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
225 226 227
	bf, ok := buf.([]byte)
	if !ok {
		return errors.New("invalid pin set value in datastore")
228
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
229
	return json.Unmarshal(bf, val)
230
}