Commit 0510dcb8 authored by vyzo's avatar vyzo

don't use a goroutine for the actual dial

parent 2ee7bf0a
......@@ -290,6 +290,7 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) {
// lo and behold, The Dialer
// TODO explain how all this works
//////////////////////////////////////////////////////////////////////////////////
type DialRequest struct {
Ctx context.Context
Resch chan DialResponse
......@@ -300,12 +301,6 @@ type DialResponse struct {
Err error
}
type dialComplete struct {
addr ma.Multiaddr
conn *Conn
err error
}
// dialWorker is an active dial goroutine that synchronizes and executes concurrent dials
func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error {
if p == s.local {
......@@ -326,6 +321,7 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial
}
type addrDial struct {
addr ma.Multiaddr
ctx context.Context
conn *Conn
err error
......@@ -336,12 +332,40 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial
requests := make(map[int]*pendRequest)
pending := make(map[ma.Multiaddr]*addrDial)
dispatchError := func(ad *addrDial, err error) {
ad.err = err
for _, reqno := range ad.requests {
pr, ok := requests[reqno]
if !ok {
// has already been dispatched
continue
}
// accumulate the error
pr.err.recordErr(ad.addr, err)
delete(pr.addrs, ad.addr)
if len(pr.addrs) == 0 {
// all addrs have erred, dispatch dial error
pr.req.Resch <- DialResponse{Err: pr.err}
delete(requests, reqno)
}
}
ad.requests = nil
// if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests
if err == ErrDialBackoff {
delete(pending, ad.addr)
}
}
var triggerDial <-chan time.Time
var nextDial []ma.Multiaddr
active := 0
done := false
resch := make(chan dialComplete)
resch := make(chan dialResult)
loop:
for {
......@@ -408,6 +432,7 @@ loop:
// dial to this addr errored, accumulate the error
pr.err.recordErr(a, ad.err)
delete(pr.addrs, a)
continue
}
// dial is still pending, add to the join list
......@@ -430,7 +455,7 @@ loop:
if len(todial) > 0 {
for _, a := range todial {
pending[a] = &addrDial{ctx: req.Ctx, requests: []int{reqno}}
pending[a] = &addrDial{addr: a, ctx: req.Ctx, requests: []int{reqno}}
}
nextDial = append(nextDial, todial...)
......@@ -454,7 +479,12 @@ loop:
// spawn the next dial
ad := pending[next]
go s.dialNextAddr(ad.ctx, p, next, resch)
err := s.dialNextAddr(ad.ctx, p, next, resch)
if err != nil {
dispatchError(ad, err)
continue loop
}
active++
// select an appropriate delay for the next dial trigger
......@@ -465,55 +495,54 @@ loop:
active--
if done && active == 0 {
if res.Conn != nil {
// we got an actual connection, but the dial has been cancelled
// Should we close it? I think not, we should just add it to the swarm
_, err := s.addConn(res.Conn, network.DirOutbound)
if err != nil {
// well duh, now we have to close it
res.Conn.Close()
}
}
return
}
ad := pending[res.addr]
ad.conn = res.conn
ad.err = res.err
ad := pending[res.Addr]
dialRequests := ad.requests
ad.requests = nil
if res.Conn != nil {
// we got a connection, add it to the swarm
conn, err := s.addConn(res.Conn, network.DirOutbound)
if err != nil {
// oops no, we failed to add it to the swarm
res.Conn.Close()
dispatchError(ad, err)
continue loop
}
if res.conn != nil {
// we got a connection, dispatch to still pending requests
for _, reqno := range dialRequests {
// dispatch to still pending requests
for _, reqno := range ad.requests {
pr, ok := requests[reqno]
if !ok {
// it has already dispatched a connection
continue
}
pr.req.Resch <- DialResponse{Conn: res.conn}
pr.req.Resch <- DialResponse{Conn: conn}
delete(requests, reqno)
}
ad.conn = conn
ad.requests = nil
continue loop
}
// it must be an error, accumulate it and dispatch dial error if the request has tried all addrs
for _, reqno := range dialRequests {
pr, ok := requests[reqno]
if !ok {
// has already been dispatched
continue
}
// accumulate the error
pr.err.recordErr(res.addr, res.err)
delete(pr.addrs, res.addr)
if len(pr.addrs) == 0 {
// all addrs have erred, dispatch dial error
pr.req.Resch <- DialResponse{Err: pr.err}
delete(requests, reqno)
}
// it must be an error -- add backoff if applicable and dispatch
if res.Err != context.Canceled {
s.backf.AddBackoff(p, res.Addr)
}
// if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests
if res.err == ErrDialBackoff {
delete(pending, res.addr)
}
dispatchError(ad, res.Err)
}
}
}
......@@ -536,41 +565,18 @@ func (s *Swarm) addrsForDial(ctx context.Context, p peer.ID) ([]ma.Multiaddr, er
return goodAddrs, nil
}
func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, resch chan dialComplete) {
func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, resch chan dialResult) error {
// check the dial backoff
if forceDirect, _ := network.GetForceDirectDial(ctx); !forceDirect {
if s.backf.Backoff(p, addr) {
resch <- dialComplete{addr: addr, err: ErrDialBackoff}
return
return ErrDialBackoff
}
}
// start the dial
dresch := make(chan dialResult)
s.limitedDial(ctx, p, addr, dresch)
select {
case res := <-dresch:
if res.Err != nil {
if res.Err != context.Canceled {
s.backf.AddBackoff(p, addr)
}
resch <- dialComplete{addr: addr, err: res.Err}
return
}
s.limitedDial(ctx, p, addr, resch)
conn, err := s.addConn(res.Conn, network.DirOutbound)
if err != nil {
res.Conn.Close()
resch <- dialComplete{addr: addr, err: err}
return
}
resch <- dialComplete{addr: addr, conn: conn}
case <-ctx.Done():
resch <- dialComplete{addr: addr, err: ctx.Err()}
}
return nil
}
func (s *Swarm) delayForNextDial(addr ma.Multiaddr) time.Duration {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment