Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
p2p
go-p2p-kad-dht
Commits
076b93d1
Unverified
Commit
076b93d1
authored
Mar 13, 2019
by
Steven Allen
Committed by
GitHub
Mar 13, 2019
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #301 from libp2p/fix/dq-defer-start
defer dialqueue action until initial peers have been added
parents
ac677253
bd60c95d
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
32 additions
and
9 deletions
+32
-9
dial_queue.go
dial_queue.go
+19
-9
dial_queue_test.go
dial_queue_test.go
+8
-0
query.go
query.go
+5
-0
No files found.
dial_queue.go
View file @
076b93d1
...
...
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math"
"sync"
"time"
peer
"github.com/libp2p/go-libp2p-peer"
...
...
@@ -31,8 +32,9 @@ const (
type
dialQueue
struct
{
*
dqParams
nWorkers
uint
out
*
queue
.
ChanQueue
nWorkers
uint
out
*
queue
.
ChanQueue
startOnce
sync
.
Once
waitingCh
chan
waitingCh
dieCh
chan
struct
{}
...
...
@@ -90,9 +92,10 @@ type waitingCh struct {
ts
time
.
Time
}
// newDialQueue returns an adaptive dial queue that spawns a dynamically sized set of goroutines to preemptively
// stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both ends (dial consumers
// and dial producers), and takes compensating action by adjusting the worker pool.
// newDialQueue returns an _unstarted_ adaptive dial queue that spawns a dynamically sized set of goroutines to
// preemptively stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both
// ends (dial consumers and dial producers), and takes compensating action by adjusting the worker pool. To
// activate the dial queue, call Start().
//
// Why? Dialing is expensive. It's orders of magnitude slower than running an RPC on an already-established
// connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake,
...
...
@@ -112,7 +115,6 @@ type waitingCh struct {
func
newDialQueue
(
params
*
dqParams
)
(
*
dialQueue
,
error
)
{
dq
:=
&
dialQueue
{
dqParams
:
params
,
nWorkers
:
params
.
config
.
minParallelism
,
out
:
queue
.
NewChanQueue
(
params
.
ctx
,
queue
.
NewXORDistancePQ
(
params
.
target
)),
growCh
:
make
(
chan
struct
{},
1
),
shrinkCh
:
make
(
chan
struct
{},
1
),
...
...
@@ -120,13 +122,21 @@ func newDialQueue(params *dqParams) (*dialQueue, error) {
dieCh
:
make
(
chan
struct
{},
params
.
config
.
maxParallelism
),
}
for
i
:=
0
;
i
<
int
(
params
.
config
.
minParallelism
);
i
++
{
go
dq
.
worker
()
}
go
dq
.
control
()
return
dq
,
nil
}
// Start initiates action on this dial queue. It should only be called once; subsequent calls are ignored.
func
(
dq
*
dialQueue
)
Start
()
{
dq
.
startOnce
.
Do
(
func
()
{
tgt
:=
int
(
dq
.
dqParams
.
config
.
minParallelism
)
for
i
:=
0
;
i
<
tgt
;
i
++
{
go
dq
.
worker
()
}
dq
.
nWorkers
=
uint
(
tgt
)
})
}
func
(
dq
*
dialQueue
)
control
()
{
var
(
dialled
<-
chan
peer
.
ID
...
...
dial_queue_test.go
View file @
076b93d1
...
...
@@ -42,6 +42,8 @@ func TestDialQueueGrowsOnSlowDials(t *testing.T) {
t
.
Error
(
"unexpected error when constructing the dial queue"
,
err
)
}
dq
.
Start
()
for
i
:=
0
;
i
<
4
;
i
++
{
_
=
dq
.
Consume
()
time
.
Sleep
(
100
*
time
.
Millisecond
)
...
...
@@ -86,6 +88,8 @@ func TestDialQueueShrinksWithNoConsumers(t *testing.T) {
t
.
Error
(
"unexpected error when constructing the dial queue"
,
err
)
}
dq
.
Start
()
// acquire 3 consumers, everytime we acquire a consumer, we will grow the pool because no dial job is completed
// and immediately returnable.
for
i
:=
0
;
i
<
3
;
i
++
{
...
...
@@ -158,6 +162,8 @@ func TestDialQueueShrinksWithWhenIdle(t *testing.T) {
t
.
Error
(
"unexpected error when constructing the dial queue"
,
err
)
}
dq
.
Start
()
// keep up to speed with backlog by releasing the dial function every time we acquire a channel.
for
i
:=
0
;
i
<
13
;
i
++
{
ch
:=
dq
.
Consume
()
...
...
@@ -210,6 +216,8 @@ func TestDialQueueMutePeriodHonored(t *testing.T) {
t
.
Error
(
"unexpected error when constructing the dial queue"
,
err
)
}
dq
.
Start
()
// pick up three consumers.
for
i
:=
0
;
i
<
3
;
i
++
{
_
=
dq
.
Consume
()
...
...
query.go
View file @
076b93d1
...
...
@@ -136,6 +136,11 @@ func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryRes
r
.
addPeerToQuery
(
p
)
}
// start the dial queue only after we've added the initial set of peers.
// this is to avoid race conditions that could cause the peersRemaining todoctr
// to be done too early if the initial dial fails before others make it into the queue.
r
.
peersDialed
.
Start
()
// go do this thing.
// do it as a child proc to make sure Run exits
// ONLY AFTER spawn workers has exited.
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment