providers_manager.go 8.71 KB
Newer Older
1 2 3
package providers

import (
Jeromy's avatar
Jeromy committed
4
	"context"
5 6 7 8 9
	"encoding/binary"
	"fmt"
	"strings"
	"time"

10 11
	"github.com/libp2p/go-libp2p-core/peer"

12
	lru "github.com/hashicorp/golang-lru/simplelru"
13
	ds "github.com/ipfs/go-datastore"
Hector Sanjuan's avatar
Hector Sanjuan committed
14
	autobatch "github.com/ipfs/go-datastore/autobatch"
15 16 17 18
	dsq "github.com/ipfs/go-datastore/query"
	logging "github.com/ipfs/go-log"
	goprocess "github.com/jbenet/goprocess"
	goprocessctx "github.com/jbenet/goprocess/context"
Steven Allen's avatar
Steven Allen committed
19
	base32 "github.com/multiformats/go-base32"
20 21
)

David Dias's avatar
David Dias committed
22 23 24
// ProvidersKeyPrefix is the prefix/namespace for ALL provider record
// keys stored in the data store.
const ProvidersKeyPrefix = "/providers/"
25

David Dias's avatar
David Dias committed
26
// ProvideValidity is the default time that a provider record should last
27 28
var ProvideValidity = time.Hour * 24
var defaultCleanupInterval = time.Hour
David Dias's avatar
David Dias committed
29 30 31
var lruCacheSize = 256
var batchBufferSize = 256
var log = logging.Logger("providers")
32

David Dias's avatar
David Dias committed
33 34
// ProviderManager adds and pulls providers out of the datastore,
// caching them in between
35 36 37
type ProviderManager struct {
	// all non channel fields are meant to be accessed only within
	// the run method
Alan Shaw's avatar
Alan Shaw committed
38
	cache  lru.LRUCache
David Dias's avatar
David Dias committed
39
	dstore *autobatch.Datastore
40 41 42 43 44 45 46 47

	newprovs chan *addProv
	getprovs chan *getProv
	proc     goprocess.Process

	cleanupInterval time.Duration
}

Alan Shaw's avatar
Alan Shaw committed
48
// Option is a function that sets a provider manager option.
49
type Option func(*ProviderManager) error
Alan Shaw's avatar
Alan Shaw committed
50

51
func (pm *ProviderManager) applyOptions(opts ...Option) error {
Alan Shaw's avatar
Alan Shaw committed
52
	for i, opt := range opts {
53
		if err := opt(pm); err != nil {
Alan Shaw's avatar
Alan Shaw committed
54 55 56 57 58 59 60 61 62
			return fmt.Errorf("provider manager option %d failed: %s", i, err)
		}
	}
	return nil
}

// CleanupInterval sets the time between GC runs.
// Defaults to 1h.
func CleanupInterval(d time.Duration) Option {
63 64
	return func(pm *ProviderManager) error {
		pm.cleanupInterval = d
Alan Shaw's avatar
Alan Shaw committed
65 66 67 68 69 70 71
		return nil
	}
}

// Cache sets the LRU cache implementation.
// Defaults to a simple LRU cache.
func Cache(c lru.LRUCache) Option {
72 73
	return func(pm *ProviderManager) error {
		pm.cache = c
Alan Shaw's avatar
Alan Shaw committed
74 75 76 77
		return nil
	}
}

78
type addProv struct {
David Dias's avatar
David Dias committed
79
	key []byte
80 81 82 83
	val peer.ID
}

type getProv struct {
David Dias's avatar
David Dias committed
84
	key  []byte
85 86 87
	resp chan []peer.ID
}

David Dias's avatar
David Dias committed
88
// NewProviderManager constructor
Alan Shaw's avatar
Alan Shaw committed
89
func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching, opts ...Option) (*ProviderManager, error) {
90 91 92
	pm := new(ProviderManager)
	pm.getprovs = make(chan *getProv)
	pm.newprovs = make(chan *addProv)
93
	pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize)
94 95 96 97 98 99 100 101 102
	cache, err := lru.NewLRU(lruCacheSize, nil)
	if err != nil {
		return nil, err
	}
	pm.cache = cache
	pm.cleanupInterval = defaultCleanupInterval
	if err := pm.applyOptions(opts...); err != nil {
		return nil, err
	}
103
	pm.proc = goprocessctx.WithContext(ctx)
Steven Allen's avatar
Steven Allen committed
104
	pm.proc.Go(pm.run)
Alan Shaw's avatar
Alan Shaw committed
105
	return pm, nil
106 107
}

David Dias's avatar
David Dias committed
108
// Process returns the ProviderManager process
109 110 111 112
func (pm *ProviderManager) Process() goprocess.Process {
	return pm.proc
}

113 114 115 116 117 118 119 120 121 122 123 124 125 126
func (pm *ProviderManager) run(proc goprocess.Process) {
	var (
		gcQuery    dsq.Results
		gcQueryRes <-chan dsq.Result
		gcSkip     map[string]struct{}
		gcTime     time.Time
		gcTimer    = time.NewTimer(pm.cleanupInterval)
	)

	defer func() {
		gcTimer.Stop()
		if gcQuery != nil {
			// don't really care if this fails.
			_ = gcQuery.Close()
Steven Allen's avatar
Steven Allen committed
127
		}
128 129
		if err := pm.dstore.Flush(); err != nil {
			log.Error("failed to flush datastore: ", err)
130
		}
131
	}()
132

133 134 135
	for {
		select {
		case np := <-pm.newprovs:
David Dias's avatar
David Dias committed
136
			err := pm.addProv(np.key, np.val)
137 138
			if err != nil {
				log.Error("error adding new providers: ", err)
139 140 141 142 143
				continue
			}
			if gcSkip != nil {
				// we have an gc, tell it to skip this provider
				// as we've updated it since the GC started.
David Dias's avatar
David Dias committed
144
				gcSkip[mkProvKeyFor(np.key, np.val)] = struct{}{}
145 146
			}
		case gp := <-pm.getprovs:
David Dias's avatar
David Dias committed
147
			provs, err := pm.getProvidersForKey(gp.key)
148 149 150 151
			if err != nil && err != ds.ErrNotFound {
				log.Error("error reading providers: ", err)
			}

Steven Allen's avatar
Steven Allen committed
152 153
			// set the cap so the user can't append to this.
			gp.resp <- provs[0:len(provs):len(provs)]
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
		case res, ok := <-gcQueryRes:
			if !ok {
				if err := gcQuery.Close(); err != nil {
					log.Error("failed to close provider GC query: ", err)
				}
				gcTimer.Reset(pm.cleanupInterval)

				// cleanup GC round
				gcQueryRes = nil
				gcSkip = nil
				gcQuery = nil
				continue
			}
			if res.Error != nil {
				log.Error("got error from GC query: ", res.Error)
				continue
			}
			if _, ok := gcSkip[res.Key]; ok {
				// We've updated this record since starting the
				// GC round, skip it.
				continue
			}

			// check expiration time
			t, err := readTimeValue(res.Value)
			switch {
			case err != nil:
				// couldn't parse the time
182
				log.Error("parsing providers record from disk: ", err)
183 184 185 186 187
				fallthrough
			case gcTime.Sub(t) > ProvideValidity:
				// or expired
				err = pm.dstore.Delete(ds.RawKey(res.Key))
				if err != nil && err != ds.ErrNotFound {
188
					log.Error("failed to remove provider record from disk: ", err)
189 190 191 192
				}
			}

		case gcTime = <-gcTimer.C:
Steven Allen's avatar
Steven Allen committed
193 194 195 196
			// You know the wonderful thing about caches? You can
			// drop them.
			//
			// Much faster than GCing.
David Dias's avatar
David Dias committed
197
			pm.cache.Purge()
198 199 200

			// Now, kick off a GC of the datastore.
			q, err := pm.dstore.Query(dsq.Query{
201
				Prefix: ProvidersKeyPrefix,
202 203 204 205 206 207 208 209
			})
			if err != nil {
				log.Error("provider record GC query failed: ", err)
				continue
			}
			gcQuery = q
			gcQueryRes = q.Next()
			gcSkip = make(map[string]struct{})
Steven Allen's avatar
Steven Allen committed
210
		case <-proc.Closing():
211 212 213 214 215
			return
		}
	}
}

David Dias's avatar
David Dias committed
216
// AddProvider adds a provider
217
func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.ID) {
218
	prov := &addProv{
David Dias's avatar
David Dias committed
219
		key: k,
220 221 222 223 224 225 226 227
		val: val,
	}
	select {
	case pm.newprovs <- prov:
	case <-ctx.Done():
	}
}

David Dias's avatar
David Dias committed
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
// addProv updates the cache if needed
func (pm *ProviderManager) addProv(k []byte, p peer.ID) error {
	now := time.Now()
	if provs, ok := pm.cache.Get(string(k)); ok {
		provs.(*providerSet).setVal(p, now)
	} // else not cached, just write through

	return writeProviderEntry(pm.dstore, k, p, now)
}

// writeProviderEntry writes the provider into the datastore
func writeProviderEntry(dstore ds.Datastore, k []byte, p peer.ID, t time.Time) error {
	dsk := mkProvKeyFor(k, p)

	buf := make([]byte, 16)
	n := binary.PutVarint(buf, t.UnixNano())

	return dstore.Put(ds.NewKey(dsk), buf[:n])
}

func mkProvKeyFor(k []byte, p peer.ID) string {
	return mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p))
}

func mkProvKey(k []byte) string {
	return ProvidersKeyPrefix + base32.RawStdEncoding.EncodeToString(k)
}

256 257
// GetProviders returns the set of providers for the given key.
// This method _does not_ copy the set. Do not modify it.
258
func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID {
259
	gp := &getProv{
David Dias's avatar
David Dias committed
260
		key:  k,
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
		resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking
	}
	select {
	case <-ctx.Done():
		return nil
	case pm.getprovs <- gp:
	}
	select {
	case <-ctx.Done():
		return nil
	case peers := <-gp.resp:
		return peers
	}
}

David Dias's avatar
David Dias committed
276 277 278 279
func (pm *ProviderManager) getProvidersForKey(k []byte) ([]peer.ID, error) {
	pset, err := pm.getProviderSetForKey(k)
	if err != nil {
		return nil, err
280
	}
David Dias's avatar
David Dias committed
281
	return pset.providers, nil
282 283
}

David Dias's avatar
David Dias committed
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
// returns the ProviderSet if it already exists on cache, otherwise loads it from datasatore
func (pm *ProviderManager) getProviderSetForKey(k []byte) (*providerSet, error) {
	cached, ok := pm.cache.Get(string(k))
	if ok {
		return cached.(*providerSet), nil
	}

	pset, err := loadProviderSet(pm.dstore, k)
	if err != nil {
		return nil, err
	}

	if len(pset.providers) > 0 {
		pm.cache.Add(string(k), pset)
	}

	return pset, nil
301 302
}

David Dias's avatar
David Dias committed
303 304 305 306 307
// loads the ProviderSet out of the datastore
func loadProviderSet(dstore ds.Datastore, k []byte) (*providerSet, error) {
	res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k)})
	if err != nil {
		return nil, err
308
	}
David Dias's avatar
David Dias committed
309
	defer res.Close()
310

David Dias's avatar
David Dias committed
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
	now := time.Now()
	out := newProviderSet()
	for {
		e, ok := res.NextSync()
		if !ok {
			break
		}
		if e.Error != nil {
			log.Error("got an error: ", e.Error)
			continue
		}

		// check expiration time
		t, err := readTimeValue(e.Value)
		switch {
		case err != nil:
			// couldn't parse the time
			log.Error("parsing providers record from disk: ", err)
			fallthrough
		case now.Sub(t) > ProvideValidity:
			// or just expired
			err = dstore.Delete(ds.RawKey(e.Key))
			if err != nil && err != ds.ErrNotFound {
				log.Error("failed to remove provider record from disk: ", err)
			}
			continue
		}

		lix := strings.LastIndex(e.Key, "/")

		decstr, err := base32.RawStdEncoding.DecodeString(e.Key[lix+1:])
		if err != nil {
			log.Error("base32 decoding error: ", err)
			err = dstore.Delete(ds.RawKey(e.Key))
			if err != nil && err != ds.ErrNotFound {
				log.Error("failed to remove provider record from disk: ", err)
			}
			continue
		}

		pid := peer.ID(decstr)

		out.setVal(pid, t)
	}

	return out, nil
}

func readTimeValue(data []byte) (time.Time, error) {
	nsec, n := binary.Varint(data)
	if n <= 0 {
		return time.Time{}, fmt.Errorf("failed to parse time")
	}

	return time.Unix(0, nsec), nil
366
}