Commit a7c7865a authored by Dirk McCormick's avatar Dirk McCormick

fix: discard outliers in latency calculation

parent af8cba85
......@@ -41,6 +41,9 @@ const (
// when we debounce for more than sendMessageMaxDelay, we'll send the
// message immediately.
sendMessageMaxDelay = 20 * time.Millisecond
// The maximum amount of time in which to accept a response as being valid
// for latency calculation (as opposed to discarding it as an outlier)
maxValidLatency = 30 * time.Second
// MessageNetwork is any network that can connect peers and generate a message
......@@ -55,14 +58,24 @@ type MessageNetwork interface {
// MessageQueue implements queue of want messages to send to peers.
type MessageQueue struct {
ctx context.Context
shutdown func()
p peer.ID
network MessageNetwork
dhTimeoutMgr DontHaveTimeoutManager
maxMessageSize int
ctx context.Context
shutdown func()
p peer.ID
network MessageNetwork
dhTimeoutMgr DontHaveTimeoutManager
// The maximum size of a message in bytes. Any overflow is put into the
// next message
maxMessageSize int
// The amount of time to wait when there's an error sending to a peer
// before retrying
sendErrorBackoff time.Duration
// The maximum amount of time in which to accept a response as being valid
// for latency calculation
maxValidLatency time.Duration
// Signals that there are outgoing wants / cancels ready to be processed
outgoingWork chan time.Time
......@@ -198,12 +211,18 @@ func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeo
onDontHaveTimeout(p, ks)
dhTimeoutMgr := newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout)
return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, dhTimeoutMgr)
return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr)
// This constructor is used by the tests
func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
maxMsgSize int, sendErrorBackoff time.Duration, dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue {
func newMessageQueue(
ctx context.Context,
p peer.ID,
network MessageNetwork,
maxMsgSize int,
sendErrorBackoff time.Duration,
maxValidLatency time.Duration,
dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue {
ctx, cancel := context.WithCancel(ctx)
return &MessageQueue{
......@@ -220,6 +239,7 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
responses: make(chan []cid.Cid, 8),
rebroadcastInterval: defaultRebroadcastInterval,
sendErrorBackoff: sendErrorBackoff,
maxValidLatency: maxValidLatency,
priority: maxPriority,
// For performance reasons we just clear out the fields of the message
// after using it, instead of creating a new one every time.
......@@ -553,17 +573,24 @@ func (mq *MessageQueue) handleResponse(ks []cid.Cid) {
// Check if the keys in the response correspond to any request that was
// sent to the peer.
// Find the earliest request so as to calculate the longest latency as
// we want to be conservative when setting the timeout.
// - Find the earliest request so as to calculate the longest latency as
// we want to be conservative when setting the timeout
// - Ignore latencies that are very long, as these are likely to be outliers
// caused when
// - we send a want to peer A
// - peer A does not have the block
// - peer A later receives the block from peer B
// - peer A sends us HAVE / block
for _, c := range ks {
if at, ok := mq.bcstWants.sentAt[c]; ok {
if earliest.IsZero() || at.Before(earliest) {
if (earliest.IsZero() || at.Before(earliest)) && now.Sub(at) < mq.maxValidLatency {
earliest = at
if at, ok := mq.peerWants.sentAt[c]; ok {
if earliest.IsZero() || at.Before(earliest) {
if (earliest.IsZero() || at.Before(earliest)) && now.Sub(at) < mq.maxValidLatency {
earliest = at
// Clear out the sent time for the CID because we only want to
......@@ -498,7 +498,7 @@ func TestSendingLargeMessages(t *testing.T) {
wantBlocks := testutil.GenerateCids(10)
entrySize := 44
maxMsgSize := entrySize * 3 // 3 wants
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMsgSize, sendErrorBackoff, dhtm)
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMsgSize, sendErrorBackoff, maxValidLatency, dhtm)
messageQueue.AddWants(wantBlocks, []cid.Cid{})
......@@ -578,7 +578,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) {
peerID := testutil.GeneratePeers(1)[0]
dhtm := &fakeDontHaveTimeoutMgr{}
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm)
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm)
wbs := testutil.GenerateCids(10)
......@@ -609,7 +609,7 @@ func TestResponseReceived(t *testing.T) {
peerID := testutil.GeneratePeers(1)[0]
dhtm := &fakeDontHaveTimeoutMgr{}
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm)
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm)
cids := testutil.GenerateCids(10)
......@@ -649,7 +649,7 @@ func TestResponseReceivedAppliesForFirstResponseOnly(t *testing.T) {
peerID := testutil.GeneratePeers(1)[0]
dhtm := &fakeDontHaveTimeoutMgr{}
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm)
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm)
cids := testutil.GenerateCids(2)
......@@ -684,6 +684,48 @@ func TestResponseReceivedAppliesForFirstResponseOnly(t *testing.T) {
func TestResponseReceivedDiscardsOutliers(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
resetChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(resetChan, messagesSent, false)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
maxValLatency := 30 * time.Millisecond
dhtm := &fakeDontHaveTimeoutMgr{}
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValLatency, dhtm)
cids := testutil.GenerateCids(4)
// Add some wants and wait 20ms
messageQueue.AddWants(cids[:2], nil)
collectMessages(ctx, t, messagesSent, 20*time.Millisecond)
// Add some more wants and wait long enough that the first wants will be
// outside the maximum valid latency, but the second wants will be inside
messageQueue.AddWants(cids[2:], nil)
collectMessages(ctx, t, messagesSent, maxValLatency-10*time.Millisecond)
// Receive a response for the wants
// Wait for the response to be processed by the message queue
time.Sleep(10 * time.Millisecond)
// Check that the latency calculation excludes the first wants
// (because they're older than max valid latency)
upds := dhtm.latencyUpdates()
if len(upds) != 1 {
t.Fatal("expected one latency update")
// Elapsed time should not include outliers
if upds[0] > maxValLatency {
t.Fatal("expected latency calculation to discard outliers")
func filterWantTypes(wantlist []bsmsg.Entry) ([]cid.Cid, []cid.Cid, []cid.Cid) {
var wbs []cid.Cid
var whs []cid.Cid
......@@ -712,7 +754,7 @@ func BenchmarkMessageQueue(b *testing.B) {
dhtm := &fakeDontHaveTimeoutMgr{}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm)
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm)
go func() {
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment