Unverified Commit 165b1549 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #386 from ipfs/feat/msg-latency

calculate message latency
parents dbfbeeef 373033e7
......@@ -353,6 +353,16 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b
allKs = append(allKs, b.Cid())
}
// If the message came from the network
if from != "" {
// Inform the PeerManager so that we can calculate per-peer latency
combined := make([]cid.Cid, 0, len(allKs)+len(haves)+len(dontHaves))
combined = append(combined, allKs...)
combined = append(combined, haves...)
combined = append(combined, dontHaves...)
bs.pm.ResponseReceived(from, combined)
}
// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
bs.sm.ReceiveFrom(ctx, from, allKs, haves, dontHaves)
......
......@@ -9,10 +9,12 @@ import (
bitswap "github.com/ipfs/go-bitswap"
bssession "github.com/ipfs/go-bitswap/internal/session"
testinstance "github.com/ipfs/go-bitswap/testinstance"
tn "github.com/ipfs/go-bitswap/testnet"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
delay "github.com/ipfs/go-ipfs-delay"
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
tu "github.com/libp2p/go-libp2p-testing/etc"
)
......@@ -71,7 +73,7 @@ func TestSessionBetweenPeers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
vnet := getVirtualNetwork()
vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(time.Millisecond))
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()
......@@ -112,6 +114,10 @@ func TestSessionBetweenPeers(t *testing.T) {
t.Fatal(err)
}
}
// Uninvolved nodes should receive
// - initial broadcast want-have of root block
// - CANCEL (when Peer A receives the root block from Peer B)
for _, is := range inst[2:] {
stat, err := is.Exchange.Stat()
if err != nil {
......
......@@ -21,10 +21,20 @@ const (
// peer takes to process a want and initiate sending a response to us
maxExpectedWantProcessTime = 2 * time.Second
// latencyMultiplier is multiplied by the average ping time to
// maxTimeout is the maximum allowed timeout, regardless of latency
maxTimeout = dontHaveTimeout + maxExpectedWantProcessTime
// pingLatencyMultiplier is multiplied by the average ping time to
// get an upper bound on how long we expect to wait for a peer's response
// to arrive
latencyMultiplier = 3
pingLatencyMultiplier = 3
// messageLatencyAlpha is the alpha supplied to the message latency EWMA
messageLatencyAlpha = 0.5
// To give a margin for error, the timeout is calculated as
// messageLatencyMultiplier * message latency
messageLatencyMultiplier = 2
)
// PeerConnection is a connection to a peer that can be pinged, and the
......@@ -44,16 +54,20 @@ type pendingWant struct {
sent time.Time
}
// dontHaveTimeoutMgr pings the peer to measure latency. It uses the latency to
// set a reasonable timeout for simulating a DONT_HAVE message for peers that
// don't support DONT_HAVE or that take to long to respond.
// dontHaveTimeoutMgr simulates a DONT_HAVE message if the peer takes too long
// to respond to a message.
// The timeout is based on latency - we start with a default latency, while
// we ping the peer to estimate latency. If we receive a response from the
// peer we use the response latency.
type dontHaveTimeoutMgr struct {
ctx context.Context
shutdown func()
peerConn PeerConnection
onDontHaveTimeout func([]cid.Cid)
defaultTimeout time.Duration
latencyMultiplier int
maxTimeout time.Duration
pingLatencyMultiplier int
messageLatencyMultiplier int
maxExpectedWantProcessTime time.Duration
// All variables below here must be protected by the lock
......@@ -66,6 +80,8 @@ type dontHaveTimeoutMgr struct {
wantQueue []*pendingWant
// time to wait for a response (depends on latency)
timeout time.Duration
// ewma of message latency (time from message sent to response received)
messageLatency *latencyEwma
// timer used to wait until want at front of queue expires
checkForTimeoutsTimer *time.Timer
}
......@@ -73,13 +89,18 @@ type dontHaveTimeoutMgr struct {
// newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr
// onDontHaveTimeout is called when pending keys expire (not cancelled before timeout)
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid)) *dontHaveTimeoutMgr {
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout,
latencyMultiplier, maxExpectedWantProcessTime)
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout, maxTimeout,
pingLatencyMultiplier, messageLatencyMultiplier, maxExpectedWantProcessTime)
}
// newDontHaveTimeoutMgrWithParams is used by the tests
func newDontHaveTimeoutMgrWithParams(pc PeerConnection, onDontHaveTimeout func([]cid.Cid),
defaultTimeout time.Duration, latencyMultiplier int,
func newDontHaveTimeoutMgrWithParams(
pc PeerConnection,
onDontHaveTimeout func([]cid.Cid),
defaultTimeout time.Duration,
maxTimeout time.Duration,
pingLatencyMultiplier int,
messageLatencyMultiplier int,
maxExpectedWantProcessTime time.Duration) *dontHaveTimeoutMgr {
ctx, shutdown := context.WithCancel(context.Background())
......@@ -89,8 +110,11 @@ func newDontHaveTimeoutMgrWithParams(pc PeerConnection, onDontHaveTimeout func([
peerConn: pc,
activeWants: make(map[cid.Cid]*pendingWant),
timeout: defaultTimeout,
messageLatency: &latencyEwma{alpha: messageLatencyAlpha},
defaultTimeout: defaultTimeout,
latencyMultiplier: latencyMultiplier,
maxTimeout: maxTimeout,
pingLatencyMultiplier: pingLatencyMultiplier,
messageLatencyMultiplier: messageLatencyMultiplier,
maxExpectedWantProcessTime: maxExpectedWantProcessTime,
onDontHaveTimeout: onDontHaveTimeout,
}
......@@ -126,16 +150,36 @@ func (dhtm *dontHaveTimeoutMgr) Start() {
// calculate a reasonable timeout
latency := dhtm.peerConn.Latency()
if latency.Nanoseconds() > 0 {
dhtm.timeout = dhtm.calculateTimeoutFromLatency(latency)
dhtm.timeout = dhtm.calculateTimeoutFromPingLatency(latency)
return
}
// Otherwise measure latency by pinging the peer
go dhtm.measureLatency()
go dhtm.measurePingLatency()
}
// UpdateMessageLatency is called when we receive a response from the peer.
// It is the time between sending a request and receiving the corresponding
// response.
func (dhtm *dontHaveTimeoutMgr) UpdateMessageLatency(elapsed time.Duration) {
dhtm.lk.Lock()
defer dhtm.lk.Unlock()
// Update the message latency and the timeout
dhtm.messageLatency.update(elapsed)
oldTimeout := dhtm.timeout
dhtm.timeout = dhtm.calculateTimeoutFromMessageLatency()
// If the timeout has decreased
if dhtm.timeout < oldTimeout {
// Check if after changing the timeout there are any pending wants that
// are now over the timeout
dhtm.checkForTimeouts()
}
}
// measureLatency measures the latency to the peer by pinging it
func (dhtm *dontHaveTimeoutMgr) measureLatency() {
// measurePingLatency measures the latency to the peer by pinging it
func (dhtm *dontHaveTimeoutMgr) measurePingLatency() {
// Wait up to defaultTimeout for a response to the ping
ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.defaultTimeout)
defer cancel()
......@@ -154,8 +198,13 @@ func (dhtm *dontHaveTimeoutMgr) measureLatency() {
dhtm.lk.Lock()
defer dhtm.lk.Unlock()
// A message has arrived so we already set the timeout based on message latency
if dhtm.messageLatency.samples > 0 {
return
}
// Calculate a reasonable timeout based on latency
dhtm.timeout = dhtm.calculateTimeoutFromLatency(latency)
dhtm.timeout = dhtm.calculateTimeoutFromPingLatency(latency)
// Check if after changing the timeout there are any pending wants that are
// now over the timeout
......@@ -284,10 +333,43 @@ func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid) {
dhtm.onDontHaveTimeout(pending)
}
// calculateTimeoutFromLatency calculates a reasonable timeout derived from latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromLatency(latency time.Duration) time.Duration {
// calculateTimeoutFromPingLatency calculates a reasonable timeout derived from latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromPingLatency(latency time.Duration) time.Duration {
// The maximum expected time for a response is
// the expected time to process the want + (latency * multiplier)
// The multiplier is to provide some padding for variable latency.
return dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.latencyMultiplier)*latency
timeout := dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.pingLatencyMultiplier)*latency
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
}
return timeout
}
// calculateTimeoutFromMessageLatency calculates a timeout derived from message latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromMessageLatency() time.Duration {
timeout := dhtm.messageLatency.latency * time.Duration(dhtm.messageLatencyMultiplier)
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
}
return timeout
}
// latencyEwma is an EWMA of message latency
type latencyEwma struct {
alpha float64
samples uint64
latency time.Duration
}
// update the EWMA with the given sample
func (le *latencyEwma) update(elapsed time.Duration) {
le.samples++
// Initially set alpha to be 1.0 / <the number of samples>
alpha := 1.0 / float64(le.samples)
if alpha < le.alpha {
// Once we have enough samples, clamp alpha
alpha = le.alpha
}
le.latency = time.Duration(float64(elapsed)*alpha + (1-alpha)*float64(le.latency))
}
......@@ -79,7 +79,7 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) {
tr := timeoutRecorder{}
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()
......@@ -102,7 +102,7 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) {
// At this stage first set of keys should have timed out
if tr.timedOutCount() != len(firstks) {
t.Fatal("expected timeout")
t.Fatal("expected timeout", tr.timedOutCount(), len(firstks))
}
// Clear the recorded timed out keys
......@@ -129,7 +129,7 @@ func TestDontHaveTimeoutMgrCancel(t *testing.T) {
tr := timeoutRecorder{}
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()
......@@ -160,7 +160,7 @@ func TestDontHaveTimeoutWantCancelWant(t *testing.T) {
tr := timeoutRecorder{}
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()
......@@ -204,7 +204,7 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) {
tr := timeoutRecorder{}
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()
......@@ -222,6 +222,78 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) {
}
}
func TestDontHaveTimeoutMgrMessageLatency(t *testing.T) {
ks := testutil.GenerateCids(2)
latency := time.Millisecond * 40
latMultiplier := 1
expProcessTime := time.Duration(0)
msgLatencyMultiplier := 1
pc := &mockPeerConn{latency: latency}
tr := timeoutRecorder{}
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, maxTimeout, latMultiplier, msgLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()
// Add keys
dhtm.AddPending(ks)
// expectedTimeout
// = expProcessTime + latency*time.Duration(latMultiplier)
// = 0 + 40ms * 1
// = 40ms
// Wait for less than the expected timeout
time.Sleep(25 * time.Millisecond)
// Receive two message latency updates
dhtm.UpdateMessageLatency(time.Millisecond * 20)
dhtm.UpdateMessageLatency(time.Millisecond * 10)
// alpha is 0.5 so timeout should be
// = (20ms * alpha) + (10ms * (1 - alpha))
// = (20ms * 0.5) + (10ms * 0.5)
// = 15ms
// We've already slept for 25ms so with the new 15ms timeout
// the keys should have timed out
// Give the queue some time to process the updates
time.Sleep(5 * time.Millisecond)
if tr.timedOutCount() != len(ks) {
t.Fatal("expected keys to timeout")
}
}
func TestDontHaveTimeoutMgrMessageLatencyMax(t *testing.T) {
ks := testutil.GenerateCids(2)
pc := &mockPeerConn{latency: time.Second} // ignored
tr := timeoutRecorder{}
msgLatencyMultiplier := 1
testMaxTimeout := time.Millisecond * 10
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, testMaxTimeout, pingLatencyMultiplier, msgLatencyMultiplier, maxExpectedWantProcessTime)
dhtm.Start()
defer dhtm.Shutdown()
// Add keys
dhtm.AddPending(ks)
// Receive a message latency update that would make the timeout greater
// than the maximum timeout
dhtm.UpdateMessageLatency(testMaxTimeout * 4)
// Sleep until just after the maximum timeout
time.Sleep(testMaxTimeout + 5*time.Millisecond)
// Keys should have timed out
if tr.timedOutCount() != len(ks) {
t.Fatal("expected keys to timeout")
}
}
func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) {
ks := testutil.GenerateCids(2)
latency := time.Millisecond * 1
......@@ -233,7 +305,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) {
pc := &mockPeerConn{latency: latency, err: fmt.Errorf("ping error")}
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
defaultTimeout, latMultiplier, expProcessTime)
defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()
......@@ -267,7 +339,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) {
pc := &mockPeerConn{latency: latency}
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
defaultTimeout, latMultiplier, expProcessTime)
defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()
......@@ -300,7 +372,7 @@ func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) {
pc := &mockPeerConn{latency: latency}
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()
......
......@@ -41,6 +41,9 @@ const (
// when we debounce for more than sendMessageMaxDelay, we'll send the
// message immediately.
sendMessageMaxDelay = 20 * time.Millisecond
// The maximum amount of time in which to accept a response as being valid
// for latency calculation (as opposed to discarding it as an outlier)
maxValidLatency = 30 * time.Second
)
// MessageNetwork is any network that can connect peers and generate a message
......@@ -55,16 +58,30 @@ type MessageNetwork interface {
// MessageQueue implements queue of want messages to send to peers.
type MessageQueue struct {
ctx context.Context
shutdown func()
p peer.ID
network MessageNetwork
dhTimeoutMgr DontHaveTimeoutManager
maxMessageSize int
ctx context.Context
shutdown func()
p peer.ID
network MessageNetwork
dhTimeoutMgr DontHaveTimeoutManager
// The maximum size of a message in bytes. Any overflow is put into the
// next message
maxMessageSize int
// The amount of time to wait when there's an error sending to a peer
// before retrying
sendErrorBackoff time.Duration
// The maximum amount of time in which to accept a response as being valid
// for latency calculation
maxValidLatency time.Duration
// Signals that there are outgoing wants / cancels ready to be processed
outgoingWork chan time.Time
// Channel of CIDs of blocks / HAVEs / DONT_HAVEs received from the peer
responses chan []cid.Cid
// Take lock whenever any of these variables are modified
wllock sync.Mutex
bcstWants recallWantlist
......@@ -88,12 +105,15 @@ type recallWantlist struct {
pending *bswl.Wantlist
// The list of wants that have been sent
sent *bswl.Wantlist
// The time at which each want was sent
sentAt map[cid.Cid]time.Time
}
func newRecallWantList() recallWantlist {
return recallWantlist{
pending: bswl.New(),
sent: bswl.New(),
sentAt: make(map[cid.Cid]time.Time),
}
}
......@@ -104,14 +124,18 @@ func (r *recallWantlist) Add(c cid.Cid, priority int32, wtype pb.Message_Wantlis
// Remove wants from both the pending list and the list of sent wants
func (r *recallWantlist) Remove(c cid.Cid) {
r.sent.Remove(c)
r.pending.Remove(c)
r.sent.Remove(c)
delete(r.sentAt, c)
}
// Remove wants by type from both the pending list and the list of sent wants
func (r *recallWantlist) RemoveType(c cid.Cid, wtype pb.Message_Wantlist_WantType) {
r.sent.RemoveType(c, wtype)
r.pending.RemoveType(c, wtype)
r.sent.RemoveType(c, wtype)
if _, ok := r.sent.Contains(c); !ok {
delete(r.sentAt, c)
}
}
// MarkSent moves the want from the pending to the sent list
......@@ -126,6 +150,23 @@ func (r *recallWantlist) MarkSent(e wantlist.Entry) bool {
return true
}
// SentAt records the time at which a want was sent
func (r *recallWantlist) SentAt(c cid.Cid, at time.Time) {
// The want may have been cancelled in the interim
if _, ok := r.sent.Contains(c); ok {
if _, ok := r.sentAt[c]; !ok {
r.sentAt[c] = at
}
}
}
// ClearSentAt clears out the record of the time a want was sent.
// We clear the sent at time when we receive a response for a key as we
// only need the first response for latency measurement.
func (r *recallWantlist) ClearSentAt(c cid.Cid) {
delete(r.sentAt, c)
}
type peerConn struct {
p peer.ID
network MessageNetwork
......@@ -160,6 +201,8 @@ type DontHaveTimeoutManager interface {
AddPending([]cid.Cid)
// CancelPending removes the wants
CancelPending([]cid.Cid)
// UpdateMessageLatency informs the manager of a new latency measurement
UpdateMessageLatency(time.Duration)
}
// New creates a new MessageQueue.
......@@ -169,15 +212,21 @@ func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeo
onDontHaveTimeout(p, ks)
}
dhTimeoutMgr := newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout)
return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, dhTimeoutMgr)
return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr)
}
// This constructor is used by the tests
func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
maxMsgSize int, sendErrorBackoff time.Duration, dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue {
func newMessageQueue(
ctx context.Context,
p peer.ID,
network MessageNetwork,
maxMsgSize int,
sendErrorBackoff time.Duration,
maxValidLatency time.Duration,
dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue {
ctx, cancel := context.WithCancel(ctx)
mq := &MessageQueue{
return &MessageQueue{
ctx: ctx,
shutdown: cancel,
p: p,
......@@ -188,15 +237,15 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
peerWants: newRecallWantList(),
cancels: cid.NewSet(),
outgoingWork: make(chan time.Time, 1),
responses: make(chan []cid.Cid, 8),
rebroadcastInterval: defaultRebroadcastInterval,
sendErrorBackoff: sendErrorBackoff,
maxValidLatency: maxValidLatency,
priority: maxPriority,
// For performance reasons we just clear out the fields of the message
// after using it, instead of creating a new one every time.
msg: bsmsg.New(false),
}
return mq
}
// Add want-haves that are part of a broadcast to all connected peers
......@@ -291,6 +340,22 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) {
}
}
// ResponseReceived is called when a message is received from the network.
// ks is the set of blocks, HAVEs and DONT_HAVEs in the message
// Note that this is just used to calculate latency.
func (mq *MessageQueue) ResponseReceived(ks []cid.Cid) {
if len(ks) == 0 {
return
}
// These messages are just used to approximate latency, so if we get so
// many responses that they get backed up, just ignore the overflow.
select {
case mq.responses <- ks:
default:
}
}
// SetRebroadcastInterval sets a new interval on which to rebroadcast the full wantlist
func (mq *MessageQueue) SetRebroadcastInterval(delay time.Duration) {
mq.rebroadcastIntervalLk.Lock()
......@@ -340,6 +405,7 @@ func (mq *MessageQueue) runQueue() {
select {
case <-mq.rebroadcastTimer.C:
mq.rebroadcastWantlist()
case when := <-mq.outgoingWork:
// If we have work scheduled, cancel the timer. If we
// don't, record when the work was scheduled.
......@@ -362,11 +428,17 @@ func (mq *MessageQueue) runQueue() {
// Otherwise, extend the timer.
scheduleWork.Reset(sendMessageDebounce)
}
case <-scheduleWork.C:
// We have work scheduled and haven't seen any updates
// in sendMessageDebounce. Send immediately.
workScheduled = time.Time{}
mq.sendIfReady()
case res := <-mq.responses:
// We received a response from the peer, calculate latency
mq.handleResponse(res)
case <-mq.ctx.Done():
return
}
......@@ -431,7 +503,7 @@ func (mq *MessageQueue) sendMessage() {
mq.dhTimeoutMgr.Start()
// Convert want lists to a Bitswap Message
message := mq.extractOutgoingMessage(mq.sender.SupportsHave())
message, onSent := mq.extractOutgoingMessage(mq.sender.SupportsHave())
// After processing the message, clear out its fields to save memory
defer mq.msg.Reset(false)
......@@ -451,6 +523,9 @@ func (mq *MessageQueue) sendMessage() {
return
}
// Record sent time so as to calculate message latency
onSent()
// Set a timer to wait for responses
mq.simulateDontHaveWithTimeout(wantlist)
......@@ -489,6 +564,51 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) {
mq.dhTimeoutMgr.AddPending(wants)
}
// handleResponse is called when a response is received from the peer,
// with the CIDs of received blocks / HAVEs / DONT_HAVEs
func (mq *MessageQueue) handleResponse(ks []cid.Cid) {
now := time.Now()
earliest := time.Time{}
mq.wllock.Lock()
// Check if the keys in the response correspond to any request that was
// sent to the peer.
//
// - Find the earliest request so as to calculate the longest latency as
// we want to be conservative when setting the timeout
// - Ignore latencies that are very long, as these are likely to be outliers
// caused when
// - we send a want to peer A
// - peer A does not have the block
// - peer A later receives the block from peer B
// - peer A sends us HAVE / block
for _, c := range ks {
if at, ok := mq.bcstWants.sentAt[c]; ok {
if (earliest.IsZero() || at.Before(earliest)) && now.Sub(at) < mq.maxValidLatency {
earliest = at
}
mq.bcstWants.ClearSentAt(c)
}
if at, ok := mq.peerWants.sentAt[c]; ok {
if (earliest.IsZero() || at.Before(earliest)) && now.Sub(at) < mq.maxValidLatency {
earliest = at
}
// Clear out the sent time for the CID because we only want to
// record the latency between the request and the first response
// for that CID (not subsequent responses)
mq.peerWants.ClearSentAt(c)
}
}
mq.wllock.Unlock()
if !earliest.IsZero() {
// Inform the timeout manager of the calculated latency
mq.dhTimeoutMgr.UpdateMessageLatency(now.Sub(earliest))
}
}
func (mq *MessageQueue) logOutgoingMessage(wantlist []bsmsg.Entry) {
// Save some CPU cycles and allocations if log level is higher than debug
if ce := sflog.Check(zap.DebugLevel, "sent message"); ce == nil {
......@@ -547,7 +667,7 @@ func (mq *MessageQueue) pendingWorkCount() int {
}
// Convert the lists of wants into a Bitswap message
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) bsmsg.BitSwapMessage {
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) {
// Get broadcast and regular wantlist entries.
mq.wllock.Lock()
peerEntries := mq.peerWants.pending.Entries()
......@@ -641,16 +761,18 @@ FINISH:
// Finally, re-take the lock, mark sent and remove any entries from our
// message that we've decided to cancel at the last minute.
mq.wllock.Lock()
for _, e := range peerEntries[:sentPeerEntries] {
for i, e := range peerEntries[:sentPeerEntries] {
if !mq.peerWants.MarkSent(e) {
// It changed.
mq.msg.Remove(e.Cid)
peerEntries[i].Cid = cid.Undef
}
}
for _, e := range bcstEntries[:sentBcstEntries] {
for i, e := range bcstEntries[:sentBcstEntries] {
if !mq.bcstWants.MarkSent(e) {
mq.msg.Remove(e.Cid)
bcstEntries[i].Cid = cid.Undef
}
}
......@@ -663,7 +785,28 @@ FINISH:
}
mq.wllock.Unlock()
return mq.msg
// When the message has been sent, record the time at which each want was
// sent so we can calculate message latency
onSent := func() {
now := time.Now()
mq.wllock.Lock()
defer mq.wllock.Unlock()
for _, e := range peerEntries[:sentPeerEntries] {
if e.Cid.Defined() { // Check if want was cancelled in the interim
mq.peerWants.SentAt(e.Cid, now)
}
}
for _, e := range bcstEntries[:sentBcstEntries] {
if e.Cid.Defined() { // Check if want was cancelled in the interim
mq.bcstWants.SentAt(e.Cid, now)
}
}
}
return mq.msg, onSent
}
func (mq *MessageQueue) initializeSender() (bsnet.MessageSender, error) {
......
......@@ -44,8 +44,9 @@ func (fms *fakeMessageNetwork) Ping(context.Context, peer.ID) ping.Result {
}
type fakeDontHaveTimeoutMgr struct {
lk sync.Mutex
ks []cid.Cid
lk sync.Mutex
ks []cid.Cid
latencyUpds []time.Duration
}
func (fp *fakeDontHaveTimeoutMgr) Start() {}
......@@ -73,6 +74,18 @@ func (fp *fakeDontHaveTimeoutMgr) CancelPending(ks []cid.Cid) {
}
fp.ks = s.Keys()
}
func (fp *fakeDontHaveTimeoutMgr) UpdateMessageLatency(elapsed time.Duration) {
fp.lk.Lock()
defer fp.lk.Unlock()
fp.latencyUpds = append(fp.latencyUpds, elapsed)
}
func (fp *fakeDontHaveTimeoutMgr) latencyUpdates() []time.Duration {
fp.lk.Lock()
defer fp.lk.Unlock()
return fp.latencyUpds
}
func (fp *fakeDontHaveTimeoutMgr) pendingCount() int {
fp.lk.Lock()
defer fp.lk.Unlock()
......@@ -485,7 +498,7 @@ func TestSendingLargeMessages(t *testing.T) {
wantBlocks := testutil.GenerateCids(10)
entrySize := 44
maxMsgSize := entrySize * 3 // 3 wants
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMsgSize, sendErrorBackoff, dhtm)
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMsgSize, sendErrorBackoff, maxValidLatency, dhtm)
messageQueue.Startup()
messageQueue.AddWants(wantBlocks, []cid.Cid{})
......@@ -565,7 +578,7 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) {
peerID := testutil.GeneratePeers(1)[0]
dhtm := &fakeDontHaveTimeoutMgr{}
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm)
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm)
messageQueue.Startup()
wbs := testutil.GenerateCids(10)
......@@ -587,6 +600,132 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) {
}
}
func TestResponseReceived(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
resetChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(resetChan, messagesSent, false)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
dhtm := &fakeDontHaveTimeoutMgr{}
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm)
messageQueue.Startup()
cids := testutil.GenerateCids(10)
// Add some wants and wait 10ms
messageQueue.AddWants(cids[:5], nil)
collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
// Add some wants and wait another 10ms
messageQueue.AddWants(cids[5:8], nil)
collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
// Receive a response for some of the wants from both groups
messageQueue.ResponseReceived([]cid.Cid{cids[0], cids[6], cids[9]})
// Wait a short time for processing
time.Sleep(10 * time.Millisecond)
// Check that message queue informs DHTM of received responses
upds := dhtm.latencyUpdates()
if len(upds) != 1 {
t.Fatal("expected one latency update")
}
// Elapsed time should be between when the first want was sent and the
// response received (about 20ms)
if upds[0] < 15*time.Millisecond || upds[0] > 25*time.Millisecond {
t.Fatal("expected latency to be time since oldest message sent")
}
}
func TestResponseReceivedAppliesForFirstResponseOnly(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
resetChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(resetChan, messagesSent, false)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
dhtm := &fakeDontHaveTimeoutMgr{}
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm)
messageQueue.Startup()
cids := testutil.GenerateCids(2)
// Add some wants and wait 10ms
messageQueue.AddWants(cids, nil)
collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
// Receive a response for the wants
messageQueue.ResponseReceived(cids)
// Wait another 10ms
time.Sleep(10 * time.Millisecond)
// Message queue should inform DHTM of first response
upds := dhtm.latencyUpdates()
if len(upds) != 1 {
t.Fatal("expected one latency update")
}
// Receive a second response for the same wants
messageQueue.ResponseReceived(cids)
// Wait for the response to be processed by the message queue
time.Sleep(10 * time.Millisecond)
// Message queue should not inform DHTM of second response because the
// CIDs are a subset of the first response
upds = dhtm.latencyUpdates()
if len(upds) != 1 {
t.Fatal("expected one latency update")
}
}
func TestResponseReceivedDiscardsOutliers(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
resetChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(resetChan, messagesSent, false)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
maxValLatency := 30 * time.Millisecond
dhtm := &fakeDontHaveTimeoutMgr{}
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValLatency, dhtm)
messageQueue.Startup()
cids := testutil.GenerateCids(4)
// Add some wants and wait 20ms
messageQueue.AddWants(cids[:2], nil)
collectMessages(ctx, t, messagesSent, 20*time.Millisecond)
// Add some more wants and wait long enough that the first wants will be
// outside the maximum valid latency, but the second wants will be inside
messageQueue.AddWants(cids[2:], nil)
collectMessages(ctx, t, messagesSent, maxValLatency-10*time.Millisecond)
// Receive a response for the wants
messageQueue.ResponseReceived(cids)
// Wait for the response to be processed by the message queue
time.Sleep(10 * time.Millisecond)
// Check that the latency calculation excludes the first wants
// (because they're older than max valid latency)
upds := dhtm.latencyUpdates()
if len(upds) != 1 {
t.Fatal("expected one latency update")
}
// Elapsed time should not include outliers
if upds[0] > maxValLatency {
t.Fatal("expected latency calculation to discard outliers")
}
}
func filterWantTypes(wantlist []bsmsg.Entry) ([]cid.Cid, []cid.Cid, []cid.Cid) {
var wbs []cid.Cid
var whs []cid.Cid
......@@ -615,7 +754,7 @@ func BenchmarkMessageQueue(b *testing.B) {
dhtm := &fakeDontHaveTimeoutMgr{}
peerID := testutil.GeneratePeers(1)[0]
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm)
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, maxValidLatency, dhtm)
messageQueue.Startup()
go func() {
......
......@@ -18,6 +18,7 @@ type PeerQueue interface {
AddBroadcastWantHaves([]cid.Cid)
AddWants([]cid.Cid, []cid.Cid)
AddCancels([]cid.Cid)
ResponseReceived(ks []cid.Cid)
Startup()
Shutdown()
}
......@@ -116,6 +117,19 @@ func (pm *PeerManager) Disconnected(p peer.ID) {
pm.pwm.removePeer(p)
}
// ResponseReceived is called when a message is received from the network.
// ks is the set of blocks, HAVEs and DONT_HAVEs in the message
// Note that this is just used to calculate latency.
func (pm *PeerManager) ResponseReceived(p peer.ID, ks []cid.Cid) {
pm.pqLk.Lock()
pq, ok := pm.peerQueues[p]
pm.pqLk.Unlock()
if ok {
pq.ResponseReceived(ks)
}
}
// BroadcastWantHaves broadcasts want-haves to all peers (used by the session
// to discover seeds).
// For each peer it filters out want-haves that have previously been sent to
......
......@@ -35,6 +35,8 @@ func (fp *mockPeerQueue) AddWants(wbs []cid.Cid, whs []cid.Cid) {
func (fp *mockPeerQueue) AddCancels(cs []cid.Cid) {
fp.msgs <- msg{fp.p, nil, nil, cs}
}
func (fp *mockPeerQueue) ResponseReceived(ks []cid.Cid) {
}
type peerWants struct {
wantHaves []cid.Cid
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment