Commit 41f47f7f authored by Adin Schmahmann's avatar Adin Schmahmann Committed by Steven Allen

cleanup unused code

parent 4c2cf464
package dht
import (
"context"
"math"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/peer"
queue "github.com/libp2p/go-libp2p-peerstore/queue"
)
const (
// DefaultDialQueueMinParallelism is the default value for the minimum number of worker dial goroutines that will
// be alive at any time.
DefaultDialQueueMinParallelism = 6
// DefaultDialQueueMaxParallelism is the default value for the maximum number of worker dial goroutines that can
// be alive at any time.
DefaultDialQueueMaxParallelism = 20
// DefaultDialQueueMaxIdle is the default value for the period that a worker dial goroutine waits before signalling
// a worker pool downscaling.
DefaultDialQueueMaxIdle = 5 * time.Second
// DefaultDialQueueScalingMutePeriod is the default value for the amount of time to ignore further worker pool
// scaling events, after one is processed. Its role is to reduce jitter.
DefaultDialQueueScalingMutePeriod = 1 * time.Second
// DefaultDialQueueScalingFactor is the default factor by which the current number of workers will be multiplied
// or divided when upscaling and downscaling events occur, respectively.
DefaultDialQueueScalingFactor = 1.5
)
type dialQueue struct {
*dqParams
nWorkers uint
out *queue.ChanQueue
startOnce sync.Once
waitingCh chan waitingCh
dieCh chan struct{}
growCh chan struct{}
shrinkCh chan struct{}
}
type dqParams struct {
ctx context.Context
target string
dialFn func(context.Context, peer.ID) error
in *queue.ChanQueue
config dqConfig
}
type dqConfig struct {
// minParallelism is the minimum number of worker dial goroutines that will be alive at any time.
minParallelism uint
// maxParallelism is the maximum number of worker dial goroutines that can be alive at any time.
maxParallelism uint
// scalingFactor is the factor by which the current number of workers will be multiplied or divided when upscaling
// and downscaling events occur, respectively.
scalingFactor float64
// mutePeriod is the amount of time to ignore further worker pool scaling events, after one is processed.
// Its role is to reduce jitter.
mutePeriod time.Duration
// maxIdle is the period that a worker dial goroutine waits before signalling a worker pool downscaling.
maxIdle time.Duration
}
// dqDefaultConfig returns the default configuration for dial queues. See const documentation to learn the default values.
func dqDefaultConfig() dqConfig {
return dqConfig{
minParallelism: DefaultDialQueueMinParallelism,
maxParallelism: DefaultDialQueueMaxParallelism,
scalingFactor: DefaultDialQueueScalingFactor,
maxIdle: DefaultDialQueueMaxIdle,
mutePeriod: DefaultDialQueueScalingMutePeriod,
}
}
type waitingCh struct {
ch chan<- peer.ID
ts time.Time
}
// newDialQueue returns an _unstarted_ adaptive dial queue that spawns a dynamically sized set of goroutines to
// preemptively stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both
// ends (dial consumers and dial producers), and takes compensating action by adjusting the worker pool. To
// activate the dial queue, call Start().
//
// Why? Dialing is expensive. It's orders of magnitude slower than running an RPC on an already-established
// connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake,
// and protocol negotiation.
//
// We start with config.minParallelism number of workers, and scale up and down based on demand and supply of
// dialled peers.
//
// The following events trigger scaling:
// - we scale up when we can't immediately return a successful dial to a new consumer.
// - we scale down when we've been idle for a while waiting for new dial attempts.
// - we scale down when we complete a dial and realise nobody was waiting for it.
//
// Dialler throttling (e.g. FD limit exceeded) is a concern, as we can easily spin up more workers to compensate, and
// end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency
// to config.maxParallelism.
func newDialQueue(params *dqParams) (*dialQueue, error) {
dq := &dialQueue{
dqParams: params,
out: queue.NewChanQueue(params.ctx, queue.NewXORDistancePQ(params.target)),
growCh: make(chan struct{}, 1),
shrinkCh: make(chan struct{}, 1),
waitingCh: make(chan waitingCh),
dieCh: make(chan struct{}, params.config.maxParallelism),
}
return dq, nil
}
// Start initiates action on this dial queue. It should only be called once; subsequent calls are ignored.
func (dq *dialQueue) Start() {
dq.startOnce.Do(func() {
go dq.control()
})
}
func (dq *dialQueue) control() {
var (
dialled <-chan peer.ID
waiting []waitingCh
lastScalingEvt = time.Now()
)
defer func() {
for _, w := range waiting {
close(w.ch)
}
waiting = nil
}()
// start workers
tgt := int(dq.dqParams.config.minParallelism)
for i := 0; i < tgt; i++ {
go dq.worker()
}
dq.nWorkers = uint(tgt)
// control workers
for {
// First process any backlog of dial jobs and waiters -- making progress is the priority.
// This block is copied below; couldn't find a more concise way of doing this.
select {
case <-dq.ctx.Done():
return
case w := <-dq.waitingCh:
waiting = append(waiting, w)
dialled = dq.out.DeqChan
continue // onto the top.
case p, ok := <-dialled:
if !ok {
return // we're done if the ChanQueue is closed, which happens when the context is closed.
}
w := waiting[0]
logger.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
w.ch <- p
close(w.ch)
waiting = waiting[1:]
if len(waiting) == 0 {
// no more waiters, so stop consuming dialled jobs.
dialled = nil
}
continue // onto the top.
default:
// there's nothing to process, so proceed onto the main select block.
}
select {
case <-dq.ctx.Done():
return
case w := <-dq.waitingCh:
waiting = append(waiting, w)
dialled = dq.out.DeqChan
case p, ok := <-dialled:
if !ok {
return // we're done if the ChanQueue is closed, which happens when the context is closed.
}
w := waiting[0]
logger.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
w.ch <- p
close(w.ch)
waiting = waiting[1:]
if len(waiting) == 0 {
// no more waiters, so stop consuming dialled jobs.
dialled = nil
}
case <-dq.growCh:
if time.Since(lastScalingEvt) < dq.config.mutePeriod {
continue
}
dq.grow()
lastScalingEvt = time.Now()
case <-dq.shrinkCh:
if time.Since(lastScalingEvt) < dq.config.mutePeriod {
continue
}
dq.shrink()
lastScalingEvt = time.Now()
}
}
}
func (dq *dialQueue) Consume() <-chan peer.ID {
ch := make(chan peer.ID, 1)
select {
case p, ok := <-dq.out.DeqChan:
// short circuit and return a dialled peer if it's immediately available, or abort if DeqChan is closed.
if ok {
ch <- p
}
close(ch)
return ch
case <-dq.ctx.Done():
// return a closed channel with no value if we're done.
close(ch)
return ch
default:
}
// we have no finished dials to return, trigger a scale up.
select {
case dq.growCh <- struct{}{}:
default:
}
// park the channel until a dialled peer becomes available.
select {
case dq.waitingCh <- waitingCh{ch, time.Now()}:
// all good
case <-dq.ctx.Done():
// return a closed channel with no value if we're done.
close(ch)
}
return ch
}
func (dq *dialQueue) grow() {
// no mutex needed as this is only called from the (single-threaded) control loop.
defer func(prev uint) {
if prev == dq.nWorkers {
return
}
logger.Debugf("grew dial worker pool: %d => %d", prev, dq.nWorkers)
}(dq.nWorkers)
if dq.nWorkers == dq.config.maxParallelism {
return
}
// choosing not to worry about uint wrapping beyond max value.
target := uint(math.Floor(float64(dq.nWorkers) * dq.config.scalingFactor))
if target > dq.config.maxParallelism {
target = dq.config.maxParallelism
}
for ; dq.nWorkers < target; dq.nWorkers++ {
go dq.worker()
}
}
func (dq *dialQueue) shrink() {
// no mutex needed as this is only called from the (single-threaded) control loop.
defer func(prev uint) {
if prev == dq.nWorkers {
return
}
logger.Debugf("shrunk dial worker pool: %d => %d", prev, dq.nWorkers)
}(dq.nWorkers)
if dq.nWorkers == dq.config.minParallelism {
return
}
target := uint(math.Floor(float64(dq.nWorkers) / dq.config.scalingFactor))
if target < dq.config.minParallelism {
target = dq.config.minParallelism
}
// send as many die signals as workers we have to prune.
for ; dq.nWorkers > target; dq.nWorkers-- {
select {
case dq.dieCh <- struct{}{}:
default:
logger.Debugf("too many die signals queued up.")
}
}
}
func (dq *dialQueue) worker() {
// This idle timer tracks if the environment is slow. If we're waiting to long to acquire a peer to dial,
// it means that the DHT query is progressing slow and we should shrink the worker pool.
idleTimer := time.NewTimer(24 * time.Hour) // placeholder init value which will be overridden immediately.
defer idleTimer.Stop()
for {
// trap exit signals first.
select {
case <-dq.ctx.Done():
return
case <-dq.dieCh:
return
default:
}
idleTimer.Stop()
select {
case <-idleTimer.C:
default:
// NOTE: There is a slight race here. We could be in the
// middle of firing the timer and not read anything from the channel.
//
// However, that's not really a huge issue. We'll think
// we're idle but that's fine.
}
idleTimer.Reset(dq.config.maxIdle)
select {
case <-dq.dieCh:
return
case <-dq.ctx.Done():
return
case <-idleTimer.C:
// no new dial requests during our idle period; time to scale down.
case p, ok := <-dq.in.DeqChan:
if !ok {
return
}
t := time.Now()
if err := dq.dialFn(dq.ctx, p); err != nil {
logger.Debugf("discarding dialled peer because of error: %v", err)
continue
}
logger.Debugf("dialling %v took %dms (as observed by the dht subsystem).", p, time.Since(t)/time.Millisecond)
waiting := len(dq.waitingCh)
// by the time we're done dialling, it's possible that the context is closed, in which case there will
// be nobody listening on dq.out.EnqChan and we could block forever.
select {
case dq.out.EnqChan <- p:
case <-dq.ctx.Done():
return
}
if waiting > 0 {
// we have somebody to deliver this value to, so no need to shrink.
continue
}
}
// scaling down; control only arrives here if the idle timer fires, or if there are no goroutines
// waiting for the value we just produced.
select {
case dq.shrinkCh <- struct{}{}:
default:
}
}
}
package dht
import (
"context"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/peer"
queue "github.com/libp2p/go-libp2p-peerstore/queue"
)
func TestDialQueueGrowsOnSlowDials(t *testing.T) {
in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
hang := make(chan struct{})
var cnt int32
dialFn := func(ctx context.Context, p peer.ID) error {
atomic.AddInt32(&cnt, 1)
<-hang
return nil
}
// Enqueue 20 jobs.
for i := 0; i < 20; i++ {
in.EnqChan <- peer.ID(i)
}
// remove the mute period to grow faster.
config := dqDefaultConfig()
config.maxIdle = 10 * time.Minute
config.mutePeriod = 0
dq, err := newDialQueue(&dqParams{
ctx: context.Background(),
target: "test",
in: in,
dialFn: dialFn,
config: config,
})
if err != nil {
t.Error("unexpected error when constructing the dial queue", err)
}
dq.Start()
for i := 0; i < 4; i++ {
_ = dq.Consume()
time.Sleep(100 * time.Millisecond)
}
for i := 0; i < 20; i++ {
if atomic.LoadInt32(&cnt) > int32(DefaultDialQueueMinParallelism) {
return
}
time.Sleep(100 * time.Millisecond)
}
t.Errorf("expected 19 concurrent dials, got %d", atomic.LoadInt32(&cnt))
}
func TestDialQueueShrinksWithNoConsumers(t *testing.T) {
// reduce interference from the other shrink path.
in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
hang := make(chan struct{})
wg := new(sync.WaitGroup)
wg.Add(13)
dialFn := func(ctx context.Context, p peer.ID) error {
wg.Done()
<-hang
return nil
}
config := dqDefaultConfig()
config.maxIdle = 10 * time.Minute
config.mutePeriod = 0
dq, err := newDialQueue(&dqParams{
ctx: context.Background(),
target: "test",
in: in,
dialFn: dialFn,
config: config,
})
if err != nil {
t.Error("unexpected error when constructing the dial queue", err)
}
dq.Start()
// acquire 3 consumers, everytime we acquire a consumer, we will grow the pool because no dial job is completed
// and immediately returnable.
for i := 0; i < 3; i++ {
_ = dq.Consume()
}
// Enqueue 13 jobs, one per worker we'll grow to.
for i := 0; i < 13; i++ {
in.EnqChan <- peer.ID(i)
}
waitForWg(t, wg, 2*time.Second)
// Release a few dialFn, but not all of them because downscaling happens when workers detect there are no
// consumers to consume their values. So the other three will be these witnesses.
for i := 0; i < 3; i++ {
hang <- struct{}{}
}
// allow enough time for signalling and dispatching values to outstanding consumers.
time.Sleep(1 * time.Second)
// unblock the rest.
for i := 0; i < 10; i++ {
hang <- struct{}{}
}
wg = new(sync.WaitGroup)
// we should now only have 6 workers, because all the shrink events will have been honoured.
wg.Add(6)
// enqueue more jobs.
for i := 0; i < 6; i++ {
in.EnqChan <- peer.ID(i)
}
// let's check we have 6 workers hanging.
waitForWg(t, wg, 2*time.Second)
}
// Inactivity = workers are idle because the DHT query is progressing slow and is producing too few peers to dial.
func TestDialQueueShrinksWithWhenIdle(t *testing.T) {
in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
hang := make(chan struct{})
var wg sync.WaitGroup
wg.Add(13)
dialFn := func(ctx context.Context, p peer.ID) error {
wg.Done()
<-hang
return nil
}
// Enqueue 13 jobs.
for i := 0; i < 13; i++ {
in.EnqChan <- peer.ID(i)
}
config := dqDefaultConfig()
config.maxIdle = 1 * time.Second
config.mutePeriod = 0
dq, err := newDialQueue(&dqParams{
ctx: context.Background(),
target: "test",
in: in,
dialFn: dialFn,
config: config,
})
if err != nil {
t.Error("unexpected error when constructing the dial queue", err)
}
dq.Start()
// keep up to speed with backlog by releasing the dial function every time we acquire a channel.
for i := 0; i < 13; i++ {
ch := dq.Consume()
hang <- struct{}{}
<-ch
time.Sleep(100 * time.Millisecond)
}
// wait for MaxIdlePeriod.
time.Sleep(1500 * time.Millisecond)
// we should now only have 6 workers, because all the shrink events will have been honoured.
wg.Add(6)
// enqueue more jobs
for i := 0; i < 10; i++ {
in.EnqChan <- peer.ID(i)
}
// let's check we have 6 workers hanging.
waitForWg(t, &wg, 2*time.Second)
}
func TestDialQueueMutePeriodHonored(t *testing.T) {
in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
hang := make(chan struct{})
var wg sync.WaitGroup
wg.Add(6)
dialFn := func(ctx context.Context, p peer.ID) error {
wg.Done()
<-hang
return nil
}
// Enqueue a bunch of jobs.
for i := 0; i < 20; i++ {
in.EnqChan <- peer.ID(i)
}
config := dqDefaultConfig()
config.mutePeriod = 2 * time.Second
dq, err := newDialQueue(&dqParams{
ctx: context.Background(),
target: "test",
in: in,
dialFn: dialFn,
config: config,
})
if err != nil {
t.Error("unexpected error when constructing the dial queue", err)
}
dq.Start()
// pick up three consumers.
for i := 0; i < 3; i++ {
_ = dq.Consume()
time.Sleep(100 * time.Millisecond)
}
time.Sleep(500 * time.Millisecond)
// we'll only have 6 workers because the grow signals have been ignored.
waitForWg(t, &wg, 2*time.Second)
}
func waitForWg(t *testing.T, wg *sync.WaitGroup, wait time.Duration) {
t.Helper()
done := make(chan struct{})
go func() {
defer close(done)
wg.Wait()
}()
select {
case <-time.After(wait):
t.Error("timeout while waiting for WaitGroup")
case <-done:
}
}
...@@ -68,91 +68,3 @@ func (ph *peerMetricHeap) Pop() interface{} { ...@@ -68,91 +68,3 @@ func (ph *peerMetricHeap) Pop() interface{} {
ph.data = old[0 : n-1] ph.data = old[0 : n-1]
return item return item
} }
/*
// KPeerSet implements heap.Interface and PeerQueue
type KPeerSet struct {
kvalue int
// from is the Key this PQ measures against
from ks.Key
// heap is a heap of peerDistance items
heap peerMetricHeap
lock sync.RWMutex
}
func (pq *KPeerSet) Len() int {
pq.lock.RLock()
defer pq.lock.RUnlock()
return len(pq.heap)
}
func (pq *KPeerSet) Check(p peer.ID) bool {
pq.lock.RLock()
defer pq.lock.RUnlock()
if pq.heap.Len() < pq.kvalue {
return true
}
distance := ks.XORKeySpace.Key([]byte(p)).Distance(pq.from)
return distance.Cmp(pq.heap[0].metric) != -1
}
func (pq *KPeerSet) Add(p peer.ID) (bool, peer.ID) {
pq.lock.Lock()
defer pq.lock.Unlock()
distance := ks.XORKeySpace.Key([]byte(p)).Distance(pq.from)
var replacedPeer peer.ID
if pq.heap.Len() >= pq.kvalue {
// If we're not closer than the worst peer, drop this.
if distance.Cmp(pq.heap[0].metric) != -1 {
return false, replacedPeer
}
// Replacing something, remove it.
replacedPeer = heap.Pop(&pq.heap).(*peerMetric).peer
}
heap.Push(&pq.heap, &peerMetric{
peer: p,
metric: distance,
})
return true, replacedPeer
}
func (pq *KPeerSet) Remove(id peer.ID) {
pq.lock.Lock()
defer pq.lock.Unlock()
for i, pm := range pq.heap {
if pm.peer == id {
heap.Remove(&pq.heap, i)
return
}
}
}
func (pq *KPeerSet) Peers() []peer.ID {
pq.lock.RLock()
defer pq.lock.RUnlock()
ret := make([]peer.ID, len(pq.heap))
for _, pm := range pq.heap {
ret = append(ret, pm.peer)
}
return ret
}
func New(kvalue int, from string) *KPeerSet {
return &KPeerSet{
from: ks.XORKeySpace.Key([]byte(from)),
kvalue: kvalue,
heap: make([]*peerMetric, 0, kvalue),
}
}
*/
...@@ -26,45 +26,6 @@ func (p peerLatencyMetricList) Less(i, j int) bool { ...@@ -26,45 +26,6 @@ func (p peerLatencyMetricList) Less(i, j int) bool {
func (p peerLatencyMetricList) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func (p peerLatencyMetricList) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p peerLatencyMetricList) GetPeerID(i int) peer.ID { return p[i].peer } func (p peerLatencyMetricList) GetPeerID(i int) peer.ID { return p[i].peer }
func less(pm1, pm2 *peerLatencyMetric) bool {
p1Connectedness, p2Connectedness := pm1.connectedness, pm2.connectedness
p1Latency, p2Latency := pm1.latency, pm2.latency
// Compare latency assuming that connected is lower latency than unconnected
if p1Connectedness == network.Connected {
if p2Connectedness == network.Connected {
return p1Latency < p2Latency
}
return true
}
if p2Connectedness == network.Connected {
return false
}
// Compare latency assuming recent connection is lower latency than older connection.
// TODO: This assumption largely stems from our latency library showing peers we know nothing about as
// having zero latency
if p1Connectedness == network.CanConnect {
if p2Connectedness == network.CanConnect {
return p1Latency > p2Latency
}
return true
}
if p2Connectedness == network.CanConnect {
return false
}
// Check if either peer has proven to be unconnectable, if so rank them low
if p1Connectedness == network.CannotConnect && p2Connectedness != network.CannotConnect {
return false
}
if p2Connectedness == network.CannotConnect && p1Connectedness != network.CannotConnect {
return true
}
return pm1.metric.Cmp(pm2.metric) == -1
}
func calculationLess(pm1, pm2 peerLatencyMetric) bool { func calculationLess(pm1, pm2 peerLatencyMetric) bool {
return calc(pm1).Cmp(calc(pm2)) == -1 return calc(pm1).Cmp(calc(pm2)) == -1
} }
...@@ -110,54 +71,3 @@ func PeersSortedByLatency(peers []IPeerMetric, net network.Network, metrics peer ...@@ -110,54 +71,3 @@ func PeersSortedByLatency(peers []IPeerMetric, net network.Network, metrics peer
sort.Sort(lst) sort.Sort(lst)
return lst return lst
} }
func SortByLatency(net network.Network, metrics peerstore.Metrics) func(peers []*peerMetric) []peer.ID {
return func(peers []*peerMetric) []peer.ID {
metricLst := NewPeerMetricList(peers, func(p1, p2 *peerMetric) bool {
p1Connectedness := net.Connectedness(p1.peer)
p2Connectedness := net.Connectedness(p2.peer)
// Compare latency assuming that connected is lower latency than unconnected
if p1Connectedness == network.Connected {
if p2Connectedness == network.Connected {
return metrics.LatencyEWMA(p1.peer) > metrics.LatencyEWMA(p2.peer)
}
return true
}
if p2Connectedness == network.Connected {
return false
}
// Compare latency assuming recent connection is lower latency than older connection.
// TODO: This assumption largely stems from our latency library showing peers we know nothing about as
// having zero latency
if p1Connectedness == network.CanConnect {
if p2Connectedness == network.CanConnect {
return metrics.LatencyEWMA(p1.peer) > metrics.LatencyEWMA(p2.peer)
}
return true
}
if p2Connectedness == network.CanConnect {
return false
}
// Check if either peer has proven to be unconnectable, if so rank them low
if p1Connectedness == network.CannotConnect && p2Connectedness != network.CannotConnect {
return false
}
if p2Connectedness == network.CannotConnect && p1Connectedness != network.CannotConnect {
return true
}
return p1.metric.Cmp(p2.metric) == -1
})
sort.Stable(metricLst)
peerLst := make([]peer.ID, metricLst.Len())
for i := range peerLst {
peerLst[i] = metricLst.GetPeerID(i)
}
return peerLst
}
}
...@@ -14,34 +14,6 @@ type SortablePeers interface { ...@@ -14,34 +14,6 @@ type SortablePeers interface {
GetPeerID(i int) peer.ID GetPeerID(i int) peer.ID
} }
type comparer func(id1, id2 *peerMetric) bool
type peerMetricList struct {
data []*peerMetric
cmp comparer
}
func (pm peerMetricList) Len() int { return len(pm.data) }
func (pm peerMetricList) Less(i, j int) bool {
return pm.cmp(pm.data[i], pm.data[j])
}
func (pm peerMetricList) Swap(i, j int) {
pm.data[i], pm.data[j] = pm.data[j], pm.data[i]
}
func (pm peerMetricList) GetPeerID(i int) peer.ID {
return pm.data[i].peer
}
func NewPeerMetricList(peers []*peerMetric, cmp func(p1, p2 *peerMetric) bool) peerMetricList {
return peerMetricList{
data: peers,
cmp: cmp,
}
}
func NewSortedPeerset(kvalue int, from string, sortPeers func([]IPeerMetric) SortablePeers) *SortedPeerset { func NewSortedPeerset(kvalue int, from string, sortPeers func([]IPeerMetric) SortablePeers) *SortedPeerset {
fromKey := ks.XORKeySpace.Key([]byte(from)) fromKey := ks.XORKeySpace.Key([]byte(from))
......
...@@ -15,10 +15,10 @@ import ( ...@@ -15,10 +15,10 @@ import (
// ErrNoPeersQueried is returned when we failed to connect to any peers. // ErrNoPeersQueried is returned when we failed to connect to any peers.
var ErrNoPeersQueried = errors.New("failed to query any peers") var ErrNoPeersQueried = errors.New("failed to query any peers")
type qfn func(context.Context, peer.ID) ([]*peer.AddrInfo, error) type queryFn func(context.Context, peer.ID) ([]*peer.AddrInfo, error)
type sfn func(*kpeerset.SortedPeerset) bool type stopFn func(*kpeerset.SortedPeerset) bool
type qu struct { type query struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
...@@ -26,11 +26,11 @@ type qu struct { ...@@ -26,11 +26,11 @@ type qu struct {
localPeers *kpeerset.SortedPeerset localPeers *kpeerset.SortedPeerset
globallyQueriedPeers *peer.Set globallyQueriedPeers *peer.Set
queryFn qfn queryFn queryFn
stopFn sfn stopFn stopFn
} }
func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn qfn, stopFn sfn) ([]*qu, error) { func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) ([]*query, error) {
queryCtx, cancelQuery := context.WithCancel(ctx) queryCtx, cancelQuery := context.WithCancel(ctx)
numQueriesComplete := 0 numQueriesComplete := 0
...@@ -49,11 +49,11 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string ...@@ -49,11 +49,11 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string
seedPeers[i], seedPeers[j] = seedPeers[j], seedPeers[i] seedPeers[i], seedPeers[j] = seedPeers[j], seedPeers[i]
}) })
queries := make([]*qu, d) queries := make([]*query, d)
peersQueried := peer.NewSet() peersQueried := peer.NewSet()
for i := 0; i < d; i++ { for i := 0; i < d; i++ {
query := &qu{ query := &query{
ctx: queryCtx, ctx: queryCtx,
cancel: cancelQuery, cancel: cancelQuery,
dht: dht, dht: dht,
...@@ -98,7 +98,7 @@ func (dht *IpfsDHT) sortPeers(peers []kpeerset.IPeerMetric) kpeerset.SortablePee ...@@ -98,7 +98,7 @@ func (dht *IpfsDHT) sortPeers(peers []kpeerset.IPeerMetric) kpeerset.SortablePee
return kpeerset.PeersSortedByLatency(peers, dht.host.Network(), dht.peerstore) return kpeerset.PeersSortedByLatency(peers, dht.host.Network(), dht.peerstore)
} }
func strictParallelismQuery(q *qu) { func strictParallelismQuery(q *query) {
/* /*
start with K closest peers (some queried already some not) start with K closest peers (some queried already some not)
take best alpha (sorted by some metric) take best alpha (sorted by some metric)
...@@ -150,7 +150,7 @@ func strictParallelismQuery(q *qu) { ...@@ -150,7 +150,7 @@ func strictParallelismQuery(q *qu) {
} }
} }
func simpleQuery(q *qu) { func simpleQuery(q *query) {
/* /*
start with K closest peers (some queried already some not) start with K closest peers (some queried already some not)
take best alpha (sorted by some metric) take best alpha (sorted by some metric)
...@@ -210,7 +210,7 @@ func simpleQuery(q *qu) { ...@@ -210,7 +210,7 @@ func simpleQuery(q *qu) {
} }
} }
func boundedDialQuery(q *qu) { func boundedDialQuery(q *query) {
/* /*
start with K closest peers (some queried already some not) start with K closest peers (some queried already some not)
take best alpha (sorted by some metric) take best alpha (sorted by some metric)
...@@ -268,7 +268,7 @@ type queryResult struct { ...@@ -268,7 +268,7 @@ type queryResult struct {
foundCloserPeer bool foundCloserPeer bool
} }
func (q *qu) queryPeer(ctx context.Context, p peer.ID) *queryResult { func (q *query) queryPeer(ctx context.Context, p peer.ID) *queryResult {
dialCtx, queryCtx := ctx, ctx dialCtx, queryCtx := ctx, ctx
if err := q.dht.dialPeer(dialCtx, p); err != nil { if err := q.dht.dialPeer(dialCtx, p); err != nil {
......
...@@ -320,9 +320,9 @@ func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte ...@@ -320,9 +320,9 @@ func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte
} }
} }
func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan []*qu) { func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan []*query) {
valCh := make(chan RecvdVal, 1) valCh := make(chan RecvdVal, 1)
queriesCh := make(chan []*qu, 1) queriesCh := make(chan []*query, 1)
if rec, err := dht.getLocal(key); rec != nil && err == nil { if rec, err := dht.getLocal(key); rec != nil && err == nil {
select { select {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment