Commit 75432960 authored by tavit ohanian's avatar tavit ohanian

Merge branch 'port-2021-07-04'

parents 76745931 39614e63
Pipeline #771 failed with stages
in 6 minutes and 24 seconds
version: 2.1
orbs:
go: gotest/tools@0.0.13
executors:
golang:
docker:
- image: circleci/golang:1.15.5
resource_class: 2xlarge
ubuntu:
docker:
- image: ubuntu:20.04
commands:
prepare:
steps:
- checkout
jobs:
build-all:
executor: golang
steps:
- prepare
- run:
name: go build
command: |
go build ./...
test:
parameters:
executor:
type: executor
default: golang
go-test-flags:
type: string
default: "-timeout 30m"
description: Flags passed to go test.
gotestsum-format:
type: string
default: testname
description: gotestsum format. https://github.com/gotestyourself/gotestsum#format
coverage:
type: string
default: -coverprofile=coverage.txt -coverpkg=github.com/libp2p/go-libp2p-pubsub
description: Coverage flag. Set to the empty string to disable.
codecov-upload:
type: boolean
default: false
description: |
Upload coverage report to https://codecov.io/. Requires the codecov API token to be
set as an environment variable for private projects.
executor: << parameters.executor >>
steps:
- prepare
- go/install-gotestsum:
gobin: $HOME/.local/bin
version: 0.5.2
- run:
name: go test
command: |
ulimit -n 2048
mkdir -p /tmp/test-reports
mkdir -p /tmp/test-artifacts
gotestsum \
--format << parameters.gotestsum-format >> \
--junitfile /tmp/test-reports/junit.xml \
--jsonfile /tmp/test-artifacts/output.json \
-- \
<< parameters.coverage >> \
<< parameters.go-test-flags >> \
github.com/libp2p/go-libp2p-pubsub
no_output_timeout: 30m
- store_test_results:
path: /tmp/test-reports
- store_artifacts:
path: /tmp/test-artifacts/output.json
- when:
condition: << parameters.codecov-upload >>
steps:
- go/install: {package: bash}
- go/install: {package: curl}
- run:
shell: /bin/bash -eo pipefail
command: |
bash <(curl -s https://codecov.io/bash)
workflows:
version: 2.1
ci:
jobs:
- build-all
- test:
codecov-upload: true
github_checks:
annotations: false
## Color palette
# yellows: dba355 d8a038 d8bd36 edd17d
# greens: 92ef92 6bbf3b 1cef5c 75b72d 9fea8f c6e84e c1f45a b8d613 fcf0b5
# reds: dd362a
# blues: 5b91c6 2a7d93 0bb1ed
# pinks: bf0f73
# oranges: ba500e ce8048
# teals: 40c491
###
### Areas
#
- name: area/ux-cli
color: 0bb1ed
description: "Area: UX/CLI"
- name: area/core
color: 0bb1ed
description: "Area: Core"
- name: area/sdk-sync
color: 0bb1ed
description: "Area: SDK | Sync Service"
- name: area/sdk-runtime
color: 0bb1ed
description: "Area: SDK | Runtime"
- name: area/sdk-testplans
color: 0bb1ed
description: "Area: SDK | Test plan API"
- name: area/sidecar
color: 0bb1ed
description: "Area: Sidecar"
- name: area/observability
color: 0bb1ed
description: "Area: Observability"
- name: area/runners
color: 0bb1ed
description: "Area: Runners"
- name: area/builders
color: 0bb1ed
description: "Area: Builders"
- name: area/infra
color: 0bb1ed
description: "Area: Infrastructure"
- name: area/specs
color: 0bb1ed
description: "Area: Specs"
- name: area/docs
color: 0bb1ed
description: "Area: Documentation"
- name: area/testing
color: 0bb1ed
description: "Area: Testing"
- name: area/test-plan
color: 0bb1ed
description: "Area: Test Plans"
###
### Kinds
#
- name: kind/bug
color: c92712
description: "Kind: Bug"
- name: kind/problem
color: c92712
description: "Kind: Problem"
- name: kind/feature
color: fcf0b5
description: "Kind: Feature"
- name: kind/improvement
color: fcf0b5
description: "Kind: Improvement"
- name: kind/test
color: fcf0b5
description: "Kind: Test"
- name: kind/tracking-issue
color: fcf0b5
description: "Kind: Tracking Issue"
- name: kind/question
color: fcf0b5
description: "Kind: Question"
- name: kind/enhancement
color: fcf0b5
description: "Kind: Enhancement"
- name: kind/discussion
color: fcf0b5
description: "Kind: Discussion"
- name: kind/spike
color: fcf0b5
description: "Kind: Spike"
###
### Difficulties
#
- name: dif/trivial
color: b2b7ff
description: "Can be confidently tackled by newcomers, who are widely unfamiliar with testground."
- name: dif/easy
color: 7886d7
description: "A prexisting testground user should be able to pick this up."
- name: dif/medium
color: 6574cd
description: "Prior experience in having developed testground modules is likely helpful."
- name: dif/hard
color: 5661b3
description: "Suggests that having worked on the specific component affected by this issue is important."
- name: dif/expert
color: 2f365f
description: "Requires extensive knowledge of the history, implications, ramifications of the issue."
###
### Efforts
#
- name: effort/minutes
color: e8fffe
description: "Effort: Minutes"
- name: effort/hours
color: a0f0ed
description: "Effort: One or Multiple Hours."
- name: effort/day
color: 64d5ca
description: "Effort: One Day."
- name: effort/days
color: 4dc0b5
description: "Effort: Multiple Days."
- name: effort/week
color: 38a89d
description: "Effort: One Week."
- name: effort/weeks
color: 20504f
description: "Effort: Multiple Weeks."
###
### Impacts
#
- name: impact/regression
color: f1f5f8
description: "Impact: Regression"
- name: impact/api-breakage
color: f1f5f8
description: "Impact: API Breakage"
- name: impact/quality
color: f1f5f8
description: "Impact: Quality"
- name: impact/dx
color: f1f5f8
description: "Impact: Developer Experience"
- name: impact/test-flakiness
color: f1f5f8
description: "Impact: Test Flakiness"
###
### Topics
#
- name: topic/interoperability
color: bf0f73
description: "Topic: Interoperability"
- name: topic/multi-language
color: bf0f73
description: "Topic: Multi-language"
- name: topic/docs
color: bf0f73
description: "Topic: Specs"
- name: topic/docs
color: bf0f73
description: "Topic: Documentation"
- name: topic/architecture
color: bf0f73
description: "Topic: Architecture"
###
### Priorities
###
- name: P0
color: dd362a
description: "P0: Critical. This is a blocker. Drop everything else."
- name: P1
color: ce8048
description: "P1: Must be fixed."
- name: P2
color: dbd81a
description: "P2: Should be fixed."
- name: P3
color: 9fea8f
description: "P3: Might get fixed."
###
### Hints
#
- name: hint/good-first-issue
color: 0623cc
description: "Hint: Good First Issue"
- name: hint/needs-contributor
color: 0623cc
description: "Hint: Needs Contributor"
- name: hint/needs-participation
color: 0623cc
description: "Hint: Needs Participation"
- name: hint/needs-decision
color: 0623cc
description: "Hint: Needs Decision"
- name: hint/needs-triage
color: 0623cc
description: "Hint: Needs Triage"
- name: hint/needs-analysis
color: 0623cc
description: "Hint: Needs Analysis"
- name: hint/needs-author-input
color: 0623cc
description: "Hint: Needs Author Input"
- name: hint/needs-team-input
color: 0623cc
description: "Hint: Needs Team Input"
- name: hint/needs-community-input
color: 0623cc
description: "Hint: Needs Community Input"
- name: hint/needs-review
color: 0623cc
description: "Hint: Needs Review"
- name: hint/needs-help
color: 0623cc
description: "Hint: Needs Help"
- name: hint/desc-outdated
color: 0623cc
description: "Hint: Description outdated"
###
### Statuses
#
- name: status/done
color: edb3a6
description: "Status: Done"
- name: status/deferred
color: edb3a6
description: "Status: Deferred"
- name: status/in-progress
color: edb3a6
description: "Status: In Progress"
- name: status/blocked
color: edb3a6
description: "Status: Blocked"
- name: status/inactive
color: edb3a6
description: "Status: Inactive"
- name: status/waiting
color: edb3a6
description: "Status: Waiting"
- name: status/rotten
color: edb3a6
description: "Status: Rotten"
- name: status/discarded
color: a0aec0
description: "Status: Discarded / Won't fix"
name: Sync Labels
on:
push:
branches:
- master
paths:
- '.github/labels.yml'
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: micnncim/action-label-syncer@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
manifest: '.github/labels.yml'
cover.out
prof.out
go-floodsub.test
stages:
- build
- test
variables:
BUILD_DIR: "/tmp/$CI_CONCURRENT_PROJECT_ID"
before_script:
- mkdir -p $BUILD_DIR/src
- cd $BUILD_DIR/src
- if [ -d $CI_PROJECT_DIR ]
- then
- echo "soft link $CI_PROJECT_DIR exists"
- else
- echo "creating soft link $CI_PROJECT_DIR"
- ln -s $CI_PROJECT_DIR
- fi
- cd $CI_PROJECT_DIR
build:
stage: build
tags:
- testing
script:
- echo $CI_JOB_STAGE
- go build
test:
stage: test
tags:
- testing
script:
- echo $CI_JOB_STAGE
- go test -cover
coverage: '/coverage: \d+.\d+% of statements/'
This project is transitioning from an MIT-only license to a dual MIT/Apache-2.0 license.
Unless otherwise noted, all code contributed prior to 2019-05-06 and not contributed by
a user listed in [this signoff issue](https://github.com/ipfs/go-ipfs/issues/6302) is
licensed under MIT-only. All new contributions (and past contributions since 2019-05-06)
are licensed under a dual MIT/Apache-2.0 license.
MIT: https://www.opensource.org/licenses/mit
Apache-2.0: https://www.apache.org/licenses/license-2.0
\ No newline at end of file
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
\ No newline at end of file
The MIT License (MIT)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
\ No newline at end of file
# go-p2p-pubsub
# go-libp2p-pubsub
dms3 p2p go-libp2p-pubsub
\ No newline at end of file
<p align="left">
<a href="http://protocol.ai"><img src="https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square" /></a>
<a href="http://libp2p.io/"><img src="https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square" /></a>
<a href="http://webchat.freenode.net/?channels=%23libp2p"><img src="https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square" /></a>
<a href="https://discuss.libp2p.io"><img src="https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg?style=flat-square"/></a>
</p>
<p align="left">
<a href="https://codecov.io/gh/libp2p/go-libp2p-pubsub"><img src="https://codecov.io/gh/libp2p/go-libp2p-pubsub/branch/master/graph/badge.svg"></a>
<a href="https://goreportcard.com/report/github.com/libp2p/go-libp2p-pubsub"><img src="https://goreportcard.com/badge/github.com/libp2p/go-libp2p-pubsub" /></a>
<a href="https://github.com/RichardLitt/standard-readme"><img src="https://img.shields.io/badge/readme%20style-standard-brightgreen.svg?style=flat-square" /></a>
<a href="https://godoc.org/github.com/libp2p/go-libp2p-pubsub"><img src="http://img.shields.io/badge/godoc-reference-5272B4.svg?style=flat-square" /></a>
<a href=""><img src="https://img.shields.io/badge/golang-%3E%3D1.14.0-orange.svg?style=flat-square" /></a>
<br>
</p>
This repo contains the canonical pubsub implementation for libp2p. We currently provide three message router options:
- Floodsub, which is the baseline flooding protocol.
- Randomsub, which is a simple probabilistic router that propagates to random subsets of peers.
- Gossipsub, which is a more advanced router with mesh formation and gossip propagation. See [spec](https://github.com/libp2p/specs/tree/master/pubsub/gossipsub) and [implementation](https://github.com/libp2p/go-libp2p-pubsub/blob/master/gossipsub.go) for more details.
**PSA: The Hardening Extensions for Gossipsub (Gossipsub V1.1) can be found under development at https://github.com/libp2p/go-libp2p-pubsub/pull/263**
## Repo Lead Maintainer
[@vyzo](https://github.com/vyzo/)
> This repo follows the [Repo Lead Maintainer Protocol](https://github.com/ipfs/team-mgmt/blob/master/LEAD_MAINTAINER_PROTOCOL.md)
## Table of Contents
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
- [Install](#install)
- [Usage](#usage)
- [Implementations](#implementations)
- [Documentation](#documentation)
- [Tracing](#tracing)
- [Contribute](#contribute)
- [License](#license)
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
## Install
```
go get github.com/libp2p/go-libp2p-pubsub
```
## Usage
To be used for messaging in p2p instrastructure (as part of libp2p) such as IPFS, Ethereum, other blockchains, etc.
### Example
https://github.com/libp2p/go-libp2p/tree/master/examples/pubsub
## Documentation
See the [libp2p specs](https://github.com/libp2p/specs/tree/master/pubsub) for high level documentation and [godoc](https://godoc.org/github.com/libp2p/go-libp2p-pubsub) for API documentation.
### In this repo, you will find
```
.
├── LICENSE
├── README.md
# Regular Golang repo set up
├── codecov.yml
├── pb
├── go.mod
├── go.sum
├── doc.go
# PubSub base
├── pubsub.go
├── blacklist.go
├── notify.go
├── comm.go
├── discovery.go
├── sign.go
├── subscription.go
├── topic.go
├── trace.go
├── tracer.go
├── validation.go
# Floodsub router
├── floodsub.go
# Randomsub router
├── randomsub.go
# Gossipsub router
├── gossipsub.go
├── score.go
├── score_params.go
└── mcache.go
```
### Tracing
The pubsub system supports _tracing_, which collects all events pertaining to the internals of the system. This allows you to recreate the complete message flow and state of the system for analysis purposes.
To enable tracing, instantiate the pubsub system using the `WithEventTracer` option; the option accepts a tracer with three available implementations in-package (trace to json, pb, or a remote peer).
If you want to trace using a remote peer, you can do so using the `traced` daemon from [go-libp2p-pubsub-tracer](https://github.com/libp2p/go-libp2p-pubsub-tracer). The package also includes a utility program, `tracestat`, for analyzing the traces collected by the daemon.
For instance, to capture the trace as a json file, you can use the following option:
```go
pubsub.NewGossipSub(..., pubsub.NewEventTracer(pubsub.NewJSONTracer("/path/to/trace.json")))
```
To capture the trace as a protobuf, you can use the following option:
```go
pubsub.NewGossipSub(..., pubsub.NewEventTracer(pubsub.NewPBTracer("/path/to/trace.pb")))
```
Finally, to use the remote tracer, you can use the following incantations:
```go
// assuming that your tracer runs in x.x.x.x and has a peer ID of QmTracer
pi, err := peer.AddrInfoFromP2pAddr(ma.StringCast("/ip4/x.x.x.x/tcp/4001/p2p/QmTracer"))
if err != nil {
panic(err)
}
tracer, err := pubsub.NewRemoteTracer(ctx, host, pi)
if err != nil {
panic(err)
}
ps, err := pubsub.NewGossipSub(..., pubsub.WithEventTracer(tracer))
```
## Contribute
Contributions welcome. Please check out [the issues](https://github.com/libp2p/go-libp2p-pubsub/issues).
Check out our [contributing document](https://github.com/libp2p/community/blob/master/contributing.md) for more information on how we work, and about contributing in general. Please be aware that all interactions related to multiformats are subject to the IPFS [Code of Conduct](https://github.com/ipfs/community/blob/master/code-of-conduct.md).
Small note: If editing the README, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification.
## License
The go-libp2p-pubsub project is dual-licensed under Apache 2.0 and MIT terms:
- Apache License, Version 2.0, ([LICENSE-APACHE](./LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license ([LICENSE-MIT](./LICENSE-MIT) or http://opensource.org/licenses/MIT)
package pubsub
import (
"sync"
"time"
"github.com/whyrusleeping/timecache"
"gitlab.dms3.io/p2p/go-p2p-core/peer"
)
// Blacklist is an interface for peer blacklisting.
type Blacklist interface {
Add(peer.ID) bool
Contains(peer.ID) bool
}
// MapBlacklist is a blacklist implementation using a perfect map
type MapBlacklist map[peer.ID]struct{}
// NewMapBlacklist creates a new MapBlacklist
func NewMapBlacklist() Blacklist {
return MapBlacklist(make(map[peer.ID]struct{}))
}
func (b MapBlacklist) Add(p peer.ID) bool {
b[p] = struct{}{}
return true
}
func (b MapBlacklist) Contains(p peer.ID) bool {
_, ok := b[p]
return ok
}
// TimeCachedBlacklist is a blacklist implementation using a time cache
type TimeCachedBlacklist struct {
sync.RWMutex
tc *timecache.TimeCache
}
// NewTimeCachedBlacklist creates a new TimeCachedBlacklist with the given expiry duration
func NewTimeCachedBlacklist(expiry time.Duration) (Blacklist, error) {
b := &TimeCachedBlacklist{tc: timecache.NewTimeCache(expiry)}
return b, nil
}
// Add returns a bool saying whether Add of peer was successful
func (b *TimeCachedBlacklist) Add(p peer.ID) bool {
b.Lock()
defer b.Unlock()
s := p.String()
if b.tc.Has(s) {
return false
}
b.tc.Add(s)
return true
}
func (b *TimeCachedBlacklist) Contains(p peer.ID) bool {
b.RLock()
defer b.RUnlock()
return b.tc.Has(p.String())
}
package pubsub
import (
"context"
"testing"
"time"
"gitlab.dms3.io/p2p/go-p2p-core/peer"
)
func TestMapBlacklist(t *testing.T) {
b := NewMapBlacklist()
p := peer.ID("test")
b.Add(p)
if !b.Contains(p) {
t.Fatal("peer not in the blacklist")
}
}
func TestTimeCachedBlacklist(t *testing.T) {
b, err := NewTimeCachedBlacklist(10 * time.Minute)
if err != nil {
t.Fatal(err)
}
p := peer.ID("test")
b.Add(p)
if !b.Contains(p) {
t.Fatal("peer not in the blacklist")
}
}
func TestBlacklist(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])
sub, err := psubs[1].Subscribe("test")
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 100)
psubs[1].BlacklistPeer(hosts[0].ID())
time.Sleep(time.Millisecond * 100)
psubs[0].Publish("test", []byte("message"))
wctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
_, err = sub.Next(wctx)
if err == nil {
t.Fatal("got message from blacklisted peer")
}
}
func TestBlacklist2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])
_, err := psubs[0].Subscribe("test")
if err != nil {
t.Fatal(err)
}
sub1, err := psubs[1].Subscribe("test")
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 100)
psubs[1].BlacklistPeer(hosts[0].ID())
time.Sleep(time.Millisecond * 100)
psubs[0].Publish("test", []byte("message"))
wctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
_, err = sub1.Next(wctx)
if err == nil {
t.Fatal("got message from blacklisted peer")
}
}
func TestBlacklist3(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 2)
psubs := getPubsubs(ctx, hosts)
psubs[1].BlacklistPeer(hosts[0].ID())
time.Sleep(time.Millisecond * 100)
connect(t, hosts[0], hosts[1])
sub, err := psubs[1].Subscribe("test")
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 100)
psubs[0].Publish("test", []byte("message"))
wctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
_, err = sub.Next(wctx)
if err == nil {
t.Fatal("got message from blacklisted peer")
}
}
coverage:
range: "50...100"
comment: off
package pubsub
import (
"bufio"
"context"
"io"
"gitlab.dms3.io/p2p/go-p2p-core/network"
"gitlab.dms3.io/p2p/go-p2p-core/peer"
pb "gitlab.dms3.io/p2p/go-p2p-pubsub/pb"
"gitlab.dms3.io/p2p/go-msgio/protoio"
"github.com/gogo/protobuf/proto"
ms "github.com/multiformats/go-multistream"
)
// get the initial RPC containing all of our subscriptions to send to new peers
func (p *PubSub) getHelloPacket() *RPC {
var rpc RPC
subscriptions := make(map[string]bool)
for t := range p.mySubs {
subscriptions[t] = true
}
for t := range p.myRelays {
subscriptions[t] = true
}
for t := range subscriptions {
as := &pb.RPC_SubOpts{
Topicid: proto.String(t),
Subscribe: proto.Bool(true),
}
rpc.Subscriptions = append(rpc.Subscriptions, as)
}
return &rpc
}
func (p *PubSub) handleNewStream(s network.Stream) {
peer := s.Conn().RemotePeer()
p.inboundStreamsMx.Lock()
other, dup := p.inboundStreams[peer]
if dup {
log.Debugf("duplicate inbound stream from %s; resetting other stream", peer)
other.Reset()
}
p.inboundStreams[peer] = s
p.inboundStreamsMx.Unlock()
defer func() {
p.inboundStreamsMx.Lock()
if p.inboundStreams[peer] == s {
delete(p.inboundStreams, peer)
}
p.inboundStreamsMx.Unlock()
}()
r := protoio.NewDelimitedReader(s, p.maxMessageSize)
for {
rpc := new(RPC)
err := r.ReadMsg(&rpc.RPC)
if err != nil {
if err != io.EOF {
s.Reset()
log.Debugf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err)
} else {
// Just be nice. They probably won't read this
// but it doesn't hurt to send it.
s.Close()
}
return
}
rpc.from = peer
select {
case p.incoming <- rpc:
case <-p.ctx.Done():
// Close is useless because the other side isn't reading.
s.Reset()
return
}
}
}
func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) {
s, err := p.host.NewStream(p.ctx, pid, p.rt.Protocols()...)
if err != nil {
log.Debug("opening new stream to peer: ", err, pid)
var ch chan peer.ID
if err == ms.ErrNotSupported {
ch = p.newPeerError
} else {
ch = p.peerDead
}
select {
case ch <- pid:
case <-ctx.Done():
}
return
}
go p.handleSendingMessages(ctx, s, outgoing)
go p.handlePeerEOF(ctx, s)
select {
case p.newPeerStream <- s:
case <-ctx.Done():
}
}
func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) {
r := protoio.NewDelimitedReader(s, p.maxMessageSize)
rpc := new(RPC)
for {
err := r.ReadMsg(&rpc.RPC)
if err != nil {
select {
case p.peerDead <- s.Conn().RemotePeer():
case <-ctx.Done():
}
return
}
log.Debugf("unexpected message from %s", s.Conn().RemotePeer())
}
}
func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing <-chan *RPC) {
bufw := bufio.NewWriter(s)
wc := protoio.NewDelimitedWriter(bufw)
writeMsg := func(msg proto.Message) error {
err := wc.WriteMsg(msg)
if err != nil {
return err
}
return bufw.Flush()
}
defer s.Close()
for {
select {
case rpc, ok := <-outgoing:
if !ok {
return
}
err := writeMsg(&rpc.RPC)
if err != nil {
s.Reset()
log.Debugf("writing message to %s: %s", s.Conn().RemotePeer(), err)
return
}
case <-ctx.Done():
return
}
}
}
func rpcWithSubs(subs ...*pb.RPC_SubOpts) *RPC {
return &RPC{
RPC: pb.RPC{
Subscriptions: subs,
},
}
}
func rpcWithMessages(msgs ...*pb.Message) *RPC {
return &RPC{RPC: pb.RPC{Publish: msgs}}
}
func rpcWithControl(msgs []*pb.Message,
ihave []*pb.ControlIHave,
iwant []*pb.ControlIWant,
graft []*pb.ControlGraft,
prune []*pb.ControlPrune) *RPC {
return &RPC{
RPC: pb.RPC{
Publish: msgs,
Control: &pb.ControlMessage{
Ihave: ihave,
Iwant: iwant,
Graft: graft,
Prune: prune,
},
},
}
}
func copyRPC(rpc *RPC) *RPC {
res := new(RPC)
*res = *rpc
if rpc.Control != nil {
res.Control = new(pb.ControlMessage)
*res.Control = *rpc.Control
}
return res
}
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: compat.proto
package compat_pb
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type Message struct {
From []byte `protobuf:"bytes,1,opt,name=from" json:"from,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`
Seqno []byte `protobuf:"bytes,3,opt,name=seqno" json:"seqno,omitempty"`
TopicIDs []string `protobuf:"bytes,4,rep,name=topicIDs" json:"topicIDs,omitempty"`
Signature []byte `protobuf:"bytes,5,opt,name=signature" json:"signature,omitempty"`
Key []byte `protobuf:"bytes,6,opt,name=key" json:"key,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {}
func (*Message) Descriptor() ([]byte, []int) {
return fileDescriptor_bced3ff93dcaa7f8, []int{0}
}
func (m *Message) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Message.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Message) XXX_Merge(src proto.Message) {
xxx_messageInfo_Message.Merge(m, src)
}
func (m *Message) XXX_Size() int {
return m.Size()
}
func (m *Message) XXX_DiscardUnknown() {
xxx_messageInfo_Message.DiscardUnknown(m)
}
var xxx_messageInfo_Message proto.InternalMessageInfo
func (m *Message) GetFrom() []byte {
if m != nil {
return m.From
}
return nil
}
func (m *Message) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
func (m *Message) GetSeqno() []byte {
if m != nil {
return m.Seqno
}
return nil
}
func (m *Message) GetTopicIDs() []string {
if m != nil {
return m.TopicIDs
}
return nil
}
func (m *Message) GetSignature() []byte {
if m != nil {
return m.Signature
}
return nil
}
func (m *Message) GetKey() []byte {
if m != nil {
return m.Key
}
return nil
}
func init() {
proto.RegisterType((*Message)(nil), "compat.pb.Message")
}
func init() { proto.RegisterFile("compat.proto", fileDescriptor_bced3ff93dcaa7f8) }
var fileDescriptor_bced3ff93dcaa7f8 = []byte{
// 165 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x49, 0xce, 0xcf, 0x2d,
0x48, 0x2c, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x84, 0xf1, 0x92, 0x94, 0x26, 0x33,
0x72, 0xb1, 0xfb, 0xa6, 0x16, 0x17, 0x27, 0xa6, 0xa7, 0x0a, 0x09, 0x71, 0xb1, 0xa4, 0x15, 0xe5,
0xe7, 0x4a, 0x30, 0x2a, 0x30, 0x6a, 0xf0, 0x04, 0x81, 0xd9, 0x20, 0xb1, 0x94, 0xc4, 0x92, 0x44,
0x09, 0x26, 0x88, 0x18, 0x88, 0x2d, 0x24, 0xc2, 0xc5, 0x5a, 0x9c, 0x5a, 0x98, 0x97, 0x2f, 0xc1,
0x0c, 0x16, 0x84, 0x70, 0x84, 0xa4, 0xb8, 0x38, 0x4a, 0xf2, 0x0b, 0x32, 0x93, 0x3d, 0x5d, 0x8a,
0x25, 0x58, 0x14, 0x98, 0x35, 0x38, 0x83, 0xe0, 0x7c, 0x21, 0x19, 0x2e, 0xce, 0xe2, 0xcc, 0xf4,
0xbc, 0xc4, 0x92, 0xd2, 0xa2, 0x54, 0x09, 0x56, 0xb0, 0x2e, 0x84, 0x80, 0x90, 0x00, 0x17, 0x73,
0x76, 0x6a, 0xa5, 0x04, 0x1b, 0x58, 0x1c, 0xc4, 0x74, 0xe2, 0x39, 0xf1, 0x48, 0x8e, 0xf1, 0xc2,
0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0x51, 0x81, 0xcf,
0x0e, 0xbd, 0x00, 0x00, 0x00,
}
func (m *Message) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Message) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Message) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if m.Key != nil {
i -= len(m.Key)
copy(dAtA[i:], m.Key)
i = encodeVarintCompat(dAtA, i, uint64(len(m.Key)))
i--
dAtA[i] = 0x32
}
if m.Signature != nil {
i -= len(m.Signature)
copy(dAtA[i:], m.Signature)
i = encodeVarintCompat(dAtA, i, uint64(len(m.Signature)))
i--
dAtA[i] = 0x2a
}
if len(m.TopicIDs) > 0 {
for iNdEx := len(m.TopicIDs) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.TopicIDs[iNdEx])
copy(dAtA[i:], m.TopicIDs[iNdEx])
i = encodeVarintCompat(dAtA, i, uint64(len(m.TopicIDs[iNdEx])))
i--
dAtA[i] = 0x22
}
}
if m.Seqno != nil {
i -= len(m.Seqno)
copy(dAtA[i:], m.Seqno)
i = encodeVarintCompat(dAtA, i, uint64(len(m.Seqno)))
i--
dAtA[i] = 0x1a
}
if m.Data != nil {
i -= len(m.Data)
copy(dAtA[i:], m.Data)
i = encodeVarintCompat(dAtA, i, uint64(len(m.Data)))
i--
dAtA[i] = 0x12
}
if m.From != nil {
i -= len(m.From)
copy(dAtA[i:], m.From)
i = encodeVarintCompat(dAtA, i, uint64(len(m.From)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintCompat(dAtA []byte, offset int, v uint64) int {
offset -= sovCompat(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *Message) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.From != nil {
l = len(m.From)
n += 1 + l + sovCompat(uint64(l))
}
if m.Data != nil {
l = len(m.Data)
n += 1 + l + sovCompat(uint64(l))
}
if m.Seqno != nil {
l = len(m.Seqno)
n += 1 + l + sovCompat(uint64(l))
}
if len(m.TopicIDs) > 0 {
for _, s := range m.TopicIDs {
l = len(s)
n += 1 + l + sovCompat(uint64(l))
}
}
if m.Signature != nil {
l = len(m.Signature)
n += 1 + l + sovCompat(uint64(l))
}
if m.Key != nil {
l = len(m.Key)
n += 1 + l + sovCompat(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovCompat(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozCompat(x uint64) (n int) {
return sovCompat(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *Message) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCompat
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Message: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Message: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field From", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCompat
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthCompat
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthCompat
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.From = append(m.From[:0], dAtA[iNdEx:postIndex]...)
if m.From == nil {
m.From = []byte{}
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCompat
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthCompat
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthCompat
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...)
if m.Data == nil {
m.Data = []byte{}
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Seqno", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCompat
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthCompat
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthCompat
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Seqno = append(m.Seqno[:0], dAtA[iNdEx:postIndex]...)
if m.Seqno == nil {
m.Seqno = []byte{}
}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field TopicIDs", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCompat
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthCompat
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthCompat
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.TopicIDs = append(m.TopicIDs, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Signature", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCompat
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthCompat
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthCompat
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Signature = append(m.Signature[:0], dAtA[iNdEx:postIndex]...)
if m.Signature == nil {
m.Signature = []byte{}
}
iNdEx = postIndex
case 6:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCompat
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthCompat
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthCompat
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Key = append(m.Key[:0], dAtA[iNdEx:postIndex]...)
if m.Key == nil {
m.Key = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipCompat(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthCompat
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthCompat
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipCompat(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowCompat
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowCompat
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowCompat
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthCompat
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupCompat
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthCompat
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthCompat = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowCompat = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupCompat = fmt.Errorf("proto: unexpected end of group")
)
syntax = "proto2";
package compat.pb;
message Message {
optional bytes from = 1;
optional bytes data = 2;
optional bytes seqno = 3;
repeated string topicIDs = 4;
optional bytes signature = 5;
optional bytes key = 6;
}
package pubsub
import (
"testing"
compat_pb "gitlab.dms3.io/p2p/go-p2p-pubsub/compat"
pb "gitlab.dms3.io/p2p/go-p2p-pubsub/pb"
)
func TestMultitopicMessageCompatibility(t *testing.T) {
topic1 := "topic1"
topic2 := "topic2"
newMessage1 := &pb.Message{
From: []byte("A"),
Data: []byte("blah"),
Seqno: []byte("123"),
Topic: &topic1,
Signature: []byte("a-signature"),
Key: []byte("a-key"),
}
oldMessage1 := &compat_pb.Message{
From: []byte("A"),
Data: []byte("blah"),
Seqno: []byte("123"),
TopicIDs: []string{topic1},
Signature: []byte("a-signature"),
Key: []byte("a-key"),
}
oldMessage2 := &compat_pb.Message{
From: []byte("A"),
Data: []byte("blah"),
Seqno: []byte("123"),
TopicIDs: []string{topic1, topic2},
Signature: []byte("a-signature"),
Key: []byte("a-key"),
}
newMessage1b, err := newMessage1.Marshal()
if err != nil {
t.Fatal(err)
}
oldMessage1b, err := oldMessage1.Marshal()
if err != nil {
t.Fatal(err)
}
oldMessage2b, err := oldMessage2.Marshal()
if err != nil {
t.Fatal(err)
}
newMessage := new(pb.Message)
oldMessage := new(compat_pb.Message)
err = newMessage.Unmarshal(oldMessage1b)
if err != nil {
t.Fatal(err)
}
if newMessage.GetTopic() != topic1 {
t.Fatalf("bad topic: expected %s, got %s", topic1, newMessage.GetTopic())
}
newMessage.Reset()
err = newMessage.Unmarshal(oldMessage2b)
if err != nil {
t.Fatal(err)
}
if newMessage.GetTopic() != topic2 {
t.Fatalf("bad topic: expected %s, got %s", topic2, newMessage.GetTopic())
}
err = oldMessage.Unmarshal(newMessage1b)
if err != nil {
t.Fatal(err)
}
topics := oldMessage.GetTopicIDs()
if len(topics) != 1 {
t.Fatalf("expected 1 topic, got %d", len(topics))
}
if topics[0] != topic1 {
t.Fatalf("bad topic: expected %s, got %s", topic1, topics[0])
}
}
package pubsub
import (
"context"
"math/rand"
"time"
"gitlab.dms3.io/p2p/go-p2p-core/discovery"
"gitlab.dms3.io/p2p/go-p2p-core/host"
"gitlab.dms3.io/p2p/go-p2p-core/peer"
discimpl "gitlab.dms3.io/p2p/go-p2p-discovery"
)
var (
// poll interval
// DiscoveryPollInitialDelay is how long the discovery system waits after it first starts before polling
DiscoveryPollInitialDelay = 0 * time.Millisecond
// DiscoveryPollInterval is approximately how long the discovery system waits in between checks for whether the
// more peers are needed for any topic
DiscoveryPollInterval = 1 * time.Second
)
// interval at which to retry advertisements when they fail.
const discoveryAdvertiseRetryInterval = 2 * time.Minute
type DiscoverOpt func(*discoverOptions) error
type discoverOptions struct {
connFactory BackoffConnectorFactory
opts []discovery.Option
}
func defaultDiscoverOptions() *discoverOptions {
rngSrc := rand.NewSource(rand.Int63())
minBackoff, maxBackoff := time.Second*10, time.Hour
cacheSize := 100
dialTimeout := time.Minute * 2
discoverOpts := &discoverOptions{
connFactory: func(host host.Host) (*discimpl.BackoffConnector, error) {
backoff := discimpl.NewExponentialBackoff(minBackoff, maxBackoff, discimpl.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc))
return discimpl.NewBackoffConnector(host, cacheSize, dialTimeout, backoff)
},
}
return discoverOpts
}
// discover represents the discovery pipeline.
// The discovery pipeline handles advertising and discovery of peers
type discover struct {
p *PubSub
// discovery assists in discovering and advertising peers for a topic
discovery discovery.Discovery
// advertising tracks which topics are being advertised
advertising map[string]context.CancelFunc
// discoverQ handles continuing peer discovery
discoverQ chan *discoverReq
// ongoing tracks ongoing discovery requests
ongoing map[string]struct{}
// done handles completion of a discovery request
done chan string
// connector handles connecting to new peers found via discovery
connector *discimpl.BackoffConnector
// options are the set of options to be used to complete struct construction in Start
options *discoverOptions
}
// MinTopicSize returns a function that checks if a router is ready for publishing based on the topic size.
// The router ultimately decides the whether it is ready or not, the given size is just a suggestion.
func MinTopicSize(size int) RouterReady {
return func(rt PubSubRouter, topic string) (bool, error) {
return rt.EnoughPeers(topic, size), nil
}
}
// Start attaches the discovery pipeline to a pubsub instance, initializes discovery and starts event loop
func (d *discover) Start(p *PubSub, opts ...DiscoverOpt) error {
if d.discovery == nil || p == nil {
return nil
}
d.p = p
d.advertising = make(map[string]context.CancelFunc)
d.discoverQ = make(chan *discoverReq, 32)
d.ongoing = make(map[string]struct{})
d.done = make(chan string)
conn, err := d.options.connFactory(p.host)
if err != nil {
return err
}
d.connector = conn
go d.discoverLoop()
go d.pollTimer()
return nil
}
func (d *discover) pollTimer() {
select {
case <-time.After(DiscoveryPollInitialDelay):
case <-d.p.ctx.Done():
return
}
select {
case d.p.eval <- d.requestDiscovery:
case <-d.p.ctx.Done():
return
}
ticker := time.NewTicker(DiscoveryPollInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
select {
case d.p.eval <- d.requestDiscovery:
case <-d.p.ctx.Done():
return
}
case <-d.p.ctx.Done():
return
}
}
}
func (d *discover) requestDiscovery() {
for t := range d.p.myTopics {
if !d.p.rt.EnoughPeers(t, 0) {
d.discoverQ <- &discoverReq{topic: t, done: make(chan struct{}, 1)}
}
}
}
func (d *discover) discoverLoop() {
for {
select {
case discover := <-d.discoverQ:
topic := discover.topic
if _, ok := d.ongoing[topic]; ok {
discover.done <- struct{}{}
continue
}
d.ongoing[topic] = struct{}{}
go func() {
d.handleDiscovery(d.p.ctx, topic, discover.opts)
select {
case d.done <- topic:
case <-d.p.ctx.Done():
}
discover.done <- struct{}{}
}()
case topic := <-d.done:
delete(d.ongoing, topic)
case <-d.p.ctx.Done():
return
}
}
}
// Advertise advertises this node's interest in a topic to a discovery service. Advertise is not thread-safe.
func (d *discover) Advertise(topic string) {
if d.discovery == nil {
return
}
advertisingCtx, cancel := context.WithCancel(d.p.ctx)
if _, ok := d.advertising[topic]; ok {
cancel()
return
}
d.advertising[topic] = cancel
go func() {
next, err := d.discovery.Advertise(advertisingCtx, topic)
if err != nil {
log.Warnf("bootstrap: error providing rendezvous for %s: %s", topic, err.Error())
if next == 0 {
next = discoveryAdvertiseRetryInterval
}
}
t := time.NewTimer(next)
defer t.Stop()
for advertisingCtx.Err() == nil {
select {
case <-t.C:
next, err = d.discovery.Advertise(advertisingCtx, topic)
if err != nil {
log.Warnf("bootstrap: error providing rendezvous for %s: %s", topic, err.Error())
if next == 0 {
next = discoveryAdvertiseRetryInterval
}
}
t.Reset(next)
case <-advertisingCtx.Done():
return
}
}
}()
}
// StopAdvertise stops advertising this node's interest in a topic. StopAdvertise is not thread-safe.
func (d *discover) StopAdvertise(topic string) {
if d.discovery == nil {
return
}
if advertiseCancel, ok := d.advertising[topic]; ok {
advertiseCancel()
delete(d.advertising, topic)
}
}
// Discover searches for additional peers interested in a given topic
func (d *discover) Discover(topic string, opts ...discovery.Option) {
if d.discovery == nil {
return
}
d.discoverQ <- &discoverReq{topic, opts, make(chan struct{}, 1)}
}
// Bootstrap attempts to bootstrap to a given topic. Returns true if bootstrapped successfully, false otherwise.
func (d *discover) Bootstrap(ctx context.Context, topic string, ready RouterReady, opts ...discovery.Option) bool {
if d.discovery == nil {
return true
}
t := time.NewTimer(time.Hour)
if !t.Stop() {
<-t.C
}
defer t.Stop()
for {
// Check if ready for publishing
bootstrapped := make(chan bool, 1)
select {
case d.p.eval <- func() {
done, _ := ready(d.p.rt, topic)
bootstrapped <- done
}:
if <-bootstrapped {
return true
}
case <-d.p.ctx.Done():
return false
case <-ctx.Done():
return false
}
// If not ready discover more peers
disc := &discoverReq{topic, opts, make(chan struct{}, 1)}
select {
case d.discoverQ <- disc:
case <-d.p.ctx.Done():
return false
case <-ctx.Done():
return false
}
select {
case <-disc.done:
case <-d.p.ctx.Done():
return false
case <-ctx.Done():
return false
}
t.Reset(time.Millisecond * 100)
select {
case <-t.C:
case <-d.p.ctx.Done():
return false
case <-ctx.Done():
return false
}
}
}
func (d *discover) handleDiscovery(ctx context.Context, topic string, opts []discovery.Option) {
discoverCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
peerCh, err := d.discovery.FindPeers(discoverCtx, topic, opts...)
if err != nil {
log.Debugf("error finding peers for topic %s: %v", topic, err)
return
}
d.connector.Connect(ctx, peerCh)
}
type discoverReq struct {
topic string
opts []discovery.Option
done chan struct{}
}
type pubSubDiscovery struct {
discovery.Discovery
opts []discovery.Option
}
func (d *pubSubDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
return d.Discovery.Advertise(ctx, "floodsub:"+ns, append(opts, d.opts...)...)
}
func (d *pubSubDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
return d.Discovery.FindPeers(ctx, "floodsub:"+ns, append(opts, d.opts...)...)
}
// WithDiscoveryOpts passes libp2p Discovery options into the PubSub discovery subsystem
func WithDiscoveryOpts(opts ...discovery.Option) DiscoverOpt {
return func(d *discoverOptions) error {
d.opts = opts
return nil
}
}
// BackoffConnectorFactory creates a BackoffConnector that is attached to a given host
type BackoffConnectorFactory func(host host.Host) (*discimpl.BackoffConnector, error)
// WithDiscoverConnector adds a custom connector that deals with how the discovery subsystem connects to peers
func WithDiscoverConnector(connFactory BackoffConnectorFactory) DiscoverOpt {
return func(d *discoverOptions) error {
d.connFactory = connFactory
return nil
}
}
package pubsub
import (
"bytes"
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"
"gitlab.dms3.io/p2p/go-p2p-core/discovery"
"gitlab.dms3.io/p2p/go-p2p-core/host"
"gitlab.dms3.io/p2p/go-p2p-core/peer"
)
type mockDiscoveryServer struct {
mx sync.Mutex
db map[string]map[peer.ID]*discoveryRegistration
}
type discoveryRegistration struct {
info peer.AddrInfo
ttl time.Duration
}
func newDiscoveryServer() *mockDiscoveryServer {
return &mockDiscoveryServer{
db: make(map[string]map[peer.ID]*discoveryRegistration),
}
}
func (s *mockDiscoveryServer) Advertise(ns string, info peer.AddrInfo, ttl time.Duration) (time.Duration, error) {
s.mx.Lock()
defer s.mx.Unlock()
peers, ok := s.db[ns]
if !ok {
peers = make(map[peer.ID]*discoveryRegistration)
s.db[ns] = peers
}
peers[info.ID] = &discoveryRegistration{info, ttl}
return ttl, nil
}
func (s *mockDiscoveryServer) FindPeers(ns string, limit int) (<-chan peer.AddrInfo, error) {
s.mx.Lock()
defer s.mx.Unlock()
peers, ok := s.db[ns]
if !ok || len(peers) == 0 {
emptyCh := make(chan peer.AddrInfo)
close(emptyCh)
return emptyCh, nil
}
count := len(peers)
if count > limit {
count = limit
}
ch := make(chan peer.AddrInfo, count)
numSent := 0
for _, reg := range peers {
if numSent == count {
break
}
numSent++
ch <- reg.info
}
close(ch)
return ch, nil
}
func (s *mockDiscoveryServer) hasPeerRecord(ns string, pid peer.ID) bool {
s.mx.Lock()
defer s.mx.Unlock()
if peers, ok := s.db[ns]; ok {
_, ok := peers[pid]
return ok
}
return false
}
type mockDiscoveryClient struct {
host host.Host
server *mockDiscoveryServer
}
func (d *mockDiscoveryClient) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
var options discovery.Options
err := options.Apply(opts...)
if err != nil {
return 0, err
}
return d.server.Advertise(ns, *host.InfoFromHost(d.host), options.Ttl)
}
func (d *mockDiscoveryClient) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
var options discovery.Options
err := options.Apply(opts...)
if err != nil {
return nil, err
}
return d.server.FindPeers(ns, options.Limit)
}
type dummyDiscovery struct{}
func (d *dummyDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
return time.Hour, nil
}
func (d *dummyDiscovery) FindPeers(ctx context.Context, ns string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) {
retCh := make(chan peer.AddrInfo)
go func() {
time.Sleep(time.Second)
close(retCh)
}()
return retCh, nil
}
func TestSimpleDiscovery(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Setup Discovery server and pubsub clients
const numHosts = 20
const topic = "foobar"
server := newDiscoveryServer()
discOpts := []discovery.Option{discovery.Limit(numHosts), discovery.TTL(1 * time.Minute)}
hosts := getNetHosts(t, ctx, numHosts)
psubs := make([]*PubSub, numHosts)
topicHandlers := make([]*Topic, numHosts)
for i, h := range hosts {
disc := &mockDiscoveryClient{h, server}
ps := getPubsub(ctx, h, WithDiscovery(disc, WithDiscoveryOpts(discOpts...)))
psubs[i] = ps
topicHandlers[i], _ = ps.Join(topic)
}
// Subscribe with all but one pubsub instance
msgs := make([]*Subscription, numHosts)
for i, th := range topicHandlers[1:] {
subch, err := th.Subscribe()
if err != nil {
t.Fatal(err)
}
msgs[i+1] = subch
}
// Wait for the advertisements to go through then check that they did
for {
server.mx.Lock()
numPeers := len(server.db["floodsub:foobar"])
server.mx.Unlock()
if numPeers == numHosts-1 {
break
} else {
time.Sleep(time.Millisecond * 100)
}
}
for i, h := range hosts[1:] {
if !server.hasPeerRecord("floodsub:"+topic, h.ID()) {
t.Fatalf("Server did not register host %d with ID: %s", i+1, h.ID().Pretty())
}
}
// Try subscribing followed by publishing a single message
subch, err := topicHandlers[0].Subscribe()
if err != nil {
t.Fatal(err)
}
msgs[0] = subch
msg := []byte("first message")
if err := topicHandlers[0].Publish(ctx, msg, WithReadiness(MinTopicSize(numHosts-1))); err != nil {
t.Fatal(err)
}
for _, sub := range msgs {
got, err := sub.Next(ctx)
if err != nil {
t.Fatal(sub.err)
}
if !bytes.Equal(msg, got.Data) {
t.Fatal("got wrong message!")
}
}
// Try random peers sending messages and make sure they are received
for i := 0; i < 100; i++ {
msg := []byte(fmt.Sprintf("%d the flooooooood %d", i, i))
owner := rand.Intn(len(psubs))
if err := topicHandlers[owner].Publish(ctx, msg, WithReadiness(MinTopicSize(1))); err != nil {
t.Fatal(err)
}
for _, sub := range msgs {
got, err := sub.Next(ctx)
if err != nil {
t.Fatal(sub.err)
}
if !bytes.Equal(msg, got.Data) {
t.Fatal("got wrong message!")
}
}
}
}
func TestGossipSubDiscoveryAfterBootstrap(t *testing.T) {
t.Skip("flaky test disabled")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Setup Discovery server and pubsub clients
partitionSize := GossipSubDlo - 1
numHosts := partitionSize * 2
const ttl = 1 * time.Minute
const topic = "foobar"
server1, server2 := newDiscoveryServer(), newDiscoveryServer()
discOpts := []discovery.Option{discovery.Limit(numHosts), discovery.TTL(ttl)}
// Put the pubsub clients into two partitions
hosts := getNetHosts(t, ctx, numHosts)
psubs := make([]*PubSub, numHosts)
topicHandlers := make([]*Topic, numHosts)
for i, h := range hosts {
s := server1
if i >= partitionSize {
s = server2
}
disc := &mockDiscoveryClient{h, s}
ps := getGossipsub(ctx, h, WithDiscovery(disc, WithDiscoveryOpts(discOpts...)))
psubs[i] = ps
topicHandlers[i], _ = ps.Join(topic)
}
msgs := make([]*Subscription, numHosts)
for i, th := range topicHandlers {
subch, err := th.Subscribe()
if err != nil {
t.Fatal(err)
}
msgs[i] = subch
}
// Wait for network to finish forming then join the partitions via discovery
for _, ps := range psubs {
waitUntilGossipsubMeshCount(ps, topic, partitionSize-1)
}
for i := 0; i < partitionSize; i++ {
if _, err := server1.Advertise("floodsub:"+topic, *host.InfoFromHost(hosts[i+partitionSize]), ttl); err != nil {
t.Fatal(err)
}
}
// test the mesh
for i := 0; i < 100; i++ {
msg := []byte(fmt.Sprintf("%d it's not a floooooood %d", i, i))
owner := rand.Intn(numHosts)
if err := topicHandlers[owner].Publish(ctx, msg, WithReadiness(MinTopicSize(numHosts-1))); err != nil {
t.Fatal(err)
}
for _, sub := range msgs {
got, err := sub.Next(ctx)
if err != nil {
t.Fatal(sub.err)
}
if !bytes.Equal(msg, got.Data) {
t.Fatal("got wrong message!")
}
}
}
}
func waitUntilGossipsubMeshCount(ps *PubSub, topic string, count int) {
done := false
doneCh := make(chan bool, 1)
rt := ps.rt.(*GossipSubRouter)
for !done {
ps.eval <- func() {
doneCh <- len(rt.mesh[topic]) == count
}
done = <-doneCh
if !done {
time.Sleep(100 * time.Millisecond)
}
}
}
package pubsub
//
// running tests in this package will likely require increasing the maximum
// number of open files. For example, on linux:
// ulimit -n 614400
// go test
// PASS
// ok gitlab.dms3.io/p2p/go-p2p-pubsub 248.481s
//
// The pubsub package provides facilities for the Publish/Subscribe pattern of message
// propagation, also known as overlay multicast.
// The implementation provides topic-based pubsub, with pluggable routing algorithms.
//
// The main interface to the library is the PubSub object.
// You can construct this object with the following constructors:
//
// - NewFloodSub creates an instance that uses the floodsub routing algorithm.
//
// - NewGossipSub creates an instance that uses the gossipsub routing algorithm.
//
// - NewRandomSub creates an instance that uses the randomsub routing algorithm.
//
// In addition, there is a generic constructor that creates a pubsub instance with
// a custom PubSubRouter interface. This procedure is currently reserved for internal
// use within the package.
//
// Once you have constructed a PubSub instance, you need to establish some connections
// to your peers; the implementation relies on ambient peer discovery, leaving bootstrap
// and active peer discovery up to the client.
//
// To publish a message to some topic, use Publish; you don't need to be subscribed
// to the topic in order to publish.
//
// To subscribe to a topic, use Subscribe; this will give you a subscription interface
// from which new messages can be pumped.
//
package pubsub
import (
"context"
"gitlab.dms3.io/p2p/go-p2p-core/host"
"gitlab.dms3.io/p2p/go-p2p-core/peer"
"gitlab.dms3.io/p2p/go-p2p-core/protocol"
)
const (
FloodSubID = protocol.ID("/floodsub/1.0.0")
FloodSubTopicSearchSize = 5
)
// NewFloodsubWithProtocols returns a new floodsub-enabled PubSub objecting using the protocols specified in ps.
func NewFloodsubWithProtocols(ctx context.Context, h host.Host, ps []protocol.ID, opts ...Option) (*PubSub, error) {
rt := &FloodSubRouter{
protocols: ps,
}
return NewPubSub(ctx, h, rt, opts...)
}
// NewFloodSub returns a new PubSub object using the FloodSubRouter.
func NewFloodSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
return NewFloodsubWithProtocols(ctx, h, []protocol.ID{FloodSubID}, opts...)
}
type FloodSubRouter struct {
p *PubSub
protocols []protocol.ID
tracer *pubsubTracer
}
func (fs *FloodSubRouter) Protocols() []protocol.ID {
return fs.protocols
}
func (fs *FloodSubRouter) Attach(p *PubSub) {
fs.p = p
fs.tracer = p.tracer
}
func (fs *FloodSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
fs.tracer.AddPeer(p, proto)
}
func (fs *FloodSubRouter) RemovePeer(p peer.ID) {
fs.tracer.RemovePeer(p)
}
func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool {
// check all peers in the topic
tmap, ok := fs.p.topics[topic]
if !ok {
return false
}
if suggested == 0 {
suggested = FloodSubTopicSearchSize
}
if len(tmap) >= suggested {
return true
}
return false
}
func (fs *FloodSubRouter) AcceptFrom(peer.ID) AcceptStatus {
return AcceptAll
}
func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {}
func (fs *FloodSubRouter) Publish(msg *Message) {
from := msg.ReceivedFrom
topic := msg.GetTopic()
out := rpcWithMessages(msg.Message)
for pid := range fs.p.topics[topic] {
if pid == from || pid == peer.ID(msg.GetFrom()) {
continue
}
mch, ok := fs.p.peers[pid]
if !ok {
continue
}
select {
case mch <- out:
fs.tracer.SendRPC(out, pid)
default:
log.Infof("dropping message to peer %s: queue full", pid)
fs.tracer.DropRPC(out, pid)
// Drop it. The peer is too slow.
}
}
}
func (fs *FloodSubRouter) Join(topic string) {
fs.tracer.Join(topic)
}
func (fs *FloodSubRouter) Leave(topic string) {
fs.tracer.Leave(topic)
}
This diff is collapsed.
This diff is collapsed.
package pubsub
import (
"math/rand"
"sync"
"time"
"gitlab.dms3.io/p2p/go-p2p-core/peer"
"gitlab.dms3.io/p2p/go-p2p-core/protocol"
)
// gossipTracer is an internal tracer that tracks IWANT requests in order to penalize
// peers who don't follow up on IWANT requests after an IHAVE advertisement.
// The tracking of promises is probabilistic to avoid using too much memory.
type gossipTracer struct {
sync.Mutex
msgID MsgIdFunction
followUpTime time.Duration
// promises for messages by message ID; for each message tracked, we track the promise
// expiration time for each peer.
promises map[string]map[peer.ID]time.Time
// promises for each peer; for each peer, we track the promised message IDs.
// this index allows us to quickly void promises when a peer is throttled.
peerPromises map[peer.ID]map[string]struct{}
}
func newGossipTracer() *gossipTracer {
return &gossipTracer{
msgID: DefaultMsgIdFn,
promises: make(map[string]map[peer.ID]time.Time),
peerPromises: make(map[peer.ID]map[string]struct{}),
}
}
func (gt *gossipTracer) Start(gs *GossipSubRouter) {
if gt == nil {
return
}
gt.msgID = gs.p.msgID
gt.followUpTime = gs.params.IWantFollowupTime
}
// track a promise to deliver a message from a list of msgIDs we are requesting
func (gt *gossipTracer) AddPromise(p peer.ID, msgIDs []string) {
if gt == nil {
return
}
idx := rand.Intn(len(msgIDs))
mid := msgIDs[idx]
gt.Lock()
defer gt.Unlock()
promises, ok := gt.promises[mid]
if !ok {
promises = make(map[peer.ID]time.Time)
gt.promises[mid] = promises
}
_, ok = promises[p]
if !ok {
promises[p] = time.Now().Add(gt.followUpTime)
peerPromises, ok := gt.peerPromises[p]
if !ok {
peerPromises = make(map[string]struct{})
gt.peerPromises[p] = peerPromises
}
peerPromises[mid] = struct{}{}
}
}
// returns the number of broken promises for each peer who didn't follow up
// on an IWANT request.
func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int {
if gt == nil {
return nil
}
gt.Lock()
defer gt.Unlock()
var res map[peer.ID]int
now := time.Now()
// find broken promises from peers
for mid, promises := range gt.promises {
for p, expire := range promises {
if expire.Before(now) {
if res == nil {
res = make(map[peer.ID]int)
}
res[p]++
delete(promises, p)
peerPromises := gt.peerPromises[p]
delete(peerPromises, mid)
if len(peerPromises) == 0 {
delete(gt.peerPromises, p)
}
}
}
if len(promises) == 0 {
delete(gt.promises, mid)
}
}
return res
}
var _ RawTracer = (*gossipTracer)(nil)
func (gt *gossipTracer) fulfillPromise(msg *Message) {
mid := gt.msgID(msg.Message)
gt.Lock()
defer gt.Unlock()
delete(gt.promises, mid)
}
func (gt *gossipTracer) DeliverMessage(msg *Message) {
// someone delivered a message, fulfill promises for it
gt.fulfillPromise(msg)
}
func (gt *gossipTracer) RejectMessage(msg *Message, reason string) {
// A message got rejected, so we can fulfill promises and let the score penalty apply
// from invalid message delivery.
// We do take exception and apply promise penalty regardless in the following cases, where
// the peer delivered an obviously invalid message.
switch reason {
case RejectMissingSignature:
return
case RejectInvalidSignature:
return
}
gt.fulfillPromise(msg)
}
func (gt *gossipTracer) ValidateMessage(msg *Message) {
// we consider the promise fulfilled as soon as the message begins validation
// if it was a case of signature issue it would have been rejected immediately
// without triggering the Validate trace
gt.fulfillPromise(msg)
}
func (gt *gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {}
func (gt *gossipTracer) RemovePeer(p peer.ID) {}
func (gt *gossipTracer) Join(topic string) {}
func (gt *gossipTracer) Leave(topic string) {}
func (gt *gossipTracer) Graft(p peer.ID, topic string) {}
func (gt *gossipTracer) Prune(p peer.ID, topic string) {}
func (gt *gossipTracer) DuplicateMessage(msg *Message) {}
func (gt *gossipTracer) ThrottlePeer(p peer.ID) {
gt.Lock()
defer gt.Unlock()
peerPromises, ok := gt.peerPromises[p]
if !ok {
return
}
for mid := range peerPromises {
promises := gt.promises[mid]
delete(promises, p)
if len(promises) == 0 {
delete(gt.promises, mid)
}
}
delete(gt.peerPromises, p)
}
package pubsub
import (
"testing"
"time"
pb "gitlab.dms3.io/p2p/go-p2p-pubsub/pb"
"gitlab.dms3.io/p2p/go-p2p-core/peer"
)
func TestBrokenPromises(t *testing.T) {
// tests that unfullfilled promises are tracked correctly
gt := newGossipTracer()
gt.followUpTime = 100 * time.Millisecond
peerA := peer.ID("A")
peerB := peer.ID("B")
peerC := peer.ID("C")
var msgs []*pb.Message
var mids []string
for i := 0; i < 100; i++ {
m := makeTestMessage(i)
m.From = []byte(peerA)
msgs = append(msgs, m)
mid := DefaultMsgIdFn(m)
mids = append(mids, mid)
}
gt.AddPromise(peerA, mids)
gt.AddPromise(peerB, mids)
gt.AddPromise(peerC, mids)
// no broken promises yet
brokenPromises := gt.GetBrokenPromises()
if brokenPromises != nil {
t.Fatal("expected no broken promises")
}
// throttle one of the peers to save his promises
gt.ThrottlePeer(peerC)
// make promises break
time.Sleep(GossipSubIWantFollowupTime + 10*time.Millisecond)
brokenPromises = gt.GetBrokenPromises()
if len(brokenPromises) != 2 {
t.Fatalf("expected 2 broken prmises, got %d", len(brokenPromises))
}
brokenPromisesA := brokenPromises[peerA]
if brokenPromisesA != 1 {
t.Fatalf("expected 1 broken promise from A, got %d", brokenPromisesA)
}
brokenPromisesB := brokenPromises[peerB]
if brokenPromisesB != 1 {
t.Fatalf("expected 1 broken promise from A, got %d", brokenPromisesB)
}
}
func TestNoBrokenPromises(t *testing.T) {
// like above, but this time we deliver messages to fullfil the promises
originalGossipSubIWantFollowupTime := GossipSubIWantFollowupTime
GossipSubIWantFollowupTime = 100 * time.Millisecond
defer func() {
GossipSubIWantFollowupTime = originalGossipSubIWantFollowupTime
}()
gt := newGossipTracer()
peerA := peer.ID("A")
peerB := peer.ID("B")
var msgs []*pb.Message
var mids []string
for i := 0; i < 100; i++ {
m := makeTestMessage(i)
m.From = []byte(peerA)
msgs = append(msgs, m)
mid := DefaultMsgIdFn(m)
mids = append(mids, mid)
}
gt.AddPromise(peerA, mids)
gt.AddPromise(peerB, mids)
for _, m := range msgs {
gt.DeliverMessage(&Message{Message: m})
}
time.Sleep(GossipSubIWantFollowupTime + 10*time.Millisecond)
// there should be no broken promises
brokenPromises := gt.GetBrokenPromises()
if brokenPromises != nil {
t.Fatal("expected no broken promises")
}
}
This diff is collapsed.
This diff is collapsed.
package pubsub
import (
"fmt"
"gitlab.dms3.io/p2p/go-p2p-core/protocol"
)
// GossipSubFeatureTest is a feature test function; it takes a feature and a protocol ID and
// should return true if the feature is supported by the protocol
type GossipSubFeatureTest = func(GossipSubFeature, protocol.ID) bool
// GossipSubFeature is a feature discriminant enum
type GossipSubFeature int
const (
// Protocol supports basic GossipSub Mesh -- gossipsub-v1.0 compatible
GossipSubFeatureMesh = iota
// Protocol supports Peer eXchange on prune -- gossipsub-v1.1 compatible
GossipSubFeaturePX
)
// GossipSubDefaultProtocols is the default gossipsub router protocol list
var GossipSubDefaultProtocols = []protocol.ID{GossipSubID_v11, GossipSubID_v10, FloodSubID}
// GossipSubDefaultFeatures is the feature test function for the default gossipsub protocols
func GossipSubDefaultFeatures(feat GossipSubFeature, proto protocol.ID) bool {
switch feat {
case GossipSubFeatureMesh:
return proto == GossipSubID_v11 || proto == GossipSubID_v10
case GossipSubFeaturePX:
return proto == GossipSubID_v11
default:
return false
}
}
// WithGossipSubProtocols is a gossipsub router option that configures a custom protocol list
// and feature test function
func WithGossipSubProtocols(protos []protocol.ID, feature GossipSubFeatureTest) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
}
gs.protos = protos
gs.feature = feature
return nil
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
{
"repoLeadMaintainer": {
"name": "Dimitris Vyzovitis",
"email": "vyzo@protocol.ai",
"username": "@vyzo"
},
"workingGroup": {
"name": "p2p",
"entryPoint": "https://gitlab.dms3.io/p2p/p2p"
}
}
This diff is collapsed.
This diff is collapsed.
package pubsub
import (
ma "github.com/multiformats/go-multiaddr"
"gitlab.dms3.io/p2p/go-p2p-core/network"
"gitlab.dms3.io/p2p/go-p2p-core/peer"
)
var _ network.Notifiee = (*PubSubNotif)(nil)
type PubSubNotif PubSub
func (p *PubSubNotif) OpenedStream(n network.Network, s network.Stream) {
}
func (p *PubSubNotif) ClosedStream(n network.Network, s network.Stream) {
}
func (p *PubSubNotif) Connected(n network.Network, c network.Conn) {
// ignore transient connections
if c.Stat().Transient {
return
}
go func() {
select {
case p.newPeers <- c.RemotePeer():
case <-p.ctx.Done():
}
}()
}
func (p *PubSubNotif) Disconnected(n network.Network, c network.Conn) {
}
func (p *PubSubNotif) Listen(n network.Network, _ ma.Multiaddr) {
}
func (p *PubSubNotif) ListenClose(n network.Network, _ ma.Multiaddr) {
}
func (p *PubSubNotif) Initialize() {
isTransient := func(pid peer.ID) bool {
for _, c := range p.host.Network().ConnsToPeer(pid) {
if !c.Stat().Transient {
return false
}
}
return true
}
for _, pid := range p.host.Network().Peers() {
if isTransient(pid) {
continue
}
select {
case p.newPeers <- pid:
case <-p.ctx.Done():
}
}
}
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --proto_path=$(GOPATH)/src:. --gogofast_out=. $<
clean:
rm -f *.pb.go
rm -f *.go
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment