Unverified Commit 323a4110 authored by vyzo's avatar vyzo Committed by GitHub

Merge pull request #230 from aarshkshah1992/feat/configurable-outbound-msg-queue-size

Configurable outbound peer queue sizes
parents 534fe2f3 0dd2171a
......@@ -3,6 +3,7 @@ package pubsub
import (
"context"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"sync"
......@@ -45,6 +46,9 @@ type PubSub struct {
disc *discover
// size of the outbound message channel that we maintain for each peer
peerOutboundQueueSize int
// incoming messages from other peers
incoming chan *RPC
......@@ -169,38 +173,39 @@ type Option func(*PubSub) error
// NewPubSub returns a new PubSub management object.
func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option) (*PubSub, error) {
ps := &PubSub{
host: h,
ctx: ctx,
rt: rt,
val: newValidation(),
disc: &discover{},
signID: h.ID(),
signKey: h.Peerstore().PrivKey(h.ID()),
signStrict: true,
incoming: make(chan *RPC, 32),
publish: make(chan *Message),
newPeers: make(chan peer.ID),
newPeerStream: make(chan network.Stream),
newPeerError: make(chan peer.ID),
peerDead: make(chan peer.ID),
cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
addTopic: make(chan *addTopicReq),
rmTopic: make(chan *rmTopicReq),
getTopics: make(chan *topicReq),
sendMsg: make(chan *Message, 32),
addVal: make(chan *addValReq),
rmVal: make(chan *rmValReq),
eval: make(chan func()),
myTopics: make(map[string]*Topic),
mySubs: make(map[string]map[*Subscription]struct{}),
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC),
blacklist: NewMapBlacklist(),
blacklistPeer: make(chan peer.ID),
seenMessages: timecache.NewTimeCache(TimeCacheDuration),
counter: uint64(time.Now().UnixNano()),
host: h,
ctx: ctx,
rt: rt,
val: newValidation(),
disc: &discover{},
peerOutboundQueueSize: 32,
signID: h.ID(),
signKey: h.Peerstore().PrivKey(h.ID()),
signStrict: true,
incoming: make(chan *RPC, 32),
publish: make(chan *Message),
newPeers: make(chan peer.ID),
newPeerStream: make(chan network.Stream),
newPeerError: make(chan peer.ID),
peerDead: make(chan peer.ID),
cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
addTopic: make(chan *addTopicReq),
rmTopic: make(chan *rmTopicReq),
getTopics: make(chan *topicReq),
sendMsg: make(chan *Message, 32),
addVal: make(chan *addValReq),
rmVal: make(chan *rmValReq),
eval: make(chan func()),
myTopics: make(map[string]*Topic),
mySubs: make(map[string]map[*Subscription]struct{}),
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC),
blacklist: NewMapBlacklist(),
blacklistPeer: make(chan peer.ID),
seenMessages: timecache.NewTimeCache(TimeCacheDuration),
counter: uint64(time.Now().UnixNano()),
}
for _, opt := range opts {
......@@ -232,6 +237,18 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
return ps, nil
}
// WithPeerOutboundQueueSize is an option to set the buffer size for outbound messages to a peer
// We start dropping messages to a peer if the outbound queue if full
func WithPeerOutboundQueueSize(size int) Option {
return func(p *PubSub) error {
if size <= 0 {
return errors.New("outbound queue size must always be positive")
}
p.peerOutboundQueueSize = size
return nil
}
}
// WithMessageSigning enables or disables message signing (enabled by default).
func WithMessageSigning(enabled bool) Option {
return func(p *PubSub) error {
......@@ -327,7 +344,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
continue
}
messages := make(chan *RPC, 32)
messages := make(chan *RPC, p.peerOutboundQueueSize)
messages <- p.getHelloPacket()
go p.handleNewPeer(ctx, pid, messages)
p.peers[pid] = messages
......@@ -366,7 +383,7 @@ func (p *PubSub) processLoop(ctx context.Context) {
// still connected, must be a duplicate connection being closed.
// we respawn the writer as we need to ensure there is a stream active
log.Warning("peer declared dead but still connected; respawning writer: ", pid)
messages := make(chan *RPC, 32)
messages := make(chan *RPC, p.peerOutboundQueueSize)
messages <- p.getHelloPacket()
go p.handleNewPeer(ctx, pid, messages)
p.peers[pid] = messages
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment