Commit 0bad3225 authored by Juan Benet's avatar Juan Benet

Merge pull request #1943 from ipfs/dht/speed

fix swarm dial backoff
parents 51d6f29f 30b4c9e6
......@@ -79,6 +79,8 @@ type dhtQueryRunner struct {
rateLimit chan struct{} // processing semaphore
log logging.EventLogger
runCtx context.Context
proc process.Process
sync.RWMutex
}
......@@ -98,6 +100,7 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
r.log = log
r.runCtx = ctx
if len(peers) == 0 {
log.Warning("Running query with no peers!")
......@@ -167,6 +170,11 @@ func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) {
return
}
notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
Type: notif.AddingPeer,
ID: next,
})
r.peersRemaining.Increment(1)
select {
case r.peersToQuery.EnqChan <- next:
......@@ -221,7 +229,12 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
// make sure we're connected to the peer.
// FIXME abstract away into the network layer
if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 {
log.Infof("not connected. dialing.")
log.Error("not connected. dialing.")
notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
Type: notif.DialingPeer,
ID: p,
})
// while we dial, we do not take up a rate limit. this is to allow
// forward progress during potentially very high latency dials.
r.rateLimit <- struct{}{}
......@@ -231,9 +244,10 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
if err := r.query.dht.host.Connect(ctx, pi); err != nil {
log.Debugf("Error connecting: %s", err)
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
Type: notif.QueryError,
Extra: err.Error(),
ID: p,
})
r.Lock()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment