Commit af8cba85 authored by Dirk McCormick's avatar Dirk McCormick

fix: only record latency for first response per want

parent 5c215f41
......@@ -147,6 +147,13 @@ func (r *recallWantlist) SentAt(c cid.Cid, at time.Time) {
}
}
// ClearSentAt clears out the record of the time a want was sent.
// We clear the sent at time when we receive a response for a key so that
// subsequent responses for the key don't appear to be even further delayed.
func (r *recallWantlist) ClearSentAt(c cid.Cid) {
delete(r.sentAt, c)
}
type peerConn struct {
p peer.ID
network MessageNetwork
......@@ -549,11 +556,20 @@ func (mq *MessageQueue) handleResponse(ks []cid.Cid) {
// Find the earliest request so as to calculate the longest latency as
// we want to be conservative when setting the timeout.
for _, c := range ks {
if at, ok := mq.bcstWants.sentAt[c]; ok && (earliest.IsZero() || at.Before(earliest)) {
earliest = at
if at, ok := mq.bcstWants.sentAt[c]; ok {
if earliest.IsZero() || at.Before(earliest) {
earliest = at
}
mq.bcstWants.ClearSentAt(c)
}
if at, ok := mq.peerWants.sentAt[c]; ok && (earliest.IsZero() || at.Before(earliest)) {
earliest = at
if at, ok := mq.peerWants.sentAt[c]; ok {
if earliest.IsZero() || at.Before(earliest) {
earliest = at
}
// Clear out the sent time for the CID because we only want to
// record the latency between the request and the first response
// for that CID (not subsequent responses)
mq.peerWants.ClearSentAt(c)
}
}
......
......@@ -640,6 +640,50 @@ func TestResponseReceived(t *testing.T) {
}
}
func TestResponseReceivedAppliesForFirstResponseOnly(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
resetChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(resetChan, messagesSent, false)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
dhtm := &fakeDontHaveTimeoutMgr{}
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm)
messageQueue.Startup()
cids := testutil.GenerateCids(2)
// Add some wants and wait 10ms
messageQueue.AddWants(cids, nil)
collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
// Receive a response for the wants
messageQueue.ResponseReceived(cids)
// Wait another 10ms
time.Sleep(10 * time.Millisecond)
// Message queue should inform DHTM of first response
upds := dhtm.latencyUpdates()
if len(upds) != 1 {
t.Fatal("expected one latency update")
}
// Receive a second response for the same wants
messageQueue.ResponseReceived(cids)
// Wait for the response to be processed by the message queue
time.Sleep(10 * time.Millisecond)
// Message queue should not inform DHTM of second response because the
// CIDs are a subset of the first response
upds = dhtm.latencyUpdates()
if len(upds) != 1 {
t.Fatal("expected one latency update")
}
}
func filterWantTypes(wantlist []bsmsg.Entry) ([]cid.Cid, []cid.Cid, []cid.Cid) {
var wbs []cid.Cid
var whs []cid.Cid
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment