Unverified Commit ea899f5d authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

Shutdown notifications go routines (#109)

* test(benchmarks): add benchmark for go routine leak

* fix(notifications): shutdown notification queues

make sure all notification publishers are actually shut down, and don't start them automatically
parent 536970d8
...@@ -29,6 +29,7 @@ import ( ...@@ -29,6 +29,7 @@ import (
basicnode "github.com/ipld/go-ipld-prime/node/basic" basicnode "github.com/ipld/go-ipld-prime/node/basic"
ipldselector "github.com/ipld/go-ipld-prime/traversal/selector" ipldselector "github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder" "github.com/ipld/go-ipld-prime/traversal/selector/builder"
peer "github.com/libp2p/go-libp2p-core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
...@@ -61,6 +62,74 @@ func BenchmarkRoundtripSuccess(b *testing.B) { ...@@ -61,6 +62,74 @@ func BenchmarkRoundtripSuccess(b *testing.B) {
b.Run("test-p2p-stress-10-128MB-1KB-chunks", func(b *testing.B) { b.Run("test-p2p-stress-10-128MB-1KB-chunks", func(b *testing.B) {
p2pStrestTest(ctx, b, 10, allFilesUniformSize(128*(1<<20), 1<<10, 1024), tdm) p2pStrestTest(ctx, b, 10, allFilesUniformSize(128*(1<<20), 1<<10, 1024), tdm)
}) })
b.Run("test-repeated-disconnects-20-10000", func(b *testing.B) {
benchmarkRepeatedDisconnects(ctx, b, 20, allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel), tdm)
})
}
func benchmarkRepeatedDisconnects(ctx context.Context, b *testing.B, numnodes int, df distFunc, tdm *tempDirMaker) {
ctx, cancel := context.WithCancel(ctx)
mn := mocknet.New(ctx)
net := tn.StreamNet(ctx, mn)
ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm)
instances, err := ig.Instances(numnodes + 1)
require.NoError(b, err)
var allCids [][]cid.Cid
for i := 0; i < b.N; i++ {
thisCids := df(ctx, b, instances[1:])
allCids = append(allCids, thisCids)
}
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
allSelector := ssb.ExploreRecursive(ipldselector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()
runtime.GC()
b.ResetTimer()
b.ReportAllocs()
fetcher := instances[0]
for i := 0; i < b.N; i++ {
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
require.NoError(b, err)
start := time.Now()
for j := 0; j < numnodes; j++ {
instance := instances[j+1]
_, errChan := fetcher.Exchange.Request(ctx, instance.Peer, cidlink.Link{Cid: allCids[i][j]}, allSelector)
wg.Add(1)
go func(other peer.ID) {
defer func() {
mn.DisconnectPeers(fetcher.Peer, other)
wg.Done()
}()
for {
select {
case <-ctx.Done():
return
case err, ok := <-errChan:
if !ok {
return
}
b.Fatalf("received error on request: %s", err.Error())
}
}
}(instance.Peer)
}
wg.Wait()
result := runStats{
Time: time.Since(start),
Name: b.Name(),
}
benchmarkLog = append(benchmarkLog, result)
cancel()
}
cancel()
time.Sleep(100 * time.Millisecond)
b.Logf("Number of running go-routines: %d", runtime.NumGoroutine())
testinstance.Close(instances)
ig.Close()
} }
func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, tdm *tempDirMaker) { func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, tdm *tempDirMaker) {
......
...@@ -111,6 +111,8 @@ func (mq *MessageQueue) Shutdown() { ...@@ -111,6 +111,8 @@ func (mq *MessageQueue) Shutdown() {
} }
func (mq *MessageQueue) runQueue() { func (mq *MessageQueue) runQueue() {
defer mq.eventPublisher.Shutdown()
mq.eventPublisher.Startup()
for { for {
select { select {
case <-mq.outgoingWork: case <-mq.outgoingWork:
......
...@@ -147,6 +147,7 @@ func TestProcessingNotification(t *testing.T) { ...@@ -147,6 +147,7 @@ func TestProcessingNotification(t *testing.T) {
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup} messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
messageQueue := New(ctx, peer, messageNetwork) messageQueue := New(ctx, peer, messageNetwork)
messageQueue.Startup()
waitGroup.Add(1) waitGroup.Add(1)
blks := testutil.GenerateBlocksOfSize(3, 128) blks := testutil.GenerateBlocksOfSize(3, 128)
...@@ -164,7 +165,6 @@ func TestProcessingNotification(t *testing.T) { ...@@ -164,7 +165,6 @@ func TestProcessingNotification(t *testing.T) {
messageQueue.AddResponses(newMessage.Responses(), blks, notifee) messageQueue.AddResponses(newMessage.Responses(), blks, notifee)
// wait for send attempt // wait for send attempt
messageQueue.Startup()
waitGroup.Wait() waitGroup.Wait()
var message gsmsg.GraphSyncMessage var message gsmsg.GraphSyncMessage
......
...@@ -143,7 +143,9 @@ func TestSubscribeOn(t *testing.T) { ...@@ -143,7 +143,9 @@ func TestSubscribeOn(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second) ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel() defer cancel()
ps := notifications.NewPublisher() ps := notifications.NewPublisher()
ps.Startup()
testPublisher(ctx, t, ps) testPublisher(ctx, t, ps)
ps.Shutdown()
}) })
} }
......
...@@ -32,10 +32,13 @@ func NewPublisher() Publisher { ...@@ -32,10 +32,13 @@ func NewPublisher() Publisher {
cmdChan: make(chan cmd), cmdChan: make(chan cmd),
closed: make(chan struct{}), closed: make(chan struct{}),
} }
go ps.start()
return ps return ps
} }
func (ps *publisher) Startup() {
go ps.start()
}
// Publish publishes an event for the given message id // Publish publishes an event for the given message id
func (ps *publisher) Publish(topic Topic, event Event) { func (ps *publisher) Publish(topic Topic, event Event) {
ps.lk.RLock() ps.lk.RLock()
......
...@@ -30,6 +30,7 @@ type Publisher interface { ...@@ -30,6 +30,7 @@ type Publisher interface {
Close(Topic) Close(Topic)
Publish(Topic, Event) Publish(Topic, Event)
Shutdown() Shutdown()
Startup()
Subscribable Subscribable
} }
......
...@@ -416,6 +416,8 @@ func (prs *peerResponseSender) signalWork() { ...@@ -416,6 +416,8 @@ func (prs *peerResponseSender) signalWork() {
} }
func (prs *peerResponseSender) run() { func (prs *peerResponseSender) run() {
defer prs.publisher.Shutdown()
prs.publisher.Startup()
for { for {
select { select {
case <-prs.ctx.Done(): case <-prs.ctx.Done():
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment