diff --git a/internal/messagequeue/messagequeue.go b/internal/messagequeue/messagequeue.go index 07c18a77e2854cef2f7f2486ccf893de44c15418..fd55fbee36cd085f12a42216df6a98e4d89da53c 100644 --- a/internal/messagequeue/messagequeue.go +++ b/internal/messagequeue/messagequeue.go @@ -147,6 +147,13 @@ func (r *recallWantlist) SentAt(c cid.Cid, at time.Time) { } } +// ClearSentAt clears out the record of the time a want was sent. +// We clear the sent at time when we receive a response for a key so that +// subsequent responses for the key don't appear to be even further delayed. +func (r *recallWantlist) ClearSentAt(c cid.Cid) { + delete(r.sentAt, c) +} + type peerConn struct { p peer.ID network MessageNetwork @@ -549,11 +556,20 @@ func (mq *MessageQueue) handleResponse(ks []cid.Cid) { // Find the earliest request so as to calculate the longest latency as // we want to be conservative when setting the timeout. for _, c := range ks { - if at, ok := mq.bcstWants.sentAt[c]; ok && (earliest.IsZero() || at.Before(earliest)) { - earliest = at + if at, ok := mq.bcstWants.sentAt[c]; ok { + if earliest.IsZero() || at.Before(earliest) { + earliest = at + } + mq.bcstWants.ClearSentAt(c) } - if at, ok := mq.peerWants.sentAt[c]; ok && (earliest.IsZero() || at.Before(earliest)) { - earliest = at + if at, ok := mq.peerWants.sentAt[c]; ok { + if earliest.IsZero() || at.Before(earliest) { + earliest = at + } + // Clear out the sent time for the CID because we only want to + // record the latency between the request and the first response + // for that CID (not subsequent responses) + mq.peerWants.ClearSentAt(c) } } diff --git a/internal/messagequeue/messagequeue_test.go b/internal/messagequeue/messagequeue_test.go index 1ef0d2a5f3061039ca90100c78a9162067ed686d..f0f32e0a7345b575a44d9b2049820921a01a9232 100644 --- a/internal/messagequeue/messagequeue_test.go +++ b/internal/messagequeue/messagequeue_test.go @@ -640,6 +640,50 @@ func TestResponseReceived(t *testing.T) { } } +func TestResponseReceivedAppliesForFirstResponseOnly(t *testing.T) { + ctx := context.Background() + messagesSent := make(chan []bsmsg.Entry) + resetChan := make(chan struct{}, 1) + fakeSender := newFakeMessageSender(resetChan, messagesSent, false) + fakenet := &fakeMessageNetwork{nil, nil, fakeSender} + peerID := testutil.GeneratePeers(1)[0] + + dhtm := &fakeDontHaveTimeoutMgr{} + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm) + messageQueue.Startup() + + cids := testutil.GenerateCids(2) + + // Add some wants and wait 10ms + messageQueue.AddWants(cids, nil) + collectMessages(ctx, t, messagesSent, 10*time.Millisecond) + + // Receive a response for the wants + messageQueue.ResponseReceived(cids) + + // Wait another 10ms + time.Sleep(10 * time.Millisecond) + + // Message queue should inform DHTM of first response + upds := dhtm.latencyUpdates() + if len(upds) != 1 { + t.Fatal("expected one latency update") + } + + // Receive a second response for the same wants + messageQueue.ResponseReceived(cids) + + // Wait for the response to be processed by the message queue + time.Sleep(10 * time.Millisecond) + + // Message queue should not inform DHTM of second response because the + // CIDs are a subset of the first response + upds = dhtm.latencyUpdates() + if len(upds) != 1 { + t.Fatal("expected one latency update") + } +} + func filterWantTypes(wantlist []bsmsg.Entry) ([]cid.Cid, []cid.Cid, []cid.Cid) { var wbs []cid.Cid var whs []cid.Cid