Commit 703d46a6 authored by hannahhoward's avatar hannahhoward

feat(peermanager): remove leaky sendmessage

Breakup Startup function so that wantlists are not sent with each call to SendMessage
parent 26b8a09f
......@@ -52,6 +52,11 @@ func New(p peer.ID, network MessageNetwork) *MessageQueue {
}
}
// RefCount returns the number of open connections for this queue.
func (mq *MessageQueue) RefCount() int {
return mq.refcnt
}
// RefIncrement increments the refcount for a message queue.
func (mq *MessageQueue) RefIncrement() {
mq.refcnt++
......@@ -75,32 +80,31 @@ func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) {
}
}
// Startup starts the processing of messages, and creates an initial message
// based on the given initial wantlist.
func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist.Entry, entries []*bsmsg.Entry, ses uint64) {
// new peer, we will want to give them our full wantlist
// AddWantlist adds a complete session tracked want list to a message queue
func (mq *MessageQueue) AddWantlist(initialEntries []*wantlist.Entry) {
if len(initialEntries) > 0 {
fullwantlist := bsmsg.New(true)
if mq.out == nil {
mq.out = bsmsg.New(false)
}
for _, e := range initialEntries {
for k := range e.SesTrk {
mq.wl.AddEntry(e, k)
}
fullwantlist.AddEntry(e.Cid, e.Priority)
mq.out.AddEntry(e.Cid, e.Priority)
}
mq.out = fullwantlist
}
if len(initialEntries) > 0 || mq.addEntries(entries, ses) {
select {
case <-ctx.Done():
return
case mq.work <- struct{}{}:
default:
}
}
}
// Startup starts the processing of messages, and creates an initial message
// based on the given initial wantlist.
func (mq *MessageQueue) Startup(ctx context.Context) {
go mq.runQueue(ctx)
}
// Shutdown stops the processing of messages for a message queue.
......
......@@ -81,8 +81,8 @@ func TestStartupAndShutdown(t *testing.T) {
ses := testutil.GenerateSessionID()
wl := testutil.GenerateWantlist(10, ses)
messageQueue.Startup(ctx, wl.Entries(), nil, 0)
messageQueue.Startup(ctx)
messageQueue.AddWantlist(wl.Entries())
messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
if len(messages) != 1 {
t.Fatal("wrong number of messages were sent for initial wants")
......@@ -123,8 +123,9 @@ func TestSendingMessagesDeduped(t *testing.T) {
ses1 := testutil.GenerateSessionID()
ses2 := testutil.GenerateSessionID()
entries := testutil.GenerateMessageEntries(10, false)
messageQueue.Startup(ctx, nil, entries, ses1)
messageQueue.Startup(ctx)
messageQueue.AddMessage(entries, ses1)
messageQueue.AddMessage(entries, ses2)
messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
......@@ -147,8 +148,9 @@ func TestSendingMessagesPartialDupe(t *testing.T) {
entries := testutil.GenerateMessageEntries(10, false)
moreEntries := testutil.GenerateMessageEntries(5, false)
secondEntries := append(entries[5:], moreEntries...)
messageQueue.Startup(ctx, nil, entries, ses1)
messageQueue.Startup(ctx)
messageQueue.AddMessage(entries, ses1)
messageQueue.AddMessage(secondEntries, ses2)
messages := collectMessages(ctx, t, messagesSent, 20*time.Millisecond)
......
......@@ -21,8 +21,10 @@ var (
type PeerQueue interface {
RefIncrement()
RefDecrement() bool
RefCount() int
AddMessage(entries []*bsmsg.Entry, ses uint64)
Startup(ctx context.Context, initialEntries []*wantlist.Entry, entries []*bsmsg.Entry, ses uint64)
Startup(ctx context.Context)
AddWantlist(initialEntries []*wantlist.Entry)
Shutdown()
}
......@@ -71,13 +73,17 @@ func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) {
mq, ok := pm.peerQueues[p]
if ok {
if mq.RefCount() == 0 {
mq.AddWantlist(initialEntries)
}
mq.RefIncrement()
return
}
mq = pm.createPeerQueue(p)
pm.peerQueues[p] = mq
mq.Startup(pm.ctx, initialEntries, nil, 0)
mq.Startup(pm.ctx)
mq.AddWantlist(initialEntries)
}
// Disconnected is called to remove a peer from the pool.
......@@ -101,7 +107,7 @@ func (pm *PeerManager) Disconnected(p peer.ID) {
// SendMessage is called to send a message to all or some peers in the pool;
// if targets is nil, it sends to all.
func (pm *PeerManager) SendMessage(initialEntries []*wantlist.Entry, entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
pm.lk.Lock()
defer pm.lk.Unlock()
......@@ -115,13 +121,12 @@ func (pm *PeerManager) SendMessage(initialEntries []*wantlist.Entry, entries []*
if !ok {
p = pm.createPeerQueue(t)
pm.peerQueues[t] = p
p.Startup(pm.ctx, initialEntries, entries, from)
p.Startup(pm.ctx)
// this is a "0 reference" queue because we haven't actually connected to it
// sending the first message will cause it to connect
p.RefDecrement()
} else {
p.AddMessage(entries, from)
}
p.AddMessage(entries, from)
}
}
}
......@@ -25,13 +25,10 @@ type fakePeer struct {
messagesSent chan messageSent
}
func (fp *fakePeer) Startup(ctx context.Context, initialEntries []*wantlist.Entry, entries []*bsmsg.Entry, ses uint64) {
if entries != nil {
fp.AddMessage(entries, ses)
}
}
func (fp *fakePeer) Shutdown() {}
func (fp *fakePeer) RefIncrement() { fp.refcnt++ }
func (fp *fakePeer) Startup(ctx context.Context) {}
func (fp *fakePeer) Shutdown() {}
func (fp *fakePeer) RefCount() int { return fp.refcnt }
func (fp *fakePeer) RefIncrement() { fp.refcnt++ }
func (fp *fakePeer) RefDecrement() bool {
fp.refcnt--
return fp.refcnt > 0
......@@ -39,7 +36,7 @@ func (fp *fakePeer) RefDecrement() bool {
func (fp *fakePeer) AddMessage(entries []*bsmsg.Entry, ses uint64) {
fp.messagesSent <- messageSent{fp.p, entries, ses}
}
func (fp *fakePeer) AddWantlist(initialEntries []*wantlist.Entry) {}
func makePeerQueueFactory(messagesSent chan messageSent) PeerQueueFactory {
return func(p peer.ID) PeerQueue {
return &fakePeer{
......@@ -136,7 +133,7 @@ func TestSendingMessagesToPeers(t *testing.T) {
entries := testutil.GenerateMessageEntries(5, false)
ses := testutil.GenerateSessionID()
peerManager.SendMessage(nil, entries, nil, ses)
peerManager.SendMessage(entries, nil, ses)
peersReceived := collectAndCheckMessages(
ctx, t, messagesSent, entries, ses, 10*time.Millisecond)
......@@ -157,7 +154,7 @@ func TestSendingMessagesToPeers(t *testing.T) {
var peersToSendTo []peer.ID
peersToSendTo = append(peersToSendTo, peer1, peer3, peer4)
peerManager.SendMessage(nil, entries, peersToSendTo, ses)
peerManager.SendMessage(entries, peersToSendTo, ses)
peersReceived = collectAndCheckMessages(
ctx, t, messagesSent, entries, ses, 10*time.Millisecond)
......
......@@ -25,7 +25,7 @@ const (
type PeerHandler interface {
Disconnected(p peer.ID)
Connected(p peer.ID, initialEntries []*wantlist.Entry)
SendMessage(initialEntries []*wantlist.Entry, entries []*bsmsg.Entry, targets []peer.ID, from uint64)
SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64)
}
type wantMessage interface {
......@@ -232,7 +232,7 @@ func (ws *wantSet) handle(wm *WantManager) {
}
// broadcast those wantlist changes
wm.peerHandler.SendMessage(wm.bcwl.Entries(), ws.entries, ws.targets, ws.from)
wm.peerHandler.SendMessage(ws.entries, ws.targets, ws.from)
}
type isWantedMessage struct {
......
......@@ -15,15 +15,13 @@ import (
)
type fakePeerHandler struct {
lk sync.RWMutex
lastWantSet wantSet
initialEntries []*wantlist.Entry
lk sync.RWMutex
lastWantSet wantSet
}
func (fph *fakePeerHandler) SendMessage(initialEntries []*wantlist.Entry, entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
func (fph *fakePeerHandler) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64) {
fph.lk.Lock()
fph.lastWantSet = wantSet{entries, targets, from}
fph.initialEntries = initialEntries
fph.lk.Unlock()
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment