From 1624828080b83bf2b7ea7b0d4c8290b3f73f1b79 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 1 Apr 2021 12:13:03 +0300 Subject: [PATCH] make DialRequest and DialResponse private --- dial_sync.go | 14 +++++++------- dial_sync_test.go | 20 +++++++++----------- swarm_dial.go | 42 +++++++++++++++++++++--------------------- 3 files changed, 37 insertions(+), 39 deletions(-) diff --git a/dial_sync.go b/dial_sync.go index ae3578a..24781dd 100644 --- a/dial_sync.go +++ b/dial_sync.go @@ -12,8 +12,8 @@ import ( // TODO: change this text when we fix the bug var errDialCanceled = errors.New("dial was aborted internally, likely due to https://git.io/Je2wW") -// DialFunc is the type of function expected by DialSync. -type DialWorkerFunc func(context.Context, peer.ID, <-chan DialRequest) error +// DialWorerFunc is used by DialSync to spawn a new dial worker +type DialWorkerFunc func(context.Context, peer.ID, <-chan dialRequest) error // NewDialSync constructs a new DialSync func NewDialSync(worker DialWorkerFunc) *DialSync { @@ -38,7 +38,7 @@ type activeDial struct { ctx context.Context cancel func() - reqch chan DialRequest + reqch chan dialRequest ds *DialSync } @@ -64,16 +64,16 @@ func (ad *activeDial) dial(ctx context.Context, p peer.ID) (*Conn, error) { dialCtx = network.WithSimultaneousConnect(dialCtx, reason) } - resch := make(chan DialResponse, 1) + resch := make(chan dialResponse, 1) select { - case ad.reqch <- DialRequest{Ctx: dialCtx, Resch: resch}: + case ad.reqch <- dialRequest{ctx: dialCtx, resch: resch}: case <-ctx.Done(): return nil, ctx.Err() } select { case res := <-resch: - return res.Conn, res.Err + return res.conn, res.err case <-ctx.Done(): return nil, ctx.Err() } @@ -94,7 +94,7 @@ func (ds *DialSync) getActiveDial(p peer.ID) (*activeDial, error) { id: p, ctx: adctx, cancel: cancel, - reqch: make(chan DialRequest), + reqch: make(chan dialRequest), ds: ds, } diff --git a/dial_sync_test.go b/dial_sync_test.go index f1a9f8a..e414dd5 100644 --- a/dial_sync_test.go +++ b/dial_sync_test.go @@ -1,4 +1,4 @@ -package swarm_test +package swarm import ( "context" @@ -7,8 +7,6 @@ import ( "testing" "time" - . "github.com/libp2p/go-libp2p-swarm" - "github.com/libp2p/go-libp2p-core/peer" ) @@ -16,7 +14,7 @@ func getMockDialFunc() (DialWorkerFunc, func(), context.Context, <-chan struct{} dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care dialctx, cancel := context.WithCancel(context.Background()) ch := make(chan struct{}) - f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { + f := func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { dfcalls <- struct{}{} go func() { defer cancel() @@ -29,9 +27,9 @@ func getMockDialFunc() (DialWorkerFunc, func(), context.Context, <-chan struct{} select { case <-ch: - req.Resch <- DialResponse{Conn: new(Conn)} + req.resch <- dialResponse{conn: new(Conn)} case <-ctx.Done(): - req.Resch <- DialResponse{Err: ctx.Err()} + req.resch <- dialResponse{err: ctx.Err()} return } case <-ctx.Done(): @@ -189,7 +187,7 @@ func TestDialSyncAllCancel(t *testing.T) { func TestFailFirst(t *testing.T) { var count int - f := func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { + f := func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { go func() { for { select { @@ -199,9 +197,9 @@ func TestFailFirst(t *testing.T) { } if count > 0 { - req.Resch <- DialResponse{Conn: new(Conn)} + req.resch <- dialResponse{conn: new(Conn)} } else { - req.Resch <- DialResponse{Err: fmt.Errorf("gophers ate the modem")} + req.resch <- dialResponse{err: fmt.Errorf("gophers ate the modem")} } count++ @@ -236,7 +234,7 @@ func TestFailFirst(t *testing.T) { } func TestStressActiveDial(t *testing.T) { - ds := NewDialSync(func(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { + ds := NewDialSync(func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { go func() { for { select { @@ -245,7 +243,7 @@ func TestStressActiveDial(t *testing.T) { return } - req.Resch <- DialResponse{} + req.resch <- dialResponse{} case <-ctx.Done(): return } diff --git a/swarm_dial.go b/swarm_dial.go index 1a5cb12..ab95a46 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -285,18 +285,18 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { // TODO explain how all this works ////////////////////////////////////////////////////////////////////////////////// -type DialRequest struct { - Ctx context.Context - Resch chan DialResponse +type dialRequest struct { + ctx context.Context + resch chan dialResponse } -type DialResponse struct { - Conn *Conn - Err error +type dialResponse struct { + conn *Conn + err error } // dialWorker is an active dial goroutine that synchronizes and executes concurrent dials -func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan DialRequest) error { +func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error { if p == s.local { return ErrDialToSelf } @@ -305,11 +305,11 @@ func (s *Swarm) dialWorker(ctx context.Context, p peer.ID, reqch <-chan DialRequ return nil } -func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan DialRequest) { +func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan dialRequest) { defer s.limiter.clearAllPeerDials(p) type pendRequest struct { - req DialRequest // the original request + req dialRequest // the original request err *DialError // dial error accumulator addrs map[ma.Multiaddr]struct{} // pending addr dials } @@ -344,11 +344,11 @@ func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan Dial // all addrs have erred, dispatch dial error // but first do a last one check in case an acceptable connection has landed from // a simultaneous dial that started later and added new acceptable addrs - c := s.bestAcceptableConnToPeer(pr.req.Ctx, p) + c := s.bestAcceptableConnToPeer(pr.req.ctx, p) if c != nil { - pr.req.Resch <- DialResponse{Conn: c} + pr.req.resch <- dialResponse{conn: c} } else { - pr.req.Resch <- DialResponse{Err: pr.err} + pr.req.resch <- dialResponse{err: pr.err} } delete(requests, reqno) } @@ -390,15 +390,15 @@ loop: return } - c := s.bestAcceptableConnToPeer(req.Ctx, p) + c := s.bestAcceptableConnToPeer(req.ctx, p) if c != nil { - req.Resch <- DialResponse{Conn: c} + req.resch <- dialResponse{conn: c} continue loop } - addrs, err := s.addrsForDial(req.Ctx, p) + addrs, err := s.addrsForDial(req.ctx, p) if err != nil { - req.Resch <- DialResponse{Err: err} + req.resch <- dialResponse{err: err} continue loop } @@ -430,7 +430,7 @@ loop: if ad.conn != nil { // dial to this addr was successful, complete the request - req.Resch <- DialResponse{Conn: ad.conn} + req.resch <- dialResponse{conn: ad.conn} continue loop } @@ -447,7 +447,7 @@ loop: if len(todial) == 0 && len(tojoin) == 0 { // all request applicable addrs have been dialed, we must have errored - req.Resch <- DialResponse{Err: pr.err} + req.resch <- dialResponse{err: pr.err} continue loop } @@ -457,14 +457,14 @@ loop: for _, ad := range tojoin { if !ad.dialed { - ad.ctx = s.mergeDialContexts(ad.ctx, req.Ctx) + ad.ctx = s.mergeDialContexts(ad.ctx, req.ctx) } ad.requests = append(ad.requests, reqno) } if len(todial) > 0 { for _, a := range todial { - pending[a] = &addrDial{addr: a, ctx: req.Ctx, requests: []int{reqno}} + pending[a] = &addrDial{addr: a, ctx: req.ctx, requests: []int{reqno}} } nextDial = append(nextDial, todial...) @@ -550,7 +550,7 @@ loop: continue } - pr.req.Resch <- DialResponse{Conn: conn} + pr.req.resch <- dialResponse{conn: conn} delete(requests, reqno) } -- GitLab