Commit 6763be87 authored by Dirk McCormick's avatar Dirk McCormick

feat: calculate message latency

parent ac478dee
......@@ -303,14 +303,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
// HasBlock announces the existence of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk}, nil, nil)
return bs.receiveBlocksFrom(context.Background(), time.Time{}, "", []blocks.Block{blk}, nil, nil)
}
// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, at time.Time, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
......@@ -348,6 +348,16 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b
allKs = append(allKs, b.Cid())
}
// If the message came from the network
if from != "" {
// Inform the PeerManager so that we can calculate per-peer latency
combined := make([]cid.Cid, 0, len(allKs)+len(haves)+len(dontHaves))
combined = append(combined, allKs...)
combined = append(combined, haves...)
combined = append(combined, dontHaves...)
bs.pm.ResponseReceived(from, at, combined)
}
// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
bs.sm.ReceiveFrom(ctx, from, allKs, haves, dontHaves)
......@@ -386,6 +396,8 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b
// ReceiveMessage is called by the network interface when a new message is
// received.
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
now := time.Now()
bs.counterLk.Lock()
bs.counters.messagesRecvd++
bs.counterLk.Unlock()
......@@ -409,7 +421,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
dontHaves := incoming.DontHaves()
if len(iblocks) > 0 || len(haves) > 0 || len(dontHaves) > 0 {
// Process blocks
err := bs.receiveBlocksFrom(ctx, p, iblocks, haves, dontHaves)
err := bs.receiveBlocksFrom(ctx, now, p, iblocks, haves, dontHaves)
if err != nil {
log.Warnf("ReceiveMessage recvBlockFrom error: %s", err)
return
......
......@@ -21,10 +21,20 @@ const (
// peer takes to process a want and initiate sending a response to us
maxExpectedWantProcessTime = 2 * time.Second
// latencyMultiplier is multiplied by the average ping time to
// maxTimeout is the maximum allowed timeout, regardless of latency
maxTimeout = dontHaveTimeout + maxExpectedWantProcessTime
// pingLatencyMultiplier is multiplied by the average ping time to
// get an upper bound on how long we expect to wait for a peer's response
// to arrive
latencyMultiplier = 3
pingLatencyMultiplier = 3
// messageLatencyAlpha is the alpha supplied to the message latency EWMA
messageLatencyAlpha = 0.5
// To give a margin for error, the timeout is calculated as
// messageLatencyMultiplier * message latency
messageLatencyMultiplier = 2
)
// PeerConnection is a connection to a peer that can be pinged, and the
......@@ -44,16 +54,20 @@ type pendingWant struct {
sent time.Time
}
// dontHaveTimeoutMgr pings the peer to measure latency. It uses the latency to
// set a reasonable timeout for simulating a DONT_HAVE message for peers that
// don't support DONT_HAVE or that take to long to respond.
// dontHaveTimeoutMgr simulates a DONT_HAVE message if the peer takes too long
// to respond to a message.
// The timeout is based on latency - we start with a default latency, while
// we ping the peer to estimate latency. If we receive a response from the
// peer we use the response latency.
type dontHaveTimeoutMgr struct {
ctx context.Context
shutdown func()
peerConn PeerConnection
onDontHaveTimeout func([]cid.Cid)
defaultTimeout time.Duration
latencyMultiplier int
maxTimeout time.Duration
pingLatencyMultiplier int
messageLatencyMultiplier int
maxExpectedWantProcessTime time.Duration
// All variables below here must be protected by the lock
......@@ -66,6 +80,8 @@ type dontHaveTimeoutMgr struct {
wantQueue []*pendingWant
// time to wait for a response (depends on latency)
timeout time.Duration
// ewma of message latency (time from message sent to response received)
messageLatency *latencyEwma
// timer used to wait until want at front of queue expires
checkForTimeoutsTimer *time.Timer
}
......@@ -73,13 +89,18 @@ type dontHaveTimeoutMgr struct {
// newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr
// onDontHaveTimeout is called when pending keys expire (not cancelled before timeout)
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid)) *dontHaveTimeoutMgr {
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout,
latencyMultiplier, maxExpectedWantProcessTime)
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout, maxTimeout,
pingLatencyMultiplier, messageLatencyMultiplier, maxExpectedWantProcessTime)
}
// newDontHaveTimeoutMgrWithParams is used by the tests
func newDontHaveTimeoutMgrWithParams(pc PeerConnection, onDontHaveTimeout func([]cid.Cid),
defaultTimeout time.Duration, latencyMultiplier int,
func newDontHaveTimeoutMgrWithParams(
pc PeerConnection,
onDontHaveTimeout func([]cid.Cid),
defaultTimeout time.Duration,
maxTimeout time.Duration,
pingLatencyMultiplier int,
messageLatencyMultiplier int,
maxExpectedWantProcessTime time.Duration) *dontHaveTimeoutMgr {
ctx, shutdown := context.WithCancel(context.Background())
......@@ -89,8 +110,11 @@ func newDontHaveTimeoutMgrWithParams(pc PeerConnection, onDontHaveTimeout func([
peerConn: pc,
activeWants: make(map[cid.Cid]*pendingWant),
timeout: defaultTimeout,
messageLatency: &latencyEwma{alpha: messageLatencyAlpha},
defaultTimeout: defaultTimeout,
latencyMultiplier: latencyMultiplier,
maxTimeout: maxTimeout,
pingLatencyMultiplier: pingLatencyMultiplier,
messageLatencyMultiplier: messageLatencyMultiplier,
maxExpectedWantProcessTime: maxExpectedWantProcessTime,
onDontHaveTimeout: onDontHaveTimeout,
}
......@@ -126,16 +150,36 @@ func (dhtm *dontHaveTimeoutMgr) Start() {
// calculate a reasonable timeout
latency := dhtm.peerConn.Latency()
if latency.Nanoseconds() > 0 {
dhtm.timeout = dhtm.calculateTimeoutFromLatency(latency)
dhtm.timeout = dhtm.calculateTimeoutFromPingLatency(latency)
return
}
// Otherwise measure latency by pinging the peer
go dhtm.measureLatency()
go dhtm.measurePingLatency()
}
// UpdateMessageLatency is called when we receive a response from the peer.
// It is the time between sending a request and receiving the corresponding
// response.
func (dhtm *dontHaveTimeoutMgr) UpdateMessageLatency(elapsed time.Duration) {
dhtm.lk.Lock()
defer dhtm.lk.Unlock()
// Update the message latency and the timeout
dhtm.messageLatency.update(elapsed)
oldTimeout := dhtm.timeout
dhtm.timeout = dhtm.calculateTimeoutFromMessageLatency()
// If the timeout has decreased
if dhtm.timeout < oldTimeout {
// Check if after changing the timeout there are any pending wants that
// are now over the timeout
dhtm.checkForTimeouts()
}
}
// measureLatency measures the latency to the peer by pinging it
func (dhtm *dontHaveTimeoutMgr) measureLatency() {
// measurePingLatency measures the latency to the peer by pinging it
func (dhtm *dontHaveTimeoutMgr) measurePingLatency() {
// Wait up to defaultTimeout for a response to the ping
ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.defaultTimeout)
defer cancel()
......@@ -154,8 +198,13 @@ func (dhtm *dontHaveTimeoutMgr) measureLatency() {
dhtm.lk.Lock()
defer dhtm.lk.Unlock()
// A message has arrived so we already set the timeout based on message latency
if dhtm.messageLatency.samples > 0 {
return
}
// Calculate a reasonable timeout based on latency
dhtm.timeout = dhtm.calculateTimeoutFromLatency(latency)
dhtm.timeout = dhtm.calculateTimeoutFromPingLatency(latency)
// Check if after changing the timeout there are any pending wants that are
// now over the timeout
......@@ -284,10 +333,43 @@ func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid) {
dhtm.onDontHaveTimeout(pending)
}
// calculateTimeoutFromLatency calculates a reasonable timeout derived from latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromLatency(latency time.Duration) time.Duration {
// calculateTimeoutFromPingLatency calculates a reasonable timeout derived from latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromPingLatency(latency time.Duration) time.Duration {
// The maximum expected time for a response is
// the expected time to process the want + (latency * multiplier)
// The multiplier is to provide some padding for variable latency.
return dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.latencyMultiplier)*latency
timeout := dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.pingLatencyMultiplier)*latency
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
}
return timeout
}
// calculateTimeoutFromMessageLatency calculates a timeout derived from message latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromMessageLatency() time.Duration {
timeout := dhtm.messageLatency.latency * time.Duration(dhtm.messageLatencyMultiplier)
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
}
return timeout
}
// latencyEwma is an EWMA of message latency
type latencyEwma struct {
alpha float64
samples uint64
latency time.Duration
}
// update the EWMA with the given sample
func (le *latencyEwma) update(elapsed time.Duration) {
le.samples++
// Initially set alpha to be 1.0 / <the number of samples>
alpha := 1.0 / float64(le.samples)
if alpha < le.alpha {
// Once we have enough samples, clamp alpha
alpha = le.alpha
}
le.latency = time.Duration(float64(elapsed)*alpha + (1-alpha)*float64(le.latency))
}
......@@ -79,7 +79,7 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) {
tr := timeoutRecorder{}
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()
......@@ -102,7 +102,7 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) {
// At this stage first set of keys should have timed out
if tr.timedOutCount() != len(firstks) {
t.Fatal("expected timeout")
t.Fatal("expected timeout", tr.timedOutCount(), len(firstks))
}
// Clear the recorded timed out keys
......@@ -129,7 +129,7 @@ func TestDontHaveTimeoutMgrCancel(t *testing.T) {
tr := timeoutRecorder{}
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()
......@@ -160,7 +160,7 @@ func TestDontHaveTimeoutWantCancelWant(t *testing.T) {
tr := timeoutRecorder{}
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()
......@@ -204,7 +204,7 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) {
tr := timeoutRecorder{}
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()
......@@ -222,6 +222,78 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) {
}
}
func TestDontHaveTimeoutMgrMessageLatency(t *testing.T) {
ks := testutil.GenerateCids(2)
latency := time.Millisecond * 40
latMultiplier := 1
expProcessTime := time.Duration(0)
msgLatencyMultiplier := 1
pc := &mockPeerConn{latency: latency}
tr := timeoutRecorder{}
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, maxTimeout, latMultiplier, msgLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()
// Add keys
dhtm.AddPending(ks)
// expectedTimeout
// = expProcessTime + latency*time.Duration(latMultiplier)
// = 0 + 40ms * 1
// = 40ms
// Wait for less than the expected timeout
time.Sleep(25 * time.Millisecond)
// Receive two message latency updates
dhtm.UpdateMessageLatency(time.Millisecond * 20)
dhtm.UpdateMessageLatency(time.Millisecond * 10)
// alpha is 0.5 so timeout should be
// = (20ms * alpha) + (10ms * (1 - alpha))
// = (20ms * 0.5) + (10ms * 0.5)
// = 15ms
// We've already slept for 25ms so with the new 15ms timeout
// the keys should have timed out
// Give the queue some time to process the updates
time.Sleep(5 * time.Millisecond)
if tr.timedOutCount() != len(ks) {
t.Fatal("expected keys to timeout")
}
}
func TestDontHaveTimeoutMgrMessageLatencyMax(t *testing.T) {
ks := testutil.GenerateCids(2)
pc := &mockPeerConn{latency: time.Second} // ignored
tr := timeoutRecorder{}
msgLatencyMultiplier := 1
testMaxTimeout := time.Millisecond * 10
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, testMaxTimeout, pingLatencyMultiplier, msgLatencyMultiplier, maxExpectedWantProcessTime)
dhtm.Start()
defer dhtm.Shutdown()
// Add keys
dhtm.AddPending(ks)
// Receive a message latency update that would make the timeout greater
// than the maximum timeout
dhtm.UpdateMessageLatency(testMaxTimeout * 4)
// Sleep until just after the maximum timeout
time.Sleep(testMaxTimeout + 5*time.Millisecond)
// Keys should have timed out
if tr.timedOutCount() != len(ks) {
t.Fatal("expected keys to timeout")
}
}
func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) {
ks := testutil.GenerateCids(2)
latency := time.Millisecond * 1
......@@ -233,7 +305,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) {
pc := &mockPeerConn{latency: latency, err: fmt.Errorf("ping error")}
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
defaultTimeout, latMultiplier, expProcessTime)
defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()
......@@ -267,7 +339,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) {
pc := &mockPeerConn{latency: latency}
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
defaultTimeout, latMultiplier, expProcessTime)
defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()
......@@ -300,7 +372,7 @@ func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) {
pc := &mockPeerConn{latency: latency}
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, latMultiplier, expProcessTime)
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime)
dhtm.Start()
defer dhtm.Shutdown()
......
......@@ -64,6 +64,7 @@ type MessageQueue struct {
sendErrorBackoff time.Duration
outgoingWork chan time.Time
responses chan *Response
// Take lock whenever any of these variables are modified
wllock sync.Mutex
......@@ -88,12 +89,15 @@ type recallWantlist struct {
pending *bswl.Wantlist
// The list of wants that have been sent
sent *bswl.Wantlist
// The time at which each want was sent
sentAt map[cid.Cid]time.Time
}
func newRecallWantList() recallWantlist {
return recallWantlist{
pending: bswl.New(),
sent: bswl.New(),
sentAt: make(map[cid.Cid]time.Time),
}
}
......@@ -104,14 +108,18 @@ func (r *recallWantlist) Add(c cid.Cid, priority int32, wtype pb.Message_Wantlis
// Remove wants from both the pending list and the list of sent wants
func (r *recallWantlist) Remove(c cid.Cid) {
r.sent.Remove(c)
r.pending.Remove(c)
r.sent.Remove(c)
delete(r.sentAt, c)
}
// Remove wants by type from both the pending list and the list of sent wants
func (r *recallWantlist) RemoveType(c cid.Cid, wtype pb.Message_Wantlist_WantType) {
r.sent.RemoveType(c, wtype)
r.pending.RemoveType(c, wtype)
r.sent.RemoveType(c, wtype)
if _, ok := r.sent.Contains(c); !ok {
delete(r.sentAt, c)
}
}
// MarkSent moves the want from the pending to the sent list
......@@ -126,6 +134,16 @@ func (r *recallWantlist) MarkSent(e wantlist.Entry) bool {
return true
}
// SentAt records the time at which a want was sent
func (r *recallWantlist) SentAt(c cid.Cid, at time.Time) {
// The want may have been cancelled in the interim
if _, ok := r.sent.Contains(c); ok {
if _, ok := r.sentAt[c]; !ok {
r.sentAt[c] = at
}
}
}
type peerConn struct {
p peer.ID
network MessageNetwork
......@@ -160,6 +178,15 @@ type DontHaveTimeoutManager interface {
AddPending([]cid.Cid)
// CancelPending removes the wants
CancelPending([]cid.Cid)
UpdateMessageLatency(time.Duration)
}
// Response from the peer
type Response struct {
// The time at which the response was received
at time.Time
// The blocks / HAVEs / DONT_HAVEs in the response
ks []cid.Cid
}
// New creates a new MessageQueue.
......@@ -177,7 +204,7 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
maxMsgSize int, sendErrorBackoff time.Duration, dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue {
ctx, cancel := context.WithCancel(ctx)
mq := &MessageQueue{
return &MessageQueue{
ctx: ctx,
shutdown: cancel,
p: p,
......@@ -188,6 +215,7 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
peerWants: newRecallWantList(),
cancels: cid.NewSet(),
outgoingWork: make(chan time.Time, 1),
responses: make(chan *Response, 8),
rebroadcastInterval: defaultRebroadcastInterval,
sendErrorBackoff: sendErrorBackoff,
priority: maxPriority,
......@@ -195,8 +223,6 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
// after using it, instead of creating a new one every time.
msg: bsmsg.New(false),
}
return mq
}
// Add want-haves that are part of a broadcast to all connected peers
......@@ -291,6 +317,22 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) {
}
}
// ResponseReceived is called when a message is received from the network.
// ks is the set of blocks, HAVEs and DONT_HAVEs in the message
// Note that this is just used to calculate latency.
func (mq *MessageQueue) ResponseReceived(at time.Time, ks []cid.Cid) {
if len(ks) == 0 {
return
}
// These messages are just used to approximate latency, so if we get so
// many responses that they get backed up, just ignore the overflow.
select {
case mq.responses <- &Response{at, ks}:
default:
}
}
// SetRebroadcastInterval sets a new interval on which to rebroadcast the full wantlist
func (mq *MessageQueue) SetRebroadcastInterval(delay time.Duration) {
mq.rebroadcastIntervalLk.Lock()
......@@ -340,6 +382,7 @@ func (mq *MessageQueue) runQueue() {
select {
case <-mq.rebroadcastTimer.C:
mq.rebroadcastWantlist()
case when := <-mq.outgoingWork:
// If we have work scheduled, cancel the timer. If we
// don't, record when the work was scheduled.
......@@ -362,11 +405,17 @@ func (mq *MessageQueue) runQueue() {
// Otherwise, extend the timer.
scheduleWork.Reset(sendMessageDebounce)
}
case <-scheduleWork.C:
// We have work scheduled and haven't seen any updates
// in sendMessageDebounce. Send immediately.
workScheduled = time.Time{}
mq.sendIfReady()
case res := <-mq.responses:
// We received a response from the peer, calculate latency
mq.handleResponse(res)
case <-mq.ctx.Done():
return
}
......@@ -431,7 +480,7 @@ func (mq *MessageQueue) sendMessage() {
mq.dhTimeoutMgr.Start()
// Convert want lists to a Bitswap Message
message := mq.extractOutgoingMessage(mq.sender.SupportsHave())
message, onSent := mq.extractOutgoingMessage(mq.sender.SupportsHave())
// After processing the message, clear out its fields to save memory
defer mq.msg.Reset(false)
......@@ -451,6 +500,9 @@ func (mq *MessageQueue) sendMessage() {
return
}
// Record sent time so as to calculate message latency
onSent()
// Set a timer to wait for responses
mq.simulateDontHaveWithTimeout(wantlist)
......@@ -489,6 +541,34 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) {
mq.dhTimeoutMgr.AddPending(wants)
}
// handleResponse is called when a response is received from the peer
func (mq *MessageQueue) handleResponse(res *Response) {
now := time.Now()
earliest := time.Time{}
mq.wllock.Lock()
// Check if the keys in the response correspond to any request that was
// sent to the peer.
// Find the earliest request so as to calculate the longest latency as
// we want to be conservative when setting the timeout.
for _, c := range res.ks {
if at, ok := mq.bcstWants.sentAt[c]; ok && (earliest.IsZero() || at.Before(earliest)) {
earliest = at
}
if at, ok := mq.peerWants.sentAt[c]; ok && (earliest.IsZero() || at.Before(earliest)) {
earliest = at
}
}
mq.wllock.Unlock()
if !earliest.IsZero() {
// Inform the timeout manager of the calculated latency
mq.dhTimeoutMgr.UpdateMessageLatency(now.Sub(earliest))
}
}
func (mq *MessageQueue) logOutgoingMessage(wantlist []bsmsg.Entry) {
// Save some CPU cycles and allocations if log level is higher than debug
if ce := sflog.Check(zap.DebugLevel, "sent message"); ce == nil {
......@@ -547,7 +627,7 @@ func (mq *MessageQueue) pendingWorkCount() int {
}
// Convert the lists of wants into a Bitswap message
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) bsmsg.BitSwapMessage {
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) {
// Get broadcast and regular wantlist entries.
mq.wllock.Lock()
peerEntries := mq.peerWants.pending.Entries()
......@@ -641,16 +721,18 @@ FINISH:
// Finally, re-take the lock, mark sent and remove any entries from our
// message that we've decided to cancel at the last minute.
mq.wllock.Lock()
for _, e := range peerEntries[:sentPeerEntries] {
for i, e := range peerEntries[:sentPeerEntries] {
if !mq.peerWants.MarkSent(e) {
// It changed.
mq.msg.Remove(e.Cid)
peerEntries[i].Cid = cid.Undef
}
}
for _, e := range bcstEntries[:sentBcstEntries] {
for i, e := range bcstEntries[:sentBcstEntries] {
if !mq.bcstWants.MarkSent(e) {
mq.msg.Remove(e.Cid)
bcstEntries[i].Cid = cid.Undef
}
}
......@@ -663,7 +745,28 @@ FINISH:
}
mq.wllock.Unlock()
return mq.msg
// When the message has been sent, record the time at which each want was
// sent so we can calculate message latency
onSent := func() {
now := time.Now()
mq.wllock.Lock()
defer mq.wllock.Unlock()
for _, e := range peerEntries[:sentPeerEntries] {
if e.Cid.Defined() { // Check if want was cancelled in the interim
mq.peerWants.SentAt(e.Cid, now)
}
}
for _, e := range bcstEntries[:sentBcstEntries] {
if e.Cid.Defined() { // Check if want was cancelled in the interim
mq.bcstWants.SentAt(e.Cid, now)
}
}
}
return mq.msg, onSent
}
func (mq *MessageQueue) initializeSender() (bsnet.MessageSender, error) {
......
......@@ -46,6 +46,7 @@ func (fms *fakeMessageNetwork) Ping(context.Context, peer.ID) ping.Result {
type fakeDontHaveTimeoutMgr struct {
lk sync.Mutex
ks []cid.Cid
latencyUpds []time.Duration
}
func (fp *fakeDontHaveTimeoutMgr) Start() {}
......@@ -73,6 +74,18 @@ func (fp *fakeDontHaveTimeoutMgr) CancelPending(ks []cid.Cid) {
}
fp.ks = s.Keys()
}
func (fp *fakeDontHaveTimeoutMgr) UpdateMessageLatency(elapsed time.Duration) {
fp.lk.Lock()
defer fp.lk.Unlock()
fp.latencyUpds = append(fp.latencyUpds, elapsed)
}
func (fp *fakeDontHaveTimeoutMgr) latencyUpdates() []time.Duration {
fp.lk.Lock()
defer fp.lk.Unlock()
return fp.latencyUpds
}
func (fp *fakeDontHaveTimeoutMgr) pendingCount() int {
fp.lk.Lock()
defer fp.lk.Unlock()
......@@ -587,6 +600,46 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) {
}
}
func TestResponseReceived(t *testing.T) {
ctx := context.Background()
messagesSent := make(chan []bsmsg.Entry)
resetChan := make(chan struct{}, 1)
fakeSender := newFakeMessageSender(resetChan, messagesSent, false)
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
peerID := testutil.GeneratePeers(1)[0]
dhtm := &fakeDontHaveTimeoutMgr{}
messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm)
messageQueue.Startup()
cids := testutil.GenerateCids(10)
// Add some wants and wait 10ms
messageQueue.AddWants(cids[:5], nil)
collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
// Add some wants and wait another 10ms
messageQueue.AddWants(cids[5:8], nil)
collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
// Receive a response for some of the wants from both groups
messageQueue.ResponseReceived(time.Now(), []cid.Cid{cids[0], cids[6], cids[9]})
// Wait a short time for processing
time.Sleep(10 * time.Millisecond)
// Check that message queue informs DHTM of received responses
upds := dhtm.latencyUpdates()
if len(upds) != 1 {
t.Fatal("expected one latency update")
}
// Elapsed time should be between when the first want was sent and the
// response received (about 20ms)
if upds[0] < 15*time.Millisecond || upds[0] > 25*time.Millisecond {
t.Fatal("expected latency to be time since oldest message sent")
}
}
func filterWantTypes(wantlist []bsmsg.Entry) ([]cid.Cid, []cid.Cid, []cid.Cid) {
var wbs []cid.Cid
var whs []cid.Cid
......
......@@ -3,6 +3,7 @@ package peermanager
import (
"context"
"sync"
"time"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-metrics-interface"
......@@ -18,6 +19,7 @@ type PeerQueue interface {
AddBroadcastWantHaves([]cid.Cid)
AddWants([]cid.Cid, []cid.Cid)
AddCancels([]cid.Cid)
ResponseReceived(at time.Time, ks []cid.Cid)
Startup()
Shutdown()
}
......@@ -116,6 +118,19 @@ func (pm *PeerManager) Disconnected(p peer.ID) {
pm.pwm.removePeer(p)
}
// ResponseReceived is called when a message is received from the network.
// ks is the set of blocks, HAVEs and DONT_HAVEs in the message
// Note that this is just used to calculate latency.
func (pm *PeerManager) ResponseReceived(p peer.ID, at time.Time, ks []cid.Cid) {
pm.pqLk.Lock()
pq, ok := pm.peerQueues[p]
pm.pqLk.Unlock()
if ok {
pq.ResponseReceived(at, ks)
}
}
// BroadcastWantHaves broadcasts want-haves to all peers (used by the session
// to discover seeds).
// For each peer it filters out want-haves that have previously been sent to
......
......@@ -35,6 +35,8 @@ func (fp *mockPeerQueue) AddWants(wbs []cid.Cid, whs []cid.Cid) {
func (fp *mockPeerQueue) AddCancels(cs []cid.Cid) {
fp.msgs <- msg{fp.p, nil, nil, cs}
}
func (fp *mockPeerQueue) ResponseReceived(at time.Time, ks []cid.Cid) {
}
type peerWants struct {
wantHaves []cid.Cid
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment