Unverified Commit 1f147c24 authored by Raúl Kripalani's avatar Raúl Kripalani Committed by GitHub

make wire protocol message size configurable. (#261)

parent a1999db8
......@@ -30,7 +30,7 @@ func (p *PubSub) getHelloPacket() *RPC {
}
func (p *PubSub) handleNewStream(s network.Stream) {
r := ggio.NewDelimitedReader(s, 1<<20)
r := ggio.NewDelimitedReader(s, p.maxMessageSize)
for {
rpc := new(RPC)
err := r.ReadMsg(&rpc.RPC)
......@@ -85,7 +85,7 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan
}
func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) {
r := ggio.NewDelimitedReader(s, 1<<20)
r := ggio.NewDelimitedReader(s, p.maxMessageSize)
rpc := new(RPC)
for {
err := r.ReadMsg(&rpc.RPC)
......
......@@ -1119,3 +1119,46 @@ func TestMessageSender(t *testing.T) {
}
}
}
func TestConfigurableMaxMessageSize(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getNetHosts(t, ctx, 10)
// use a 4mb limit; default is 1mb; we'll test with a 2mb payload.
psubs := getPubsubs(ctx, hosts, WithMaxMessageSize(1<<22))
sparseConnect(t, hosts)
time.Sleep(time.Millisecond * 100)
const topic = "foobar"
var subs []*Subscription
for _, ps := range psubs {
subch, err := ps.Subscribe(topic)
if err != nil {
t.Fatal(err)
}
subs = append(subs, subch)
}
// 2mb payload.
msg := make([]byte, 1<<21)
rand.Read(msg)
err := psubs[0].Publish(topic, msg)
if err != nil {
t.Fatal(err)
}
// make sure that all peers received the message.
for _, sub := range subs {
got, err := sub.Next(ctx)
if err != nil {
t.Fatal(sub.err)
}
if !bytes.Equal(msg, got.Data) {
t.Fatal("got wrong message!")
}
}
}
......@@ -2,7 +2,7 @@ module github.com/libp2p/go-libp2p-pubsub
require (
github.com/gogo/protobuf v1.3.1
github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/ipfs/go-log v0.0.1
github.com/libp2p/go-libp2p-blankhost v0.1.4
github.com/libp2p/go-libp2p-core v0.3.0
......
This diff is collapsed.
......@@ -23,6 +23,9 @@ import (
timecache "github.com/whyrusleeping/timecache"
)
// DefaultMaximumMessageSize is 1mb.
const DefaultMaxMessageSize = 1 << 20
var (
TimeCacheDuration = 120 * time.Second
)
......@@ -48,6 +51,10 @@ type PubSub struct {
tracer *pubsubTracer
// maxMessageSize is the maximum message size; it applies globally to all
// topics.
maxMessageSize int
// size of the outbound message channel that we maintain for each peer
peerOutboundQueueSize int
......@@ -184,6 +191,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
rt: rt,
val: newValidation(),
disc: &discover{},
maxMessageSize: DefaultMaxMessageSize,
peerOutboundQueueSize: 32,
signID: h.ID(),
signKey: h.Peerstore().PrivKey(h.ID()),
......@@ -353,6 +361,34 @@ func WithEventTracer(tracer EventTracer) Option {
}
}
// WithMaxMessageSize sets the global maximum message size for pubsub wire
// messages. The default value is 1MiB (DefaultMaxMessageSize).
//
// Observe the following warnings when setting this option.
//
// WARNING #1: Make sure to change the default protocol prefixes for floodsub
// (FloodSubID) and gossipsub (GossipSubID). This avoids accidentally joining
// the public default network, which uses the default max message size, and
// therefore will cause messages to be dropped.
//
// WARNING #2: Reducing the default max message limit is fine, if you are
// certain that your application messages will not exceed the new limit.
// However, be wary of increasing the limit, as pubsub networks are naturally
// write-amplifying, i.e. for every message we receive, we send D copies of the
// message to our peers. If those messages are large, the bandwidth requirements
// will grow linearly. Note that propagation is sent on the uplink, which
// traditionally is more constrained than the downlink. Instead, consider
// out-of-band retrieval for large messages, by sending a CID (Content-ID) or
// another type of locator, such that messages can be fetched on-demand, rather
// than being pushed proactively. Under this design, you'd use the pubsub layer
// as a signalling system, rather than a data delivery system.
func WithMaxMessageSize(maxMessageSize int) Option {
return func(ps *PubSub) error {
ps.maxMessageSize = maxMessageSize
return nil
}
}
// processLoop handles all inputs arriving on the channels
func (p *PubSub) processLoop(ctx context.Context) {
defer func() {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment