pin.go 4.26 KB
Newer Older
1 2 3
package pin

import (
4 5

	//ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
Jeromy's avatar
flush!  
Jeromy committed
6 7
	"bytes"
	"encoding/json"
Jeromy's avatar
Jeromy committed
8 9
	"sync"

10
	ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
11
	nsds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go/namespace"
12 13 14 15 16
	"github.com/jbenet/go-ipfs/blocks/set"
	mdag "github.com/jbenet/go-ipfs/merkledag"
	"github.com/jbenet/go-ipfs/util"
)

17 18 19 20
var recursePinDatastoreKey = ds.NewKey("/local/pins/recursive/keys")
var directPinDatastoreKey = ds.NewKey("/local/pins/direct/keys")
var indirectPinDatastoreKey = ds.NewKey("/local/pins/indirect/keys")

21
type Pinner interface {
Jeromy's avatar
Jeromy committed
22
	IsPinned(util.Key) bool
23 24
	Pin(*mdag.Node, bool) error
	Unpin(util.Key, bool) error
25
	Flush() error
26 27 28
}

type pinner struct {
Jeromy's avatar
Jeromy committed
29
	lock       sync.RWMutex
30 31
	recursePin set.BlockSet
	directPin  set.BlockSet
32
	indirPin   *indirectPin
33
	dserv      *mdag.DAGService
34
	dstore     ds.Datastore
35 36 37
}

func NewPinner(dstore ds.Datastore, serv *mdag.DAGService) Pinner {
38 39

	// Load set from given datastore...
40 41
	rcds := nsds.Wrap(dstore, recursePinDatastoreKey)
	rcset := set.NewDBWrapperSet(rcds, set.NewSimpleBlockSet())
42

43 44 45 46
	dirds := nsds.Wrap(dstore, directPinDatastoreKey)
	dirset := set.NewDBWrapperSet(dirds, set.NewSimpleBlockSet())

	nsdstore := nsds.Wrap(dstore, indirectPinDatastoreKey)
47
	return &pinner{
48 49
		recursePin: rcset,
		directPin:  dirset,
50
		indirPin:   NewIndirectPin(nsdstore),
51
		dserv:      serv,
52
		dstore:     dstore,
53 54 55 56
	}
}

func (p *pinner) Pin(node *mdag.Node, recurse bool) error {
Jeromy's avatar
Jeromy committed
57 58
	p.lock.Lock()
	defer p.lock.Unlock()
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
	k, err := node.Key()
	if err != nil {
		return err
	}

	if recurse {
		if p.recursePin.HasKey(k) {
			return nil
		}

		p.recursePin.AddBlock(k)

		err := p.pinLinks(node)
		if err != nil {
			return err
		}
	} else {
		p.directPin.AddBlock(k)
	}
	return nil
}

func (p *pinner) Unpin(k util.Key, recurse bool) error {
Jeromy's avatar
Jeromy committed
82 83
	p.lock.Lock()
	defer p.lock.Unlock()
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
	if recurse {
		p.recursePin.RemoveBlock(k)
		node, err := p.dserv.Get(k)
		if err != nil {
			return err
		}

		return p.unpinLinks(node)
	} else {
		p.directPin.RemoveBlock(k)
	}
	return nil
}

func (p *pinner) unpinLinks(node *mdag.Node) error {
	for _, l := range node.Links {
		node, err := l.GetNode(p.dserv)
		if err != nil {
			return err
		}

		k, err := node.Key()
		if err != nil {
			return err
		}

		p.recursePin.RemoveBlock(k)

		err = p.unpinLinks(node)
		if err != nil {
			return err
		}
	}
117 118 119 120 121 122 123 124 125
	return nil
}

func (p *pinner) pinIndirectRecurse(node *mdag.Node) error {
	k, err := node.Key()
	if err != nil {
		return err
	}

126
	p.indirPin.Increment(k)
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
	return p.pinLinks(node)
}

func (p *pinner) pinLinks(node *mdag.Node) error {
	for _, l := range node.Links {
		subnode, err := l.GetNode(p.dserv)
		if err != nil {
			// TODO: Maybe just log and continue?
			return err
		}
		err = p.pinIndirectRecurse(subnode)
		if err != nil {
			return err
		}
	}
	return nil
}

func (p *pinner) IsPinned(key util.Key) bool {
Jeromy's avatar
Jeromy committed
146 147
	p.lock.RLock()
	defer p.lock.RUnlock()
148 149 150 151
	return p.recursePin.HasKey(key) ||
		p.directPin.HasKey(key) ||
		p.indirPin.HasKey(key)
}
152

Jeromy's avatar
Jeromy committed
153
func LoadPinner(d ds.Datastore, dserv *mdag.DAGService) (Pinner, error) {
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
	p := new(pinner)

	var err error
	p.recursePin, err = set.SetFromDatastore(d, recursePinDatastoreKey)
	if err != nil {
		return nil, err
	}
	p.directPin, err = set.SetFromDatastore(d, directPinDatastoreKey)
	if err != nil {
		return nil, err
	}

	p.indirPin, err = loadIndirPin(d, indirectPinDatastoreKey)
	if err != nil {
		return nil, err
	}

Jeromy's avatar
Jeromy committed
171 172 173
	p.dserv = dserv
	p.dstore = d

174 175 176 177
	return p, nil
}

func (p *pinner) Flush() error {
Jeromy's avatar
Jeromy committed
178 179
	p.lock.RLock()
	defer p.lock.RUnlock()
Jeromy's avatar
flush!  
Jeromy committed
180 181 182
	buf := new(bytes.Buffer)
	enc := json.NewEncoder(buf)

183
	recurse := p.recursePin.GetKeys()
Jeromy's avatar
flush!  
Jeromy committed
184
	err := enc.Encode(recurse)
185 186 187 188
	if err != nil {
		return err
	}

Jeromy's avatar
flush!  
Jeromy committed
189 190 191 192 193 194 195
	err = p.dstore.Put(recursePinDatastoreKey, buf.Bytes())
	if err != nil {
		return err
	}

	buf = new(bytes.Buffer)
	enc = json.NewEncoder(buf)
196
	direct := p.directPin.GetKeys()
Jeromy's avatar
flush!  
Jeromy committed
197 198 199 200 201 202 203 204 205 206 207 208 209
	err = enc.Encode(direct)
	if err != nil {
		return err
	}

	err = p.dstore.Put(directPinDatastoreKey, buf.Bytes())
	if err != nil {
		return err
	}

	buf = new(bytes.Buffer)
	enc = json.NewEncoder(buf)
	err = enc.Encode(p.indirPin.refCounts)
210 211 212 213
	if err != nil {
		return err
	}

Jeromy's avatar
flush!  
Jeromy committed
214
	err = p.dstore.Put(indirectPinDatastoreKey, buf.Bytes())
215 216 217 218 219
	if err != nil {
		return err
	}
	return nil
}