Unverified Commit 74d66a08 authored by Raúl Kripalani's avatar Raúl Kripalani Committed by GitHub

implement decaying tags. (#61)

parent 27383946
......@@ -27,27 +27,30 @@ var log = logging.Logger("connmgr")
//
// See configuration parameters in NewConnManager.
type BasicConnMgr struct {
highWater int
lowWater int
connCount int32
gracePeriod time.Duration
segments segments
*decayer
cfg *BasicConnManagerConfig
segments segments
plk sync.RWMutex
protected map[peer.ID]map[string]struct{}
trimTrigger chan chan<- struct{}
// channel-based semaphore that enforces only a single trim is in progress
trimRunningCh chan struct{}
trimTrigger chan chan<- struct{}
connCount int32
lastTrimMu sync.RWMutex
lastTrim time.Time
silencePeriod time.Duration
ctx context.Context
cancel func()
}
var _ connmgr.ConnManager = (*BasicConnMgr)(nil)
var (
_ connmgr.ConnManager = (*BasicConnMgr)(nil)
_ connmgr.Decayer = (*BasicConnMgr)(nil)
)
type segment struct {
sync.Mutex
......@@ -80,6 +83,7 @@ func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives.
temp: true,
tags: make(map[string]int),
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
conns: make(map[network.Conn]time.Time),
}
s.peers[p] = pi
......@@ -92,15 +96,32 @@ func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
// their connections terminated) until 'low watermark' peers remain.
// * grace is the amount of time a newly opened connection is given before it becomes
// subject to pruning.
func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr {
func NewConnManager(low, hi int, grace time.Duration, opts ...Option) *BasicConnMgr {
ctx, cancel := context.WithCancel(context.Background())
cm := &BasicConnMgr{
cfg := &BasicConnManagerConfig{
highWater: hi,
lowWater: low,
gracePeriod: grace,
silencePeriod: SilencePeriod,
}
for _, o := range opts {
// TODO we're ignoring errors from options because we have no way to
// return them, or otherwise act on them.
_ = o(cfg)
}
if cfg.decayer == nil {
// Set the default decayer config.
cfg.decayer = (&DecayerCfg{}).WithDefaults()
}
cm := &BasicConnMgr{
cfg: cfg,
trimRunningCh: make(chan struct{}, 1),
trimTrigger: make(chan chan<- struct{}),
protected: make(map[peer.ID]map[string]struct{}, 16),
silencePeriod: SilencePeriod,
ctx: ctx,
cancel: cancel,
segments: func() (ret segments) {
......@@ -113,11 +134,17 @@ func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr {
}(),
}
decay, _ := NewDecayer(cfg.decayer, cm)
cm.decayer = decay
go cm.background()
return cm
}
func (cm *BasicConnMgr) Close() error {
if err := cm.decayer.Close(); err != nil {
return err
}
cm.cancel()
return nil
}
......@@ -151,10 +178,12 @@ func (cm *BasicConnMgr) Unprotect(id peer.ID, tag string) (protected bool) {
// peerInfo stores metadata for a given peer.
type peerInfo struct {
id peer.ID
tags map[string]int // value for each tag
value int // cached sum of all tag values
temp bool // this is a temporary entry holding early tags, and awaiting connections
id peer.ID
tags map[string]int // value for each tag
decaying map[*decayingTag]*connmgr.DecayingValue // decaying tags
value int // cached sum of all tag values
temp bool // this is a temporary entry holding early tags, and awaiting connections
conns map[network.Conn]time.Time // start time of each connection
......@@ -199,7 +228,7 @@ func (cm *BasicConnMgr) background() {
var waiting chan<- struct{}
select {
case <-ticker.C:
if atomic.LoadInt32(&cm.connCount) < int32(cm.highWater) {
if atomic.LoadInt32(&cm.connCount) < int32(cm.cfg.highWater) {
// Below high water, skip.
continue
}
......@@ -235,7 +264,7 @@ func (cm *BasicConnMgr) trim() {
cm.lastTrimMu.RUnlock()
// skip this attempt to trim if the last one just took place.
if time.Since(lastTrim) < cm.silencePeriod {
if time.Since(lastTrim) < cm.cfg.silencePeriod {
return
}
......@@ -256,13 +285,13 @@ func (cm *BasicConnMgr) trim() {
// getConnsToClose runs the heuristics described in TrimOpenConns and returns the
// connections to close.
func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
if cm.lowWater == 0 || cm.highWater == 0 {
if cm.cfg.lowWater == 0 || cm.cfg.highWater == 0 {
// disabled
return nil
}
nconns := int(atomic.LoadInt32(&cm.connCount))
if nconns <= cm.lowWater {
if nconns <= cm.cfg.lowWater {
log.Info("open connection count below limit")
return nil
}
......@@ -270,7 +299,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
npeers := cm.segments.countPeers()
candidates := make([]*peerInfo, 0, npeers)
ncandidates := 0
gracePeriodStart := time.Now().Add(-cm.gracePeriod)
gracePeriodStart := time.Now().Add(-cm.cfg.gracePeriod)
cm.plk.RLock()
for _, s := range cm.segments {
......@@ -291,7 +320,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
}
cm.plk.RUnlock()
if ncandidates < cm.lowWater {
if ncandidates < cm.cfg.lowWater {
log.Info("open connection count above limit but too many are in the grace period")
// We have too many connections but fewer than lowWater
// connections out of the grace period.
......@@ -311,7 +340,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
return left.value < right.value
})
target := ncandidates - cm.lowWater
target := ncandidates - cm.cfg.lowWater
// slightly overallocate because we may have more than one conns per peer
selected := make([]network.Conn, 0, target+10)
......@@ -363,6 +392,9 @@ func (cm *BasicConnMgr) GetTagInfo(p peer.ID) *connmgr.TagInfo {
for t, v := range pi.tags {
out.Tags[t] = v
}
for t, v := range pi.decaying {
out.Tags[t.name] = v.Value
}
for c, t := range pi.conns {
out.Conns[c.RemoteMultiaddr().String()] = t
}
......@@ -439,10 +471,10 @@ func (cm *BasicConnMgr) GetInfo() CMInfo {
cm.lastTrimMu.RUnlock()
return CMInfo{
HighWater: cm.highWater,
LowWater: cm.lowWater,
HighWater: cm.cfg.highWater,
LowWater: cm.cfg.lowWater,
LastTrim: lastTrim,
GracePeriod: cm.gracePeriod,
GracePeriod: cm.cfg.gracePeriod,
ConnCount: int(atomic.LoadInt32(&cm.connCount)),
}
}
......@@ -478,6 +510,7 @@ func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
id: id,
firstSeen: time.Now(),
tags: make(map[string]int),
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
conns: make(map[network.Conn]time.Time),
}
s.peers[id] = pinfo
......
package connmgr
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/benbjohnson/clock"
)
// DefaultResolution is the default resolution of the decay tracker.
var DefaultResolution = 1 * time.Minute
// bumpCmd represents a bump command.
type bumpCmd struct {
peer peer.ID
tag *decayingTag
delta int
}
// decayer tracks and manages all decaying tags and their values.
type decayer struct {
cfg *DecayerCfg
mgr *BasicConnMgr
clock clock.Clock // for testing.
tagsMu sync.Mutex
knownTags map[string]*decayingTag
// lastTick stores the last time the decayer ticked. Guarded by atomic.
lastTick atomic.Value
// bumpCh queues bump commands to be processed by the loop.
bumpCh chan bumpCmd
// closure thingies.
closeCh chan struct{}
doneCh chan struct{}
err error
}
var _ connmgr.Decayer = (*decayer)(nil)
// DecayerCfg is the configuration object for the Decayer.
type DecayerCfg struct {
Resolution time.Duration
Clock clock.Clock
}
// WithDefaults writes the default values on this DecayerConfig instance,
// and returns itself for chainability.
//
// cfg := (&DecayerCfg{}).WithDefaults()
// cfg.Resolution = 30 * time.Second
// t := NewDecayer(cfg, cm)
func (cfg *DecayerCfg) WithDefaults() *DecayerCfg {
cfg.Resolution = DefaultResolution
return cfg
}
// NewDecayer creates a new decaying tag registry.
func NewDecayer(cfg *DecayerCfg, mgr *BasicConnMgr) (*decayer, error) {
// use real time if the Clock in the config is nil.
if cfg.Clock == nil {
cfg.Clock = clock.New()
}
d := &decayer{
cfg: cfg,
mgr: mgr,
clock: cfg.Clock,
knownTags: make(map[string]*decayingTag),
bumpCh: make(chan bumpCmd, 128),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
d.lastTick.Store(d.clock.Now())
// kick things off.
go d.process()
return d, nil
}
func (d *decayer) RegisterDecayingTag(name string, interval time.Duration, decayFn connmgr.DecayFn, bumpFn connmgr.BumpFn) (connmgr.DecayingTag, error) {
d.tagsMu.Lock()
defer d.tagsMu.Unlock()
tag, ok := d.knownTags[name]
if ok {
return nil, fmt.Errorf("decaying tag with name %s already exists", name)
}
if interval < d.cfg.Resolution {
log.Warnf("decay interval for %s (%s) was lower than tracker's resolution (%s); overridden to resolution",
name, interval, d.cfg.Resolution)
interval = d.cfg.Resolution
}
if interval%d.cfg.Resolution != 0 {
log.Warnf("decay interval for tag %s (%s) is not a multiple of tracker's resolution (%s); "+
"some precision may be lost", name, interval, d.cfg.Resolution)
}
lastTick := d.lastTick.Load().(time.Time)
tag = &decayingTag{
trkr: d,
name: name,
interval: interval,
nextTick: lastTick.Add(interval),
decayFn: decayFn,
bumpFn: bumpFn,
}
d.knownTags[name] = tag
return tag, nil
}
// Close closes the Decayer. It is idempotent.
func (d *decayer) Close() error {
select {
case <-d.doneCh:
return d.err
default:
}
close(d.closeCh)
<-d.doneCh
return d.err
}
// process is the heart of the tracker. It performs the following duties:
//
// 1. Manages decay.
// 2. Applies score bumps.
// 3. Yields when closed.
func (d *decayer) process() {
defer close(d.doneCh)
ticker := d.clock.Ticker(d.cfg.Resolution)
defer ticker.Stop()
var (
bmp bumpCmd
now time.Time
visit = make(map[*decayingTag]struct{})
)
for {
select {
case now = <-ticker.C:
d.lastTick.Store(now)
d.tagsMu.Lock()
for _, tag := range d.knownTags {
if tag.nextTick.After(now) {
// skip the tag.
continue
}
// Mark the tag to be updated in this round.
visit[tag] = struct{}{}
}
d.tagsMu.Unlock()
// Visit each peer, and decay tags that need to be decayed.
for _, s := range d.mgr.segments {
s.Lock()
// Entered a segment that contains peers. Process each peer.
for _, p := range s.peers {
for tag, v := range p.decaying {
if _, ok := visit[tag]; !ok {
// skip this tag.
continue
}
// ~ this value needs to be visited. ~
var delta int
if after, rm := tag.decayFn(*v); rm {
// delete the value and move on to the next tag.
delta -= v.Value
delete(p.decaying, tag)
} else {
// accumulate the delta, and apply the changes.
delta += after - v.Value
v.Value, v.LastVisit = after, now
}
p.value += delta
}
}
s.Unlock()
}
// Reset each tag's next visit round, and clear the visited set.
for tag := range visit {
tag.nextTick = tag.nextTick.Add(tag.interval)
delete(visit, tag)
}
case bmp = <-d.bumpCh:
var (
now = d.clock.Now()
peer, tag = bmp.peer, bmp.tag
)
s := d.mgr.segments.get(peer)
s.Lock()
p := s.tagInfoFor(peer)
v, ok := p.decaying[tag]
if !ok {
v = &connmgr.DecayingValue{
Tag: tag,
Peer: peer,
LastVisit: now,
Added: now,
Value: 0,
}
p.decaying[tag] = v
}
prev := v.Value
v.Value, v.LastVisit = v.Tag.(*decayingTag).bumpFn(*v, bmp.delta), now
p.value += v.Value - prev
s.Unlock()
case <-d.closeCh:
return
}
}
}
// decayingTag represents a decaying tag, with an associated decay interval, a
// decay function, and a bump function.
type decayingTag struct {
trkr *decayer
name string
interval time.Duration
nextTick time.Time
decayFn connmgr.DecayFn
bumpFn connmgr.BumpFn
}
var _ connmgr.DecayingTag = (*decayingTag)(nil)
func (t *decayingTag) Name() string {
return t.name
}
func (t *decayingTag) Interval() time.Duration {
return t.interval
}
// Bump bumps a tag for this peer.
func (t *decayingTag) Bump(p peer.ID, delta int) error {
bmp := bumpCmd{peer: p, tag: t, delta: delta}
select {
case t.trkr.bumpCh <- bmp:
return nil
default:
return fmt.Errorf(
"unable to bump decaying tag for peer %s, tag %s, delta %d; queue full (len=%d)",
p.Pretty(),
t.name,
delta,
len(t.trkr.bumpCh))
}
}
package connmgr
import (
"testing"
"time"
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/peer"
tu "github.com/libp2p/go-libp2p-core/test"
"github.com/stretchr/testify/require"
"github.com/benbjohnson/clock"
)
const TestResolution = 50 * time.Millisecond
func TestDecayExpire(t *testing.T) {
var (
id = tu.RandPeerIDFatal(t)
mgr, decay, mockClock = testDecayTracker(t)
)
tag, err := decay.RegisterDecayingTag("pop", 250*time.Millisecond, connmgr.DecayExpireWhenInactive(1*time.Second), connmgr.BumpSumUnbounded())
if err != nil {
t.Fatal(err)
}
err = tag.Bump(id, 10)
if err != nil {
t.Fatal(err)
}
// give time for the bump command to process.
<-time.After(100 * time.Millisecond)
if v := mgr.GetTagInfo(id).Value; v != 10 {
t.Fatalf("wrong value; expected = %d; got = %d", 10, v)
}
mockClock.Add(250 * time.Millisecond)
mockClock.Add(250 * time.Millisecond)
mockClock.Add(250 * time.Millisecond)
mockClock.Add(250 * time.Millisecond)
if v := mgr.GetTagInfo(id).Value; v != 0 {
t.Fatalf("wrong value; expected = %d; got = %d", 0, v)
}
}
func TestMultipleBumps(t *testing.T) {
var (
id = tu.RandPeerIDFatal(t)
mgr, decay, _ = testDecayTracker(t)
)
tag, err := decay.RegisterDecayingTag("pop", 250*time.Millisecond, connmgr.DecayExpireWhenInactive(1*time.Second), connmgr.BumpSumBounded(10, 20))
if err != nil {
t.Fatal(err)
}
err = tag.Bump(id, 5)
if err != nil {
t.Fatal(err)
}
<-time.After(100 * time.Millisecond)
if v := mgr.GetTagInfo(id).Value; v != 10 {
t.Fatalf("wrong value; expected = %d; got = %d", 10, v)
}
err = tag.Bump(id, 100)
if err != nil {
t.Fatal(err)
}
<-time.After(100 * time.Millisecond)
if v := mgr.GetTagInfo(id).Value; v != 20 {
t.Fatalf("wrong value; expected = %d; got = %d", 20, v)
}
}
func TestMultipleTagsNoDecay(t *testing.T) {
var (
id = tu.RandPeerIDFatal(t)
mgr, decay, _ = testDecayTracker(t)
)
tag1, err := decay.RegisterDecayingTag("beep", 250*time.Millisecond, connmgr.DecayNone(), connmgr.BumpSumBounded(0, 100))
if err != nil {
t.Fatal(err)
}
tag2, err := decay.RegisterDecayingTag("bop", 250*time.Millisecond, connmgr.DecayNone(), connmgr.BumpSumBounded(0, 100))
if err != nil {
t.Fatal(err)
}
tag3, err := decay.RegisterDecayingTag("foo", 250*time.Millisecond, connmgr.DecayNone(), connmgr.BumpSumBounded(0, 100))
if err != nil {
t.Fatal(err)
}
_ = tag1.Bump(id, 100)
_ = tag2.Bump(id, 100)
_ = tag3.Bump(id, 100)
_ = tag1.Bump(id, 100)
_ = tag2.Bump(id, 100)
_ = tag3.Bump(id, 100)
<-time.After(500 * time.Millisecond)
// all tags are upper-bounded, so the score must be 300
ti := mgr.GetTagInfo(id)
if v := ti.Value; v != 300 {
t.Fatalf("wrong value; expected = %d; got = %d", 300, v)
}
for _, s := range []string{"beep", "bop", "foo"} {
if v, ok := ti.Tags[s]; !ok || v != 100 {
t.Fatalf("expected tag %s to be 100; was = %d", s, v)
}
}
}
func TestCustomFunctions(t *testing.T) {
var (
id = tu.RandPeerIDFatal(t)
mgr, decay, mockClock = testDecayTracker(t)
)
tag1, err := decay.RegisterDecayingTag("beep", 250*time.Millisecond, connmgr.DecayFixed(10), connmgr.BumpSumUnbounded())
if err != nil {
t.Fatal(err)
}
tag2, err := decay.RegisterDecayingTag("bop", 100*time.Millisecond, connmgr.DecayFixed(5), connmgr.BumpSumUnbounded())
if err != nil {
t.Fatal(err)
}
tag3, err := decay.RegisterDecayingTag("foo", 50*time.Millisecond, connmgr.DecayFixed(1), connmgr.BumpSumUnbounded())
if err != nil {
t.Fatal(err)
}
_ = tag1.Bump(id, 1000)
_ = tag2.Bump(id, 1000)
_ = tag3.Bump(id, 1000)
<-time.After(500 * time.Millisecond)
// no decay has occurred yet, so score must be 3000.
if v := mgr.GetTagInfo(id).Value; v != 3000 {
t.Fatalf("wrong value; expected = %d; got = %d", 3000, v)
}
// only tag3 should tick.
mockClock.Add(50 * time.Millisecond)
if v := mgr.GetTagInfo(id).Value; v != 2999 {
t.Fatalf("wrong value; expected = %d; got = %d", 2999, v)
}
// tag3 will tick thrice, tag2 will tick twice.
mockClock.Add(150 * time.Millisecond)
if v := mgr.GetTagInfo(id).Value; v != 2986 {
t.Fatalf("wrong value; expected = %d; got = %d", 2986, v)
}
// tag3 will tick once, tag1 will tick once.
mockClock.Add(50 * time.Millisecond)
if v := mgr.GetTagInfo(id).Value; v != 2975 {
t.Fatalf("wrong value; expected = %d; got = %d", 2975, v)
}
}
func TestMultiplePeers(t *testing.T) {
var (
ids = []peer.ID{tu.RandPeerIDFatal(t), tu.RandPeerIDFatal(t), tu.RandPeerIDFatal(t)}
mgr, decay, mockClock = testDecayTracker(t)
)
tag1, err := decay.RegisterDecayingTag("beep", 250*time.Millisecond, connmgr.DecayFixed(10), connmgr.BumpSumUnbounded())
if err != nil {
t.Fatal(err)
}
tag2, err := decay.RegisterDecayingTag("bop", 100*time.Millisecond, connmgr.DecayFixed(5), connmgr.BumpSumUnbounded())
if err != nil {
t.Fatal(err)
}
tag3, err := decay.RegisterDecayingTag("foo", 50*time.Millisecond, connmgr.DecayFixed(1), connmgr.BumpSumUnbounded())
if err != nil {
t.Fatal(err)
}
_ = tag1.Bump(ids[0], 1000)
_ = tag2.Bump(ids[0], 1000)
_ = tag3.Bump(ids[0], 1000)
_ = tag1.Bump(ids[1], 500)
_ = tag2.Bump(ids[1], 500)
_ = tag3.Bump(ids[1], 500)
_ = tag1.Bump(ids[2], 100)
_ = tag2.Bump(ids[2], 100)
_ = tag3.Bump(ids[2], 100)
// allow the background goroutine to process bumps.
<-time.After(500 * time.Millisecond)
mockClock.Add(3 * time.Second)
// allow the background goroutine to process ticks.
<-time.After(500 * time.Millisecond)
if v := mgr.GetTagInfo(ids[0]).Value; v != 2670 {
t.Fatalf("wrong value; expected = %d; got = %d", 2670, v)
}
if v := mgr.GetTagInfo(ids[1]).Value; v != 1170 {
t.Fatalf("wrong value; expected = %d; got = %d", 1170, v)
}
if v := mgr.GetTagInfo(ids[2]).Value; v != 40 {
t.Fatalf("wrong value; expected = %d; got = %d", 40, v)
}
}
func TestLinearDecayOverwrite(t *testing.T) {
var (
id = tu.RandPeerIDFatal(t)
mgr, decay, mockClock = testDecayTracker(t)
)
tag1, err := decay.RegisterDecayingTag("beep", 250*time.Millisecond, connmgr.DecayLinear(0.5), connmgr.BumpOverwrite())
if err != nil {
t.Fatal(err)
}
_ = tag1.Bump(id, 1000)
// allow the background goroutine to process bumps.
<-time.After(500 * time.Millisecond)
mockClock.Add(250 * time.Millisecond)
if v := mgr.GetTagInfo(id).Value; v != 500 {
t.Fatalf("value should be half; got = %d", v)
}
mockClock.Add(250 * time.Millisecond)
if v := mgr.GetTagInfo(id).Value; v != 250 {
t.Fatalf("value should be half; got = %d", v)
}
_ = tag1.Bump(id, 1000)
// allow the background goroutine to process bumps.
<-time.After(500 * time.Millisecond)
if v := mgr.GetTagInfo(id).Value; v != 1000 {
t.Fatalf("value should 1000; got = %d", v)
}
}
func TestResolutionMisaligned(t *testing.T) {
var (
id = tu.RandPeerIDFatal(t)
mgr, decay, mockClock = testDecayTracker(t)
require = require.New(t)
)
tag1, err := decay.RegisterDecayingTag("beep", time.Duration(float64(TestResolution)*1.4), connmgr.DecayFixed(1), connmgr.BumpOverwrite())
require.NoError(err)
tag2, err := decay.RegisterDecayingTag("bop", time.Duration(float64(TestResolution)*2.4), connmgr.DecayFixed(1), connmgr.BumpOverwrite())
require.NoError(err)
_ = tag1.Bump(id, 1000)
_ = tag2.Bump(id, 1000)
// allow the background goroutine to process bumps.
<-time.After(500 * time.Millisecond)
// nothing has happened.
mockClock.Add(TestResolution)
require.Equal(1000, mgr.GetTagInfo(id).Tags["beep"])
require.Equal(1000, mgr.GetTagInfo(id).Tags["bop"])
// next tick; tag1 would've ticked.
mockClock.Add(TestResolution)
require.Equal(999, mgr.GetTagInfo(id).Tags["beep"])
require.Equal(1000, mgr.GetTagInfo(id).Tags["bop"])
// next tick; tag1 would've ticked twice, tag2 once.
mockClock.Add(TestResolution)
require.Equal(998, mgr.GetTagInfo(id).Tags["beep"])
require.Equal(999, mgr.GetTagInfo(id).Tags["bop"])
require.Equal(1997, mgr.GetTagInfo(id).Value)
}
func testDecayTracker(tb testing.TB) (*BasicConnMgr, connmgr.Decayer, *clock.Mock) {
mockClock := clock.NewMock()
cfg := &DecayerCfg{
Resolution: TestResolution,
Clock: mockClock,
}
mgr := NewConnManager(10, 10, 1*time.Second, DecayerConfig(cfg))
decay, ok := connmgr.SupportsDecay(mgr)
if !ok {
tb.Fatalf("connmgr does not support decay")
}
return mgr, decay, mockClock
}
This diff is collapsed.
package connmgr
import "time"
// BasicConnManagerConfig is the configuration struct for the basic connection
// manager.
type BasicConnManagerConfig struct {
highWater int
lowWater int
gracePeriod time.Duration
silencePeriod time.Duration
decayer *DecayerCfg
}
// Option represents an option for the basic connection manager.
type Option func(*BasicConnManagerConfig) error
// DecayerConfig applies a configuration for the decayer.
func DecayerConfig(opts *DecayerCfg) Option {
return func(cfg *BasicConnManagerConfig) error {
cfg.decayer = opts
return nil
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment