diff --git a/internal/messagequeue/messagequeue.go b/internal/messagequeue/messagequeue.go index fd55fbee36cd085f12a42216df6a98e4d89da53c..a3e21790d56ca6d595d628e258c59081653c65d6 100644 --- a/internal/messagequeue/messagequeue.go +++ b/internal/messagequeue/messagequeue.go @@ -41,6 +41,9 @@ const ( // when we debounce for more than sendMessageMaxDelay, we'll send the // message immediately. sendMessageMaxDelay = 20 * time.Millisecond + // The maximum amount of time in which to accept a response as being valid + // for latency calculation (as opposed to discarding it as an outlier) + maxValidLatency = 30 * time.Second ) // MessageNetwork is any network that can connect peers and generate a message @@ -55,14 +58,24 @@ type MessageNetwork interface { // MessageQueue implements queue of want messages to send to peers. type MessageQueue struct { - ctx context.Context - shutdown func() - p peer.ID - network MessageNetwork - dhTimeoutMgr DontHaveTimeoutManager - maxMessageSize int + ctx context.Context + shutdown func() + p peer.ID + network MessageNetwork + dhTimeoutMgr DontHaveTimeoutManager + + // The maximum size of a message in bytes. Any overflow is put into the + // next message + maxMessageSize int + + // The amount of time to wait when there's an error sending to a peer + // before retrying sendErrorBackoff time.Duration + // The maximum amount of time in which to accept a response as being valid + // for latency calculation + maxValidLatency time.Duration + // Signals that there are outgoing wants / cancels ready to be processed outgoingWork chan time.Time @@ -198,12 +211,18 @@ func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeo onDontHaveTimeout(p, ks) } dhTimeoutMgr := newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout) - return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, dhTimeoutMgr) + return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr) } // This constructor is used by the tests -func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, - maxMsgSize int, sendErrorBackoff time.Duration, dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue { +func newMessageQueue( + ctx context.Context, + p peer.ID, + network MessageNetwork, + maxMsgSize int, + sendErrorBackoff time.Duration, + maxValidLatency time.Duration, + dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue { ctx, cancel := context.WithCancel(ctx) return &MessageQueue{ @@ -220,6 +239,7 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, responses: make(chan []cid.Cid, 8), rebroadcastInterval: defaultRebroadcastInterval, sendErrorBackoff: sendErrorBackoff, + maxValidLatency: maxValidLatency, priority: maxPriority, // For performance reasons we just clear out the fields of the message // after using it, instead of creating a new one every time. @@ -553,17 +573,24 @@ func (mq *MessageQueue) handleResponse(ks []cid.Cid) { // Check if the keys in the response correspond to any request that was // sent to the peer. - // Find the earliest request so as to calculate the longest latency as - // we want to be conservative when setting the timeout. + // + // - Find the earliest request so as to calculate the longest latency as + // we want to be conservative when setting the timeout + // - Ignore latencies that are very long, as these are likely to be outliers + // caused when + // - we send a want to peer A + // - peer A does not have the block + // - peer A later receives the block from peer B + // - peer A sends us HAVE / block for _, c := range ks { if at, ok := mq.bcstWants.sentAt[c]; ok { - if earliest.IsZero() || at.Before(earliest) { + if (earliest.IsZero() || at.Before(earliest)) && now.Sub(at) < mq.maxValidLatency { earliest = at } mq.bcstWants.ClearSentAt(c) } if at, ok := mq.peerWants.sentAt[c]; ok { - if earliest.IsZero() || at.Before(earliest) { + if (earliest.IsZero() || at.Before(earliest)) && now.Sub(at) < mq.maxValidLatency { earliest = at } // Clear out the sent time for the CID because we only want to diff --git a/internal/messagequeue/messagequeue_test.go b/internal/messagequeue/messagequeue_test.go index f0f32e0a7345b575a44d9b2049820921a01a9232..4af3000ad70af04d62116a9288c5fe7fd631170f 100644 --- a/internal/messagequeue/messagequeue_test.go +++ b/internal/messagequeue/messagequeue_test.go @@ -498,7 +498,7 @@ func TestSendingLargeMessages(t *testing.T) { wantBlocks := testutil.GenerateCids(10) entrySize := 44 maxMsgSize := entrySize * 3 // 3 wants - messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMsgSize, sendErrorBackoff, dhtm) + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMsgSize, sendErrorBackoff, maxValidLatency, dhtm) messageQueue.Startup() messageQueue.AddWants(wantBlocks, []cid.Cid{}) @@ -578,7 +578,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) { peerID := testutil.GeneratePeers(1)[0] dhtm := &fakeDontHaveTimeoutMgr{} - messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm) + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm) messageQueue.Startup() wbs := testutil.GenerateCids(10) @@ -609,7 +609,7 @@ func TestResponseReceived(t *testing.T) { peerID := testutil.GeneratePeers(1)[0] dhtm := &fakeDontHaveTimeoutMgr{} - messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm) + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm) messageQueue.Startup() cids := testutil.GenerateCids(10) @@ -649,7 +649,7 @@ func TestResponseReceivedAppliesForFirstResponseOnly(t *testing.T) { peerID := testutil.GeneratePeers(1)[0] dhtm := &fakeDontHaveTimeoutMgr{} - messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm) + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm) messageQueue.Startup() cids := testutil.GenerateCids(2) @@ -684,6 +684,48 @@ func TestResponseReceivedAppliesForFirstResponseOnly(t *testing.T) { } } +func TestResponseReceivedDiscardsOutliers(t *testing.T) { + ctx := context.Background() + messagesSent := make(chan []bsmsg.Entry) + resetChan := make(chan struct{}, 1) + fakeSender := newFakeMessageSender(resetChan, messagesSent, false) + fakenet := &fakeMessageNetwork{nil, nil, fakeSender} + peerID := testutil.GeneratePeers(1)[0] + + maxValLatency := 30 * time.Millisecond + dhtm := &fakeDontHaveTimeoutMgr{} + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValLatency, dhtm) + messageQueue.Startup() + + cids := testutil.GenerateCids(4) + + // Add some wants and wait 20ms + messageQueue.AddWants(cids[:2], nil) + collectMessages(ctx, t, messagesSent, 20*time.Millisecond) + + // Add some more wants and wait long enough that the first wants will be + // outside the maximum valid latency, but the second wants will be inside + messageQueue.AddWants(cids[2:], nil) + collectMessages(ctx, t, messagesSent, maxValLatency-10*time.Millisecond) + + // Receive a response for the wants + messageQueue.ResponseReceived(cids) + + // Wait for the response to be processed by the message queue + time.Sleep(10 * time.Millisecond) + + // Check that the latency calculation excludes the first wants + // (because they're older than max valid latency) + upds := dhtm.latencyUpdates() + if len(upds) != 1 { + t.Fatal("expected one latency update") + } + // Elapsed time should not include outliers + if upds[0] > maxValLatency { + t.Fatal("expected latency calculation to discard outliers") + } +} + func filterWantTypes(wantlist []bsmsg.Entry) ([]cid.Cid, []cid.Cid, []cid.Cid) { var wbs []cid.Cid var whs []cid.Cid @@ -712,7 +754,7 @@ func BenchmarkMessageQueue(b *testing.B) { dhtm := &fakeDontHaveTimeoutMgr{} peerID := testutil.GeneratePeers(1)[0] - messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm) + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm) messageQueue.Startup() go func() {