Unverified Commit dffa2f8b authored by David Dias's avatar David Dias Committed by GitHub

refactor: ProviderManager (#492)

* test: augment TestProviderManager test, add notes of future tests
* refactor(provider-manager): order funcs, update names for consistency, add code docs
parent ad27ebcf
package providers
import (
"time"
"github.com/libp2p/go-libp2p-core/peer"
)
// A providerSet has the list of providers and the time that they were added
// It is used as an intermediary data struct between what is stored in the datastore
// and the list of providers that get passed to the consumer of a .GetProviders call
type providerSet struct {
providers []peer.ID
set map[peer.ID]time.Time
}
func newProviderSet() *providerSet {
return &providerSet{
set: make(map[peer.ID]time.Time),
}
}
func (ps *providerSet) Add(p peer.ID) {
ps.setVal(p, time.Now())
}
func (ps *providerSet) setVal(p peer.ID, t time.Time) {
_, found := ps.set[p]
if !found {
ps.providers = append(ps.providers, p)
}
ps.set[p] = t
}
...@@ -19,19 +19,24 @@ import ( ...@@ -19,19 +19,24 @@ import (
base32 "github.com/multiformats/go-base32" base32 "github.com/multiformats/go-base32"
) )
var batchBufferSize = 256 // ProvidersKeyPrefix is the prefix/namespace for ALL provider record
// keys stored in the data store.
var log = logging.Logger("providers") const ProvidersKeyPrefix = "/providers/"
var lruCacheSize = 256 // ProvideValidity is the default time that a provider record should last
var ProvideValidity = time.Hour * 24 var ProvideValidity = time.Hour * 24
var defaultCleanupInterval = time.Hour var defaultCleanupInterval = time.Hour
var lruCacheSize = 256
var batchBufferSize = 256
var log = logging.Logger("providers")
// ProviderManager adds and pulls providers out of the datastore,
// caching them in between
type ProviderManager struct { type ProviderManager struct {
// all non channel fields are meant to be accessed only within // all non channel fields are meant to be accessed only within
// the run method // the run method
providers *lru.LRU cache *lru.LRU
dstore *autobatch.Datastore dstore *autobatch.Datastore
newprovs chan *addProv newprovs chan *addProv
getprovs chan *getProv getprovs chan *getProv
...@@ -40,21 +45,17 @@ type ProviderManager struct { ...@@ -40,21 +45,17 @@ type ProviderManager struct {
cleanupInterval time.Duration cleanupInterval time.Duration
} }
type providerSet struct {
providers []peer.ID
set map[peer.ID]time.Time
}
type addProv struct { type addProv struct {
k []byte key []byte
val peer.ID val peer.ID
} }
type getProv struct { type getProv struct {
k []byte key []byte
resp chan []peer.ID resp chan []peer.ID
} }
// NewProviderManager constructor
func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching) *ProviderManager { func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching) *ProviderManager {
pm := new(ProviderManager) pm := new(ProviderManager)
pm.getprovs = make(chan *getProv) pm.getprovs = make(chan *getProv)
...@@ -64,7 +65,7 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching) ...@@ -64,7 +65,7 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching)
if err != nil { if err != nil {
panic(err) //only happens if negative value is passed to lru constructor panic(err) //only happens if negative value is passed to lru constructor
} }
pm.providers = cache pm.cache = cache
pm.proc = goprocessctx.WithContext(ctx) pm.proc = goprocessctx.WithContext(ctx)
pm.cleanupInterval = defaultCleanupInterval pm.cleanupInterval = defaultCleanupInterval
...@@ -73,130 +74,11 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching) ...@@ -73,130 +74,11 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching)
return pm return pm
} }
// ProvidersKeyPrefix is the prefix/namespace for ALL provider record // Process returns the ProviderManager process
// keys stored in the data store.
const ProvidersKeyPrefix = "/providers/"
func mkProvKey(k []byte) string {
return ProvidersKeyPrefix + base32.RawStdEncoding.EncodeToString(k)
}
func (pm *ProviderManager) Process() goprocess.Process { func (pm *ProviderManager) Process() goprocess.Process {
return pm.proc return pm.proc
} }
func (pm *ProviderManager) providersForKey(k []byte) ([]peer.ID, error) {
pset, err := pm.getProvSet(k)
if err != nil {
return nil, err
}
return pset.providers, nil
}
func (pm *ProviderManager) getProvSet(k []byte) (*providerSet, error) {
cached, ok := pm.providers.Get(string(k))
if ok {
return cached.(*providerSet), nil
}
pset, err := loadProvSet(pm.dstore, k)
if err != nil {
return nil, err
}
if len(pset.providers) > 0 {
pm.providers.Add(string(k), pset)
}
return pset, nil
}
func loadProvSet(dstore ds.Datastore, k []byte) (*providerSet, error) {
res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k)})
if err != nil {
return nil, err
}
defer res.Close()
now := time.Now()
out := newProviderSet()
for {
e, ok := res.NextSync()
if !ok {
break
}
if e.Error != nil {
log.Error("got an error: ", e.Error)
continue
}
// check expiration time
t, err := readTimeValue(e.Value)
switch {
case err != nil:
// couldn't parse the time
log.Error("parsing providers record from disk: ", err)
fallthrough
case now.Sub(t) > ProvideValidity:
// or just expired
err = dstore.Delete(ds.RawKey(e.Key))
if err != nil && err != ds.ErrNotFound {
log.Error("failed to remove provider record from disk: ", err)
}
continue
}
lix := strings.LastIndex(e.Key, "/")
decstr, err := base32.RawStdEncoding.DecodeString(e.Key[lix+1:])
if err != nil {
log.Error("base32 decoding error: ", err)
err = dstore.Delete(ds.RawKey(e.Key))
if err != nil && err != ds.ErrNotFound {
log.Error("failed to remove provider record from disk: ", err)
}
continue
}
pid := peer.ID(decstr)
out.setVal(pid, t)
}
return out, nil
}
func readTimeValue(data []byte) (time.Time, error) {
nsec, n := binary.Varint(data)
if n <= 0 {
return time.Time{}, fmt.Errorf("failed to parse time")
}
return time.Unix(0, nsec), nil
}
func (pm *ProviderManager) addProv(k []byte, p peer.ID) error {
now := time.Now()
if provs, ok := pm.providers.Get(string(k)); ok {
provs.(*providerSet).setVal(p, now)
} // else not cached, just write through
return writeProviderEntry(pm.dstore, k, p, now)
}
func mkProvKeyFor(k []byte, p peer.ID) string {
return mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p))
}
func writeProviderEntry(dstore ds.Datastore, k []byte, p peer.ID, t time.Time) error {
dsk := mkProvKeyFor(k, p)
buf := make([]byte, 16)
n := binary.PutVarint(buf, t.UnixNano())
return dstore.Put(ds.NewKey(dsk), buf[:n])
}
func (pm *ProviderManager) run(proc goprocess.Process) { func (pm *ProviderManager) run(proc goprocess.Process) {
var ( var (
gcQuery dsq.Results gcQuery dsq.Results
...@@ -220,7 +102,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) { ...@@ -220,7 +102,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
for { for {
select { select {
case np := <-pm.newprovs: case np := <-pm.newprovs:
err := pm.addProv(np.k, np.val) err := pm.addProv(np.key, np.val)
if err != nil { if err != nil {
log.Error("error adding new providers: ", err) log.Error("error adding new providers: ", err)
continue continue
...@@ -228,10 +110,10 @@ func (pm *ProviderManager) run(proc goprocess.Process) { ...@@ -228,10 +110,10 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
if gcSkip != nil { if gcSkip != nil {
// we have an gc, tell it to skip this provider // we have an gc, tell it to skip this provider
// as we've updated it since the GC started. // as we've updated it since the GC started.
gcSkip[mkProvKeyFor(np.k, np.val)] = struct{}{} gcSkip[mkProvKeyFor(np.key, np.val)] = struct{}{}
} }
case gp := <-pm.getprovs: case gp := <-pm.getprovs:
provs, err := pm.providersForKey(gp.k) provs, err := pm.getProvidersForKey(gp.key)
if err != nil && err != ds.ErrNotFound { if err != nil && err != ds.ErrNotFound {
log.Error("error reading providers: ", err) log.Error("error reading providers: ", err)
} }
...@@ -281,7 +163,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) { ...@@ -281,7 +163,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
// drop them. // drop them.
// //
// Much faster than GCing. // Much faster than GCing.
pm.providers.Purge() pm.cache.Purge()
// Now, kick off a GC of the datastore. // Now, kick off a GC of the datastore.
q, err := pm.dstore.Query(dsq.Query{ q, err := pm.dstore.Query(dsq.Query{
...@@ -300,10 +182,10 @@ func (pm *ProviderManager) run(proc goprocess.Process) { ...@@ -300,10 +182,10 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
} }
} }
// AddProvider adds a provider. // AddProvider adds a provider
func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.ID) { func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.ID) {
prov := &addProv{ prov := &addProv{
k: k, key: k,
val: val, val: val,
} }
select { select {
...@@ -312,11 +194,39 @@ func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.I ...@@ -312,11 +194,39 @@ func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.I
} }
} }
// addProv updates the cache if needed
func (pm *ProviderManager) addProv(k []byte, p peer.ID) error {
now := time.Now()
if provs, ok := pm.cache.Get(string(k)); ok {
provs.(*providerSet).setVal(p, now)
} // else not cached, just write through
return writeProviderEntry(pm.dstore, k, p, now)
}
// writeProviderEntry writes the provider into the datastore
func writeProviderEntry(dstore ds.Datastore, k []byte, p peer.ID, t time.Time) error {
dsk := mkProvKeyFor(k, p)
buf := make([]byte, 16)
n := binary.PutVarint(buf, t.UnixNano())
return dstore.Put(ds.NewKey(dsk), buf[:n])
}
func mkProvKeyFor(k []byte, p peer.ID) string {
return mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p))
}
func mkProvKey(k []byte) string {
return ProvidersKeyPrefix + base32.RawStdEncoding.EncodeToString(k)
}
// GetProviders returns the set of providers for the given key. // GetProviders returns the set of providers for the given key.
// This method _does not_ copy the set. Do not modify it. // This method _does not_ copy the set. Do not modify it.
func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID { func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID {
gp := &getProv{ gp := &getProv{
k: k, key: k,
resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking
} }
select { select {
...@@ -332,21 +242,94 @@ func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID ...@@ -332,21 +242,94 @@ func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID
} }
} }
func newProviderSet() *providerSet { func (pm *ProviderManager) getProvidersForKey(k []byte) ([]peer.ID, error) {
return &providerSet{ pset, err := pm.getProviderSetForKey(k)
set: make(map[peer.ID]time.Time), if err != nil {
return nil, err
} }
return pset.providers, nil
} }
func (ps *providerSet) Add(p peer.ID) { // returns the ProviderSet if it already exists on cache, otherwise loads it from datasatore
ps.setVal(p, time.Now()) func (pm *ProviderManager) getProviderSetForKey(k []byte) (*providerSet, error) {
cached, ok := pm.cache.Get(string(k))
if ok {
return cached.(*providerSet), nil
}
pset, err := loadProviderSet(pm.dstore, k)
if err != nil {
return nil, err
}
if len(pset.providers) > 0 {
pm.cache.Add(string(k), pset)
}
return pset, nil
} }
func (ps *providerSet) setVal(p peer.ID, t time.Time) { // loads the ProviderSet out of the datastore
_, found := ps.set[p] func loadProviderSet(dstore ds.Datastore, k []byte) (*providerSet, error) {
if !found { res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k)})
ps.providers = append(ps.providers, p) if err != nil {
return nil, err
} }
defer res.Close()
ps.set[p] = t now := time.Now()
out := newProviderSet()
for {
e, ok := res.NextSync()
if !ok {
break
}
if e.Error != nil {
log.Error("got an error: ", e.Error)
continue
}
// check expiration time
t, err := readTimeValue(e.Value)
switch {
case err != nil:
// couldn't parse the time
log.Error("parsing providers record from disk: ", err)
fallthrough
case now.Sub(t) > ProvideValidity:
// or just expired
err = dstore.Delete(ds.RawKey(e.Key))
if err != nil && err != ds.ErrNotFound {
log.Error("failed to remove provider record from disk: ", err)
}
continue
}
lix := strings.LastIndex(e.Key, "/")
decstr, err := base32.RawStdEncoding.DecodeString(e.Key[lix+1:])
if err != nil {
log.Error("base32 decoding error: ", err)
err = dstore.Delete(ds.RawKey(e.Key))
if err != nil && err != ds.ErrNotFound {
log.Error("failed to remove provider record from disk: ", err)
}
continue
}
pid := peer.ID(decstr)
out.setVal(pid, t)
}
return out, nil
}
func readTimeValue(data []byte) (time.Time, error) {
nsec, n := binary.Varint(data)
if n <= 0 {
return time.Time{}, fmt.Errorf("failed to parse time")
}
return time.Unix(0, nsec), nil
} }
...@@ -31,16 +31,27 @@ func TestProviderManager(t *testing.T) { ...@@ -31,16 +31,27 @@ func TestProviderManager(t *testing.T) {
p.AddProvider(ctx, a, peer.ID("testingprovider")) p.AddProvider(ctx, a, peer.ID("testingprovider"))
// Not cached // Not cached
// TODO verify that cache is empty
resp := p.GetProviders(ctx, a) resp := p.GetProviders(ctx, a)
if len(resp) != 1 { if len(resp) != 1 {
t.Fatal("Could not retrieve provider.") t.Fatal("Could not retrieve provider.")
} }
// Cached // Cached
// TODO verify that cache is populated
resp = p.GetProviders(ctx, a) resp = p.GetProviders(ctx, a)
if len(resp) != 1 { if len(resp) != 1 {
t.Fatal("Could not retrieve provider.") t.Fatal("Could not retrieve provider.")
} }
p.AddProvider(ctx, a, peer.ID("testingprovider2"))
p.AddProvider(ctx, a, peer.ID("testingprovider3"))
// TODO verify that cache is already up to date
resp = p.GetProviders(ctx, a)
if len(resp) != 3 {
t.Fatalf("Should have got 3 providers, got %d", len(resp))
}
p.proc.Close() p.proc.Close()
} }
...@@ -94,7 +105,7 @@ func TestProvidersSerialization(t *testing.T) { ...@@ -94,7 +105,7 @@ func TestProvidersSerialization(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
pset, err := loadProvSet(dstore, k) pset, err := loadProviderSet(dstore, k)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -182,7 +193,7 @@ func TestProvidesExpire(t *testing.T) { ...@@ -182,7 +193,7 @@ func TestProvidesExpire(t *testing.T) {
// Stop to prevent data races // Stop to prevent data races
p.Process().Close() p.Process().Close()
if p.providers.Len() != 0 { if p.cache.Len() != 0 {
t.Fatal("providers map not cleaned up") t.Fatal("providers map not cleaned up")
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment