From 0510dcb8ac7eec7eb1baf5252518f2159b13f5b4 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 31 Mar 2021 13:58:16 +0300 Subject: [PATCH] don't use a goroutine for the actual dial --- swarm_dial.go | 138 ++++++++++++++++++++++++++------------------------ 1 file changed, 72 insertions(+), 66 deletions(-) diff --git a/swarm_dial.go b/swarm_dial.go index 77cb8a6..8f1e2c3 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -290,6 +290,7 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { // lo and behold, The Dialer // TODO explain how all this works ////////////////////////////////////////////////////////////////////////////////// + type DialRequest struct { Ctx context.Context Resch chan DialResponse @@ -300,12 +301,6 @@ type DialResponse struct { Err error } -type dialComplete struct { - addr ma.Multiaddr - conn *Conn - err error -} - // dialWorker is an active dial goroutine that synchronizes and executes concurrent dials func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { if p == s.local { @@ -326,6 +321,7 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial } type addrDial struct { + addr ma.Multiaddr ctx context.Context conn *Conn err error @@ -336,12 +332,40 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial requests := make(map[int]*pendRequest) pending := make(map[ma.Multiaddr]*addrDial) + dispatchError := func(ad *addrDial, err error) { + ad.err = err + for _, reqno := range ad.requests { + pr, ok := requests[reqno] + if !ok { + // has already been dispatched + continue + } + + // accumulate the error + pr.err.recordErr(ad.addr, err) + + delete(pr.addrs, ad.addr) + if len(pr.addrs) == 0 { + // all addrs have erred, dispatch dial error + pr.req.Resch <- DialResponse{Err: pr.err} + delete(requests, reqno) + } + } + + ad.requests = nil + + // if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests + if err == ErrDialBackoff { + delete(pending, ad.addr) + } + } + var triggerDial <-chan time.Time var nextDial []ma.Multiaddr active := 0 done := false - resch := make(chan dialComplete) + resch := make(chan dialResult) loop: for { @@ -408,6 +432,7 @@ loop: // dial to this addr errored, accumulate the error pr.err.recordErr(a, ad.err) delete(pr.addrs, a) + continue } // dial is still pending, add to the join list @@ -430,7 +455,7 @@ loop: if len(todial) > 0 { for _, a := range todial { - pending[a] = &addrDial{ctx: req.Ctx, requests: []int{reqno}} + pending[a] = &addrDial{addr: a, ctx: req.Ctx, requests: []int{reqno}} } nextDial = append(nextDial, todial...) @@ -454,7 +479,12 @@ loop: // spawn the next dial ad := pending[next] - go s.dialNextAddr(ad.ctx, p, next, resch) + err := s.dialNextAddr(ad.ctx, p, next, resch) + if err != nil { + dispatchError(ad, err) + continue loop + } + active++ // select an appropriate delay for the next dial trigger @@ -465,55 +495,54 @@ loop: active-- if done && active == 0 { + if res.Conn != nil { + // we got an actual connection, but the dial has been cancelled + // Should we close it? I think not, we should just add it to the swarm + _, err := s.addConn(res.Conn, network.DirOutbound) + if err != nil { + // well duh, now we have to close it + res.Conn.Close() + } + } return } - ad := pending[res.addr] - ad.conn = res.conn - ad.err = res.err + ad := pending[res.Addr] - dialRequests := ad.requests - ad.requests = nil + if res.Conn != nil { + // we got a connection, add it to the swarm + conn, err := s.addConn(res.Conn, network.DirOutbound) + if err != nil { + // oops no, we failed to add it to the swarm + res.Conn.Close() + dispatchError(ad, err) + continue loop + } - if res.conn != nil { - // we got a connection, dispatch to still pending requests - for _, reqno := range dialRequests { + // dispatch to still pending requests + for _, reqno := range ad.requests { pr, ok := requests[reqno] if !ok { // it has already dispatched a connection continue } - pr.req.Resch <- DialResponse{Conn: res.conn} + pr.req.Resch <- DialResponse{Conn: conn} delete(requests, reqno) } + ad.conn = conn + ad.requests = nil + continue loop } - // it must be an error, accumulate it and dispatch dial error if the request has tried all addrs - for _, reqno := range dialRequests { - pr, ok := requests[reqno] - if !ok { - // has already been dispatched - continue - } - - // accumulate the error - pr.err.recordErr(res.addr, res.err) - - delete(pr.addrs, res.addr) - if len(pr.addrs) == 0 { - // all addrs have erred, dispatch dial error - pr.req.Resch <- DialResponse{Err: pr.err} - delete(requests, reqno) - } + // it must be an error -- add backoff if applicable and dispatch + if res.Err != context.Canceled { + s.backf.AddBackoff(p, res.Addr) } - // if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests - if res.err == ErrDialBackoff { - delete(pending, res.addr) - } + dispatchError(ad, res.Err) } } } @@ -536,41 +565,18 @@ func (s *Swarm) addrsForDial(ctx context.Context, p peer.ID) ([]ma.Multiaddr, er return goodAddrs, nil } -func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, resch chan dialComplete) { +func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, resch chan dialResult) error { // check the dial backoff if forceDirect, _ := network.GetForceDirectDial(ctx); !forceDirect { if s.backf.Backoff(p, addr) { - resch <- dialComplete{addr: addr, err: ErrDialBackoff} - return + return ErrDialBackoff } } // start the dial - dresch := make(chan dialResult) - s.limitedDial(ctx, p, addr, dresch) - select { - case res := <-dresch: - if res.Err != nil { - if res.Err != context.Canceled { - s.backf.AddBackoff(p, addr) - } - - resch <- dialComplete{addr: addr, err: res.Err} - return - } + s.limitedDial(ctx, p, addr, resch) - conn, err := s.addConn(res.Conn, network.DirOutbound) - if err != nil { - res.Conn.Close() - resch <- dialComplete{addr: addr, err: err} - return - } - - resch <- dialComplete{addr: addr, conn: conn} - - case <-ctx.Done(): - resch <- dialComplete{addr: addr, err: ctx.Err()} - } + return nil } func (s *Swarm) delayForNextDial(addr ma.Multiaddr) time.Duration { -- GitLab