providers_manager.go 8.9 KB
Newer Older
1 2 3
package providers

import (
Jeromy's avatar
Jeromy committed
4
	"context"
5 6 7 8 9
	"encoding/binary"
	"fmt"
	"strings"
	"time"

10 11
	"github.com/libp2p/go-libp2p-core/peer"

12
	lru "github.com/hashicorp/golang-lru/simplelru"
13
	ds "github.com/ipfs/go-datastore"
Hector Sanjuan's avatar
Hector Sanjuan committed
14
	autobatch "github.com/ipfs/go-datastore/autobatch"
15 16 17 18
	dsq "github.com/ipfs/go-datastore/query"
	logging "github.com/ipfs/go-log"
	goprocess "github.com/jbenet/goprocess"
	goprocessctx "github.com/jbenet/goprocess/context"
Steven Allen's avatar
Steven Allen committed
19
	base32 "github.com/multiformats/go-base32"
20 21
)

David Dias's avatar
David Dias committed
22 23 24
// ProvidersKeyPrefix is the prefix/namespace for ALL provider record
// keys stored in the data store.
const ProvidersKeyPrefix = "/providers/"
25

David Dias's avatar
David Dias committed
26
// ProvideValidity is the default time that a provider record should last
27 28
var ProvideValidity = time.Hour * 24
var defaultCleanupInterval = time.Hour
David Dias's avatar
David Dias committed
29 30 31
var lruCacheSize = 256
var batchBufferSize = 256
var log = logging.Logger("providers")
32

David Dias's avatar
David Dias committed
33 34
// ProviderManager adds and pulls providers out of the datastore,
// caching them in between
35 36 37
type ProviderManager struct {
	// all non channel fields are meant to be accessed only within
	// the run method
Alan Shaw's avatar
Alan Shaw committed
38
	cache  lru.LRUCache
David Dias's avatar
David Dias committed
39
	dstore *autobatch.Datastore
40 41 42 43 44 45 46 47

	newprovs chan *addProv
	getprovs chan *getProv
	proc     goprocess.Process

	cleanupInterval time.Duration
}

Alan Shaw's avatar
Alan Shaw committed
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
type options struct {
	cleanupInterval time.Duration
	cache           lru.LRUCache
}

// Option is a function that sets a provider manager option.
type Option func(*options) error

func (c *options) apply(opts ...Option) error {
	for i, opt := range opts {
		if err := opt(c); err != nil {
			return fmt.Errorf("provider manager option %d failed: %s", i, err)
		}
	}
	return nil
}

var defaults = func(o *options) error {
	o.cleanupInterval = defaultCleanupInterval
	cache, err := lru.NewLRU(lruCacheSize, nil)
	if err != nil {
		return err
	}
	o.cache = cache
	return nil
}

// CleanupInterval sets the time between GC runs.
// Defaults to 1h.
func CleanupInterval(d time.Duration) Option {
	return func(o *options) error {
		o.cleanupInterval = d
		return nil
	}
}

// Cache sets the LRU cache implementation.
// Defaults to a simple LRU cache.
func Cache(c lru.LRUCache) Option {
	return func(o *options) error {
		o.cache = c
		return nil
	}
}

93
type addProv struct {
David Dias's avatar
David Dias committed
94
	key []byte
95 96 97 98
	val peer.ID
}

type getProv struct {
David Dias's avatar
David Dias committed
99
	key  []byte
100 101 102
	resp chan []peer.ID
}

David Dias's avatar
David Dias committed
103
// NewProviderManager constructor
Alan Shaw's avatar
Alan Shaw committed
104
func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching, opts ...Option) (*ProviderManager, error) {
Alan Shaw's avatar
Alan Shaw committed
105 106
	var cfg options
	if err := cfg.apply(append([]Option{defaults}, opts...)...); err != nil {
Alan Shaw's avatar
Alan Shaw committed
107 108
		return nil, err
	}
109 110 111
	pm := new(ProviderManager)
	pm.getprovs = make(chan *getProv)
	pm.newprovs = make(chan *addProv)
112
	pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize)
Alan Shaw's avatar
Alan Shaw committed
113
	pm.cache = cfg.cache
114
	pm.proc = goprocessctx.WithContext(ctx)
Alan Shaw's avatar
Alan Shaw committed
115
	pm.cleanupInterval = cfg.cleanupInterval
Steven Allen's avatar
Steven Allen committed
116
	pm.proc.Go(pm.run)
Alan Shaw's avatar
Alan Shaw committed
117
	return pm, nil
118 119
}

David Dias's avatar
David Dias committed
120
// Process returns the ProviderManager process
121 122 123 124
func (pm *ProviderManager) Process() goprocess.Process {
	return pm.proc
}

125 126 127 128 129 130 131 132 133 134 135 136 137 138
func (pm *ProviderManager) run(proc goprocess.Process) {
	var (
		gcQuery    dsq.Results
		gcQueryRes <-chan dsq.Result
		gcSkip     map[string]struct{}
		gcTime     time.Time
		gcTimer    = time.NewTimer(pm.cleanupInterval)
	)

	defer func() {
		gcTimer.Stop()
		if gcQuery != nil {
			// don't really care if this fails.
			_ = gcQuery.Close()
Steven Allen's avatar
Steven Allen committed
139
		}
140 141
		if err := pm.dstore.Flush(); err != nil {
			log.Error("failed to flush datastore: ", err)
142
		}
143
	}()
144

145 146 147
	for {
		select {
		case np := <-pm.newprovs:
David Dias's avatar
David Dias committed
148
			err := pm.addProv(np.key, np.val)
149 150
			if err != nil {
				log.Error("error adding new providers: ", err)
151 152 153 154 155
				continue
			}
			if gcSkip != nil {
				// we have an gc, tell it to skip this provider
				// as we've updated it since the GC started.
David Dias's avatar
David Dias committed
156
				gcSkip[mkProvKeyFor(np.key, np.val)] = struct{}{}
157 158
			}
		case gp := <-pm.getprovs:
David Dias's avatar
David Dias committed
159
			provs, err := pm.getProvidersForKey(gp.key)
160 161 162 163
			if err != nil && err != ds.ErrNotFound {
				log.Error("error reading providers: ", err)
			}

Steven Allen's avatar
Steven Allen committed
164 165
			// set the cap so the user can't append to this.
			gp.resp <- provs[0:len(provs):len(provs)]
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
		case res, ok := <-gcQueryRes:
			if !ok {
				if err := gcQuery.Close(); err != nil {
					log.Error("failed to close provider GC query: ", err)
				}
				gcTimer.Reset(pm.cleanupInterval)

				// cleanup GC round
				gcQueryRes = nil
				gcSkip = nil
				gcQuery = nil
				continue
			}
			if res.Error != nil {
				log.Error("got error from GC query: ", res.Error)
				continue
			}
			if _, ok := gcSkip[res.Key]; ok {
				// We've updated this record since starting the
				// GC round, skip it.
				continue
			}

			// check expiration time
			t, err := readTimeValue(res.Value)
			switch {
			case err != nil:
				// couldn't parse the time
194
				log.Error("parsing providers record from disk: ", err)
195 196 197 198 199
				fallthrough
			case gcTime.Sub(t) > ProvideValidity:
				// or expired
				err = pm.dstore.Delete(ds.RawKey(res.Key))
				if err != nil && err != ds.ErrNotFound {
200
					log.Error("failed to remove provider record from disk: ", err)
201 202 203 204
				}
			}

		case gcTime = <-gcTimer.C:
Steven Allen's avatar
Steven Allen committed
205 206 207 208
			// You know the wonderful thing about caches? You can
			// drop them.
			//
			// Much faster than GCing.
David Dias's avatar
David Dias committed
209
			pm.cache.Purge()
210 211 212

			// Now, kick off a GC of the datastore.
			q, err := pm.dstore.Query(dsq.Query{
213
				Prefix: ProvidersKeyPrefix,
214 215 216 217 218 219 220 221
			})
			if err != nil {
				log.Error("provider record GC query failed: ", err)
				continue
			}
			gcQuery = q
			gcQueryRes = q.Next()
			gcSkip = make(map[string]struct{})
Steven Allen's avatar
Steven Allen committed
222
		case <-proc.Closing():
223 224 225 226 227
			return
		}
	}
}

David Dias's avatar
David Dias committed
228
// AddProvider adds a provider
229
func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.ID) {
230
	prov := &addProv{
David Dias's avatar
David Dias committed
231
		key: k,
232 233 234 235 236 237 238 239
		val: val,
	}
	select {
	case pm.newprovs <- prov:
	case <-ctx.Done():
	}
}

David Dias's avatar
David Dias committed
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
// addProv updates the cache if needed
func (pm *ProviderManager) addProv(k []byte, p peer.ID) error {
	now := time.Now()
	if provs, ok := pm.cache.Get(string(k)); ok {
		provs.(*providerSet).setVal(p, now)
	} // else not cached, just write through

	return writeProviderEntry(pm.dstore, k, p, now)
}

// writeProviderEntry writes the provider into the datastore
func writeProviderEntry(dstore ds.Datastore, k []byte, p peer.ID, t time.Time) error {
	dsk := mkProvKeyFor(k, p)

	buf := make([]byte, 16)
	n := binary.PutVarint(buf, t.UnixNano())

	return dstore.Put(ds.NewKey(dsk), buf[:n])
}

func mkProvKeyFor(k []byte, p peer.ID) string {
	return mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p))
}

func mkProvKey(k []byte) string {
	return ProvidersKeyPrefix + base32.RawStdEncoding.EncodeToString(k)
}

268 269
// GetProviders returns the set of providers for the given key.
// This method _does not_ copy the set. Do not modify it.
270
func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID {
271
	gp := &getProv{
David Dias's avatar
David Dias committed
272
		key:  k,
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
		resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking
	}
	select {
	case <-ctx.Done():
		return nil
	case pm.getprovs <- gp:
	}
	select {
	case <-ctx.Done():
		return nil
	case peers := <-gp.resp:
		return peers
	}
}

David Dias's avatar
David Dias committed
288 289 290 291
func (pm *ProviderManager) getProvidersForKey(k []byte) ([]peer.ID, error) {
	pset, err := pm.getProviderSetForKey(k)
	if err != nil {
		return nil, err
292
	}
David Dias's avatar
David Dias committed
293
	return pset.providers, nil
294 295
}

David Dias's avatar
David Dias committed
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
// returns the ProviderSet if it already exists on cache, otherwise loads it from datasatore
func (pm *ProviderManager) getProviderSetForKey(k []byte) (*providerSet, error) {
	cached, ok := pm.cache.Get(string(k))
	if ok {
		return cached.(*providerSet), nil
	}

	pset, err := loadProviderSet(pm.dstore, k)
	if err != nil {
		return nil, err
	}

	if len(pset.providers) > 0 {
		pm.cache.Add(string(k), pset)
	}

	return pset, nil
313 314
}

David Dias's avatar
David Dias committed
315 316 317 318 319
// loads the ProviderSet out of the datastore
func loadProviderSet(dstore ds.Datastore, k []byte) (*providerSet, error) {
	res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k)})
	if err != nil {
		return nil, err
320
	}
David Dias's avatar
David Dias committed
321
	defer res.Close()
322

David Dias's avatar
David Dias committed
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
	now := time.Now()
	out := newProviderSet()
	for {
		e, ok := res.NextSync()
		if !ok {
			break
		}
		if e.Error != nil {
			log.Error("got an error: ", e.Error)
			continue
		}

		// check expiration time
		t, err := readTimeValue(e.Value)
		switch {
		case err != nil:
			// couldn't parse the time
			log.Error("parsing providers record from disk: ", err)
			fallthrough
		case now.Sub(t) > ProvideValidity:
			// or just expired
			err = dstore.Delete(ds.RawKey(e.Key))
			if err != nil && err != ds.ErrNotFound {
				log.Error("failed to remove provider record from disk: ", err)
			}
			continue
		}

		lix := strings.LastIndex(e.Key, "/")

		decstr, err := base32.RawStdEncoding.DecodeString(e.Key[lix+1:])
		if err != nil {
			log.Error("base32 decoding error: ", err)
			err = dstore.Delete(ds.RawKey(e.Key))
			if err != nil && err != ds.ErrNotFound {
				log.Error("failed to remove provider record from disk: ", err)
			}
			continue
		}

		pid := peer.ID(decstr)

		out.setVal(pid, t)
	}

	return out, nil
}

func readTimeValue(data []byte) (time.Time, error) {
	nsec, n := binary.Varint(data)
	if n <= 0 {
		return time.Time{}, fmt.Errorf("failed to parse time")
	}

	return time.Unix(0, nsec), nil
378
}