Unverified Commit 523a9b8d authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #593 from alanshaw/feat/set-provider-mgr-options

feat: set provider manager options
parents 21b38ccf 1d696d1e
...@@ -279,7 +279,11 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) { ...@@ -279,7 +279,11 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
// the DHT context should be done when the process is closed // the DHT context should be done when the process is closed
dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc) dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)
dht.ProviderManager = providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore) pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore, cfg.providersOptions...)
if err != nil {
return nil, err
}
dht.ProviderManager = pm
return dht, nil return dht, nil
} }
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-kad-dht/providers"
record "github.com/libp2p/go-libp2p-record" record "github.com/libp2p/go-libp2p-record"
) )
...@@ -45,6 +46,7 @@ type config struct { ...@@ -45,6 +46,7 @@ type config struct {
maxRecordAge time.Duration maxRecordAge time.Duration
enableProviders bool enableProviders bool
enableValues bool enableValues bool
providersOptions []providers.Option
queryPeerFilter QueryFilterFunc queryPeerFilter QueryFilterFunc
routingTable struct { routingTable struct {
...@@ -348,6 +350,18 @@ func DisableValues() Option { ...@@ -348,6 +350,18 @@ func DisableValues() Option {
} }
} }
// ProvidersOptions are options passed directly to the provider manager.
//
// The provider manager adds and gets provider records from the datastore, cahing
// them in between. These options are passed to the provider manager allowing
// customisation of things like the GC interval and cache implementation.
func ProvidersOptions(opts []providers.Option) Option {
return func(c *config) error {
c.providersOptions = opts
return nil
}
}
// QueryFilter sets a function that approves which peers may be dialed in a query // QueryFilter sets a function that approves which peers may be dialed in a query
func QueryFilter(filter QueryFilterFunc) Option { func QueryFilter(filter QueryFilterFunc) Option {
return func(c *config) error { return func(c *config) error {
......
...@@ -35,7 +35,7 @@ var log = logging.Logger("providers") ...@@ -35,7 +35,7 @@ var log = logging.Logger("providers")
type ProviderManager struct { type ProviderManager struct {
// all non channel fields are meant to be accessed only within // all non channel fields are meant to be accessed only within
// the run method // the run method
cache *lru.LRU cache lru.LRUCache
dstore *autobatch.Datastore dstore *autobatch.Datastore
newprovs chan *addProv newprovs chan *addProv
...@@ -45,6 +45,36 @@ type ProviderManager struct { ...@@ -45,6 +45,36 @@ type ProviderManager struct {
cleanupInterval time.Duration cleanupInterval time.Duration
} }
// Option is a function that sets a provider manager option.
type Option func(*ProviderManager) error
func (pm *ProviderManager) applyOptions(opts ...Option) error {
for i, opt := range opts {
if err := opt(pm); err != nil {
return fmt.Errorf("provider manager option %d failed: %s", i, err)
}
}
return nil
}
// CleanupInterval sets the time between GC runs.
// Defaults to 1h.
func CleanupInterval(d time.Duration) Option {
return func(pm *ProviderManager) error {
pm.cleanupInterval = d
return nil
}
}
// Cache sets the LRU cache implementation.
// Defaults to a simple LRU cache.
func Cache(c lru.LRUCache) Option {
return func(pm *ProviderManager) error {
pm.cache = c
return nil
}
}
type addProv struct { type addProv struct {
key []byte key []byte
val peer.ID val peer.ID
...@@ -56,22 +86,23 @@ type getProv struct { ...@@ -56,22 +86,23 @@ type getProv struct {
} }
// NewProviderManager constructor // NewProviderManager constructor
func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching) *ProviderManager { func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching, opts ...Option) (*ProviderManager, error) {
pm := new(ProviderManager) pm := new(ProviderManager)
pm.getprovs = make(chan *getProv) pm.getprovs = make(chan *getProv)
pm.newprovs = make(chan *addProv) pm.newprovs = make(chan *addProv)
pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize) pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize)
cache, err := lru.NewLRU(lruCacheSize, nil) cache, err := lru.NewLRU(lruCacheSize, nil)
if err != nil { if err != nil {
panic(err) //only happens if negative value is passed to lru constructor return nil, err
} }
pm.cache = cache pm.cache = cache
pm.proc = goprocessctx.WithContext(ctx)
pm.cleanupInterval = defaultCleanupInterval pm.cleanupInterval = defaultCleanupInterval
if err := pm.applyOptions(opts...); err != nil {
return nil, err
}
pm.proc = goprocessctx.WithContext(ctx)
pm.proc.Go(pm.run) pm.proc.Go(pm.run)
return pm, nil
return pm
} }
// Process returns the ProviderManager process // Process returns the ProviderManager process
......
...@@ -26,7 +26,10 @@ func TestProviderManager(t *testing.T) { ...@@ -26,7 +26,10 @@ func TestProviderManager(t *testing.T) {
defer cancel() defer cancel()
mid := peer.ID("testing") mid := peer.ID("testing")
p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore())) p, err := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
if err != nil {
t.Fatal(err)
}
a := u.Hash([]byte("test")) a := u.Hash([]byte("test"))
p.AddProvider(ctx, a, peer.ID("testingprovider")) p.AddProvider(ctx, a, peer.ID("testingprovider"))
...@@ -64,7 +67,10 @@ func TestProvidersDatastore(t *testing.T) { ...@@ -64,7 +67,10 @@ func TestProvidersDatastore(t *testing.T) {
defer cancel() defer cancel()
mid := peer.ID("testing") mid := peer.ID("testing")
p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore())) p, err := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
if err != nil {
t.Fatal(err)
}
defer p.proc.Close() defer p.proc.Close()
friend := peer.ID("friend") friend := peer.ID("friend")
...@@ -144,7 +150,10 @@ func TestProvidesExpire(t *testing.T) { ...@@ -144,7 +150,10 @@ func TestProvidesExpire(t *testing.T) {
ds := dssync.MutexWrap(ds.NewMapDatastore()) ds := dssync.MutexWrap(ds.NewMapDatastore())
mid := peer.ID("testing") mid := peer.ID("testing")
p := NewProviderManager(ctx, mid, ds) p, err := NewProviderManager(ctx, mid, ds)
if err != nil {
t.Fatal(err)
}
peers := []peer.ID{"a", "b"} peers := []peer.ID{"a", "b"}
var mhs []mh.Multihash var mhs []mh.Multihash
...@@ -249,7 +258,10 @@ func TestLargeProvidersSet(t *testing.T) { ...@@ -249,7 +258,10 @@ func TestLargeProvidersSet(t *testing.T) {
} }
mid := peer.ID("myself") mid := peer.ID("myself")
p := NewProviderManager(ctx, mid, dstore) p, err := NewProviderManager(ctx, mid, dstore)
if err != nil {
t.Fatal(err)
}
defer p.proc.Close() defer p.proc.Close()
var mhs []mh.Multihash var mhs []mh.Multihash
...@@ -281,7 +293,10 @@ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) { ...@@ -281,7 +293,10 @@ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) {
p1, p2 := peer.ID("a"), peer.ID("b") p1, p2 := peer.ID("a"), peer.ID("b")
h1 := u.Hash([]byte("1")) h1 := u.Hash([]byte("1"))
h2 := u.Hash([]byte("2")) h2 := u.Hash([]byte("2"))
pm := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore())) pm, err := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore()))
if err != nil {
t.Fatal(err)
}
// add provider // add provider
pm.AddProvider(ctx, h1, p1) pm.AddProvider(ctx, h1, p1)
...@@ -302,7 +317,10 @@ func TestWriteUpdatesCache(t *testing.T) { ...@@ -302,7 +317,10 @@ func TestWriteUpdatesCache(t *testing.T) {
p1, p2 := peer.ID("a"), peer.ID("b") p1, p2 := peer.ID("a"), peer.ID("b")
h1 := u.Hash([]byte("1")) h1 := u.Hash([]byte("1"))
pm := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore())) pm, err := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore()))
if err != nil {
t.Fatal(err)
}
// add provider // add provider
pm.AddProvider(ctx, h1, p1) pm.AddProvider(ctx, h1, p1)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment