Commit 9c5652aa authored by hannahhoward's avatar hannahhoward

fix(messagequeue): no retry after queue shutdown

Previously, we allowed the last message send attempt to run to maxRetries if the queue was shutdown,
which also generated warning logs. Now, if a send attempt fails, check that the queue has shutdown
and immediately return if it has
parent 90ce01c2
...@@ -5,7 +5,7 @@ import ( ...@@ -5,7 +5,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
gsmsg "github.com/ipfs/go-graphsync/message" gsmsg "github.com/ipfs/go-graphsync/message"
gsnet "github.com/ipfs/go-graphsync/network" gsnet "github.com/ipfs/go-graphsync/network"
...@@ -189,6 +189,8 @@ func (mq *MessageQueue) attemptSendAndRecovery(message gsmsg.GraphSyncMessage) b ...@@ -189,6 +189,8 @@ func (mq *MessageQueue) attemptSendAndRecovery(message gsmsg.GraphSyncMessage) b
mq.sender = nil mq.sender = nil
select { select {
case <-mq.done:
return true
case <-mq.ctx.Done(): case <-mq.ctx.Done():
return true return true
case <-time.After(time.Millisecond * 100): case <-time.After(time.Millisecond * 100):
......
...@@ -2,6 +2,7 @@ package messagequeue ...@@ -2,6 +2,7 @@ package messagequeue
import ( import (
"context" "context"
"fmt"
"math/rand" "math/rand"
"reflect" "reflect"
"sync" "sync"
...@@ -89,6 +90,72 @@ func TestStartupAndShutdown(t *testing.T) { ...@@ -89,6 +90,72 @@ func TestStartupAndShutdown(t *testing.T) {
} }
} }
func TestShutdownDuringMessageSend(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
peer := testutil.GeneratePeers(1)[0]
messagesSent := make(chan gsmsg.GraphSyncMessage)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
messageSender := &fakeMessageSender{
fmt.Errorf("Something went wrong"),
fullClosedChan,
resetChan,
messagesSent}
var waitGroup sync.WaitGroup
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
messageQueue := New(ctx, peer, messageNetwork)
messageQueue.Startup()
id := gsmsg.GraphSyncRequestID(rand.Int31())
priority := gsmsg.GraphSyncPriority(rand.Int31())
selector := testutil.RandomBytes(100)
root := testutil.GenerateCids(1)[0]
// setup a message and advance as far as beginning to send it
waitGroup.Add(1)
messageQueue.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
waitGroup.Wait()
// now shut down
messageQueue.Shutdown()
// let the message send attempt complete and fail (as it would if
// the connection were closed)
select {
case <-ctx.Done():
t.Fatal("message send not attempted")
case <-messagesSent:
}
// verify the connection is reset after a failed send attempt
select {
case <-resetChan:
case <-fullClosedChan:
t.Fatal("message sender should have been reset but was closed")
case <-ctx.Done():
t.Fatal("message sender should have been closed but wasn't")
}
// now verify after it's reset, no further retries, connection
// resets, or attempts to close the connection, cause the queue
// should realize it's shut down and stop processing
// FIXME: this relies on time passing -- 100 ms to be exact
// and we should instead mock out time as a dependency
waitGroup.Add(1)
select {
case <-messagesSent:
t.Fatal("should not have attempted to send second message")
case <-resetChan:
t.Fatal("message sender should not have been reset again")
case <-fullClosedChan:
t.Fatal("message sender should not have been closed closed")
case <-ctx.Done():
}
}
func TestProcessingNotification(t *testing.T) { func TestProcessingNotification(t *testing.T) {
ctx := context.Background() ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second) ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment