Commit a1d2d47a authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub

Merge pull request #3447 from ipfs/feat/multiplex-experiment

add option to enable go-multiplex experiment
parents 0814a997 10ddd40f
......@@ -48,6 +48,7 @@ const (
unrestrictedApiAccessKwd = "unrestricted-api"
writableKwd = "writable"
enableFloodSubKwd = "enable-pubsub-experiment"
enableMultiplexKwd = "enable-mplex-experiment"
// apiAddrKwd = "address-api"
// swarmAddrKwd = "address-swarm"
)
......@@ -158,6 +159,7 @@ Headers.
cmds.BoolOption(offlineKwd, "Run offline. Do not connect to the rest of the network but provide local API.").Default(false),
cmds.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."),
cmds.BoolOption(enableFloodSubKwd, "Instantiate the ipfs daemon with the experimental pubsub feature enabled."),
cmds.BoolOption(enableMultiplexKwd, "Add the experimental 'go-multiplex' stream muxer to libp2p on construction."),
// TODO: add way to override addresses. tricky part: updating the config if also --init.
// cmds.StringOption(apiAddrKwd, "Address for the daemon rpc API (overrides config)"),
......@@ -288,6 +290,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
offline, _, _ := req.Option(offlineKwd).Bool()
pubsub, _, _ := req.Option(enableFloodSubKwd).Bool()
mplex, _, _ := req.Option(enableMultiplexKwd).Bool()
// Start assembling node config
ncfg := &core.BuildCfg{
......@@ -296,6 +299,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
Online: !offline,
ExtraOpts: map[string]bool{
"pubsub": pubsub,
"mplex": mplex,
},
//TODO(Kubuxu): refactor Online vs Offline by adding Permanent vs Ephemeral
}
......
......@@ -197,7 +197,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
if cfg.Online {
do := setupDiscoveryOption(rcfg.Discovery)
if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub")); err != nil {
if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub"), cfg.getOpt("mplex")); err != nil {
return err
}
} else {
......
......@@ -14,7 +14,10 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"os"
"strings"
"time"
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
......@@ -44,19 +47,24 @@ import (
mamask "gx/ipfs/QmSMZwvs3n4GBikZ7hKzT17c3bk65FmyZo2JqtJ16swqCv/multiaddr-filter"
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
b58 "gx/ipfs/QmT8rehPR3F6bmwL6zjUN8XpiDBFFpMP2myPdC6ApsWfJf/go-base58"
mssmux "gx/ipfs/QmTfjLsou9ic6L4KqCcmbLSZcdiFu8q1v6njKp121pbbXx/go-smux-multistream"
ma "gx/ipfs/QmUAQaWbKxGCUTuoQVvvicbQNZ9APF5pDGWyAZSe93AtKH/go-multiaddr"
floodsub "gx/ipfs/QmV5jot2GfVXmgvetHExJCa2hprebf3AKjprZtuwaXSr1v/floodsub"
addrutil "gx/ipfs/QmVDnc2zvyQm8LhT72n22THcshvH7j3qPMnhvjerQER62T/go-addr-util"
spdy "gx/ipfs/QmWUNsat6Jb19nC5CiJCDXepTkxjdxi3eZqeoB6mrmmaGu/go-smux-spdystream"
swarm "gx/ipfs/QmWfxnAiQ5TnnCgiX9ikVUKFNHRgGhbgKdx5DoKPELD7P4/go-libp2p-swarm"
mplex "gx/ipfs/QmXGevGDVTqeKdisBzaxEK4CJZqfxeXiVSWLaXaVWcG5on/go-smux-multiplex"
metrics "gx/ipfs/QmY2otvyPM2sTaDsczo7Yuosg98sUMCJ9qx1gpPaAPTS9B/go-libp2p-metrics"
u "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util"
routing "gx/ipfs/QmbkGVaN9W6RYJK4Ws5FvMKXKDqdRQ5snhtaa92qP6L8eU/go-libp2p-routing"
yamux "gx/ipfs/Qmbn7RYyWzBVXiUp9jZ1dA4VADHy9DtS7iZLwfhEUQvm3U/go-smux-yamux"
discovery "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/discovery"
p2pbhost "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/host/basic"
rhost "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/host/routed"
ping "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/protocol/ping"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
pstore "gx/ipfs/QmeXj9VAjmYQZxpmVz7VzccbJrpmr8qkCDSjfVNsPTWTYU/go-libp2p-peerstore"
smux "gx/ipfs/QmeZBgYBHvxMukGK5ojg28BCNLB9SeXqT7XXg6o7r2GbJy/go-stream-muxer"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
ic "gx/ipfs/QmfWDLQjGjVe4fr5CoztYW2DYYjRysMJrFe1RCsXLPTf46/go-libp2p-crypto"
)
......@@ -129,7 +137,7 @@ type Mounts struct {
Ipns mount.Mount
}
func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub bool) error {
func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, mplex bool) error {
if n.PeerHost != nil { // already online.
return errors.New("node already online")
......@@ -159,7 +167,9 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
n.Reporter = metrics.NewBandwidthCounter()
}
peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, n.Reporter, addrfilter)
tpt := makeSmuxTransport(mplex)
peerhost, err := hostOption(ctx, n.Identity, n.Peerstore, n.Reporter, addrfilter, tpt)
if err != nil {
return err
}
......@@ -207,6 +217,34 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
return n.Bootstrap(DefaultBootstrapConfig)
}
func makeSmuxTransport(mplexExp bool) smux.Transport {
mstpt := mssmux.NewBlankTransport()
ymxtpt := &yamux.Transport{
AcceptBacklog: 8192,
ConnectionWriteTimeout: time.Second * 10,
KeepAliveInterval: time.Second * 30,
EnableKeepAlive: true,
MaxStreamWindowSize: uint32(1024 * 512),
LogOutput: ioutil.Discard,
}
mstpt.AddTransport("/yamux/1.0.0", ymxtpt)
mstpt.AddTransport("/spdy/3.1.0", spdy.Transport)
if mplexExp {
mstpt.AddTransport("/mplex/6.7.0", mplex.DefaultTransport)
}
// Allow muxer preference order overriding
if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" {
mstpt.OrderPreference = strings.Fields(prefs)
}
return mstpt
}
func setupDiscoveryOption(d config.Discovery) DiscoveryOption {
if d.MDNS.Enabled {
return func(ctx context.Context, h p2phost.Host) (discovery.Service, error) {
......@@ -616,19 +654,21 @@ func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) {
return listen, nil
}
type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (p2phost.Host, error)
type HostOption func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, tpt smux.Transport) (p2phost.Host, error)
var DefaultHostOption HostOption = constructPeerHost
// isolates the complex initialization steps
func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (p2phost.Host, error) {
func constructPeerHost(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, tpt smux.Transport) (p2phost.Host, error) {
// no addresses to begin with. we'll start later.
network, err := swarm.NewNetwork(ctx, nil, id, ps, bwr)
swrm, err := swarm.NewSwarmWithProtector(ctx, nil, id, ps, nil, tpt, bwr)
if err != nil {
return nil, err
}
network := (*swarm.Network)(swrm)
for _, f := range fs {
network.Swarm().Filters.AddDialFilter(f)
}
......
package coremock
import (
"context"
"net"
context "context"
"gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
syncds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/sync"
commands "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/repo"
config "github.com/ipfs/go-ipfs/repo/config"
ds2 "github.com/ipfs/go-ipfs/thirdparty/datastore2"
testutil "github.com/ipfs/go-ipfs/thirdparty/testutil"
host "gx/ipfs/QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ/go-libp2p-host"
"gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
syncds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/sync"
metrics "gx/ipfs/QmY2otvyPM2sTaDsczo7Yuosg98sUMCJ9qx1gpPaAPTS9B/go-libp2p-metrics"
mocknet "gx/ipfs/QmbzCT1CwxVZ2ednptC9RavuJe7Bv8DDi2Ne89qUrA37XM/go-libp2p/p2p/net/mock"
pstore "gx/ipfs/QmeXj9VAjmYQZxpmVz7VzccbJrpmr8qkCDSjfVNsPTWTYU/go-libp2p-peerstore"
smux "gx/ipfs/QmeZBgYBHvxMukGK5ojg28BCNLB9SeXqT7XXg6o7r2GbJy/go-stream-muxer"
peer "gx/ipfs/QmfMmLGoKzCHDN7cGgk64PJr4iipzidDRME8HABSJqvmhC/go-libp2p-peer"
)
......@@ -32,7 +33,7 @@ func NewMockNode() (*core.IpfsNode, error) {
}
func MockHostOption(mn mocknet.Mocknet) core.HostOption {
return func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (host.Host, error) {
return func(ctx context.Context, id peer.ID, ps pstore.Peerstore, bwr metrics.Reporter, fs []*net.IPNet, _ smux.Transport) (host.Host, error) {
return mn.AddPeerWithPeerstore(id, ps)
}
}
......
......@@ -287,6 +287,12 @@
"hash": "QmXuBJ7DR6k3rmUEKtvVMhwjmXDuJgXXPUt4LQXKBMsU93",
"name": "go-os-helper",
"version": "0.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmXGevGDVTqeKdisBzaxEK4CJZqfxeXiVSWLaXaVWcG5on",
"name": "go-smux-multiplex",
"version": "1.1.4"
}
],
"gxVersion": "0.4.0",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment