Unverified Commit d92aaa85 authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

Merge pull request #38 from ipfs/bugs/fix-messagequeue-shutdown

fix(messagequeue): no retry after queue shutdown
parents 90ce01c2 9c5652aa
Pipeline #141 failed with stages
in 0 seconds
......@@ -5,7 +5,7 @@ import (
"sync"
"time"
"github.com/ipfs/go-block-format"
blocks "github.com/ipfs/go-block-format"
gsmsg "github.com/ipfs/go-graphsync/message"
gsnet "github.com/ipfs/go-graphsync/network"
......@@ -189,6 +189,8 @@ func (mq *MessageQueue) attemptSendAndRecovery(message gsmsg.GraphSyncMessage) b
mq.sender = nil
select {
case <-mq.done:
return true
case <-mq.ctx.Done():
return true
case <-time.After(time.Millisecond * 100):
......
......@@ -2,6 +2,7 @@ package messagequeue
import (
"context"
"fmt"
"math/rand"
"reflect"
"sync"
......@@ -89,6 +90,72 @@ func TestStartupAndShutdown(t *testing.T) {
}
}
func TestShutdownDuringMessageSend(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
peer := testutil.GeneratePeers(1)[0]
messagesSent := make(chan gsmsg.GraphSyncMessage)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
messageSender := &fakeMessageSender{
fmt.Errorf("Something went wrong"),
fullClosedChan,
resetChan,
messagesSent}
var waitGroup sync.WaitGroup
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
messageQueue := New(ctx, peer, messageNetwork)
messageQueue.Startup()
id := gsmsg.GraphSyncRequestID(rand.Int31())
priority := gsmsg.GraphSyncPriority(rand.Int31())
selector := testutil.RandomBytes(100)
root := testutil.GenerateCids(1)[0]
// setup a message and advance as far as beginning to send it
waitGroup.Add(1)
messageQueue.AddRequest(gsmsg.NewRequest(id, root, selector, priority))
waitGroup.Wait()
// now shut down
messageQueue.Shutdown()
// let the message send attempt complete and fail (as it would if
// the connection were closed)
select {
case <-ctx.Done():
t.Fatal("message send not attempted")
case <-messagesSent:
}
// verify the connection is reset after a failed send attempt
select {
case <-resetChan:
case <-fullClosedChan:
t.Fatal("message sender should have been reset but was closed")
case <-ctx.Done():
t.Fatal("message sender should have been closed but wasn't")
}
// now verify after it's reset, no further retries, connection
// resets, or attempts to close the connection, cause the queue
// should realize it's shut down and stop processing
// FIXME: this relies on time passing -- 100 ms to be exact
// and we should instead mock out time as a dependency
waitGroup.Add(1)
select {
case <-messagesSent:
t.Fatal("should not have attempted to send second message")
case <-resetChan:
t.Fatal("message sender should not have been reset again")
case <-fullClosedChan:
t.Fatal("message sender should not have been closed closed")
case <-ctx.Done():
}
}
func TestProcessingNotification(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment