Commit e1c40dfa authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub

Merge pull request #3202 from ipfs/feat/floodsub

Add a very basic many to many pubsub
parents dcb21bd2 91db6f31
......@@ -21,10 +21,9 @@ import (
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
migrate "github.com/ipfs/go-ipfs/repo/fsrepo/migrations"
"gx/ipfs/QmPpRcbNUXauP3zWZ1NJMLWpe4QnmEHrd2ba2D3yqWznw7/go-multiaddr-net"
"gx/ipfs/QmR3KwhXCRLTNZB59vELb2HhEWrGy9nuychepxFtj3wWYa/client_golang/prometheus"
mprome "gx/ipfs/QmXWro6iddJRbGWUoZDpTu6tjo5EXX4xJHHR9VczeoGZbw/go-metrics-prometheus"
"gx/ipfs/QmY83KqqnQ286ZWbV2x7ixpeemH3cBpk8R54egS619WYff/go-multiaddr-net"
pstore "gx/ipfs/QmYkwVGkwoPbMVQEbf6LonZg4SsCxGP3H7PBEtdNCNRyxD/go-libp2p-peerstore"
ma "gx/ipfs/QmYzDkkgAEmrcNzFCiYo6L1dTX4EAG1gZkbtdbd9trL4vd/go-multiaddr"
util "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
......@@ -47,6 +46,7 @@ const (
unencryptTransportKwd = "disable-transport-encryption"
unrestrictedApiAccessKwd = "unrestricted-api"
writableKwd = "writable"
enableFloodSubKwd = "enable-pubsub-experiment"
// apiAddrKwd = "address-api"
// swarmAddrKwd = "address-swarm"
)
......@@ -145,6 +145,7 @@ Headers.
cmds.BoolOption(adjustFDLimitKwd, "Check and raise file descriptor limits if needed").Default(true),
cmds.BoolOption(offlineKwd, "Run offline. Do not connect to the rest of the network but provide local API.").Default(false),
cmds.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."),
cmds.BoolOption(enableFloodSubKwd, "Instantiate the ipfs daemon with the experimental pubsub feature enabled."),
// TODO: add way to override addresses. tricky part: updating the config if also --init.
// cmds.StringOption(apiAddrKwd, "Address for the daemon rpc API (overrides config)"),
......@@ -266,14 +267,19 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
return
}
offline, _, _ := req.Option(offlineKwd).Bool()
pubsub, _, _ := req.Option(enableFloodSubKwd).Bool()
// Start assembling node config
ncfg := &core.BuildCfg{
Repo: repo,
Permament: true, // It is temporary way to signify that node is permament
//TODO(Kubuxu): refactor Online vs Offline by adding Permement vs Epthemeral
Online: !offline,
ExtraOpts: map[string]bool{
"pubsub": pubsub,
},
//TODO(Kubuxu): refactor Online vs Offline by adding Permanent vs Ephemeral
}
offline, _, _ := req.Option(offlineKwd).Bool()
ncfg.Online = !offline
routingOption, _, err := req.Option(routingOptionKwd).String()
if err != nil {
......
......@@ -17,7 +17,7 @@ import (
"syscall"
"time"
manet "gx/ipfs/QmPpRcbNUXauP3zWZ1NJMLWpe4QnmEHrd2ba2D3yqWznw7/go-multiaddr-net"
manet "gx/ipfs/QmY83KqqnQ286ZWbV2x7ixpeemH3cBpk8R54egS619WYff/go-multiaddr-net"
ma "gx/ipfs/QmYzDkkgAEmrcNzFCiYo6L1dTX4EAG1gZkbtdbd9trL4vd/go-multiaddr"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
......
......@@ -32,6 +32,9 @@ type BuildCfg struct {
// If online is set, the node will have networking enabled
Online bool
// ExtraOpts is a map of extra options used to configure the ipfs nodes creation
ExtraOpts map[string]bool
// If permament then node should run more expensive processes
// that will improve performance in long run
Permament bool
......@@ -44,6 +47,14 @@ type BuildCfg struct {
Repo repo.Repo
}
func (cfg *BuildCfg) getOpt(key string) bool {
if cfg.ExtraOpts == nil {
return false
}
return cfg.ExtraOpts[key]
}
func (cfg *BuildCfg) fillDefaults() error {
if cfg.Repo != nil && cfg.NilRepo {
return errors.New("cannot set a repo and specify nilrepo at the same time")
......@@ -184,7 +195,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
if cfg.Online {
do := setupDiscoveryOption(rcfg.Discovery)
if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do); err != nil {
if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub")); err != nil {
return err
}
} else {
......
package commands
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"sync"
"time"
blocks "github.com/ipfs/go-ipfs/blocks"
cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
pstore "gx/ipfs/QmYkwVGkwoPbMVQEbf6LonZg4SsCxGP3H7PBEtdNCNRyxD/go-libp2p-peerstore"
u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
floodsub "gx/ipfs/QmaUewj1HPiCX5mjNHmevQiNWr4eeAn7HBfHcGVbRyafdo/floodsub"
key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key"
cid "gx/ipfs/QmfSc2xehWmWLnwwYR91Y8QF4xdASypTFVknutoKQS3GHp/go-cid"
)
var PubsubCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "An experimental publish-subscribe system on ipfs.",
ShortDescription: `
ipfs pubsub allows you to publish messages to a given topic, and also to
subscribe to new messages on a given topic.
This is an experimental feature. It is not intended in its current state
to be used in a production environment.
To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
},
Subcommands: map[string]*cmds.Command{
"pub": PubsubPubCmd,
"sub": PubsubSubCmd,
"ls": PubsubLsCmd,
"peers": PubsubPeersCmd,
},
}
var PubsubSubCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Subscribe to messages on a given topic.",
ShortDescription: `
ipfs pubsub sub subscribes to messages on a given topic.
This is an experimental feature. It is not intended in its current state
to be used in a production environment.
To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("topic", true, false, "String name of topic to subscribe to."),
},
Options: []cmds.Option{
cmds.BoolOption("discover", "try to discover other peers subscribed to the same topic"),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
// Must be online!
if !n.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
}
if n.Floodsub == nil {
res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use."), cmds.ErrNormal)
return
}
topic := req.Arguments()[0]
msgs, err := n.Floodsub.Subscribe(req.Context(), topic)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
out := make(chan interface{})
res.SetOutput((<-chan interface{})(out))
ctx := req.Context()
go func() {
defer close(out)
for {
select {
case msg, ok := <-msgs:
if !ok {
return
}
out <- msg
case <-ctx.Done():
n.Floodsub.Unsub(topic)
}
}
}()
discover, _, _ := req.Option("discover").Bool()
if discover {
go func() {
blk := blocks.NewBlock([]byte("floodsub:" + topic))
cid, err := n.Blocks.AddObject(blk)
if err != nil {
log.Error("pubsub discovery: ", err)
return
}
connectToPubSubPeers(req.Context(), n, cid)
}()
}
},
Marshalers: cmds.MarshalerMap{
cmds.Text: getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) {
return bytes.NewReader(m.Data), nil
}),
"ndpayload": getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) {
m.Data = append(m.Data, '\n')
return bytes.NewReader(m.Data), nil
}),
"lenpayload": getPsMsgMarshaler(func(m *floodsub.Message) (io.Reader, error) {
buf := make([]byte, 8)
n := binary.PutUvarint(buf, uint64(len(m.Data)))
return io.MultiReader(bytes.NewReader(buf[:n]), bytes.NewReader(m.Data)), nil
}),
},
Type: floodsub.Message{},
}
func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid *cid.Cid) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
provs := n.Routing.FindProvidersAsync(ctx, key.Key(cid.Hash()), 10)
wg := &sync.WaitGroup{}
for p := range provs {
wg.Add(1)
go func(pi pstore.PeerInfo) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
err := n.PeerHost.Connect(ctx, pi)
if err != nil {
log.Info("pubsub discover: ", err)
return
}
log.Info("connected to pubsub peer:", pi.ID)
}(p)
}
wg.Wait()
}
func getPsMsgMarshaler(f func(m *floodsub.Message) (io.Reader, error)) func(cmds.Response) (io.Reader, error) {
return func(res cmds.Response) (io.Reader, error) {
outChan, ok := res.Output().(<-chan interface{})
if !ok {
return nil, u.ErrCast()
}
marshal := func(v interface{}) (io.Reader, error) {
obj, ok := v.(*floodsub.Message)
if !ok {
return nil, u.ErrCast()
}
return f(obj)
}
return &cmds.ChannelMarshaler{
Channel: outChan,
Marshaler: marshal,
Res: res,
}, nil
}
}
var PubsubPubCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Publish a message to a given pubsub topic.",
ShortDescription: `
ipfs pubsub pub publishes a message to a specified topic.
This is an experimental feature. It is not intended in its current state
to be used in a production environment.
To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("topic", true, false, "Topic to publish to."),
cmds.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
// Must be online!
if !n.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
}
if n.Floodsub == nil {
res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use."), cmds.ErrNormal)
return
}
topic := req.Arguments()[0]
for _, data := range req.Arguments()[1:] {
if err := n.Floodsub.Publish(topic, []byte(data)); err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
}
},
}
var PubsubLsCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "List subscribed topics by name.",
ShortDescription: `
ipfs pubsub ls lists out the names of topics you are currently subscribed to.
This is an experimental feature. It is not intended in its current state
to be used in a production environment.
To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
// Must be online!
if !n.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
}
if n.Floodsub == nil {
res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use."), cmds.ErrNormal)
return
}
res.SetOutput(&stringList{n.Floodsub.GetTopics()})
},
Type: stringList{},
Marshalers: cmds.MarshalerMap{
cmds.Text: stringListMarshaler,
},
}
var PubsubPeersCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "List all peers we are currently pubsubbing with.",
ShortDescription: `
ipfs pubsub peers lists out the pubsub peers you are currently connected to.
This is an experimental feature. It is not intended in its current state
to be used in a production environment.
To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
// Must be online!
if !n.OnlineMode() {
res.SetError(errNotOnline, cmds.ErrClient)
return
}
if n.Floodsub == nil {
res.SetError(fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use."), cmds.ErrNormal)
return
}
var out []string
for _, p := range n.Floodsub.ListPeers() {
out = append(out, p.Pretty())
}
res.SetOutput(&stringList{out})
},
Type: stringList{},
Marshalers: cmds.MarshalerMap{
cmds.Text: stringListMarshaler,
},
}
......@@ -99,6 +99,7 @@ var rootSubcommands = map[string]*cmds.Command{
"object": ocmd.ObjectCmd,
"pin": PinCmd,
"ping": PingCmd,
"pubsub": PubsubCmd,
"refs": RefsCmd,
"repo": RepoCmd,
"resolve": ResolveCmd,
......
......@@ -8,7 +8,7 @@ import (
cmds "github.com/ipfs/go-ipfs/commands"
config "github.com/ipfs/go-ipfs/repo/config"
manet "gx/ipfs/QmPpRcbNUXauP3zWZ1NJMLWpe4QnmEHrd2ba2D3yqWznw7/go-multiaddr-net"
manet "gx/ipfs/QmY83KqqnQ286ZWbV2x7ixpeemH3cBpk8R54egS619WYff/go-multiaddr-net"
sysi "gx/ipfs/QmZRjKbHa6DenStpQJFiaPcEwkZqrx7TH6xTf342LDU3qM/go-sysinfo"
)
......
......@@ -17,6 +17,7 @@ import (
"time"
diag "github.com/ipfs/go-ipfs/diagnostics"
goprocess "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
......@@ -26,6 +27,7 @@ import (
pstore "gx/ipfs/QmYkwVGkwoPbMVQEbf6LonZg4SsCxGP3H7PBEtdNCNRyxD/go-libp2p-peerstore"
ma "gx/ipfs/QmYzDkkgAEmrcNzFCiYo6L1dTX4EAG1gZkbtdbd9trL4vd/go-multiaddr"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
floodsub "gx/ipfs/QmaUewj1HPiCX5mjNHmevQiNWr4eeAn7HBfHcGVbRyafdo/floodsub"
discovery "gx/ipfs/QmbiRCGZqhfcSjnm9icGz3oNQQdPLAnLWnKHXixaEWXVCN/go-libp2p/p2p/discovery"
p2phost "gx/ipfs/QmbiRCGZqhfcSjnm9icGz3oNQQdPLAnLWnKHXixaEWXVCN/go-libp2p/p2p/host"
p2pbhost "gx/ipfs/QmbiRCGZqhfcSjnm9icGz3oNQQdPLAnLWnKHXixaEWXVCN/go-libp2p/p2p/host/basic"
......@@ -112,6 +114,8 @@ type IpfsNode struct {
Reprovider *rp.Reprovider // the value reprovider system
IpnsRepub *ipnsrp.Republisher
Floodsub *floodsub.PubSub
proc goprocess.Process
ctx context.Context
......@@ -126,7 +130,7 @@ type Mounts struct {
Ipns mount.Mount
}
func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption) error {
func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub bool) error {
if n.PeerHost != nil { // already online.
return errors.New("node already online")
......@@ -184,6 +188,10 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
go n.Reprovider.ProvideEvery(ctx, interval)
}
if pubsub {
n.Floodsub = floodsub.NewFloodSub(ctx, peerhost)
}
// setup local discovery
if do != nil {
service, err := do(ctx, n.PeerHost)
......
......@@ -11,9 +11,9 @@ import (
"time"
core "github.com/ipfs/go-ipfs/core"
manet "gx/ipfs/QmPpRcbNUXauP3zWZ1NJMLWpe4QnmEHrd2ba2D3yqWznw7/go-multiaddr-net"
"gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
manet "gx/ipfs/QmY83KqqnQ286ZWbV2x7ixpeemH3cBpk8R54egS619WYff/go-multiaddr-net"
ma "gx/ipfs/QmYzDkkgAEmrcNzFCiYo6L1dTX4EAG1gZkbtdbd9trL4vd/go-multiaddr"
)
......
......@@ -263,6 +263,12 @@
"hash": "QmdCL8M8DXJdSRnwhpDhukX5r8ydjxnzPJpaKrFudDA8yn",
"name": "hang-fds",
"version": "0.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmaUewj1HPiCX5mjNHmevQiNWr4eeAn7HBfHcGVbRyafdo",
"name": "floodsub",
"version": "0.6.2"
}
],
"gxVersion": "0.4.0",
......
......@@ -10,8 +10,8 @@ import (
"os"
"time"
manet "gx/ipfs/QmPpRcbNUXauP3zWZ1NJMLWpe4QnmEHrd2ba2D3yqWznw7/go-multiaddr-net"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
manet "gx/ipfs/QmY83KqqnQ286ZWbV2x7ixpeemH3cBpk8R54egS619WYff/go-multiaddr-net"
ma "gx/ipfs/QmYzDkkgAEmrcNzFCiYo6L1dTX4EAG1gZkbtdbd9trL4vd/go-multiaddr"
)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment