Commit 6ab98475 authored by tavit ohanian's avatar tavit ohanian

reference basis

parents 1c8d3d07 e6dd8fb3
Pipeline #471 failed with stages
in 0 seconds
# File managed by web3-bot. DO NOT EDIT.
# See https://github.com/protocol/.github/ for details.
# Automatically merge pull requests opened by web3-bot, as soon as (and only if) all tests pass.
# This reduces the friction associated with updating with our workflows.
on: [ pull_request ]
jobs:
automerge:
if: github.event.pull_request.user.login == 'web3-bot'
runs-on: ubuntu-latest
steps:
- name: Wait on tests
uses: lewagon/wait-on-check-action@bafe56a6863672c681c3cf671f5e10b20abf2eaa # v0.2
with:
ref: ${{ github.event.pull_request.head.sha }}
repo-token: ${{ secrets.GITHUB_TOKEN }}
wait-interval: 10
running-workflow-name: 'automerge' # the name of this job
- name: Merge PR
uses: pascalgn/automerge-action@741c311a47881be9625932b0a0de1b0937aab1ae # v0.13.1
env:
GITHUB_TOKEN: "${{ secrets.GITHUB_TOKEN }}"
MERGE_LABELS: ""
MERGE_METHOD: "squash"
MERGE_DELETE_BRANCH: true
# File managed by web3-bot. DO NOT EDIT.
# See https://github.com/protocol/.github/ for details.
on: [push, pull_request]
jobs:
unit:
runs-on: ubuntu-latest
name: Go checks
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: "1.16.x"
- name: Install staticcheck
run: go install honnef.co/go/tools/cmd/staticcheck@be534f007836a777104a15f2456cd1fffd3ddee8 # v2020.2.2
- name: Check that go.mod is tidy
run: |
go mod tidy
if [[ -n $(git ls-files --other --exclude-standard --directory -- go.sum) ]]; then
echo "go.sum was added by go mod tidy"
exit 1
fi
git diff --exit-code -- go.sum go.mod
- name: gofmt
if: ${{ success() || failure() }} # run this step even if the previous one failed
run: |
out=$(gofmt -s -l .)
if [[ -n "$out" ]]; then
echo $out | awk '{print "::error file=" $0 ",line=0,col=0::File is not gofmt-ed."}'
exit 1
fi
- name: go vet
if: ${{ success() || failure() }} # run this step even if the previous one failed
run: go vet ./...
- name: staticcheck
if: ${{ success() || failure() }} # run this step even if the previous one failed
run: |
set -o pipefail
staticcheck ./... | sed -e 's@\(.*\)\.go@./\1.go@g'
# File managed by web3-bot. DO NOT EDIT.
# See https://github.com/protocol/.github/ for details.
on: [push, pull_request]
jobs:
unit:
strategy:
fail-fast: false
matrix:
os: [ "ubuntu", "windows", "macos" ]
go: [ "1.15.x", "1.16.x" ]
runs-on: ${{ matrix.os }}-latest
name: Unit tests (${{ matrix.os}}, Go ${{ matrix.go }})
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go }}
- name: Go information
run: |
go version
go env
- name: Run tests
run: go test -v -coverprofile coverage.txt ./...
- name: Run tests (32 bit)
if: ${{ matrix.os != 'macos' }} # can't run 32 bit tests on OSX.
env:
GOARCH: 386
run: go test -v ./...
- name: Run tests with race detector
if: ${{ matrix.os == 'ubuntu' }} # speed things up. Windows and OSX VMs are slow
run: go test -v -race ./...
- name: Upload coverage to Codecov
uses: codecov/codecov-action@967e2b38a85a62bd61be5529ada27ebc109948c2 # v1.4.1
with:
file: coverage.txt
env_vars: OS=${{ matrix.os }}, GO=${{ matrix.go }}
The MIT License (MIT)
Copyright (c) 2016 Jeromy Johnson
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
# go-p2p-swarm
go-libp2p-swarm
==================
dms3 p2p swarm
\ No newline at end of file
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai)
[![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](https://libp2p.io/)
[![](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p)
[![Coverage Status](https://coveralls.io/repos/github/libp2p/go-libp2p-swarm/badge.svg?branch=master)](https://coveralls.io/github/libp2p/go-libp2p-swarm?branch=master)
[![Travis CI](https://travis-ci.org/libp2p/go-libp2p-swarm.svg?branch=master)](https://travis-ci.org/libp2p/go-libp2p-swarm)
[![Discourse posts](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg)](https://discuss.libp2p.io)
> The libp2p swarm manages groups of connections to peers, and handles incoming and outgoing streams.
The libp2p swarm is the 'low level' interface for working with a given libp2p
network. It gives you more fine grained control over various aspects of the
system. Most applications don't need this level of access, so the `Swarm` is
generally wrapped in a `Host` abstraction that provides a more friendly
interface. See [the host interface](https://godoc.org/github.com/libp2p/go-libp2p-core/host#Host)
for more info on that.
## Table of Contents
- [Install](#install)
- [Contribute](#contribute)
- [License](#license)
## Install
```sh
make install
```
## Usage
### Creating a swarm
To construct a swarm, you'll be calling `NewSwarm`. That function looks like this:
```go
swarm, err := NewSwarm(ctx, laddrs, pid, pstore, bwc)
```
It takes five items to fully construct a swarm, the first is a go
`context.Context`. This controls the lifetime of the swarm, and all swarm
processes have their lifespan derived from the given context. You can just use
`context.Background()` if you're not concerned with that.
The next argument is an array of multiaddrs that the swarm will open up
listeners for. Once started, the swarm will start accepting and handling
incoming connections on every given address. This argument is optional, you can
pass `nil` and the swarm will not listen for any incoming connections (but will
still be able to dial out to other peers).
After that, you'll need to give the swarm an identity in the form of a peer.ID.
If you're not wanting to enable secio (libp2p's transport layer encryption),
then you can pick any string for this value. For example `peer.ID("FooBar123")`
would work. Note that passing a random string ID will result in your node not
being able to communicate with other peers that have correctly generated IDs.
To see how to generate a proper ID, see the below section on "Identity
Generation".
The fourth argument is a peerstore. This is essentially a database that the
swarm will use to store peer IDs, addresses, public keys, protocol preferences
and more. You can construct one by importing
`github.com/libp2p/go-libp2p-peerstore` and calling `peerstore.NewPeerstore()`.
The final argument is a bandwidth metrics collector, This is used to track
incoming and outgoing bandwidth on connections managed by this swarm. It is
optional, and passing `nil` will simply result in no metrics for connections
being available.
#### Identity Generation
A proper libp2p identity is PKI based. We currently have support for RSA and ed25519 keys. To create a 'correct' ID, you'll need to either load or generate a new keypair. Here is an example of doing so:
```go
import (
"fmt"
"crypto/rand"
ci "github.com/libp2p/go-libp2p-crypto"
pstore "github.com/libp2p/go-libp2p-peerstore"
peer "github.com/libp2p/go-libp2p-peer"
)
func demo() {
// First, select a source of entropy. We're using the stdlib's crypto reader here
src := rand.Reader
// Now create a 2048 bit RSA key using that
priv, pub, err := ci.GenerateKeyPairWithReader(ci.RSA, 2048, src)
if err != nil {
panic(err) // oh no!
}
// Now that we have a keypair, lets create our identity from it
pid, err := peer.IDFromPrivateKey(priv)
if err != nil {
panic(err)
}
// Woo! Identity acquired!
fmt.Println("I am ", pid)
// Now, for the purposes of building a swarm, lets add this all to a peerstore.
ps := pstore.NewPeerstore()
ps.AddPubKey(pid, pub)
ps.AddPrivKey(pid, priv)
// Once you've got all that, creating a basic swarm can be as easy as
ctx := context.Background()
swarm, err := NewSwarm(ctx, nil, pid, ps, nil)
// voila! A functioning swarm!
}
```
### Streams
The swarm is designed around using multiplexed streams to communicate with
other peers. When working with a swarm, you will want to set a function to
handle incoming streams from your peers:
```go
swrm.SetStreamHandler(func(s inet.Stream) {
defer s.Close()
fmt.Println("Got a stream from: ", s.SwarmConn().RemotePeer())
fmt.Fprintln(s, "Hello Friend!")
})
```
Tip: Always make sure to close streams when you're done with them.
Opening streams is also pretty simple:
```go
s, err := swrm.NewStreamWithPeer(ctx, rpid)
if err != nil {
panic(err)
}
defer s.Close()
io.Copy(os.Stdout, s) // pipe the stream to stdout
```
Just pass a context and the ID of the peer you want a stream to, and you'll get
back a stream to read and write on.
## Contribute
PRs are welcome!
Small note: If editing the Readme, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification.
## License
MIT © Jeromy Johnson
---
The last gx published version of this module was: 3.0.35: QmQVoMEL1CxrVusTSUdYsiJXVBnvSqNUpBsGybkwSfksEF
package swarm
import (
filter "github.com/libp2p/go-maddr-filter"
ma "github.com/multiformats/go-multiaddr"
mamask "github.com/whyrusleeping/multiaddr-filter"
)
// http://www.iana.org/assignments/iana-ipv4-special-registry/iana-ipv4-special-registry.xhtml
var lowTimeoutFilters = ma.NewFilters()
func init() {
for _, p := range []string{
"/ip4/10.0.0.0/ipcidr/8",
"/ip4/100.64.0.0/ipcidr/10",
"/ip4/169.254.0.0/ipcidr/16",
"/ip4/172.16.0.0/ipcidr/12",
"/ip4/192.0.0.0/ipcidr/24",
"/ip4/192.0.0.0/ipcidr/29",
"/ip4/192.0.0.8/ipcidr/32",
"/ip4/192.0.0.170/ipcidr/32",
"/ip4/192.0.0.171/ipcidr/32",
"/ip4/192.0.2.0/ipcidr/24",
"/ip4/192.168.0.0/ipcidr/16",
"/ip4/198.18.0.0/ipcidr/15",
"/ip4/198.51.100.0/ipcidr/24",
"/ip4/203.0.113.0/ipcidr/24",
"/ip4/240.0.0.0/ipcidr/4",
} {
f, err := mamask.NewMask(p)
if err != nil {
panic("error in lowTimeoutFilters init: " + err.Error())
}
lowTimeoutFilters.AddFilter(*f, filter.ActionDeny)
}
}
coverage:
range: "50...100"
comment: off
package swarm
import (
"fmt"
"os"
"strings"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
)
// maxDialDialErrors is the maximum number of dial errors we record
const maxDialDialErrors = 16
// DialError is the error type returned when dialing.
type DialError struct {
Peer peer.ID
DialErrors []TransportError
Cause error
Skipped int
}
func (e *DialError) Timeout() bool {
return os.IsTimeout(e.Cause)
}
func (e *DialError) recordErr(addr ma.Multiaddr, err error) {
if len(e.DialErrors) >= maxDialDialErrors {
e.Skipped++
return
}
e.DialErrors = append(e.DialErrors, TransportError{
Address: addr,
Cause: err,
})
}
func (e *DialError) Error() string {
var builder strings.Builder
fmt.Fprintf(&builder, "failed to dial %s:", e.Peer)
if e.Cause != nil {
fmt.Fprintf(&builder, " %s", e.Cause)
}
for _, te := range e.DialErrors {
fmt.Fprintf(&builder, "\n * [%s] %s", te.Address, te.Cause)
}
if e.Skipped > 0 {
fmt.Fprintf(&builder, "\n ... skipping %d errors ...", e.Skipped)
}
return builder.String()
}
// Unwrap implements https://godoc.org/golang.org/x/xerrors#Wrapper.
func (e *DialError) Unwrap() error {
return e.Cause
}
var _ error = (*DialError)(nil)
// TransportError is the error returned when dialing a specific address.
type TransportError struct {
Address ma.Multiaddr
Cause error
}
func (e *TransportError) Error() string {
return fmt.Sprintf("failed to dial %s: %s", e.Address, e.Cause)
}
var _ error = (*TransportError)(nil)
package swarm
import (
"context"
"sync"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
)
// DialWorerFunc is used by DialSync to spawn a new dial worker
type dialWorkerFunc func(context.Context, peer.ID, <-chan dialRequest) error
// newDialSync constructs a new DialSync
func newDialSync(worker dialWorkerFunc) *DialSync {
return &DialSync{
dials: make(map[peer.ID]*activeDial),
dialWorker: worker,
}
}
// DialSync is a dial synchronization helper that ensures that at most one dial
// to any given peer is active at any given time.
type DialSync struct {
dials map[peer.ID]*activeDial
dialsLk sync.Mutex
dialWorker dialWorkerFunc
}
type activeDial struct {
id peer.ID
refCnt int
ctx context.Context
cancel func()
reqch chan dialRequest
ds *DialSync
}
func (ad *activeDial) decref() {
ad.ds.dialsLk.Lock()
ad.refCnt--
if ad.refCnt == 0 {
ad.cancel()
close(ad.reqch)
delete(ad.ds.dials, ad.id)
}
ad.ds.dialsLk.Unlock()
}
func (ad *activeDial) dial(ctx context.Context, p peer.ID) (*Conn, error) {
dialCtx := ad.ctx
if forceDirect, reason := network.GetForceDirectDial(ctx); forceDirect {
dialCtx = network.WithForceDirectDial(dialCtx, reason)
}
if simConnect, reason := network.GetSimultaneousConnect(ctx); simConnect {
dialCtx = network.WithSimultaneousConnect(dialCtx, reason)
}
resch := make(chan dialResponse, 1)
select {
case ad.reqch <- dialRequest{ctx: dialCtx, resch: resch}:
case <-ctx.Done():
return nil, ctx.Err()
}
select {
case res := <-resch:
return res.conn, res.err
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (ds *DialSync) getActiveDial(p peer.ID) (*activeDial, error) {
ds.dialsLk.Lock()
defer ds.dialsLk.Unlock()
actd, ok := ds.dials[p]
if !ok {
// This code intentionally uses the background context. Otherwise, if the first call
// to Dial is canceled, subsequent dial calls will also be canceled.
// XXX: this also breaks direct connection logic. We will need to pipe the
// information through some other way.
adctx, cancel := context.WithCancel(context.Background())
actd = &activeDial{
id: p,
ctx: adctx,
cancel: cancel,
reqch: make(chan dialRequest),
ds: ds,
}
err := ds.dialWorker(adctx, p, actd.reqch)
if err != nil {
cancel()
return nil, err
}
ds.dials[p] = actd
}
// increase ref count before dropping dialsLk
actd.refCnt++
return actd, nil
}
// DialLock initiates a dial to the given peer if there are none in progress
// then waits for the dial to that peer to complete.
func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error) {
ad, err := ds.getActiveDial(p)
if err != nil {
return nil, err
}
defer ad.decref()
return ad.dial(ctx, p)
}
package swarm
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/peer"
)
func getMockDialFunc() (dialWorkerFunc, func(), context.Context, <-chan struct{}) {
dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care
dialctx, cancel := context.WithCancel(context.Background())
ch := make(chan struct{})
f := func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error {
dfcalls <- struct{}{}
go func() {
defer cancel()
for {
select {
case req, ok := <-reqch:
if !ok {
return
}
select {
case <-ch:
req.resch <- dialResponse{conn: new(Conn)}
case <-ctx.Done():
req.resch <- dialResponse{err: ctx.Err()}
return
}
case <-ctx.Done():
return
}
}
}()
return nil
}
o := new(sync.Once)
return f, func() { o.Do(func() { close(ch) }) }, dialctx, dfcalls
}
func TestBasicDialSync(t *testing.T) {
df, done, _, callsch := getMockDialFunc()
dsync := newDialSync(df)
p := peer.ID("testpeer")
ctx := context.Background()
finished := make(chan struct{})
go func() {
_, err := dsync.DialLock(ctx, p)
if err != nil {
t.Error(err)
}
finished <- struct{}{}
}()
go func() {
_, err := dsync.DialLock(ctx, p)
if err != nil {
t.Error(err)
}
finished <- struct{}{}
}()
// short sleep just to make sure we've moved around in the scheduler
time.Sleep(time.Millisecond * 20)
done()
<-finished
<-finished
if len(callsch) > 1 {
t.Fatal("should only have called dial func once!")
}
}
func TestDialSyncCancel(t *testing.T) {
df, done, _, dcall := getMockDialFunc()
dsync := newDialSync(df)
p := peer.ID("testpeer")
ctx1, cancel1 := context.WithCancel(context.Background())
finished := make(chan struct{})
go func() {
_, err := dsync.DialLock(ctx1, p)
if err != ctx1.Err() {
t.Error("should have gotten context error")
}
finished <- struct{}{}
}()
// make sure the above makes it through the wait code first
select {
case <-dcall:
case <-time.After(time.Second):
t.Fatal("timed out waiting for dial to start")
}
// Add a second dialwait in so two actors are waiting on the same dial
go func() {
_, err := dsync.DialLock(context.Background(), p)
if err != nil {
t.Error(err)
}
finished <- struct{}{}
}()
time.Sleep(time.Millisecond * 20)
// cancel the first dialwait, it should not affect the second at all
cancel1()
select {
case <-finished:
case <-time.After(time.Second):
t.Fatal("timed out waiting for wait to exit")
}
// short sleep just to make sure we've moved around in the scheduler
time.Sleep(time.Millisecond * 20)
done()
<-finished
}
func TestDialSyncAllCancel(t *testing.T) {
df, done, dctx, _ := getMockDialFunc()
dsync := newDialSync(df)
p := peer.ID("testpeer")
ctx1, cancel1 := context.WithCancel(context.Background())
finished := make(chan struct{})
go func() {
_, err := dsync.DialLock(ctx1, p)
if err != ctx1.Err() {
t.Error("should have gotten context error")
}
finished <- struct{}{}
}()
// Add a second dialwait in so two actors are waiting on the same dial
go func() {
_, err := dsync.DialLock(ctx1, p)
if err != ctx1.Err() {
t.Error("should have gotten context error")
}
finished <- struct{}{}
}()
cancel1()
for i := 0; i < 2; i++ {
select {
case <-finished:
case <-time.After(time.Second):
t.Fatal("timed out waiting for wait to exit")
}
}
// the dial should have exited now
select {
case <-dctx.Done():
case <-time.After(time.Second):
t.Fatal("timed out waiting for dial to return")
}
// should be able to successfully dial that peer again
done()
_, err := dsync.DialLock(context.Background(), p)
if err != nil {
t.Fatal(err)
}
}
func TestFailFirst(t *testing.T) {
var count int
f := func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error {
go func() {
for {
select {
case req, ok := <-reqch:
if !ok {
return
}
if count > 0 {
req.resch <- dialResponse{conn: new(Conn)}
} else {
req.resch <- dialResponse{err: fmt.Errorf("gophers ate the modem")}
}
count++
case <-ctx.Done():
return
}
}
}()
return nil
}
ds := newDialSync(f)
p := peer.ID("testing")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_, err := ds.DialLock(ctx, p)
if err == nil {
t.Fatal("expected gophers to have eaten the modem")
}
c, err := ds.DialLock(ctx, p)
if err != nil {
t.Fatal(err)
}
if c == nil {
t.Fatal("should have gotten a 'real' conn back")
}
}
func TestStressActiveDial(t *testing.T) {
ds := newDialSync(func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error {
go func() {
for {
select {
case req, ok := <-reqch:
if !ok {
return
}
req.resch <- dialResponse{}
case <-ctx.Done():
return
}
}
}()
return nil
})
wg := sync.WaitGroup{}
pid := peer.ID("foo")
makeDials := func() {
for i := 0; i < 10000; i++ {
ds.DialLock(context.Background(), pid)
}
wg.Done()
}
for i := 0; i < 100; i++ {
wg.Add(1)
go makeDials()
}
wg.Wait()
}
func TestDialSelf(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
self := peer.ID("ABC")
s := NewSwarm(ctx, self, nil, nil)
defer s.Close()
// this should fail
_, err := s.dsync.DialLock(ctx, self)
if err != ErrDialToSelf {
t.Fatal("expected error from self dial")
}
// do it twice to make sure we get a new active dial object that fails again
_, err = s.dsync.DialLock(ctx, self)
if err != ErrDialToSelf {
t.Fatal("expected error from self dial")
}
}
package swarm_test
import (
"context"
"net"
"sync"
"testing"
"time"
addrutil "github.com/libp2p/go-addr-util"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/transport"
testutil "github.com/libp2p/go-libp2p-core/test"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
"github.com/libp2p/go-libp2p-testing/ci"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
. "github.com/libp2p/go-libp2p-swarm"
)
func init() {
transport.DialTimeout = time.Second
}
func closeSwarms(swarms []*Swarm) {
for _, s := range swarms {
s.Close()
}
}
func TestBasicDialPeer(t *testing.T) {
t.Parallel()
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
defer closeSwarms(swarms)
s1 := swarms[0]
s2 := swarms[1]
s1.Peerstore().AddAddrs(s2.LocalPeer(), s2.ListenAddresses(), peerstore.PermanentAddrTTL)
c, err := s1.DialPeer(ctx, s2.LocalPeer())
if err != nil {
t.Fatal(err)
}
s, err := c.NewStream(ctx)
if err != nil {
t.Fatal(err)
}
s.Close()
}
func TestDialWithNoListeners(t *testing.T) {
t.Parallel()
ctx := context.Background()
s1 := makeDialOnlySwarm(ctx, t)
swarms := makeSwarms(ctx, t, 1)
defer closeSwarms(swarms)
s2 := swarms[0]
s1.Peerstore().AddAddrs(s2.LocalPeer(), s2.ListenAddresses(), peerstore.PermanentAddrTTL)
c, err := s1.DialPeer(ctx, s2.LocalPeer())
if err != nil {
t.Fatal(err)
}
s, err := c.NewStream(ctx)
if err != nil {
t.Fatal(err)
}
s.Close()
}
func acceptAndHang(l net.Listener) {
conns := make([]net.Conn, 0, 10)
for {
c, err := l.Accept()
if err != nil {
break
}
if c != nil {
conns = append(conns, c)
}
}
for _, c := range conns {
c.Close()
}
}
func TestSimultDials(t *testing.T) {
// t.Skip("skipping for another test")
t.Parallel()
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2, swarmt.OptDisableReuseport)
// connect everyone
{
var wg sync.WaitGroup
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
// copy for other peer
log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.LocalPeer(), dst, addr)
s.Peerstore().AddAddr(dst, addr, peerstore.TempAddrTTL)
if _, err := s.DialPeer(ctx, dst); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
wg.Done()
}
ifaceAddrs0, err := swarms[0].InterfaceListenAddresses()
if err != nil {
t.Fatal(err)
}
ifaceAddrs1, err := swarms[1].InterfaceListenAddresses()
if err != nil {
t.Fatal(err)
}
log.Info("Connecting swarms simultaneously.")
for i := 0; i < 10; i++ { // connect 10x for each.
wg.Add(2)
go connect(swarms[0], swarms[1].LocalPeer(), ifaceAddrs1[0])
go connect(swarms[1], swarms[0].LocalPeer(), ifaceAddrs0[0])
}
wg.Wait()
}
// should still just have 1, at most 2 connections :)
c01l := len(swarms[0].ConnsToPeer(swarms[1].LocalPeer()))
if c01l > 2 {
t.Error("0->1 has", c01l)
}
c10l := len(swarms[1].ConnsToPeer(swarms[0].LocalPeer()))
if c10l > 2 {
t.Error("1->0 has", c10l)
}
for _, s := range swarms {
s.Close()
}
}
func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) {
dst := testutil.RandPeerIDFatal(t)
lst, err := net.Listen("tcp4", "localhost:0")
if err != nil {
t.Fatal(err)
}
addr, err := manet.FromNetAddr(lst.Addr())
if err != nil {
t.Fatal(err)
}
addrs := []ma.Multiaddr{addr}
addrs, err = addrutil.ResolveUnspecifiedAddresses(addrs, nil)
if err != nil {
t.Fatal(err)
}
t.Log("new silent peer:", dst, addrs[0])
return dst, addrs[0], lst
}
func TestDialWait(t *testing.T) {
t.Parallel()
ctx := context.Background()
swarms := makeSwarms(ctx, t, 1)
s1 := swarms[0]
defer s1.Close()
// dial to a non-existent peer.
s2p, s2addr, s2l := newSilentPeer(t)
go acceptAndHang(s2l)
defer s2l.Close()
s1.Peerstore().AddAddr(s2p, s2addr, peerstore.PermanentAddrTTL)
before := time.Now()
if c, err := s1.DialPeer(ctx, s2p); err == nil {
defer c.Close()
t.Fatal("error swarm dialing to unknown peer worked...", err)
} else {
t.Log("correctly got error:", err)
}
duration := time.Since(before)
if duration < transport.DialTimeout*DialAttempts {
t.Error("< transport.DialTimeout * DialAttempts not being respected", duration, transport.DialTimeout*DialAttempts)
}
if duration > 2*transport.DialTimeout*DialAttempts {
t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts)
}
if !s1.Backoff().Backoff(s2p, s2addr) {
t.Error("s2 should now be on backoff")
}
}
func TestDialBackoff(t *testing.T) {
// t.Skip("skipping for another test")
if ci.IsRunning() {
t.Skip("travis will never have fun with this test")
}
t.Parallel()
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
s1 := swarms[0]
s2 := swarms[1]
defer s1.Close()
defer s2.Close()
s2addrs, err := s2.InterfaceListenAddresses()
if err != nil {
t.Fatal(err)
}
s1.Peerstore().AddAddrs(s2.LocalPeer(), s2addrs, peerstore.PermanentAddrTTL)
// dial to a non-existent peer.
s3p, s3addr, s3l := newSilentPeer(t)
go acceptAndHang(s3l)
defer s3l.Close()
s1.Peerstore().AddAddr(s3p, s3addr, peerstore.PermanentAddrTTL)
// in this test we will:
// 1) dial 10x to each node.
// 2) all dials should hang
// 3) s1->s2 should succeed.
// 4) s1->s3 should not (and should place s3 on backoff)
// 5) disconnect entirely
// 6) dial 10x to each node again
// 7) s3 dials should all return immediately (except 1)
// 8) s2 dials should all hang, and succeed
// 9) last s3 dial ends, unsuccessful
dialOnlineNode := func(dst peer.ID, times int) <-chan bool {
ch := make(chan bool)
for i := 0; i < times; i++ {
go func() {
if _, err := s1.DialPeer(ctx, dst); err != nil {
t.Error("error dialing", dst, err)
ch <- false
} else {
ch <- true
}
}()
}
return ch
}
dialOfflineNode := func(dst peer.ID, times int) <-chan bool {
ch := make(chan bool)
for i := 0; i < times; i++ {
go func() {
if c, err := s1.DialPeer(ctx, dst); err != nil {
ch <- false
} else {
t.Error("succeeded in dialing", dst)
ch <- true
c.Close()
}
}()
}
return ch
}
{
// 1) dial 10x to each node.
N := 10
s2done := dialOnlineNode(s2.LocalPeer(), N)
s3done := dialOfflineNode(s3p, N)
// when all dials should be done by:
dialTimeout1x := time.After(transport.DialTimeout)
dialTimeout10Ax := time.After(transport.DialTimeout * 2 * 10) // DialAttempts * 10)
// 2) all dials should hang
select {
case <-s2done:
t.Error("s2 should not happen immediately")
case <-s3done:
t.Error("s3 should not happen yet")
case <-time.After(time.Millisecond):
// s2 may finish very quickly, so let's get out.
}
// 3) s1->s2 should succeed.
for i := 0; i < N; i++ {
select {
case r := <-s2done:
if !r {
t.Error("s2 should not fail")
}
case <-s3done:
t.Error("s3 should not happen yet")
case <-dialTimeout1x:
t.Error("s2 took too long")
}
}
select {
case <-s2done:
t.Error("s2 should have no more")
case <-s3done:
t.Error("s3 should not happen yet")
case <-dialTimeout1x: // let it pass
}
// 4) s1->s3 should not (and should place s3 on backoff)
// N-1 should finish before dialTimeout1x * 2
for i := 0; i < N; i++ {
select {
case <-s2done:
t.Error("s2 should have no more")
case r := <-s3done:
if r {
t.Error("s3 should not succeed")
}
case <-(dialTimeout1x):
if i < (N - 1) {
t.Fatal("s3 took too long")
}
t.Log("dialTimeout1x * 1.3 hit for last peer")
case <-dialTimeout10Ax:
t.Fatal("s3 took too long")
}
}
// check backoff state
if s1.Backoff().Backoff(s2.LocalPeer(), s2addrs[0]) {
t.Error("s2 should not be on backoff")
}
if !s1.Backoff().Backoff(s3p, s3addr) {
t.Error("s3 should be on backoff")
}
// 5) disconnect entirely
for _, c := range s1.Conns() {
c.Close()
}
for i := 0; i < 100 && len(s1.Conns()) > 0; i++ {
<-time.After(time.Millisecond)
}
if len(s1.Conns()) > 0 {
t.Fatal("s1 conns must exit")
}
}
{
// 6) dial 10x to each node again
N := 10
s2done := dialOnlineNode(s2.LocalPeer(), N)
s3done := dialOfflineNode(s3p, N)
// when all dials should be done by:
dialTimeout1x := time.After(transport.DialTimeout)
dialTimeout10Ax := time.After(transport.DialTimeout * 2 * 10) // DialAttempts * 10)
// 7) s3 dials should all return immediately (except 1)
for i := 0; i < N-1; i++ {
select {
case <-s2done:
t.Error("s2 should not succeed yet")
case r := <-s3done:
if r {
t.Error("s3 should not succeed")
}
case <-dialTimeout1x:
t.Fatal("s3 took too long")
}
}
// 8) s2 dials should all hang, and succeed
for i := 0; i < N; i++ {
select {
case r := <-s2done:
if !r {
t.Error("s2 should succeed")
}
// case <-s3done:
case <-(dialTimeout1x):
t.Fatal("s3 took too long")
}
}
// 9) the last s3 should return, failed.
select {
case <-s2done:
t.Error("s2 should have no more")
case r := <-s3done:
if r {
t.Error("s3 should not succeed")
}
case <-dialTimeout10Ax:
t.Fatal("s3 took too long")
}
// check backoff state (the same)
if s1.Backoff().Backoff(s2.LocalPeer(), s2addrs[0]) {
t.Error("s2 should not be on backoff")
}
if !s1.Backoff().Backoff(s3p, s3addr) {
t.Error("s3 should be on backoff")
}
}
}
func TestDialBackoffClears(t *testing.T) {
// t.Skip("skipping for another test")
t.Parallel()
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
s1 := swarms[0]
s2 := swarms[1]
defer s1.Close()
defer s2.Close()
// use another address first, that accept and hang on conns
_, s2bad, s2l := newSilentPeer(t)
go acceptAndHang(s2l)
defer s2l.Close()
// phase 1 -- dial to non-operational addresses
s1.Peerstore().AddAddr(s2.LocalPeer(), s2bad, peerstore.PermanentAddrTTL)
before := time.Now()
c, err := s1.DialPeer(ctx, s2.LocalPeer())
if err == nil {
defer c.Close()
t.Fatal("dialing to broken addr worked...", err)
} else {
t.Log("correctly got error:", err)
}
duration := time.Since(before)
if duration < transport.DialTimeout*DialAttempts {
t.Error("< transport.DialTimeout * DialAttempts not being respected", duration, transport.DialTimeout*DialAttempts)
}
if duration > 2*transport.DialTimeout*DialAttempts {
t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts)
}
if !s1.Backoff().Backoff(s2.LocalPeer(), s2bad) {
t.Error("s2 should now be on backoff")
} else {
t.Log("correctly added to backoff")
}
// phase 2 -- add the working address. dial should succeed.
ifaceAddrs1, err := swarms[1].InterfaceListenAddresses()
if err != nil {
t.Fatal(err)
}
s1.Peerstore().AddAddrs(s2.LocalPeer(), ifaceAddrs1, peerstore.PermanentAddrTTL)
if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err == nil {
c.Close()
t.Log("backoffs are per address, not peer")
}
time.Sleep(BackoffBase)
if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err != nil {
t.Fatal(err)
} else {
c.Close()
t.Log("correctly connected")
}
if s1.Backoff().Backoff(s2.LocalPeer(), s2bad) {
t.Error("s2 should no longer be on backoff")
} else {
t.Log("correctly cleared backoff")
}
}
func TestDialPeerFailed(t *testing.T) {
t.Parallel()
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
defer closeSwarms(swarms)
testedSwarm, targetSwarm := swarms[0], swarms[1]
expectedErrorsCount := 5
for i := 0; i < expectedErrorsCount; i++ {
_, silentPeerAddress, silentPeerListener := newSilentPeer(t)
go acceptAndHang(silentPeerListener)
defer silentPeerListener.Close()
testedSwarm.Peerstore().AddAddr(
targetSwarm.LocalPeer(),
silentPeerAddress,
peerstore.PermanentAddrTTL)
}
_, err := testedSwarm.DialPeer(ctx, targetSwarm.LocalPeer())
if err == nil {
t.Fatal(err)
}
// dial_test.go:508: correctly get a combined error: failed to dial PEER: all dials failed
// * [/ip4/127.0.0.1/tcp/46485] failed to negotiate security protocol: context deadline exceeded
// * [/ip4/127.0.0.1/tcp/34881] failed to negotiate security protocol: context deadline exceeded
// ...
dialErr, ok := err.(*DialError)
if !ok {
t.Fatalf("expected *DialError, got %T", err)
}
if len(dialErr.DialErrors) != expectedErrorsCount {
t.Errorf("expected %d errors, got %d", expectedErrorsCount, len(dialErr.DialErrors))
}
}
func TestDialExistingConnection(t *testing.T) {
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
defer closeSwarms(swarms)
s1 := swarms[0]
s2 := swarms[1]
s1.Peerstore().AddAddrs(s2.LocalPeer(), s2.ListenAddresses(), peerstore.PermanentAddrTTL)
c1, err := s1.DialPeer(ctx, s2.LocalPeer())
if err != nil {
t.Fatal(err)
}
c2, err := s1.DialPeer(ctx, s2.LocalPeer())
if err != nil {
t.Fatal(err)
}
if c1 != c2 {
t.Fatal("expecting the same connection from both dials")
}
}
func newSilentListener(t *testing.T) ([]ma.Multiaddr, net.Listener) {
lst, err := net.Listen("tcp4", "localhost:0")
if err != nil {
t.Fatal(err)
}
addr, err := manet.FromNetAddr(lst.Addr())
if err != nil {
t.Fatal(err)
}
addrs := []ma.Multiaddr{addr}
addrs, err = addrutil.ResolveUnspecifiedAddresses(addrs, nil)
if err != nil {
t.Fatal(err)
}
return addrs, lst
}
func TestDialSimultaneousJoin(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
swarms := makeSwarms(ctx, t, 2)
s1 := swarms[0]
s2 := swarms[1]
defer s1.Close()
defer s2.Close()
s2silentAddrs, s2silentListener := newSilentListener(t)
go acceptAndHang(s2silentListener)
connch := make(chan network.Conn, 512)
errs := make(chan error, 2)
// start a dial to s2 through the silent addr
go func() {
s1.Peerstore().AddAddrs(s2.LocalPeer(), s2silentAddrs, peerstore.PermanentAddrTTL)
c, err := s1.DialPeer(ctx, s2.LocalPeer())
if err != nil {
errs <- err
connch <- nil
return
}
t.Logf("first dial succedded; conn: %+v", c)
connch <- c
errs <- nil
}()
// wait a bit for the dial to take hold
time.Sleep(100 * time.Millisecond)
// start a second dial to s2 that uses the real s2 addrs
go func() {
s2addrs, err := s2.InterfaceListenAddresses()
if err != nil {
errs <- err
return
}
s1.Peerstore().AddAddrs(s2.LocalPeer(), s2addrs[:1], peerstore.PermanentAddrTTL)
c, err := s1.DialPeer(ctx, s2.LocalPeer())
if err != nil {
errs <- err
connch <- nil
return
}
t.Logf("second dial succedded; conn: %+v", c)
connch <- c
errs <- nil
}()
// wait for the second dial to finish
c2 := <-connch
// start a third dial to s2, this should get the existing connection from the successful dial
go func() {
c, err := s1.DialPeer(ctx, s2.LocalPeer())
if err != nil {
errs <- err
connch <- nil
return
}
t.Logf("third dial succedded; conn: %+v", c)
connch <- c
errs <- nil
}()
c3 := <-connch
// raise any errors from the previous goroutines
for i := 0; i < 3; i++ {
err := <-errs
if err != nil {
t.Fatal(err)
}
}
if c2 != c3 {
t.Fatal("expected c2 and c3 to be the same")
}
// next, the first dial to s2, using the silent addr should timeout; at this point the dial
// will error but the last chance check will see the existing connection and return it
select {
case c1 := <-connch:
if c1 != c2 {
t.Fatal("expected c1 and c2 to be the same")
}
case <-time.After(2 * transport.DialTimeout):
t.Fatal("no connection from first dial")
}
}
func TestDialSelf2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
swarms := makeSwarms(ctx, t, 2)
s1 := swarms[0]
defer s1.Close()
_, err := s1.DialPeer(ctx, s1.LocalPeer())
if err != ErrDialToSelf {
t.Fatal("expected error from self dial")
}
}
This diff is collapsed.
package swarm
import (
"context"
"os"
"strconv"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/transport"
ma "github.com/multiformats/go-multiaddr"
)
type dialResult struct {
Conn transport.CapableConn
Addr ma.Multiaddr
Err error
}
type dialJob struct {
addr ma.Multiaddr
peer peer.ID
ctx context.Context
resp chan dialResult
}
func (dj *dialJob) cancelled() bool {
return dj.ctx.Err() != nil
}
func (dj *dialJob) dialTimeout() time.Duration {
timeout := transport.DialTimeout
if lowTimeoutFilters.AddrBlocked(dj.addr) {
timeout = DialTimeoutLocal
}
return timeout
}
type dialLimiter struct {
lk sync.Mutex
isFdConsumingFnc isFdConsumingFnc
fdConsuming int
fdLimit int
waitingOnFd []*dialJob
dialFunc dialfunc
activePerPeer map[peer.ID]int
perPeerLimit int
waitingOnPeerLimit map[peer.ID][]*dialJob
}
type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (transport.CapableConn, error)
type isFdConsumingFnc func(ma.Multiaddr) bool
func newDialLimiter(df dialfunc, fdFnc isFdConsumingFnc) *dialLimiter {
fd := ConcurrentFdDials
if env := os.Getenv("LIBP2P_SWARM_FD_LIMIT"); env != "" {
if n, err := strconv.ParseInt(env, 10, 32); err == nil {
fd = int(n)
}
}
return newDialLimiterWithParams(fdFnc, df, fd, DefaultPerPeerRateLimit)
}
func newDialLimiterWithParams(isFdConsumingFnc isFdConsumingFnc, df dialfunc, fdLimit, perPeerLimit int) *dialLimiter {
return &dialLimiter{
isFdConsumingFnc: isFdConsumingFnc,
fdLimit: fdLimit,
perPeerLimit: perPeerLimit,
waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
activePerPeer: make(map[peer.ID]int),
dialFunc: df,
}
}
// freeFDToken frees FD token and if there are any schedules another waiting dialJob
// in it's place
func (dl *dialLimiter) freeFDToken() {
log.Debugf("[limiter] freeing FD token; waiting: %d; consuming: %d", len(dl.waitingOnFd), dl.fdConsuming)
dl.fdConsuming--
for len(dl.waitingOnFd) > 0 {
next := dl.waitingOnFd[0]
dl.waitingOnFd[0] = nil // clear out memory
dl.waitingOnFd = dl.waitingOnFd[1:]
if len(dl.waitingOnFd) == 0 {
// clear out memory.
dl.waitingOnFd = nil
}
// Skip over canceled dials instead of queuing up a goroutine.
if next.cancelled() {
dl.freePeerToken(next)
continue
}
dl.fdConsuming++
// we already have activePerPeer token at this point so we can just dial
go dl.executeDial(next)
return
}
}
func (dl *dialLimiter) freePeerToken(dj *dialJob) {
log.Debugf("[limiter] freeing peer token; peer %s; addr: %s; active for peer: %d; waiting on peer limit: %d",
dj.peer, dj.addr, dl.activePerPeer[dj.peer], len(dl.waitingOnPeerLimit[dj.peer]))
// release tokens in reverse order than we take them
dl.activePerPeer[dj.peer]--
if dl.activePerPeer[dj.peer] == 0 {
delete(dl.activePerPeer, dj.peer)
}
waitlist := dl.waitingOnPeerLimit[dj.peer]
for len(waitlist) > 0 {
next := waitlist[0]
waitlist[0] = nil // clear out memory
waitlist = waitlist[1:]
if len(waitlist) == 0 {
delete(dl.waitingOnPeerLimit, next.peer)
} else {
dl.waitingOnPeerLimit[next.peer] = waitlist
}
if next.cancelled() {
continue
}
dl.activePerPeer[next.peer]++ // just kidding, we still want this token
dl.addCheckFdLimit(next)
return
}
}
func (dl *dialLimiter) finishedDial(dj *dialJob) {
dl.lk.Lock()
defer dl.lk.Unlock()
if dl.shouldConsumeFd(dj.addr) {
dl.freeFDToken()
}
dl.freePeerToken(dj)
}
func (dl *dialLimiter) shouldConsumeFd(addr ma.Multiaddr) bool {
// we don't consume FD's for relay addresses for now as they will be consumed when the Relay Transport
// actually dials the Relay server. That dial call will also pass through this limiter with
// the address of the relay server i.e. non-relay address.
_, err := addr.ValueForProtocol(ma.P_CIRCUIT)
isRelay := err == nil
return !isRelay && dl.isFdConsumingFnc(addr)
}
func (dl *dialLimiter) addCheckFdLimit(dj *dialJob) {
if dl.shouldConsumeFd(dj.addr) {
if dl.fdConsuming >= dl.fdLimit {
log.Debugf("[limiter] blocked dial waiting on FD token; peer: %s; addr: %s; consuming: %d; "+
"limit: %d; waiting: %d", dj.peer, dj.addr, dl.fdConsuming, dl.fdLimit, len(dl.waitingOnFd))
dl.waitingOnFd = append(dl.waitingOnFd, dj)
return
}
log.Debugf("[limiter] taking FD token: peer: %s; addr: %s; prev consuming: %d",
dj.peer, dj.addr, dl.fdConsuming)
// take token
dl.fdConsuming++
}
log.Debugf("[limiter] executing dial; peer: %s; addr: %s; FD consuming: %d; waiting: %d",
dj.peer, dj.addr, dl.fdConsuming, len(dl.waitingOnFd))
go dl.executeDial(dj)
}
func (dl *dialLimiter) addCheckPeerLimit(dj *dialJob) {
if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
log.Debugf("[limiter] blocked dial waiting on peer limit; peer: %s; addr: %s; active: %d; "+
"peer limit: %d; waiting: %d", dj.peer, dj.addr, dl.activePerPeer[dj.peer], dl.perPeerLimit,
len(dl.waitingOnPeerLimit[dj.peer]))
wlist := dl.waitingOnPeerLimit[dj.peer]
dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
return
}
dl.activePerPeer[dj.peer]++
dl.addCheckFdLimit(dj)
}
// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func (dl *dialLimiter) AddDialJob(dj *dialJob) {
dl.lk.Lock()
defer dl.lk.Unlock()
log.Debugf("[limiter] adding a dial job through limiter: %v", dj.addr)
dl.addCheckPeerLimit(dj)
}
func (dl *dialLimiter) clearAllPeerDials(p peer.ID) {
dl.lk.Lock()
defer dl.lk.Unlock()
delete(dl.waitingOnPeerLimit, p)
log.Debugf("[limiter] clearing all peer dials: %v", p)
// NB: the waitingOnFd list doesn't need to be cleaned out here, we will
// remove them as we encounter them because they are 'cancelled' at this
// point
}
// executeDial calls the dialFunc, and reports the result through the response
// channel when finished. Once the response is sent it also releases all tokens
// it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) {
defer dl.finishedDial(j)
if j.cancelled() {
return
}
dctx, cancel := context.WithTimeout(j.ctx, j.dialTimeout())
defer cancel()
con, err := dl.dialFunc(dctx, j.peer, j.addr)
select {
case j.resp <- dialResult{Conn: con, Addr: j.addr, Err: err}:
case <-j.ctx.Done():
if err == nil {
con.Close()
}
}
}
package swarm
import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/test"
"github.com/libp2p/go-libp2p-core/transport"
ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
)
var isFdConsuming = func(addr ma.Multiaddr) bool {
res := false
ma.ForEach(addr, func(c ma.Component) bool {
if c.Protocol().Code == ma.P_TCP {
res = true
return false
}
return true
})
return res
}
func mustAddr(t *testing.T, s string) ma.Multiaddr {
a, err := ma.NewMultiaddr(s)
if err != nil {
t.Fatal(err)
}
return a
}
func addrWithPort(t *testing.T, p int) ma.Multiaddr {
return mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p))
}
// in these tests I use addresses with tcp ports over a certain number to
// signify 'good' addresses that will succeed, and addresses below that number
// will fail. This lets us more easily test these different scenarios.
func tcpPortOver(a ma.Multiaddr, n int) bool {
port, err := a.ValueForProtocol(ma.P_TCP)
if err != nil {
panic(err)
}
pnum, err := strconv.Atoi(port)
if err != nil {
panic(err)
}
return pnum > n
}
func tryDialAddrs(ctx context.Context, l *dialLimiter, p peer.ID, addrs []ma.Multiaddr, res chan dialResult) {
for _, a := range addrs {
l.AddDialJob(&dialJob{
ctx: ctx,
peer: p,
addr: a,
resp: res,
})
}
}
func hangDialFunc(hang chan struct{}) dialfunc {
return func(ctx context.Context, p peer.ID, a ma.Multiaddr) (transport.CapableConn, error) {
if mafmt.UTP.Matches(a) {
return transport.CapableConn(nil), nil
}
_, err := a.ValueForProtocol(ma.P_CIRCUIT)
if err == nil {
return transport.CapableConn(nil), nil
}
if tcpPortOver(a, 10) {
return transport.CapableConn(nil), nil
}
<-hang
return nil, fmt.Errorf("test bad dial")
}
}
func TestLimiterBasicDials(t *testing.T) {
hang := make(chan struct{})
defer close(hang)
l := newDialLimiterWithParams(isFdConsuming, hangDialFunc(hang), ConcurrentFdDials, 4)
bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
good := addrWithPort(t, 20)
resch := make(chan dialResult)
pid := peer.ID("testpeer")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tryDialAddrs(ctx, l, pid, bads, resch)
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pid,
addr: good,
resp: resch,
})
select {
case <-resch:
t.Fatal("no dials should have completed!")
case <-time.After(time.Millisecond * 100):
}
// complete a single hung dial
hang <- struct{}{}
select {
case r := <-resch:
if r.Err == nil {
t.Fatal("should have gotten failed dial result")
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for dial completion")
}
select {
case r := <-resch:
if r.Err != nil {
t.Fatal("expected second result to be success!")
}
case <-time.After(time.Second):
}
}
func TestFDLimiting(t *testing.T) {
hang := make(chan struct{})
defer close(hang)
l := newDialLimiterWithParams(isFdConsuming, hangDialFunc(hang), 16, 5)
bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
pids := []peer.ID{"testpeer1", "testpeer2", "testpeer3", "testpeer4"}
goodTCP := addrWithPort(t, 20)
ctx := context.Background()
resch := make(chan dialResult)
// take all fd limit tokens with hang dials
for _, pid := range pids {
tryDialAddrs(ctx, l, pid, bads, resch)
}
// these dials should work normally, but will hang because we have taken
// up all the fd limiting
for _, pid := range pids {
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pid,
addr: goodTCP,
resp: resch,
})
}
select {
case <-resch:
t.Fatal("no dials should have completed!")
case <-time.After(time.Millisecond * 100):
}
pid5 := peer.ID("testpeer5")
utpaddr := mustAddr(t, "/ip4/127.0.0.1/udp/7777/utp")
// This should complete immediately since utp addresses arent blocked by fd rate limiting
l.AddDialJob(&dialJob{ctx: ctx, peer: pid5, addr: utpaddr, resp: resch})
select {
case res := <-resch:
if res.Err != nil {
t.Fatal("should have gotten successful response")
}
case <-time.After(time.Second * 5):
t.Fatal("timeout waiting for utp addr success")
}
// A relay address with tcp transport will complete because we do not consume fds for dials
// with relay addresses as the fd will be consumed when we actually dial the relay server.
pid6 := test.RandPeerIDFatal(t)
relayAddr := mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/20/p2p-circuit/p2p/%s", pid6))
l.AddDialJob(&dialJob{ctx: ctx, peer: pid6, addr: relayAddr, resp: resch})
select {
case res := <-resch:
if res.Err != nil {
t.Fatal("should have gotten successful response")
}
case <-time.After(time.Second * 5):
t.Fatal("timeout waiting for relay addr success")
}
}
func TestTokenRedistribution(t *testing.T) {
var lk sync.Mutex
hangchs := make(map[peer.ID]chan struct{})
df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (transport.CapableConn, error) {
if tcpPortOver(a, 10) {
return (transport.CapableConn)(nil), nil
}
lk.Lock()
ch := hangchs[p]
lk.Unlock()
<-ch
return nil, fmt.Errorf("test bad dial")
}
l := newDialLimiterWithParams(isFdConsuming, df, 8, 4)
bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
pids := []peer.ID{"testpeer1", "testpeer2"}
ctx := context.Background()
resch := make(chan dialResult)
// take all fd limit tokens with hang dials
for _, pid := range pids {
hangchs[pid] = make(chan struct{})
}
for _, pid := range pids {
tryDialAddrs(ctx, l, pid, bads, resch)
}
good := mustAddr(t, "/ip4/127.0.0.1/tcp/1001")
// add a good dial job for peer 1
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pids[1],
addr: good,
resp: resch,
})
select {
case <-resch:
t.Fatal("no dials should have completed!")
case <-time.After(time.Millisecond * 100):
}
// unblock one dial for peer 0
hangchs[pids[0]] <- struct{}{}
select {
case res := <-resch:
if res.Err == nil {
t.Fatal("should have only been a failure here")
}
case <-time.After(time.Millisecond * 100):
t.Fatal("expected a dial failure here")
}
select {
case <-resch:
t.Fatal("no more dials should have completed!")
case <-time.After(time.Millisecond * 100):
}
// add a bad dial job to peer 0 to fill their rate limiter
// and test that more dials for this peer won't interfere with peer 1's successful dial incoming
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pids[0],
addr: addrWithPort(t, 7),
resp: resch,
})
hangchs[pids[1]] <- struct{}{}
// now one failed dial from peer 1 should get through and fail
// which will in turn unblock the successful dial on peer 1
select {
case res := <-resch:
if res.Err == nil {
t.Fatal("should have only been a failure here")
}
case <-time.After(time.Millisecond * 100):
t.Fatal("expected a dial failure here")
}
select {
case res := <-resch:
if res.Err != nil {
t.Fatal("should have succeeded!")
}
case <-time.After(time.Millisecond * 100):
t.Fatal("should have gotten successful dial")
}
}
func TestStressLimiter(t *testing.T) {
df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (transport.CapableConn, error) {
if tcpPortOver(a, 1000) {
return transport.CapableConn(nil), nil
}
time.Sleep(time.Millisecond * time.Duration(5+rand.Intn(100)))
return nil, fmt.Errorf("test bad dial")
}
l := newDialLimiterWithParams(isFdConsuming, df, 20, 5)
var bads []ma.Multiaddr
for i := 0; i < 100; i++ {
bads = append(bads, addrWithPort(t, i))
}
addresses := append(bads, addrWithPort(t, 2000))
success := make(chan struct{})
for i := 0; i < 20; i++ {
go func(id peer.ID) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resp := make(chan dialResult)
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
for _, i := range rand.Perm(len(addresses)) {
l.AddDialJob(&dialJob{
addr: addresses[i],
ctx: ctx,
peer: id,
resp: resp,
})
}
for res := range resp {
if res.Err == nil {
success <- struct{}{}
return
}
}
}(peer.ID(fmt.Sprintf("testpeer%d", i)))
}
for i := 0; i < 20; i++ {
select {
case <-success:
case <-time.After(time.Second * 5):
t.Fatal("expected a success within five seconds")
}
}
}
func TestFDLimitUnderflow(t *testing.T) {
df := func(ctx context.Context, p peer.ID, addr ma.Multiaddr) (transport.CapableConn, error) {
select {
case <-ctx.Done():
case <-time.After(5 * time.Second):
}
return nil, fmt.Errorf("df timed out")
}
const fdLimit = 20
l := newDialLimiterWithParams(isFdConsuming, df, fdLimit, 3)
var addrs []ma.Multiaddr
for i := 0; i <= 1000; i++ {
addrs = append(addrs, addrWithPort(t, i))
}
wg := sync.WaitGroup{}
const num = 3 * fdLimit
wg.Add(num)
errs := make(chan error, num)
for i := 0; i < num; i++ {
go func(id peer.ID, i int) {
defer wg.Done()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resp := make(chan dialResult)
l.AddDialJob(&dialJob{
addr: addrs[i],
ctx: ctx,
peer: id,
resp: resp,
})
for res := range resp {
if res.Err != nil {
return
}
errs <- errors.New("got dial res, but shouldn't")
}
}(peer.ID(fmt.Sprintf("testpeer%d", i%20)), i)
}
go func() {
wg.Wait()
close(errs)
}()
for err := range errs {
t.Fatal(err)
}
l.lk.Lock()
fdConsuming := l.fdConsuming
l.lk.Unlock()
if fdConsuming < 0 {
t.Fatalf("l.fdConsuming < 0")
}
}
package swarm_test
import (
"context"
"testing"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
ma "github.com/multiformats/go-multiaddr"
. "github.com/libp2p/go-libp2p-swarm"
)
func TestPeers(t *testing.T) {
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
s1 := swarms[0]
s2 := swarms[1]
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
// TODO: make a DialAddr func.
s.Peerstore().AddAddr(dst, addr, peerstore.PermanentAddrTTL)
// t.Logf("connections from %s", s.LocalPeer())
// for _, c := range s.ConnsToPeer(dst) {
// t.Logf("connection from %s to %s: %v", s.LocalPeer(), dst, c)
// }
// t.Logf("")
if _, err := s.DialPeer(ctx, dst); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
// t.Log(s.swarm.Dump())
}
s1GotConn := make(chan struct{})
s2GotConn := make(chan struct{})
s1.SetConnHandler(func(c network.Conn) {
s1GotConn <- struct{}{}
})
s2.SetConnHandler(func(c network.Conn) {
s2GotConn <- struct{}{}
})
connect(s1, s2.LocalPeer(), s2.ListenAddresses()[0])
<-s2GotConn // have to wait here so the other side catches up.
connect(s2, s1.LocalPeer(), s1.ListenAddresses()[0])
for i := 0; i < 100; i++ {
connect(s1, s2.LocalPeer(), s2.ListenAddresses()[0])
connect(s2, s1.LocalPeer(), s1.ListenAddresses()[0])
}
for _, s := range swarms {
log.Infof("%s swarm routing table: %s", s.LocalPeer(), s.Peers())
}
test := func(s *Swarm) {
expect := 1
actual := len(s.Peers())
if actual != expect {
t.Errorf("%s has %d peers, not %d: %v", s.LocalPeer(), actual, expect, s.Peers())
}
actual = len(s.Conns())
if actual != expect {
t.Errorf("%s has %d conns, not %d: %v", s.LocalPeer(), actual, expect, s.Conns())
}
}
test(s1)
test(s2)
}
package swarm_test
import (
"context"
"runtime"
"sync"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
ma "github.com/multiformats/go-multiaddr"
. "github.com/libp2p/go-libp2p-swarm"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
"github.com/libp2p/go-libp2p-testing/ci"
)
func TestSimultOpen(t *testing.T) {
t.Parallel()
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2, swarmt.OptDisableReuseport)
// connect everyone
{
var wg sync.WaitGroup
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
defer wg.Done()
// copy for other peer
log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.LocalPeer(), dst, addr)
s.Peerstore().AddAddr(dst, addr, peerstore.PermanentAddrTTL)
if _, err := s.DialPeer(ctx, dst); err != nil {
t.Error("error swarm dialing to peer", err)
}
}
log.Info("Connecting swarms simultaneously.")
wg.Add(2)
go connect(swarms[0], swarms[1].LocalPeer(), swarms[1].ListenAddresses()[0])
go connect(swarms[1], swarms[0].LocalPeer(), swarms[0].ListenAddresses()[0])
wg.Wait()
}
for _, s := range swarms {
s.Close()
}
}
func TestSimultOpenMany(t *testing.T) {
// t.Skip("very very slow")
addrs := 20
rounds := 10
if ci.IsRunning() || runtime.GOOS == "darwin" {
// osx has a limit of 256 file descriptors
addrs = 10
rounds = 5
}
SubtestSwarm(t, addrs, rounds)
}
func TestSimultOpenFewStress(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
// t.Skip("skipping for another test")
t.Parallel()
msgs := 40
swarms := 2
rounds := 10
// rounds := 100
for i := 0; i < rounds; i++ {
SubtestSwarm(t, swarms, msgs)
<-time.After(10 * time.Millisecond)
}
}
package swarm
import (
"context"
"errors"
"fmt"
"io"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/metrics"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/transport"
logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
ma "github.com/multiformats/go-multiaddr"
)
// DialTimeoutLocal is the maximum duration a Dial to local network address
// is allowed to take.
// This includes the time between dialing the raw network connection,
// protocol selection as well the handshake, if applicable.
var DialTimeoutLocal = 5 * time.Second
var log = logging.Logger("swarm2")
// ErrSwarmClosed is returned when one attempts to operate on a closed swarm.
var ErrSwarmClosed = errors.New("swarm closed")
// ErrAddrFiltered is returned when trying to register a connection to a
// filtered address. You shouldn't see this error unless some underlying
// transport is misbehaving.
var ErrAddrFiltered = errors.New("address filtered")
// ErrDialTimeout is returned when one a dial times out due to the global timeout
var ErrDialTimeout = errors.New("dial timed out")
// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
// destination or source Peer.
type Swarm struct {
nextConnID uint64 // guarded by atomic
nextStreamID uint64 // guarded by atomic
// Close refcount. This allows us to fully wait for the swarm to be torn
// down before continuing.
refs sync.WaitGroup
local peer.ID
peers peerstore.Peerstore
conns struct {
sync.RWMutex
m map[peer.ID][]*Conn
}
listeners struct {
sync.RWMutex
ifaceListenAddres []ma.Multiaddr
cacheEOL time.Time
m map[transport.Listener]struct{}
}
notifs struct {
sync.RWMutex
m map[network.Notifiee]struct{}
}
transports struct {
sync.RWMutex
m map[int]transport.Transport
}
// new connection and stream handlers
connh atomic.Value
streamh atomic.Value
// dialing helpers
dsync *DialSync
backf DialBackoff
limiter *dialLimiter
gater connmgr.ConnectionGater
proc goprocess.Process
ctx context.Context
bwc metrics.Reporter
}
// NewSwarm constructs a Swarm.
//
// NOTE: go-libp2p will be moving to dependency injection soon. The variadic
// `extra` interface{} parameter facilitates the future migration. Supported
// elements are:
// - connmgr.ConnectionGater
func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc metrics.Reporter, extra ...interface{}) *Swarm {
s := &Swarm{
local: local,
peers: peers,
bwc: bwc,
}
s.conns.m = make(map[peer.ID][]*Conn)
s.listeners.m = make(map[transport.Listener]struct{})
s.transports.m = make(map[int]transport.Transport)
s.notifs.m = make(map[network.Notifiee]struct{})
for _, i := range extra {
switch v := i.(type) {
case connmgr.ConnectionGater:
s.gater = v
}
}
s.dsync = newDialSync(s.startDialWorker)
s.limiter = newDialLimiter(s.dialAddr, isFdConsumingAddr)
s.proc = goprocessctx.WithContext(ctx)
s.ctx = goprocessctx.OnClosingContext(s.proc)
s.backf.init(s.ctx)
// Set teardown after setting the context/process so we don't start the
// teardown process early.
s.proc.SetTeardown(s.teardown)
return s
}
func (s *Swarm) teardown() error {
// Wait for the context to be canceled.
// This allows other parts of the swarm to detect that we're shutting
// down.
<-s.ctx.Done()
// Prevents new connections and/or listeners from being added to the swarm.
s.listeners.Lock()
listeners := s.listeners.m
s.listeners.m = nil
s.listeners.Unlock()
s.conns.Lock()
conns := s.conns.m
s.conns.m = nil
s.conns.Unlock()
// Lots of goroutines but we might as well do this in parallel. We want to shut down as fast as
// possible.
for l := range listeners {
go func(l transport.Listener) {
if err := l.Close(); err != nil {
log.Errorf("error when shutting down listener: %s", err)
}
}(l)
}
for _, cs := range conns {
for _, c := range cs {
go func(c *Conn) {
if err := c.Close(); err != nil {
log.Errorf("error when shutting down connection: %s", err)
}
}(c)
}
}
// Wait for everything to finish.
s.refs.Wait()
// Now close out any transports (if necessary). Do this after closing
// all connections/listeners.
s.transports.Lock()
transports := s.transports.m
s.transports.m = nil
s.transports.Unlock()
var wg sync.WaitGroup
for _, t := range transports {
if closer, ok := t.(io.Closer); ok {
wg.Add(1)
go func(c io.Closer) {
defer wg.Done()
if err := closer.Close(); err != nil {
log.Errorf("error when closing down transport %T: %s", c, err)
}
}(closer)
}
}
wg.Wait()
return nil
}
// Process returns the Process of the swarm
func (s *Swarm) Process() goprocess.Process {
return s.proc
}
func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, error) {
var (
p = tc.RemotePeer()
addr = tc.RemoteMultiaddr()
)
// create the Stat object, initializing with the underlying connection Stat if available
var stat network.Stat
if cs, ok := tc.(network.ConnStat); ok {
stat = cs.Stat()
}
stat.Direction = dir
stat.Opened = time.Now()
// Wrap and register the connection.
c := &Conn{
conn: tc,
swarm: s,
stat: stat,
id: atomic.AddUint64(&s.nextConnID, 1),
}
// we ONLY check upgraded connections here so we can send them a Disconnect message.
// If we do this in the Upgrader, we will not be able to do this.
if s.gater != nil {
if allow, _ := s.gater.InterceptUpgraded(c); !allow {
// TODO Send disconnect with reason here
err := tc.Close()
if err != nil {
log.Warnf("failed to close connection with peer %s and addr %s; err: %s", p.Pretty(), addr, err)
}
return nil, ErrGaterDisallowedConnection
}
}
// Add the public key.
if pk := tc.RemotePublicKey(); pk != nil {
s.peers.AddPubKey(p, pk)
}
// Clear any backoffs
s.backf.Clear(p)
// Finally, add the peer.
s.conns.Lock()
// Check if we're still online
if s.conns.m == nil {
s.conns.Unlock()
tc.Close()
return nil, ErrSwarmClosed
}
c.streams.m = make(map[*Stream]struct{})
s.conns.m[p] = append(s.conns.m[p], c)
// Add two swarm refs:
// * One will be decremented after the close notifications fire in Conn.doClose
// * The other will be decremented when Conn.start exits.
s.refs.Add(2)
// Take the notification lock before releasing the conns lock to block
// Disconnect notifications until after the Connect notifications done.
c.notifyLk.Lock()
s.conns.Unlock()
s.notifyAll(func(f network.Notifiee) {
f.Connected(s, c)
})
c.notifyLk.Unlock()
c.start()
// TODO: Get rid of this. We use it for identify but that happen much
// earlier (really, inside the transport and, if not then, during the
// notifications).
if h := s.ConnHandler(); h != nil {
go h(c)
}
return c, nil
}
// Peerstore returns this swarms internal Peerstore.
func (s *Swarm) Peerstore() peerstore.Peerstore {
return s.peers
}
// Context returns the context of the swarm
func (s *Swarm) Context() context.Context {
return s.ctx
}
// Close stops the Swarm.
func (s *Swarm) Close() error {
return s.proc.Close()
}
// TODO: We probably don't need the conn handlers.
// SetConnHandler assigns the handler for new connections.
// You will rarely use this. See SetStreamHandler
func (s *Swarm) SetConnHandler(handler network.ConnHandler) {
s.connh.Store(handler)
}
// ConnHandler gets the handler for new connections.
func (s *Swarm) ConnHandler() network.ConnHandler {
handler, _ := s.connh.Load().(network.ConnHandler)
return handler
}
// SetStreamHandler assigns the handler for new streams.
func (s *Swarm) SetStreamHandler(handler network.StreamHandler) {
s.streamh.Store(handler)
}
// StreamHandler gets the handler for new streams.
func (s *Swarm) StreamHandler() network.StreamHandler {
handler, _ := s.streamh.Load().(network.StreamHandler)
return handler
}
// NewStream creates a new stream on any available connection to peer, dialing
// if necessary.
func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (network.Stream, error) {
log.Debugf("[%s] opening stream to peer [%s]", s.local, p)
// Algorithm:
// 1. Find the best connection, otherwise, dial.
// 2. Try opening a stream.
// 3. If the underlying connection is, in fact, closed, close the outer
// connection and try again. We do this in case we have a closed
// connection but don't notice it until we actually try to open a
// stream.
//
// Note: We only dial once.
//
// TODO: Try all connections even if we get an error opening a stream on
// a non-closed connection.
dials := 0
for {
// will prefer direct connections over relayed connections for opening streams
c := s.bestConnToPeer(p)
if c == nil {
if nodial, _ := network.GetNoDial(ctx); nodial {
return nil, network.ErrNoConn
}
if dials >= DialAttempts {
return nil, errors.New("max dial attempts exceeded")
}
dials++
var err error
c, err = s.dialPeer(ctx, p)
if err != nil {
return nil, err
}
}
s, err := c.NewStream(ctx)
if err != nil {
if c.conn.IsClosed() {
continue
}
return nil, err
}
return s, nil
}
}
// ConnsToPeer returns all the live connections to peer.
func (s *Swarm) ConnsToPeer(p peer.ID) []network.Conn {
// TODO: Consider sorting the connection list best to worst. Currently,
// it's sorted oldest to newest.
s.conns.RLock()
defer s.conns.RUnlock()
conns := s.conns.m[p]
output := make([]network.Conn, len(conns))
for i, c := range conns {
output[i] = c
}
return output
}
func isBetterConn(a, b *Conn) bool {
// If one is transient and not the other, prefer the non-transient connection.
aTransient := a.Stat().Transient
bTransient := b.Stat().Transient
if aTransient != bTransient {
return !aTransient
}
// If one is direct and not the other, prefer the direct connection.
aDirect := isDirectConn(a)
bDirect := isDirectConn(b)
if aDirect != bDirect {
return aDirect
}
// Otherwise, prefer the connection with more open streams.
a.streams.Lock()
aLen := len(a.streams.m)
a.streams.Unlock()
b.streams.Lock()
bLen := len(b.streams.m)
b.streams.Unlock()
if aLen != bLen {
return aLen > bLen
}
// finally, pick the last connection.
return true
}
// bestConnToPeer returns the best connection to peer.
func (s *Swarm) bestConnToPeer(p peer.ID) *Conn {
// TODO: Prefer some transports over others.
// For now, prefers direct connections over Relayed connections.
// For tie-breaking, select the newest non-closed connection with the most streams.
s.conns.RLock()
defer s.conns.RUnlock()
var best *Conn
for _, c := range s.conns.m[p] {
if c.conn.IsClosed() {
// We *will* garbage collect this soon anyways.
continue
}
if best == nil || isBetterConn(c, best) {
best = c
}
}
return best
}
func (s *Swarm) bestAcceptableConnToPeer(ctx context.Context, p peer.ID) *Conn {
conn := s.bestConnToPeer(p)
if conn != nil {
forceDirect, _ := network.GetForceDirectDial(ctx)
if !forceDirect || isDirectConn(conn) {
return conn
}
}
return nil
}
func isDirectConn(c *Conn) bool {
return c != nil && !c.conn.Transport().Proxy()
}
// Connectedness returns our "connectedness" state with the given peer.
//
// To check if we have an open connection, use `s.Connectedness(p) ==
// network.Connected`.
func (s *Swarm) Connectedness(p peer.ID) network.Connectedness {
if s.bestConnToPeer(p) != nil {
return network.Connected
}
return network.NotConnected
}
// Conns returns a slice of all connections.
func (s *Swarm) Conns() []network.Conn {
s.conns.RLock()
defer s.conns.RUnlock()
conns := make([]network.Conn, 0, len(s.conns.m))
for _, cs := range s.conns.m {
for _, c := range cs {
conns = append(conns, c)
}
}
return conns
}
// ClosePeer closes all connections to the given peer.
func (s *Swarm) ClosePeer(p peer.ID) error {
conns := s.ConnsToPeer(p)
switch len(conns) {
case 0:
return nil
case 1:
return conns[0].Close()
default:
errCh := make(chan error)
for _, c := range conns {
go func(c network.Conn) {
errCh <- c.Close()
}(c)
}
var errs []string
for range conns {
err := <-errCh
if err != nil {
errs = append(errs, err.Error())
}
}
if len(errs) > 0 {
return fmt.Errorf("when disconnecting from peer %s: %s", p, strings.Join(errs, ", "))
}
return nil
}
}
// Peers returns a copy of the set of peers swarm is connected to.
func (s *Swarm) Peers() []peer.ID {
s.conns.RLock()
defer s.conns.RUnlock()
peers := make([]peer.ID, 0, len(s.conns.m))
for p := range s.conns.m {
peers = append(peers, p)
}
return peers
}
// LocalPeer returns the local peer swarm is associated to.
func (s *Swarm) LocalPeer() peer.ID {
return s.local
}
// Backoff returns the DialBackoff object for this swarm.
func (s *Swarm) Backoff() *DialBackoff {
return &s.backf
}
// notifyAll sends a signal to all Notifiees
func (s *Swarm) notifyAll(notify func(network.Notifiee)) {
var wg sync.WaitGroup
s.notifs.RLock()
wg.Add(len(s.notifs.m))
for f := range s.notifs.m {
go func(f network.Notifiee) {
defer wg.Done()
notify(f)
}(f)
}
wg.Wait()
s.notifs.RUnlock()
}
// Notify signs up Notifiee to receive signals when events happen
func (s *Swarm) Notify(f network.Notifiee) {
s.notifs.Lock()
s.notifs.m[f] = struct{}{}
s.notifs.Unlock()
}
// StopNotify unregisters Notifiee fromr receiving signals
func (s *Swarm) StopNotify(f network.Notifiee) {
s.notifs.Lock()
delete(s.notifs.m, f)
s.notifs.Unlock()
}
func (s *Swarm) removeConn(c *Conn) {
p := c.RemotePeer()
s.conns.Lock()
defer s.conns.Unlock()
cs := s.conns.m[p]
for i, ci := range cs {
if ci == c {
if len(cs) == 1 {
delete(s.conns.m, p)
} else {
// NOTE: We're intentionally preserving order.
// This way, connections to a peer are always
// sorted oldest to newest.
copy(cs[i:], cs[i+1:])
cs[len(cs)-1] = nil
s.conns.m[p] = cs[:len(cs)-1]
}
return
}
}
}
// String returns a string representation of Network.
func (s *Swarm) String() string {
return fmt.Sprintf("<Swarm %s>", s.LocalPeer())
}
// Swarm is a Network.
var _ network.Network = (*Swarm)(nil)
var _ transport.TransportNetwork = (*Swarm)(nil)
package swarm
import (
"time"
addrutil "github.com/libp2p/go-addr-util"
ma "github.com/multiformats/go-multiaddr"
)
// ListenAddresses returns a list of addresses at which this swarm listens.
func (s *Swarm) ListenAddresses() []ma.Multiaddr {
s.listeners.RLock()
defer s.listeners.RUnlock()
return s.listenAddressesNoLock()
}
func (s *Swarm) listenAddressesNoLock() []ma.Multiaddr {
addrs := make([]ma.Multiaddr, 0, len(s.listeners.m))
for l := range s.listeners.m {
addrs = append(addrs, l.Multiaddr())
}
return addrs
}
const ifaceAddrsCacheDuration = 1 * time.Minute
// InterfaceListenAddresses returns a list of addresses at which this swarm
// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces.
func (s *Swarm) InterfaceListenAddresses() ([]ma.Multiaddr, error) {
s.listeners.RLock() // RLock start
ifaceListenAddres := s.listeners.ifaceListenAddres
isEOL := time.Now().After(s.listeners.cacheEOL)
s.listeners.RUnlock() // RLock end
if !isEOL {
// Cache is valid, clone the slice
return append(ifaceListenAddres[:0:0], ifaceListenAddres...), nil
}
// Cache is not valid
// Perfrom double checked locking
s.listeners.Lock() // Lock start
ifaceListenAddres = s.listeners.ifaceListenAddres
isEOL = time.Now().After(s.listeners.cacheEOL)
if isEOL {
// Cache is still invalid
listenAddres := s.listenAddressesNoLock()
if len(listenAddres) > 0 {
// We're actually listening on addresses.
var err error
ifaceListenAddres, err = addrutil.ResolveUnspecifiedAddresses(listenAddres, nil)
if err != nil {
s.listeners.Unlock() // Lock early exit
return nil, err
}
} else {
ifaceListenAddres = nil
}
s.listeners.ifaceListenAddres = ifaceListenAddres
s.listeners.cacheEOL = time.Now().Add(ifaceAddrsCacheDuration)
}
s.listeners.Unlock() // Lock end
return append(ifaceListenAddres[:0:0], ifaceListenAddres...), nil
}
package swarm_test
import (
"context"
"testing"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/test"
ma "github.com/multiformats/go-multiaddr"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
)
func TestDialBadAddrs(t *testing.T) {
m := func(s string) ma.Multiaddr {
maddr, err := ma.NewMultiaddr(s)
if err != nil {
t.Fatal(err)
}
return maddr
}
ctx := context.Background()
s := makeSwarms(ctx, t, 1)[0]
test := func(a ma.Multiaddr) {
p := test.RandPeerIDFatal(t)
s.Peerstore().AddAddr(p, a, peerstore.PermanentAddrTTL)
if _, err := s.DialPeer(ctx, p); err == nil {
t.Errorf("swarm should not dial: %s", p)
}
}
test(m("/ip6/fe80::1")) // link local
test(m("/ip6/fe80::100")) // link local
test(m("/ip4/127.0.0.1/udp/1234/utp")) // utp
}
func TestAddrRace(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := makeSwarms(ctx, t, 1)[0]
defer s.Close()
a1, err := s.InterfaceListenAddresses()
if err != nil {
t.Fatal(err)
}
a2, err := s.InterfaceListenAddresses()
if err != nil {
t.Fatal(err)
}
if len(a1) > 0 && len(a2) > 0 && &a1[0] == &a2[0] {
t.Fatal("got the exact same address set twice; this could lead to data races")
}
}
func TestAddressesWithoutListening(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := swarmt.GenSwarm(t, ctx, swarmt.OptDialOnly)
a1, err := s.InterfaceListenAddresses()
if err != nil {
t.Fatal(err)
}
if len(a1) != 0 {
t.Fatalf("expected to be listening on no addresses, was listening on %d", len(a1))
}
}
package swarm
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
ic "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/transport"
ma "github.com/multiformats/go-multiaddr"
)
// TODO: Put this elsewhere.
// ErrConnClosed is returned when operating on a closed connection.
var ErrConnClosed = errors.New("connection closed")
// Conn is the connection type used by swarm. In general, you won't use this
// type directly.
type Conn struct {
id uint64
conn transport.CapableConn
swarm *Swarm
closeOnce sync.Once
err error
notifyLk sync.Mutex
streams struct {
sync.Mutex
m map[*Stream]struct{}
}
stat network.Stat
}
func (c *Conn) ID() string {
// format: <first 10 chars of peer id>-<global conn ordinal>
return fmt.Sprintf("%s-%d", c.RemotePeer().Pretty()[0:10], c.id)
}
// Close closes this connection.
//
// Note: This method won't wait for the close notifications to finish as that
// would create a deadlock when called from an open notification (because all
// open notifications must finish before we can fire off the close
// notifications).
func (c *Conn) Close() error {
c.closeOnce.Do(c.doClose)
return c.err
}
func (c *Conn) doClose() {
c.swarm.removeConn(c)
// Prevent new streams from opening.
c.streams.Lock()
streams := c.streams.m
c.streams.m = nil
c.streams.Unlock()
c.err = c.conn.Close()
// This is just for cleaning up state. The connection has already been closed.
// We *could* optimize this but it really isn't worth it.
for s := range streams {
s.Reset()
}
// do this in a goroutine to avoid deadlocking if we call close in an open notification.
go func() {
// prevents us from issuing close notifications before finishing the open notifications
c.notifyLk.Lock()
defer c.notifyLk.Unlock()
c.swarm.notifyAll(func(f network.Notifiee) {
f.Disconnected(c.swarm, c)
})
c.swarm.refs.Done() // taken in Swarm.addConn
}()
}
func (c *Conn) removeStream(s *Stream) {
c.streams.Lock()
delete(c.streams.m, s)
c.streams.Unlock()
}
// listens for new streams.
//
// The caller must take a swarm ref before calling. This function decrements the
// swarm ref count.
func (c *Conn) start() {
go func() {
defer c.swarm.refs.Done()
defer c.Close()
for {
ts, err := c.conn.AcceptStream()
if err != nil {
return
}
c.swarm.refs.Add(1)
go func() {
s, err := c.addStream(ts, network.DirInbound)
// Don't defer this. We don't want to block
// swarm shutdown on the connection handler.
c.swarm.refs.Done()
// We only get an error here when the swarm is closed or closing.
if err != nil {
return
}
if h := c.swarm.StreamHandler(); h != nil {
h(s)
}
}()
}
}()
}
func (c *Conn) String() string {
return fmt.Sprintf(
"<swarm.Conn[%T] %s (%s) <-> %s (%s)>",
c.conn.Transport(),
c.conn.LocalMultiaddr(),
c.conn.LocalPeer().Pretty(),
c.conn.RemoteMultiaddr(),
c.conn.RemotePeer().Pretty(),
)
}
// LocalMultiaddr is the Multiaddr on this side
func (c *Conn) LocalMultiaddr() ma.Multiaddr {
return c.conn.LocalMultiaddr()
}
// LocalPeer is the Peer on our side of the connection
func (c *Conn) LocalPeer() peer.ID {
return c.conn.LocalPeer()
}
// RemoteMultiaddr is the Multiaddr on the remote side
func (c *Conn) RemoteMultiaddr() ma.Multiaddr {
return c.conn.RemoteMultiaddr()
}
// RemotePeer is the Peer on the remote side
func (c *Conn) RemotePeer() peer.ID {
return c.conn.RemotePeer()
}
// LocalPrivateKey is the public key of the peer on this side
func (c *Conn) LocalPrivateKey() ic.PrivKey {
return c.conn.LocalPrivateKey()
}
// RemotePublicKey is the public key of the peer on the remote side
func (c *Conn) RemotePublicKey() ic.PubKey {
return c.conn.RemotePublicKey()
}
// Stat returns metadata pertaining to this connection
func (c *Conn) Stat() network.Stat {
return c.stat
}
// NewStream returns a new Stream from this connection
func (c *Conn) NewStream(ctx context.Context) (network.Stream, error) {
if c.Stat().Transient {
if useTransient, _ := network.GetUseTransient(ctx); !useTransient {
return nil, network.ErrTransientConn
}
}
ts, err := c.conn.OpenStream(ctx)
if err != nil {
return nil, err
}
return c.addStream(ts, network.DirOutbound)
}
func (c *Conn) addStream(ts mux.MuxedStream, dir network.Direction) (*Stream, error) {
c.streams.Lock()
// Are we still online?
if c.streams.m == nil {
c.streams.Unlock()
ts.Reset()
return nil, ErrConnClosed
}
// Wrap and register the stream.
stat := network.Stat{
Direction: dir,
Opened: time.Now(),
}
s := &Stream{
stream: ts,
conn: c,
stat: stat,
id: atomic.AddUint64(&c.swarm.nextStreamID, 1),
}
c.streams.m[s] = struct{}{}
// Released once the stream disconnect notifications have finished
// firing (in Swarm.remove).
c.swarm.refs.Add(1)
// Take the notification lock before releasing the streams lock to block
// StreamClose notifications until after the StreamOpen notifications
// done.
s.notifyLk.Lock()
c.streams.Unlock()
c.swarm.notifyAll(func(f network.Notifiee) {
f.OpenedStream(c.swarm, s)
})
s.notifyLk.Unlock()
return s, nil
}
// GetStreams returns the streams associated with this connection.
func (c *Conn) GetStreams() []network.Stream {
c.streams.Lock()
defer c.streams.Unlock()
streams := make([]network.Stream, 0, len(c.streams.m))
for s := range c.streams.m {
streams = append(streams, s)
}
return streams
}
This diff is collapsed.
package swarm
import (
"fmt"
"time"
"github.com/libp2p/go-libp2p-core/network"
ma "github.com/multiformats/go-multiaddr"
)
// Listen sets up listeners for all of the given addresses.
// It returns as long as we successfully listen on at least *one* address.
func (s *Swarm) Listen(addrs ...ma.Multiaddr) error {
errs := make([]error, len(addrs))
var succeeded int
for i, a := range addrs {
if err := s.AddListenAddr(a); err != nil {
errs[i] = err
} else {
succeeded++
}
}
for i, e := range errs {
if e != nil {
log.Warnw("listening failed", "on", addrs[i], "error", errs[i])
}
}
if succeeded == 0 && len(addrs) > 0 {
return fmt.Errorf("failed to listen on any addresses: %s", errs)
}
return nil
}
// AddListenAddr tells the swarm to listen on a single address. Unlike Listen,
// this method does not attempt to filter out bad addresses.
func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
tpt := s.TransportForListening(a)
if tpt == nil {
// TransportForListening will return nil if either:
// 1. No transport has been registered.
// 2. We're closed (so we've nulled out the transport map.
//
// Distinguish between these two cases to avoid confusing users.
select {
case <-s.proc.Closing():
return ErrSwarmClosed
default:
return ErrNoTransport
}
}
list, err := tpt.Listen(a)
if err != nil {
return err
}
s.listeners.Lock()
if s.listeners.m == nil {
s.listeners.Unlock()
list.Close()
return ErrSwarmClosed
}
s.refs.Add(1)
s.listeners.m[list] = struct{}{}
s.listeners.cacheEOL = time.Time{}
s.listeners.Unlock()
maddr := list.Multiaddr()
// signal to our notifiees on listen.
s.notifyAll(func(n network.Notifiee) {
n.Listen(s, maddr)
})
go func() {
defer func() {
list.Close()
s.listeners.Lock()
delete(s.listeners.m, list)
s.listeners.cacheEOL = time.Time{}
s.listeners.Unlock()
// signal to our notifiees on listen close.
s.notifyAll(func(n network.Notifiee) {
n.ListenClose(s, maddr)
})
s.refs.Done()
}()
for {
c, err := list.Accept()
if err != nil {
if s.ctx.Err() == nil {
// only log if the swarm is still running.
log.Errorf("swarm listener accept error: %s", err)
}
return
}
log.Debugf("swarm listener accepted connection: %s", c)
s.refs.Add(1)
go func() {
defer s.refs.Done()
_, err := s.addConn(c, network.DirInbound)
switch err {
case nil:
case ErrSwarmClosed:
// ignore.
return
default:
log.Warnw("adding connection failed", "to", a, "error", err)
return
}
}()
}
}()
return nil
}
package swarm_test
import (
"context"
"fmt"
"io/ioutil"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/network"
. "github.com/libp2p/go-libp2p-swarm/testing"
)
// TestConnectednessCorrect starts a few networks, connects a few
// and tests Connectedness value is correct.
func TestConnectednessCorrect(t *testing.T) {
ctx := context.Background()
nets := make([]network.Network, 4)
for i := 0; i < 4; i++ {
nets[i] = GenSwarm(t, ctx)
}
// connect 0-1, 0-2, 0-3, 1-2, 2-3
dial := func(a, b network.Network) {
DivulgeAddresses(b, a)
if _, err := a.DialPeer(ctx, b.LocalPeer()); err != nil {
t.Fatalf("Failed to dial: %s", err)
}
}
dial(nets[0], nets[1])
dial(nets[0], nets[3])
dial(nets[1], nets[2])
dial(nets[3], nets[2])
// The notifications for new connections get sent out asynchronously.
// There is the potential for a race condition here, so we sleep to ensure
// that they have been received.
time.Sleep(time.Millisecond * 100)
// test those connected show up correctly
// test connected
expectConnectedness(t, nets[0], nets[1], network.Connected)
expectConnectedness(t, nets[0], nets[3], network.Connected)
expectConnectedness(t, nets[1], nets[2], network.Connected)
expectConnectedness(t, nets[3], nets[2], network.Connected)
// test not connected
expectConnectedness(t, nets[0], nets[2], network.NotConnected)
expectConnectedness(t, nets[1], nets[3], network.NotConnected)
if len(nets[0].Peers()) != 2 {
t.Fatal("expected net 0 to have two peers")
}
if len(nets[2].Peers()) != 2 {
t.Fatal("expected net 2 to have two peers")
}
if len(nets[1].ConnsToPeer(nets[3].LocalPeer())) != 0 {
t.Fatal("net 1 should have no connections to net 3")
}
if err := nets[2].ClosePeer(nets[1].LocalPeer()); err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 50)
expectConnectedness(t, nets[2], nets[1], network.NotConnected)
for _, n := range nets {
n.Close()
}
for _, n := range nets {
<-n.Process().Closed()
}
}
func expectConnectedness(t *testing.T, a, b network.Network, expected network.Connectedness) {
es := "%s is connected to %s, but Connectedness incorrect. %s %s %s"
atob := a.Connectedness(b.LocalPeer())
btoa := b.Connectedness(a.LocalPeer())
if atob != expected {
t.Errorf(es, a, b, printConns(a), printConns(b), atob)
}
// test symmetric case
if btoa != expected {
t.Errorf(es, b, a, printConns(b), printConns(a), btoa)
}
}
func printConns(n network.Network) string {
s := fmt.Sprintf("Connections in %s:\n", n)
for _, c := range n.Conns() {
s = s + fmt.Sprintf("- %s\n", c)
}
return s
}
func TestNetworkOpenStream(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testString := "hello ipfs"
nets := make([]network.Network, 4)
for i := 0; i < 4; i++ {
nets[i] = GenSwarm(t, ctx)
}
dial := func(a, b network.Network) {
DivulgeAddresses(b, a)
if _, err := a.DialPeer(ctx, b.LocalPeer()); err != nil {
t.Fatalf("Failed to dial: %s", err)
}
}
dial(nets[0], nets[1])
dial(nets[0], nets[3])
dial(nets[1], nets[2])
done := make(chan bool)
nets[1].SetStreamHandler(func(s network.Stream) {
defer close(done)
defer s.Close()
buf, err := ioutil.ReadAll(s)
if err != nil {
t.Error(err)
return
}
if string(buf) != testString {
t.Error("got wrong message")
}
})
s, err := nets[0].NewStream(ctx, nets[1].LocalPeer())
if err != nil {
t.Fatal(err)
}
numStreams := 0
for _, conn := range nets[0].ConnsToPeer(nets[1].LocalPeer()) {
numStreams += len(conn.GetStreams())
}
if numStreams != 1 {
t.Fatal("should only have one stream there")
}
n, err := s.Write([]byte(testString))
if err != nil {
t.Fatal(err)
} else if n != len(testString) {
t.Errorf("expected to write %d bytes, wrote %d", len(testString), n)
}
err = s.Close()
if err != nil {
t.Fatal(err)
}
select {
case <-done:
case <-time.After(time.Millisecond * 100):
t.Fatal("timed out waiting on stream")
}
_, err = nets[1].NewStream(ctx, nets[3].LocalPeer())
if err == nil {
t.Fatal("expected stream open 1->3 to fail")
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment