Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
p2p
go-p2p-kad-dht
Commits
da8a30f2
Commit
da8a30f2
authored
Apr 20, 2020
by
Alan Shaw
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
feat: set provider manager options
parent
7c7c46d3
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
93 additions
and
18 deletions
+93
-18
dht.go
dht.go
+5
-1
dht_options.go
dht_options.go
+10
-0
providers/providers_manager.go
providers/providers_manager.go
+54
-11
providers/providers_manager_test.go
providers/providers_manager_test.go
+24
-6
No files found.
dht.go
View file @
da8a30f2
...
...
@@ -275,7 +275,11 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
// the DHT context should be done when the process is closed
dht
.
ctx
=
goprocessctx
.
WithProcessClosing
(
ctxTags
,
dht
.
proc
)
dht
.
ProviderManager
=
providers
.
NewProviderManager
(
dht
.
ctx
,
h
.
ID
(),
cfg
.
datastore
)
pm
,
err
:=
providers
.
NewProviderManager
(
dht
.
ctx
,
h
.
ID
(),
cfg
.
datastore
,
cfg
.
providersOptions
...
)
if
err
!=
nil
{
return
nil
,
err
}
dht
.
ProviderManager
=
pm
return
dht
,
nil
}
...
...
dht_options.go
View file @
da8a30f2
...
...
@@ -11,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-kad-dht/providers"
record
"github.com/libp2p/go-libp2p-record"
)
...
...
@@ -45,6 +46,7 @@ type config struct {
maxRecordAge
time
.
Duration
enableProviders
bool
enableValues
bool
providersOptions
[]
providers
.
Option
queryPeerFilter
QueryFilterFunc
routingTable
struct
{
...
...
@@ -348,6 +350,14 @@ func DisableValues() Option {
}
}
// ProvidersOptions are options passed directly to the provider manager.
func
ProvidersOptions
(
opts
[]
providers
.
Option
)
Option
{
return
func
(
c
*
config
)
error
{
c
.
providersOptions
=
opts
return
nil
}
}
// QueryFilter sets a function that approves which peers may be dialed in a query
func
QueryFilter
(
filter
QueryFilterFunc
)
Option
{
return
func
(
c
*
config
)
error
{
...
...
providers/providers_manager.go
View file @
da8a30f2
...
...
@@ -35,7 +35,7 @@ var log = logging.Logger("providers")
type
ProviderManager
struct
{
// all non channel fields are meant to be accessed only within
// the run method
cache
*
lru
.
LRU
cache
lru
.
LRU
Cache
dstore
*
autobatch
.
Datastore
newprovs
chan
*
addProv
...
...
@@ -45,6 +45,51 @@ type ProviderManager struct {
cleanupInterval
time
.
Duration
}
type
options
struct
{
cleanupInterval
time
.
Duration
cache
lru
.
LRUCache
}
// Option is a function that sets a provider manager option.
type
Option
func
(
*
options
)
error
func
(
c
*
options
)
apply
(
opts
...
Option
)
error
{
for
i
,
opt
:=
range
opts
{
if
err
:=
opt
(
c
);
err
!=
nil
{
return
fmt
.
Errorf
(
"provider manager option %d failed: %s"
,
i
,
err
)
}
}
return
nil
}
var
defaults
=
func
(
o
*
options
)
error
{
o
.
cleanupInterval
=
defaultCleanupInterval
cache
,
err
:=
lru
.
NewLRU
(
lruCacheSize
,
nil
)
if
err
!=
nil
{
return
err
}
o
.
cache
=
cache
return
nil
}
// CleanupInterval sets the time between GC runs.
// Defaults to 1h.
func
CleanupInterval
(
d
time
.
Duration
)
Option
{
return
func
(
o
*
options
)
error
{
o
.
cleanupInterval
=
d
return
nil
}
}
// Cache sets the LRU cache implementation.
// Defaults to a simple LRU cache.
func
Cache
(
c
lru
.
LRUCache
)
Option
{
return
func
(
o
*
options
)
error
{
o
.
cache
=
c
return
nil
}
}
type
addProv
struct
{
key
[]
byte
val
peer
.
ID
...
...
@@ -56,22 +101,20 @@ type getProv struct {
}
// NewProviderManager constructor
func
NewProviderManager
(
ctx
context
.
Context
,
local
peer
.
ID
,
dstore
ds
.
Batching
)
*
ProviderManager
{
func
NewProviderManager
(
ctx
context
.
Context
,
local
peer
.
ID
,
dstore
ds
.
Batching
,
opts
...
Option
)
(
*
ProviderManager
,
error
)
{
var
options
options
if
err
:=
options
.
apply
(
append
([]
Option
{
defaults
},
opts
...
)
...
);
err
!=
nil
{
return
nil
,
err
}
pm
:=
new
(
ProviderManager
)
pm
.
getprovs
=
make
(
chan
*
getProv
)
pm
.
newprovs
=
make
(
chan
*
addProv
)
pm
.
dstore
=
autobatch
.
NewAutoBatching
(
dstore
,
batchBufferSize
)
cache
,
err
:=
lru
.
NewLRU
(
lruCacheSize
,
nil
)
if
err
!=
nil
{
panic
(
err
)
//only happens if negative value is passed to lru constructor
}
pm
.
cache
=
cache
pm
.
cache
=
options
.
cache
pm
.
proc
=
goprocessctx
.
WithContext
(
ctx
)
pm
.
cleanupInterval
=
defaultC
leanupInterval
pm
.
cleanupInterval
=
options
.
c
leanupInterval
pm
.
proc
.
Go
(
pm
.
run
)
return
pm
return
pm
,
nil
}
// Process returns the ProviderManager process
...
...
providers/providers_manager_test.go
View file @
da8a30f2
...
...
@@ -26,7 +26,10 @@ func TestProviderManager(t *testing.T) {
defer
cancel
()
mid
:=
peer
.
ID
(
"testing"
)
p
:=
NewProviderManager
(
ctx
,
mid
,
dssync
.
MutexWrap
(
ds
.
NewMapDatastore
()))
p
,
err
:=
NewProviderManager
(
ctx
,
mid
,
dssync
.
MutexWrap
(
ds
.
NewMapDatastore
()))
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
a
:=
u
.
Hash
([]
byte
(
"test"
))
p
.
AddProvider
(
ctx
,
a
,
peer
.
ID
(
"testingprovider"
))
...
...
@@ -64,7 +67,10 @@ func TestProvidersDatastore(t *testing.T) {
defer
cancel
()
mid
:=
peer
.
ID
(
"testing"
)
p
:=
NewProviderManager
(
ctx
,
mid
,
dssync
.
MutexWrap
(
ds
.
NewMapDatastore
()))
p
,
err
:=
NewProviderManager
(
ctx
,
mid
,
dssync
.
MutexWrap
(
ds
.
NewMapDatastore
()))
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
defer
p
.
proc
.
Close
()
friend
:=
peer
.
ID
(
"friend"
)
...
...
@@ -144,7 +150,10 @@ func TestProvidesExpire(t *testing.T) {
ds
:=
dssync
.
MutexWrap
(
ds
.
NewMapDatastore
())
mid
:=
peer
.
ID
(
"testing"
)
p
:=
NewProviderManager
(
ctx
,
mid
,
ds
)
p
,
err
:=
NewProviderManager
(
ctx
,
mid
,
ds
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
peers
:=
[]
peer
.
ID
{
"a"
,
"b"
}
var
mhs
[]
mh
.
Multihash
...
...
@@ -249,7 +258,10 @@ func TestLargeProvidersSet(t *testing.T) {
}
mid
:=
peer
.
ID
(
"myself"
)
p
:=
NewProviderManager
(
ctx
,
mid
,
dstore
)
p
,
err
:=
NewProviderManager
(
ctx
,
mid
,
dstore
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
defer
p
.
proc
.
Close
()
var
mhs
[]
mh
.
Multihash
...
...
@@ -281,7 +293,10 @@ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) {
p1
,
p2
:=
peer
.
ID
(
"a"
),
peer
.
ID
(
"b"
)
h1
:=
u
.
Hash
([]
byte
(
"1"
))
h2
:=
u
.
Hash
([]
byte
(
"2"
))
pm
:=
NewProviderManager
(
ctx
,
p1
,
dssync
.
MutexWrap
(
ds
.
NewMapDatastore
()))
pm
,
err
:=
NewProviderManager
(
ctx
,
p1
,
dssync
.
MutexWrap
(
ds
.
NewMapDatastore
()))
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
// add provider
pm
.
AddProvider
(
ctx
,
h1
,
p1
)
...
...
@@ -302,7 +317,10 @@ func TestWriteUpdatesCache(t *testing.T) {
p1
,
p2
:=
peer
.
ID
(
"a"
),
peer
.
ID
(
"b"
)
h1
:=
u
.
Hash
([]
byte
(
"1"
))
pm
:=
NewProviderManager
(
ctx
,
p1
,
dssync
.
MutexWrap
(
ds
.
NewMapDatastore
()))
pm
,
err
:=
NewProviderManager
(
ctx
,
p1
,
dssync
.
MutexWrap
(
ds
.
NewMapDatastore
()))
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
// add provider
pm
.
AddProvider
(
ctx
,
h1
,
p1
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment