Commit 189564ed authored by Dirk McCormick's avatar Dirk McCormick

fix: shutdown message queue when there's a send error

parent ba4b52e7
......@@ -359,6 +359,8 @@ func (mq *MessageQueue) runQueue() {
return
case <-mq.ctx.Done():
if mq.sender != nil {
// TODO: should I call sender.Close() here also to stop
// and in progress connection?
_ = mq.sender.Reset()
}
return
......@@ -415,6 +417,7 @@ func (mq *MessageQueue) sendMessage() {
// If we fail to initialize the sender, the networking layer will
// emit a Disconnect event and the MessageQueue will get cleaned up
log.Infof("Could not open message sender to peer %s: %s", mq.p, err)
mq.Shutdown()
return
}
......@@ -439,6 +442,7 @@ func (mq *MessageQueue) sendMessage() {
// If the message couldn't be sent, the networking layer will
// emit a Disconnect event and the MessageQueue will get cleaned up
log.Infof("Could not send message to peer %s: %s", mq.p, err)
mq.Shutdown()
return
}
......
......@@ -112,6 +112,13 @@ func (s *streamMessageSender) Connect(ctx context.Context) (stream network.Strea
return nil, err
}
// Check if the sender has been closed
select {
case <-s.done:
return nil, nil
default:
}
stream, err = s.bsnet.newStreamToPeer(ctx, s.to)
if err == nil {
s.stream = stream
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment