From 34d63fed764386e5f03f8fdc45425f29626ea076 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 14 Apr 2016 16:43:42 -0700 Subject: [PATCH] Refactor the swarm dialer --- addr/addr.go | 16 ++- addr/filter.go | 27 +++++ dial_test.go | 36 ------- limiter.go | 136 +++++++++++++++++++++++++ limiter_test.go | 265 ++++++++++++++++++++++++++++++++++++++++++++++++ swarm.go | 5 + swarm_dial.go | 253 +++++++++++++-------------------------------- swarm_test.go | 2 +- 8 files changed, 516 insertions(+), 224 deletions(-) create mode 100644 addr/filter.go create mode 100644 limiter.go create mode 100644 limiter_test.go diff --git a/addr/addr.go b/addr/addr.go index 1e22bae..8c0c94b 100644 --- a/addr/addr.go +++ b/addr/addr.go @@ -42,10 +42,14 @@ func init() { // FilterAddrs is a filter that removes certain addresses, according to filter. // if filter returns true, the address is kept. -func FilterAddrs(a []ma.Multiaddr, filter func(ma.Multiaddr) bool) []ma.Multiaddr { +func FilterAddrs(a []ma.Multiaddr, filters ...func(ma.Multiaddr) bool) []ma.Multiaddr { b := make([]ma.Multiaddr, 0, len(a)) for _, addr := range a { - if filter(addr) { + good := true + for _, filter := range filters { + good = good && filter(addr) + } + if good { b = append(b, addr) } } @@ -56,9 +60,11 @@ func FilterAddrs(a []ma.Multiaddr, filter func(ma.Multiaddr) bool) []ma.Multiadd // from a list. the addresses removed are those known NOT // to work with our network. Namely, addresses with UTP. func FilterUsableAddrs(a []ma.Multiaddr) []ma.Multiaddr { - return FilterAddrs(a, func(m ma.Multiaddr) bool { - return AddrUsable(m, false) - }) + return FilterAddrs(a, AddrUsableFunc) +} + +func AddrUsableFunc(m ma.Multiaddr) bool { + return AddrUsable(m, false) } // AddrOverNonLocalIP returns whether the addr uses a non-local ip link diff --git a/addr/filter.go b/addr/filter.go new file mode 100644 index 0000000..c67e1c6 --- /dev/null +++ b/addr/filter.go @@ -0,0 +1,27 @@ +package addrutil + +import ( + ma "github.com/jbenet/go-multiaddr" + mafmt "github.com/whyrusleeping/mafmt" +) + +func SubtractFilter(addrs ...ma.Multiaddr) func(ma.Multiaddr) bool { + addrmap := make(map[string]bool) + for _, a := range addrs { + addrmap[string(a.Bytes())] = true + } + + return func(a ma.Multiaddr) bool { + return !addrmap[string(a.Bytes())] + } +} + +func IsFDCostlyTransport(a ma.Multiaddr) bool { + return mafmt.TCP.Matches(a) +} + +func FilterNeg(f func(ma.Multiaddr) bool) func(ma.Multiaddr) bool { + return func(a ma.Multiaddr) bool { + return !f(a) + } +} diff --git a/dial_test.go b/dial_test.go index 988f7f7..aea0f72 100644 --- a/dial_test.go +++ b/dial_test.go @@ -2,7 +2,6 @@ package swarm import ( "net" - "sort" "sync" "testing" "time" @@ -493,38 +492,3 @@ func TestDialBackoffClears(t *testing.T) { t.Log("correctly cleared backoff") } } - -func mkAddr(t *testing.T, s string) ma.Multiaddr { - a, err := ma.NewMultiaddr(s) - if err != nil { - t.Fatal(err) - } - - return a -} - -func TestAddressSorting(t *testing.T) { - u1 := mkAddr(t, "/ip4/152.12.23.53/udp/1234/utp") - u2l := mkAddr(t, "/ip4/127.0.0.1/udp/1234/utp") - local := mkAddr(t, "/ip4/127.0.0.1/tcp/1234") - norm := mkAddr(t, "/ip4/6.5.4.3/tcp/1234") - - l := AddrList{local, u1, u2l, norm} - sort.Sort(l) - - if !l[0].Equal(u2l) { - t.Fatal("expected utp local addr to be sorted first: ", l[0]) - } - - if !l[1].Equal(u1) { - t.Fatal("expected utp addr to be sorted second") - } - - if !l[2].Equal(local) { - t.Fatal("expected tcp localhost addr thid") - } - - if !l[3].Equal(norm) { - t.Fatal("expected normal addr last") - } -} diff --git a/limiter.go b/limiter.go new file mode 100644 index 0000000..fe8162d --- /dev/null +++ b/limiter.go @@ -0,0 +1,136 @@ +package swarm + +import ( + "sync" + + peer "github.com/ipfs/go-libp2p-peer" + ma "github.com/jbenet/go-multiaddr" + context "golang.org/x/net/context" + + conn "github.com/ipfs/go-libp2p/p2p/net/conn" + addrutil "github.com/ipfs/go-libp2p/p2p/net/swarm/addr" +) + +type dialResult struct { + Conn conn.Conn + Err error +} + +type dialJob struct { + addr ma.Multiaddr + peer peer.ID + ctx context.Context + resp chan dialResult + success bool +} + +type dialLimiter struct { + rllock sync.Mutex + fdConsuming int + fdLimit int + waitingOnFd []*dialJob + + dialFunc func(context.Context, peer.ID, ma.Multiaddr) (conn.Conn, error) + + activePerPeer map[peer.ID]int + perPeerLimit int + waitingOnPeerLimit map[peer.ID][]*dialJob +} + +type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (conn.Conn, error) + +func newDialLimiter(df dialfunc) *dialLimiter { + return newDialLimiterWithParams(df, concurrentFdDials, defaultPerPeerRateLimit) +} + +func newDialLimiterWithParams(df dialfunc, fdl, ppl int) *dialLimiter { + return &dialLimiter{ + fdLimit: fdl, + perPeerLimit: ppl, + waitingOnPeerLimit: make(map[peer.ID][]*dialJob), + activePerPeer: make(map[peer.ID]int), + dialFunc: df, + } +} + +func (dl *dialLimiter) finishedDial(dj *dialJob) { + dl.rllock.Lock() + defer dl.rllock.Unlock() + + // release tokens in reverse order than we take them + dl.activePerPeer[dj.peer]-- + if dl.activePerPeer[dj.peer] == 0 { + delete(dl.activePerPeer, dj.peer) + } + + waitlist := dl.waitingOnPeerLimit[dj.peer] + if !dj.success && len(waitlist) > 0 { + next := waitlist[0] + if len(waitlist) == 1 { + delete(dl.waitingOnPeerLimit, dj.peer) + } else { + dl.waitingOnPeerLimit[dj.peer] = waitlist[1:] + } + dl.activePerPeer[dj.peer]++ // just kidding, we still want this token + + // can kick this off right here, dials in this list already + // have the other tokens needed + go dl.executeDial(next) + } + + if addrutil.IsFDCostlyTransport(dj.addr) { + dl.fdConsuming-- + if len(dl.waitingOnFd) > 0 { + next := dl.waitingOnFd[0] + dl.waitingOnFd = dl.waitingOnFd[1:] + dl.fdConsuming++ + + // now, attempt to take the 'per peer limit' token + dl.schedulePerPeerDial(next) + } + } +} + +// AddDialJob tries to take the needed tokens for starting the given dial job. +// If it acquires all needed tokens, it immediately starts the dial, otherwise +// it will put it on the waitlist for the requested token. +func (dl *dialLimiter) AddDialJob(dj *dialJob) { + dl.rllock.Lock() + defer dl.rllock.Unlock() + + if addrutil.IsFDCostlyTransport(dj.addr) { + if dl.fdConsuming >= dl.fdLimit { + dl.waitingOnFd = append(dl.waitingOnFd, dj) + return + } + + // take token + dl.fdConsuming++ + } + + dl.schedulePerPeerDial(dj) +} + +// executeDial calls the dialFunc, and reports the result through the response +// channel when finished. Once the response is sent it also releases all tokens +// it held during the dial. +func (dl *dialLimiter) executeDial(j *dialJob) { + defer dl.finishedDial(j) + con, err := dl.dialFunc(j.ctx, j.peer, j.addr) + select { + case j.resp <- dialResult{Conn: con, Err: err}: + case <-j.ctx.Done(): + } +} + +func (dl *dialLimiter) schedulePerPeerDial(j *dialJob) { + if dl.activePerPeer[j.peer] >= dl.perPeerLimit { + wlist := dl.waitingOnPeerLimit[j.peer] + dl.waitingOnPeerLimit[j.peer] = append(wlist, j) + return + } + + // take second needed token and start dial! + dl.activePerPeer[j.peer]++ + go dl.executeDial(j) +} diff --git a/limiter_test.go b/limiter_test.go new file mode 100644 index 0000000..b5bbde8 --- /dev/null +++ b/limiter_test.go @@ -0,0 +1,265 @@ +package swarm + +import ( + "fmt" + "strconv" + "testing" + "time" + + peer "github.com/ipfs/go-libp2p-peer" + ma "github.com/jbenet/go-multiaddr" + mafmt "github.com/whyrusleeping/mafmt" + context "golang.org/x/net/context" + + conn "github.com/ipfs/go-libp2p/p2p/net/conn" +) + +func mustAddr(t *testing.T, s string) ma.Multiaddr { + a, err := ma.NewMultiaddr(s) + if err != nil { + t.Fatal(err) + } + return a +} + +func addrWithPort(t *testing.T, p int) ma.Multiaddr { + return mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p)) +} + +// in these tests I use addresses with tcp ports over a certain number to +// signify 'good' addresses that will succeed, and addresses below that number +// will fail. This lets us more easily test these different scenarios. +func tcpPortOver(a ma.Multiaddr, n int) bool { + port, err := a.ValueForProtocol(ma.P_TCP) + if err != nil { + panic(err) + } + + pnum, err := strconv.Atoi(port) + if err != nil { + panic(err) + } + + return pnum > n +} + +func tryDialAddrs(ctx context.Context, l *dialLimiter, p peer.ID, addrs []ma.Multiaddr, res chan dialResult) { + for _, a := range addrs { + l.AddDialJob(&dialJob{ + ctx: ctx, + peer: p, + addr: a, + resp: res, + }) + } +} + +func hangDialFunc(hang chan struct{}) dialfunc { + return func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) { + if mafmt.UTP.Matches(a) { + return conn.Conn(nil), nil + } + + if tcpPortOver(a, 10) { + return conn.Conn(nil), nil + } else { + <-hang + return nil, fmt.Errorf("test bad dial") + } + } +} + +func TestLimiterBasicDials(t *testing.T) { + hang := make(chan struct{}) + defer close(hang) + + l := newDialLimiterWithParams(hangDialFunc(hang), concurrentFdDials, 4) + + bads := []ma.Multiaddr{ + addrWithPort(t, 1), + addrWithPort(t, 2), + addrWithPort(t, 3), + addrWithPort(t, 4), + } + + good := addrWithPort(t, 20) + + resch := make(chan dialResult) + pid := peer.ID("testpeer") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tryDialAddrs(ctx, l, pid, bads, resch) + + l.AddDialJob(&dialJob{ + ctx: ctx, + peer: pid, + addr: good, + resp: resch, + }) + + select { + case <-resch: + t.Fatal("no dials should have completed!") + case <-time.After(time.Millisecond * 100): + } + + // complete a single hung dial + hang <- struct{}{} + + select { + case r := <-resch: + if r.Err == nil { + t.Fatal("should have gotten failed dial result") + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for dial completion") + } + + select { + case r := <-resch: + if r.Err != nil { + t.Fatal("expected second result to be success!") + } + case <-time.After(time.Second): + } +} + +func TestFDLimiting(t *testing.T) { + hang := make(chan struct{}) + defer close(hang) + l := newDialLimiterWithParams(hangDialFunc(hang), 16, 5) + + bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} + pids := []peer.ID{"testpeer1", "testpeer2", "testpeer3", "testpeer4"} + good_tcp := addrWithPort(t, 20) + + ctx := context.Background() + resch := make(chan dialResult) + + // take all fd limit tokens with hang dials + for _, pid := range pids { + tryDialAddrs(ctx, l, pid, bads, resch) + } + + // these dials should work normally, but will hang because we have taken + // up all the fd limiting + for _, pid := range pids { + l.AddDialJob(&dialJob{ + ctx: ctx, + peer: pid, + addr: good_tcp, + resp: resch, + }) + } + + select { + case <-resch: + t.Fatal("no dials should have completed!") + case <-time.After(time.Millisecond * 100): + } + + pid5 := peer.ID("testpeer5") + utpaddr := mustAddr(t, "/ip4/127.0.0.1/udp/7777/utp") + + l.AddDialJob(&dialJob{ctx: ctx, peer: pid5, addr: utpaddr, resp: resch}) + + select { + case res := <-resch: + if res.Err != nil { + t.Fatal("should have gotten successful response") + } + case <-time.After(time.Second * 5): + t.Fatal("timeout waiting for utp addr success") + } +} + +func TestTokenRedistribution(t *testing.T) { + hangchs := make(map[peer.ID]chan struct{}) + df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) { + if tcpPortOver(a, 10) { + return (conn.Conn)(nil), nil + } else { + <-hangchs[p] + return nil, fmt.Errorf("test bad dial") + } + } + l := newDialLimiterWithParams(df, 8, 4) + + bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} + pids := []peer.ID{"testpeer1", "testpeer2"} + + ctx := context.Background() + resch := make(chan dialResult) + + // take all fd limit tokens with hang dials + for _, pid := range pids { + hangchs[pid] = make(chan struct{}) + tryDialAddrs(ctx, l, pid, bads, resch) + } + + good := mustAddr(t, "/ip4/127.0.0.1/tcp/1001") + + // add a good dial job for peer 1 + l.AddDialJob(&dialJob{ + ctx: ctx, + peer: pids[1], + addr: good, + resp: resch, + }) + + select { + case <-resch: + t.Fatal("no dials should have completed!") + case <-time.After(time.Millisecond * 100): + } + + // unblock one dial for peer 0 + hangchs[pids[0]] <- struct{}{} + + select { + case res := <-resch: + if res.Err == nil { + t.Fatal("should have only been a failure here") + } + case <-time.After(time.Millisecond * 100): + t.Fatal("expected a dial failure here") + } + + select { + case <-resch: + t.Fatal("no more dials should have completed!") + case <-time.After(time.Millisecond * 100): + } + + // add a bad dial job to peer 0 to fill their rate limiter + // and test that more dials for this peer won't interfere with peer 1's successful dial incoming + l.AddDialJob(&dialJob{ + ctx: ctx, + peer: pids[0], + addr: addrWithPort(t, 7), + resp: resch, + }) + + hangchs[pids[1]] <- struct{}{} + + // now one failed dial from peer 1 should get through and fail + // which will in turn unblock the successful dial on peer 1 + select { + case res := <-resch: + if res.Err == nil { + t.Fatal("should have only been a failure here") + } + case <-time.After(time.Millisecond * 100): + t.Fatal("expected a dial failure here") + } + + select { + case res := <-resch: + if res.Err != nil { + t.Fatal("should have succeeded!") + } + case <-time.After(time.Millisecond * 100): + t.Fatal("should have gotten successful dial") + } +} diff --git a/swarm.go b/swarm.go index 74e43dd..54dd903 100644 --- a/swarm.go +++ b/swarm.go @@ -90,6 +90,8 @@ type Swarm struct { proc goprocess.Process ctx context.Context bwc metrics.Reporter + + limiter *dialLimiter } // NewSwarm constructs a Swarm, with a Chan. @@ -122,6 +124,8 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, dialer: conn.NewDialer(local, peers.PrivKey(local), wrap), } + s.limiter = newDialLimiter(s.dialAddr) + // configure Swarm s.proc = goprocessctx.WithContextAndTeardown(ctx, s.teardown) s.SetConnHandler(nil) // make sure to setup our own conn handler. @@ -155,6 +159,7 @@ func filterAddrs(listenAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) { } listenAddrs = filtered } + return listenAddrs, nil } diff --git a/swarm_dial.go b/swarm_dial.go index aa4a351..c1b3a73 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -1,10 +1,8 @@ package swarm import ( - "bytes" "errors" "fmt" - "sort" "sync" "time" @@ -13,7 +11,6 @@ import ( conn "github.com/ipfs/go-libp2p/p2p/net/conn" addrutil "github.com/ipfs/go-libp2p/p2p/net/swarm/addr" ma "github.com/jbenet/go-multiaddr" - "github.com/jbenet/go-multiaddr-net" context "golang.org/x/net/context" ) @@ -42,6 +39,9 @@ const dialAttempts = 1 // number of concurrent outbound dials over transports that consume file descriptors const concurrentFdDials = 160 +// number of concurrent outbound dials to make per peer +const defaultPerPeerRateLimit = 8 + // DialTimeout is the amount of time each dial attempt has. We can think about making // this larger down the road, or putting more granular timeouts (i.e. within each // subcomponent of Dial) @@ -319,32 +319,34 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.") } - // get remote peer addrs - remoteAddrs := s.peers.Addrs(p) - // make sure we can use the addresses. - remoteAddrs = addrutil.FilterUsableAddrs(remoteAddrs) - // drop out any addrs that would just dial ourselves. use ListenAddresses - // as that is a more authoritative view than localAddrs. ila, _ := s.InterfaceListenAddresses() - remoteAddrs = addrutil.Subtract(remoteAddrs, ila) - remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addrs(s.local)) - - log.Debugf("%s swarm dialing %s -- local:%s remote:%s", s.local, p, s.ListenAddresses(), remoteAddrs) - if len(remoteAddrs) == 0 { - err := errors.New("peer has no addresses") - logdial["error"] = err - return nil, err - } - - remoteAddrs = s.filterAddrs(remoteAddrs) - if len(remoteAddrs) == 0 { - err := errors.New("all adresses for peer have been filtered out") - logdial["error"] = err - return nil, err + subtract_filter := addrutil.SubtractFilter(append(ila, s.peers.Addrs(s.local)...)...) + + // get live channel of addresses for peer, filtered by the given filters + /* + remoteAddrChan := s.peers.AddrsChan(ctx, p, + addrutil.AddrUsableFilter, + subtract_filter, + s.Filters.AddrBlocked) + */ + + ////// TEMP UNTIL PEERSTORE GETS UPGRADED + // Ref: https://github.com/ipfs/go-libp2p-peer/pull/1 + paddrs := s.peers.Addrs(p) + good_addrs := addrutil.FilterAddrs(paddrs, + addrutil.AddrUsableFunc, + subtract_filter, + addrutil.FilterNeg(s.Filters.AddrBlocked), + ) + remoteAddrChan := make(chan ma.Multiaddr, len(good_addrs)) + for _, a := range good_addrs { + remoteAddrChan <- a } + close(remoteAddrChan) + ///////// // try to get a connection to any addr - connC, err := s.dialAddrs(ctx, p, remoteAddrs) + connC, err := s.dialAddrs(ctx, p, remoteAddrChan) if err != nil { logdial["error"] = err return nil, err @@ -364,98 +366,64 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { return swarmC, nil } -func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) { - - // sort addresses so preferred addresses are dialed sooner - sort.Sort(AddrList(remoteAddrs)) - - // try to connect to one of the peer's known addresses. - // we dial concurrently to each of the addresses, which: - // * makes the process faster overall - // * attempts to get the fastest connection available. - // * mitigates the waste of trying bad addresses +func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.Multiaddr) (conn.Conn, error) { log.Debugf("%s swarm dialing %s %s", s.local, p, remoteAddrs) ctx, cancel := context.WithCancel(ctx) defer cancel() // cancel work when we exit func - conns := make(chan conn.Conn) - errs := make(chan error, len(remoteAddrs)) + // use a single response type instead of errs and conns, reduces complexity *a ton* + respch := make(chan dialResult) - // dialSingleAddr is used in the rate-limited async thing below. - dialSingleAddr := func(addr ma.Multiaddr) { - // rebind chans in scope so we can nil them out easily - connsout := conns - errsout := errs + defaultDialFail := fmt.Errorf("failed to dial %s (default failure)", p) + exitErr := defaultDialFail - connC, err := s.dialAddr(ctx, p, addr) - if err != nil { - connsout = nil - } else if connC == nil { - // NOTE: this really should never happen - log.Errorf("failed to dial %s %s and got no error!", p, addr) - err = fmt.Errorf("failed to dial %s %s", p, addr) - connsout = nil - } else { - errsout = nil - } - - // check parent still wants our results + var active int + for { select { - case <-ctx.Done(): - if connC != nil { - connC.Close() + case addr, ok := <-remoteAddrs: + if !ok { + remoteAddrs = nil + if active == 0 { + return nil, exitErr + } + continue } - case errsout <- err: - case connsout <- connC: - } - } - // this whole thing is in a goroutine so we can use foundConn - // to end early. - go func() { - limiter := make(chan struct{}, 8) - for _, addr := range remoteAddrs { - // returns whatever ratelimiting is acceptable for workerAddr. - // may not rate limit at all. - rl := s.addrDialRateLimit(addr) - select { - case <-ctx.Done(): // our context was cancelled - return - case rl <- struct{}{}: - // take the token, move on + // limitedDial will start a dial to the given peer when + // it is able, respecting the various different types of rate + // limiting that occur without using extra goroutines per addr + s.limitedDial(ctx, p, addr, respch) + active++ + case <-ctx.Done(): + if exitErr == defaultDialFail { + exitErr = ctx.Err() } - - select { - case <-ctx.Done(): // our context was cancelled - return - case limiter <- struct{}{}: - // take the token, move on + return nil, exitErr + case resp := <-respch: + active-- + if resp.Err != nil { + log.Error("got error on dial: ", resp.Err) + // Errors are normal, lots of dials will fail + exitErr = resp.Err + + if remoteAddrs == nil && active == 0 { + return nil, exitErr + } + } else if resp.Conn != nil { + return resp.Conn, nil } - - go func(rlc <-chan struct{}, a ma.Multiaddr) { - dialSingleAddr(a) - <-limiter - <-rlc - }(rl, addr) - } - }() - - // wair for the results. - exitErr := fmt.Errorf("failed to dial %s", p) - for range remoteAddrs { - select { - case exitErr = <-errs: // - log.Debug("dial error: ", exitErr) - case connC := <-conns: - // take the first + return asap - return connC, nil - case <-ctx.Done(): - // break out and return error - break } } - return nil, exitErr +} + +func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan dialResult) { + s.limiter.AddDialJob(&dialJob{ + addr: a, + peer: p, + resp: resp, + ctx: ctx, + }) } func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) { @@ -485,16 +453,6 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (con return connC, nil } -func (s *Swarm) filterAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { - var out []ma.Multiaddr - for _, a := range addrs { - if !s.Filters.AddrBlocked(a) { - out = append(out, a) - } - } - return out -} - // dialConnSetup is the setup logic for a connection from the dial side. it // needs to add the Conn to the StreamSwarm, then run newConnSetup func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error) { @@ -514,72 +472,3 @@ func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error return swarmC, err } - -// addrDialRateLimit returns a ratelimiting channel for dialing transport -// addrs like a. for example, tcp is fd-ratelimited. utp is not ratelimited. -func (s *Swarm) addrDialRateLimit(a ma.Multiaddr) chan struct{} { - if isFDCostlyTransport(a) { - return s.fdRateLimit - } - - // do not rate limit it at all - return make(chan struct{}, 1) -} - -func isFDCostlyTransport(a ma.Multiaddr) bool { - return isTCPMultiaddr(a) -} - -func isTCPMultiaddr(a ma.Multiaddr) bool { - p := a.Protocols() - return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp" -} - -type AddrList []ma.Multiaddr - -func (al AddrList) Len() int { - return len(al) -} - -func (al AddrList) Swap(i, j int) { - al[i], al[j] = al[j], al[i] -} - -func (al AddrList) Less(i, j int) bool { - a := al[i] - b := al[j] - - // dial localhost addresses next, they should fail immediately - lba := manet.IsIPLoopback(a) - lbb := manet.IsIPLoopback(b) - if lba { - if !lbb { - return true - } - } - - // dial utp and similar 'non-fd-consuming' addresses first - fda := isFDCostlyTransport(a) - fdb := isFDCostlyTransport(b) - if !fda { - if fdb { - return true - } - - // if neither consume fd's, assume equal ordering - return false - } - - // if 'b' doesnt take a file descriptor - if !fdb { - return false - } - - // if 'b' is loopback and both take file descriptors - if lbb { - return false - } - - // for the rest, just sort by bytes - return bytes.Compare(a.Bytes(), b.Bytes()) > 0 -} diff --git a/swarm_test.go b/swarm_test.go index 6e9f121..f5e4549 100644 --- a/swarm_test.go +++ b/swarm_test.go @@ -303,7 +303,7 @@ func TestAddrBlocking(t *testing.T) { swarms := makeSwarms(ctx, t, 2) swarms[0].SetConnHandler(func(conn *Conn) { - t.Fatalf("no connections should happen! -- %s", conn) + t.Errorf("no connections should happen! -- %s", conn) }) _, block, err := net.ParseCIDR("127.0.0.1/8") -- GitLab