diff --git a/bitswap.go b/bitswap.go index db0ca0986de0cb7629dbc40129f5b045d37423d8..36b95cfd5528302a3ddac8638c3178c545e00b97 100644 --- a/bitswap.go +++ b/bitswap.go @@ -303,14 +303,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks // HasBlock announces the existence of a block to this bitswap service. The // service will potentially notify its peers. func (bs *Bitswap) HasBlock(blk blocks.Block) error { - return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk}, nil, nil) + return bs.receiveBlocksFrom(context.Background(), time.Time{}, "", []blocks.Block{blk}, nil, nil) } // TODO: Some of this stuff really only needs to be done when adding a block // from the user, not when receiving it from the network. // In case you run `git blame` on this comment, I'll save you some time: ask // @whyrusleeping, I don't know the answers you seek. -func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error { +func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, at time.Time, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error { select { case <-bs.process.Closing(): return errors.New("bitswap is closed") @@ -348,6 +348,16 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b allKs = append(allKs, b.Cid()) } + // If the message came from the network + if from != "" { + // Inform the PeerManager so that we can calculate per-peer latency + combined := make([]cid.Cid, 0, len(allKs)+len(haves)+len(dontHaves)) + combined = append(combined, allKs...) + combined = append(combined, haves...) + combined = append(combined, dontHaves...) + bs.pm.ResponseReceived(from, at, combined) + } + // Send all block keys (including duplicates) to any sessions that want them. // (The duplicates are needed by sessions for accounting purposes) bs.sm.ReceiveFrom(ctx, from, allKs, haves, dontHaves) @@ -386,6 +396,8 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b // ReceiveMessage is called by the network interface when a new message is // received. func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) { + now := time.Now() + bs.counterLk.Lock() bs.counters.messagesRecvd++ bs.counterLk.Unlock() @@ -409,7 +421,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg dontHaves := incoming.DontHaves() if len(iblocks) > 0 || len(haves) > 0 || len(dontHaves) > 0 { // Process blocks - err := bs.receiveBlocksFrom(ctx, p, iblocks, haves, dontHaves) + err := bs.receiveBlocksFrom(ctx, now, p, iblocks, haves, dontHaves) if err != nil { log.Warnf("ReceiveMessage recvBlockFrom error: %s", err) return diff --git a/internal/messagequeue/donthavetimeoutmgr.go b/internal/messagequeue/donthavetimeoutmgr.go index e53b232e6d9d8a9d49f41db7eefc79e71d047361..14e70c077b1646b0891d02a9c5a19cc1b3dd37e9 100644 --- a/internal/messagequeue/donthavetimeoutmgr.go +++ b/internal/messagequeue/donthavetimeoutmgr.go @@ -21,10 +21,20 @@ const ( // peer takes to process a want and initiate sending a response to us maxExpectedWantProcessTime = 2 * time.Second - // latencyMultiplier is multiplied by the average ping time to + // maxTimeout is the maximum allowed timeout, regardless of latency + maxTimeout = dontHaveTimeout + maxExpectedWantProcessTime + + // pingLatencyMultiplier is multiplied by the average ping time to // get an upper bound on how long we expect to wait for a peer's response // to arrive - latencyMultiplier = 3 + pingLatencyMultiplier = 3 + + // messageLatencyAlpha is the alpha supplied to the message latency EWMA + messageLatencyAlpha = 0.5 + + // To give a margin for error, the timeout is calculated as + // messageLatencyMultiplier * message latency + messageLatencyMultiplier = 2 ) // PeerConnection is a connection to a peer that can be pinged, and the @@ -44,16 +54,20 @@ type pendingWant struct { sent time.Time } -// dontHaveTimeoutMgr pings the peer to measure latency. It uses the latency to -// set a reasonable timeout for simulating a DONT_HAVE message for peers that -// don't support DONT_HAVE or that take to long to respond. +// dontHaveTimeoutMgr simulates a DONT_HAVE message if the peer takes too long +// to respond to a message. +// The timeout is based on latency - we start with a default latency, while +// we ping the peer to estimate latency. If we receive a response from the +// peer we use the response latency. type dontHaveTimeoutMgr struct { ctx context.Context shutdown func() peerConn PeerConnection onDontHaveTimeout func([]cid.Cid) defaultTimeout time.Duration - latencyMultiplier int + maxTimeout time.Duration + pingLatencyMultiplier int + messageLatencyMultiplier int maxExpectedWantProcessTime time.Duration // All variables below here must be protected by the lock @@ -66,6 +80,8 @@ type dontHaveTimeoutMgr struct { wantQueue []*pendingWant // time to wait for a response (depends on latency) timeout time.Duration + // ewma of message latency (time from message sent to response received) + messageLatency *latencyEwma // timer used to wait until want at front of queue expires checkForTimeoutsTimer *time.Timer } @@ -73,13 +89,18 @@ type dontHaveTimeoutMgr struct { // newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr // onDontHaveTimeout is called when pending keys expire (not cancelled before timeout) func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid)) *dontHaveTimeoutMgr { - return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout, - latencyMultiplier, maxExpectedWantProcessTime) + return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout, maxTimeout, + pingLatencyMultiplier, messageLatencyMultiplier, maxExpectedWantProcessTime) } // newDontHaveTimeoutMgrWithParams is used by the tests -func newDontHaveTimeoutMgrWithParams(pc PeerConnection, onDontHaveTimeout func([]cid.Cid), - defaultTimeout time.Duration, latencyMultiplier int, +func newDontHaveTimeoutMgrWithParams( + pc PeerConnection, + onDontHaveTimeout func([]cid.Cid), + defaultTimeout time.Duration, + maxTimeout time.Duration, + pingLatencyMultiplier int, + messageLatencyMultiplier int, maxExpectedWantProcessTime time.Duration) *dontHaveTimeoutMgr { ctx, shutdown := context.WithCancel(context.Background()) @@ -89,8 +110,11 @@ func newDontHaveTimeoutMgrWithParams(pc PeerConnection, onDontHaveTimeout func([ peerConn: pc, activeWants: make(map[cid.Cid]*pendingWant), timeout: defaultTimeout, + messageLatency: &latencyEwma{alpha: messageLatencyAlpha}, defaultTimeout: defaultTimeout, - latencyMultiplier: latencyMultiplier, + maxTimeout: maxTimeout, + pingLatencyMultiplier: pingLatencyMultiplier, + messageLatencyMultiplier: messageLatencyMultiplier, maxExpectedWantProcessTime: maxExpectedWantProcessTime, onDontHaveTimeout: onDontHaveTimeout, } @@ -126,16 +150,36 @@ func (dhtm *dontHaveTimeoutMgr) Start() { // calculate a reasonable timeout latency := dhtm.peerConn.Latency() if latency.Nanoseconds() > 0 { - dhtm.timeout = dhtm.calculateTimeoutFromLatency(latency) + dhtm.timeout = dhtm.calculateTimeoutFromPingLatency(latency) return } // Otherwise measure latency by pinging the peer - go dhtm.measureLatency() + go dhtm.measurePingLatency() +} + +// UpdateMessageLatency is called when we receive a response from the peer. +// It is the time between sending a request and receiving the corresponding +// response. +func (dhtm *dontHaveTimeoutMgr) UpdateMessageLatency(elapsed time.Duration) { + dhtm.lk.Lock() + defer dhtm.lk.Unlock() + + // Update the message latency and the timeout + dhtm.messageLatency.update(elapsed) + oldTimeout := dhtm.timeout + dhtm.timeout = dhtm.calculateTimeoutFromMessageLatency() + + // If the timeout has decreased + if dhtm.timeout < oldTimeout { + // Check if after changing the timeout there are any pending wants that + // are now over the timeout + dhtm.checkForTimeouts() + } } -// measureLatency measures the latency to the peer by pinging it -func (dhtm *dontHaveTimeoutMgr) measureLatency() { +// measurePingLatency measures the latency to the peer by pinging it +func (dhtm *dontHaveTimeoutMgr) measurePingLatency() { // Wait up to defaultTimeout for a response to the ping ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.defaultTimeout) defer cancel() @@ -154,8 +198,13 @@ func (dhtm *dontHaveTimeoutMgr) measureLatency() { dhtm.lk.Lock() defer dhtm.lk.Unlock() + // A message has arrived so we already set the timeout based on message latency + if dhtm.messageLatency.samples > 0 { + return + } + // Calculate a reasonable timeout based on latency - dhtm.timeout = dhtm.calculateTimeoutFromLatency(latency) + dhtm.timeout = dhtm.calculateTimeoutFromPingLatency(latency) // Check if after changing the timeout there are any pending wants that are // now over the timeout @@ -284,10 +333,43 @@ func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid) { dhtm.onDontHaveTimeout(pending) } -// calculateTimeoutFromLatency calculates a reasonable timeout derived from latency -func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromLatency(latency time.Duration) time.Duration { +// calculateTimeoutFromPingLatency calculates a reasonable timeout derived from latency +func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromPingLatency(latency time.Duration) time.Duration { // The maximum expected time for a response is // the expected time to process the want + (latency * multiplier) // The multiplier is to provide some padding for variable latency. - return dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.latencyMultiplier)*latency + timeout := dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.pingLatencyMultiplier)*latency + if timeout > dhtm.maxTimeout { + timeout = dhtm.maxTimeout + } + return timeout +} + +// calculateTimeoutFromMessageLatency calculates a timeout derived from message latency +func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromMessageLatency() time.Duration { + timeout := dhtm.messageLatency.latency * time.Duration(dhtm.messageLatencyMultiplier) + if timeout > dhtm.maxTimeout { + timeout = dhtm.maxTimeout + } + return timeout +} + +// latencyEwma is an EWMA of message latency +type latencyEwma struct { + alpha float64 + samples uint64 + latency time.Duration +} + +// update the EWMA with the given sample +func (le *latencyEwma) update(elapsed time.Duration) { + le.samples++ + + // Initially set alpha to be 1.0 / + alpha := 1.0 / float64(le.samples) + if alpha < le.alpha { + // Once we have enough samples, clamp alpha + alpha = le.alpha + } + le.latency = time.Duration(float64(elapsed)*alpha + (1-alpha)*float64(le.latency)) } diff --git a/internal/messagequeue/donthavetimeoutmgr_test.go b/internal/messagequeue/donthavetimeoutmgr_test.go index 03ceb4816631286f6fd6e3cafc5da3e9eaf21e8c..6f315fea9b933120f325a1bb32d13fbe572f736e 100644 --- a/internal/messagequeue/donthavetimeoutmgr_test.go +++ b/internal/messagequeue/donthavetimeoutmgr_test.go @@ -79,7 +79,7 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) { tr := timeoutRecorder{} dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, latMultiplier, expProcessTime) + dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime) dhtm.Start() defer dhtm.Shutdown() @@ -102,7 +102,7 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) { // At this stage first set of keys should have timed out if tr.timedOutCount() != len(firstks) { - t.Fatal("expected timeout") + t.Fatal("expected timeout", tr.timedOutCount(), len(firstks)) } // Clear the recorded timed out keys @@ -129,7 +129,7 @@ func TestDontHaveTimeoutMgrCancel(t *testing.T) { tr := timeoutRecorder{} dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, latMultiplier, expProcessTime) + dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime) dhtm.Start() defer dhtm.Shutdown() @@ -160,7 +160,7 @@ func TestDontHaveTimeoutWantCancelWant(t *testing.T) { tr := timeoutRecorder{} dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, latMultiplier, expProcessTime) + dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime) dhtm.Start() defer dhtm.Shutdown() @@ -204,7 +204,7 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) { tr := timeoutRecorder{} dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, latMultiplier, expProcessTime) + dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime) dhtm.Start() defer dhtm.Shutdown() @@ -222,6 +222,78 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) { } } +func TestDontHaveTimeoutMgrMessageLatency(t *testing.T) { + ks := testutil.GenerateCids(2) + latency := time.Millisecond * 40 + latMultiplier := 1 + expProcessTime := time.Duration(0) + msgLatencyMultiplier := 1 + pc := &mockPeerConn{latency: latency} + tr := timeoutRecorder{} + + dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, + dontHaveTimeout, maxTimeout, latMultiplier, msgLatencyMultiplier, expProcessTime) + dhtm.Start() + defer dhtm.Shutdown() + + // Add keys + dhtm.AddPending(ks) + + // expectedTimeout + // = expProcessTime + latency*time.Duration(latMultiplier) + // = 0 + 40ms * 1 + // = 40ms + + // Wait for less than the expected timeout + time.Sleep(25 * time.Millisecond) + + // Receive two message latency updates + dhtm.UpdateMessageLatency(time.Millisecond * 20) + dhtm.UpdateMessageLatency(time.Millisecond * 10) + + // alpha is 0.5 so timeout should be + // = (20ms * alpha) + (10ms * (1 - alpha)) + // = (20ms * 0.5) + (10ms * 0.5) + // = 15ms + // We've already slept for 25ms so with the new 15ms timeout + // the keys should have timed out + + // Give the queue some time to process the updates + time.Sleep(5 * time.Millisecond) + + if tr.timedOutCount() != len(ks) { + t.Fatal("expected keys to timeout") + } +} + +func TestDontHaveTimeoutMgrMessageLatencyMax(t *testing.T) { + ks := testutil.GenerateCids(2) + pc := &mockPeerConn{latency: time.Second} // ignored + tr := timeoutRecorder{} + msgLatencyMultiplier := 1 + testMaxTimeout := time.Millisecond * 10 + + dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, + dontHaveTimeout, testMaxTimeout, pingLatencyMultiplier, msgLatencyMultiplier, maxExpectedWantProcessTime) + dhtm.Start() + defer dhtm.Shutdown() + + // Add keys + dhtm.AddPending(ks) + + // Receive a message latency update that would make the timeout greater + // than the maximum timeout + dhtm.UpdateMessageLatency(testMaxTimeout * 4) + + // Sleep until just after the maximum timeout + time.Sleep(testMaxTimeout + 5*time.Millisecond) + + // Keys should have timed out + if tr.timedOutCount() != len(ks) { + t.Fatal("expected keys to timeout") + } +} + func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) { ks := testutil.GenerateCids(2) latency := time.Millisecond * 1 @@ -233,7 +305,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) { pc := &mockPeerConn{latency: latency, err: fmt.Errorf("ping error")} dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - defaultTimeout, latMultiplier, expProcessTime) + defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime) dhtm.Start() defer dhtm.Shutdown() @@ -267,7 +339,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) { pc := &mockPeerConn{latency: latency} dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - defaultTimeout, latMultiplier, expProcessTime) + defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime) dhtm.Start() defer dhtm.Shutdown() @@ -300,7 +372,7 @@ func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) { pc := &mockPeerConn{latency: latency} dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, latMultiplier, expProcessTime) + dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime) dhtm.Start() defer dhtm.Shutdown() diff --git a/internal/messagequeue/messagequeue.go b/internal/messagequeue/messagequeue.go index 755df08a7183e0bb520bb8906868064783658bbe..9db2a86281e2dbf7ec208509cc2234ab118eceec 100644 --- a/internal/messagequeue/messagequeue.go +++ b/internal/messagequeue/messagequeue.go @@ -64,6 +64,7 @@ type MessageQueue struct { sendErrorBackoff time.Duration outgoingWork chan time.Time + responses chan *Response // Take lock whenever any of these variables are modified wllock sync.Mutex @@ -88,12 +89,15 @@ type recallWantlist struct { pending *bswl.Wantlist // The list of wants that have been sent sent *bswl.Wantlist + // The time at which each want was sent + sentAt map[cid.Cid]time.Time } func newRecallWantList() recallWantlist { return recallWantlist{ pending: bswl.New(), sent: bswl.New(), + sentAt: make(map[cid.Cid]time.Time), } } @@ -104,14 +108,18 @@ func (r *recallWantlist) Add(c cid.Cid, priority int32, wtype pb.Message_Wantlis // Remove wants from both the pending list and the list of sent wants func (r *recallWantlist) Remove(c cid.Cid) { - r.sent.Remove(c) r.pending.Remove(c) + r.sent.Remove(c) + delete(r.sentAt, c) } // Remove wants by type from both the pending list and the list of sent wants func (r *recallWantlist) RemoveType(c cid.Cid, wtype pb.Message_Wantlist_WantType) { - r.sent.RemoveType(c, wtype) r.pending.RemoveType(c, wtype) + r.sent.RemoveType(c, wtype) + if _, ok := r.sent.Contains(c); !ok { + delete(r.sentAt, c) + } } // MarkSent moves the want from the pending to the sent list @@ -126,6 +134,16 @@ func (r *recallWantlist) MarkSent(e wantlist.Entry) bool { return true } +// SentAt records the time at which a want was sent +func (r *recallWantlist) SentAt(c cid.Cid, at time.Time) { + // The want may have been cancelled in the interim + if _, ok := r.sent.Contains(c); ok { + if _, ok := r.sentAt[c]; !ok { + r.sentAt[c] = at + } + } +} + type peerConn struct { p peer.ID network MessageNetwork @@ -160,6 +178,15 @@ type DontHaveTimeoutManager interface { AddPending([]cid.Cid) // CancelPending removes the wants CancelPending([]cid.Cid) + UpdateMessageLatency(time.Duration) +} + +// Response from the peer +type Response struct { + // The time at which the response was received + at time.Time + // The blocks / HAVEs / DONT_HAVEs in the response + ks []cid.Cid } // New creates a new MessageQueue. @@ -177,7 +204,7 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, maxMsgSize int, sendErrorBackoff time.Duration, dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue { ctx, cancel := context.WithCancel(ctx) - mq := &MessageQueue{ + return &MessageQueue{ ctx: ctx, shutdown: cancel, p: p, @@ -188,6 +215,7 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, peerWants: newRecallWantList(), cancels: cid.NewSet(), outgoingWork: make(chan time.Time, 1), + responses: make(chan *Response, 8), rebroadcastInterval: defaultRebroadcastInterval, sendErrorBackoff: sendErrorBackoff, priority: maxPriority, @@ -195,8 +223,6 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, // after using it, instead of creating a new one every time. msg: bsmsg.New(false), } - - return mq } // Add want-haves that are part of a broadcast to all connected peers @@ -291,6 +317,22 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) { } } +// ResponseReceived is called when a message is received from the network. +// ks is the set of blocks, HAVEs and DONT_HAVEs in the message +// Note that this is just used to calculate latency. +func (mq *MessageQueue) ResponseReceived(at time.Time, ks []cid.Cid) { + if len(ks) == 0 { + return + } + + // These messages are just used to approximate latency, so if we get so + // many responses that they get backed up, just ignore the overflow. + select { + case mq.responses <- &Response{at, ks}: + default: + } +} + // SetRebroadcastInterval sets a new interval on which to rebroadcast the full wantlist func (mq *MessageQueue) SetRebroadcastInterval(delay time.Duration) { mq.rebroadcastIntervalLk.Lock() @@ -340,6 +382,7 @@ func (mq *MessageQueue) runQueue() { select { case <-mq.rebroadcastTimer.C: mq.rebroadcastWantlist() + case when := <-mq.outgoingWork: // If we have work scheduled, cancel the timer. If we // don't, record when the work was scheduled. @@ -362,11 +405,17 @@ func (mq *MessageQueue) runQueue() { // Otherwise, extend the timer. scheduleWork.Reset(sendMessageDebounce) } + case <-scheduleWork.C: // We have work scheduled and haven't seen any updates // in sendMessageDebounce. Send immediately. workScheduled = time.Time{} mq.sendIfReady() + + case res := <-mq.responses: + // We received a response from the peer, calculate latency + mq.handleResponse(res) + case <-mq.ctx.Done(): return } @@ -431,7 +480,7 @@ func (mq *MessageQueue) sendMessage() { mq.dhTimeoutMgr.Start() // Convert want lists to a Bitswap Message - message := mq.extractOutgoingMessage(mq.sender.SupportsHave()) + message, onSent := mq.extractOutgoingMessage(mq.sender.SupportsHave()) // After processing the message, clear out its fields to save memory defer mq.msg.Reset(false) @@ -451,6 +500,9 @@ func (mq *MessageQueue) sendMessage() { return } + // Record sent time so as to calculate message latency + onSent() + // Set a timer to wait for responses mq.simulateDontHaveWithTimeout(wantlist) @@ -489,6 +541,34 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) { mq.dhTimeoutMgr.AddPending(wants) } +// handleResponse is called when a response is received from the peer +func (mq *MessageQueue) handleResponse(res *Response) { + now := time.Now() + earliest := time.Time{} + + mq.wllock.Lock() + + // Check if the keys in the response correspond to any request that was + // sent to the peer. + // Find the earliest request so as to calculate the longest latency as + // we want to be conservative when setting the timeout. + for _, c := range res.ks { + if at, ok := mq.bcstWants.sentAt[c]; ok && (earliest.IsZero() || at.Before(earliest)) { + earliest = at + } + if at, ok := mq.peerWants.sentAt[c]; ok && (earliest.IsZero() || at.Before(earliest)) { + earliest = at + } + } + + mq.wllock.Unlock() + + if !earliest.IsZero() { + // Inform the timeout manager of the calculated latency + mq.dhTimeoutMgr.UpdateMessageLatency(now.Sub(earliest)) + } +} + func (mq *MessageQueue) logOutgoingMessage(wantlist []bsmsg.Entry) { // Save some CPU cycles and allocations if log level is higher than debug if ce := sflog.Check(zap.DebugLevel, "sent message"); ce == nil { @@ -547,7 +627,7 @@ func (mq *MessageQueue) pendingWorkCount() int { } // Convert the lists of wants into a Bitswap message -func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) bsmsg.BitSwapMessage { +func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) { // Get broadcast and regular wantlist entries. mq.wllock.Lock() peerEntries := mq.peerWants.pending.Entries() @@ -641,16 +721,18 @@ FINISH: // Finally, re-take the lock, mark sent and remove any entries from our // message that we've decided to cancel at the last minute. mq.wllock.Lock() - for _, e := range peerEntries[:sentPeerEntries] { + for i, e := range peerEntries[:sentPeerEntries] { if !mq.peerWants.MarkSent(e) { // It changed. mq.msg.Remove(e.Cid) + peerEntries[i].Cid = cid.Undef } } - for _, e := range bcstEntries[:sentBcstEntries] { + for i, e := range bcstEntries[:sentBcstEntries] { if !mq.bcstWants.MarkSent(e) { mq.msg.Remove(e.Cid) + bcstEntries[i].Cid = cid.Undef } } @@ -663,7 +745,28 @@ FINISH: } mq.wllock.Unlock() - return mq.msg + // When the message has been sent, record the time at which each want was + // sent so we can calculate message latency + onSent := func() { + now := time.Now() + + mq.wllock.Lock() + defer mq.wllock.Unlock() + + for _, e := range peerEntries[:sentPeerEntries] { + if e.Cid.Defined() { // Check if want was cancelled in the interim + mq.peerWants.SentAt(e.Cid, now) + } + } + + for _, e := range bcstEntries[:sentBcstEntries] { + if e.Cid.Defined() { // Check if want was cancelled in the interim + mq.bcstWants.SentAt(e.Cid, now) + } + } + } + + return mq.msg, onSent } func (mq *MessageQueue) initializeSender() (bsnet.MessageSender, error) { diff --git a/internal/messagequeue/messagequeue_test.go b/internal/messagequeue/messagequeue_test.go index 344da41a58c3e53afa3cf05690fe02c8770dfaee..32a7242c2fcc12b4bd4cb9beaabaa98da071dd9a 100644 --- a/internal/messagequeue/messagequeue_test.go +++ b/internal/messagequeue/messagequeue_test.go @@ -44,8 +44,9 @@ func (fms *fakeMessageNetwork) Ping(context.Context, peer.ID) ping.Result { } type fakeDontHaveTimeoutMgr struct { - lk sync.Mutex - ks []cid.Cid + lk sync.Mutex + ks []cid.Cid + latencyUpds []time.Duration } func (fp *fakeDontHaveTimeoutMgr) Start() {} @@ -73,6 +74,18 @@ func (fp *fakeDontHaveTimeoutMgr) CancelPending(ks []cid.Cid) { } fp.ks = s.Keys() } +func (fp *fakeDontHaveTimeoutMgr) UpdateMessageLatency(elapsed time.Duration) { + fp.lk.Lock() + defer fp.lk.Unlock() + + fp.latencyUpds = append(fp.latencyUpds, elapsed) +} +func (fp *fakeDontHaveTimeoutMgr) latencyUpdates() []time.Duration { + fp.lk.Lock() + defer fp.lk.Unlock() + + return fp.latencyUpds +} func (fp *fakeDontHaveTimeoutMgr) pendingCount() int { fp.lk.Lock() defer fp.lk.Unlock() @@ -587,6 +600,46 @@ func TestSendToPeerThatDoesntSupportHaveMonitorsTimeouts(t *testing.T) { } } +func TestResponseReceived(t *testing.T) { + ctx := context.Background() + messagesSent := make(chan []bsmsg.Entry) + resetChan := make(chan struct{}, 1) + fakeSender := newFakeMessageSender(resetChan, messagesSent, false) + fakenet := &fakeMessageNetwork{nil, nil, fakeSender} + peerID := testutil.GeneratePeers(1)[0] + + dhtm := &fakeDontHaveTimeoutMgr{} + messageQueue := newMessageQueue(ctx, peerID, fakenet, maxMessageSize, sendErrorBackoff, dhtm) + messageQueue.Startup() + + cids := testutil.GenerateCids(10) + + // Add some wants and wait 10ms + messageQueue.AddWants(cids[:5], nil) + collectMessages(ctx, t, messagesSent, 10*time.Millisecond) + + // Add some wants and wait another 10ms + messageQueue.AddWants(cids[5:8], nil) + collectMessages(ctx, t, messagesSent, 10*time.Millisecond) + + // Receive a response for some of the wants from both groups + messageQueue.ResponseReceived(time.Now(), []cid.Cid{cids[0], cids[6], cids[9]}) + + // Wait a short time for processing + time.Sleep(10 * time.Millisecond) + + // Check that message queue informs DHTM of received responses + upds := dhtm.latencyUpdates() + if len(upds) != 1 { + t.Fatal("expected one latency update") + } + // Elapsed time should be between when the first want was sent and the + // response received (about 20ms) + if upds[0] < 15*time.Millisecond || upds[0] > 25*time.Millisecond { + t.Fatal("expected latency to be time since oldest message sent") + } +} + func filterWantTypes(wantlist []bsmsg.Entry) ([]cid.Cid, []cid.Cid, []cid.Cid) { var wbs []cid.Cid var whs []cid.Cid diff --git a/internal/peermanager/peermanager.go b/internal/peermanager/peermanager.go index 5228232638f39d681d7453efe675063113e8bbd0..aa40727b22728336bfe1469cb35307cce65a8569 100644 --- a/internal/peermanager/peermanager.go +++ b/internal/peermanager/peermanager.go @@ -3,6 +3,7 @@ package peermanager import ( "context" "sync" + "time" logging "github.com/ipfs/go-log" "github.com/ipfs/go-metrics-interface" @@ -18,6 +19,7 @@ type PeerQueue interface { AddBroadcastWantHaves([]cid.Cid) AddWants([]cid.Cid, []cid.Cid) AddCancels([]cid.Cid) + ResponseReceived(at time.Time, ks []cid.Cid) Startup() Shutdown() } @@ -116,6 +118,19 @@ func (pm *PeerManager) Disconnected(p peer.ID) { pm.pwm.removePeer(p) } +// ResponseReceived is called when a message is received from the network. +// ks is the set of blocks, HAVEs and DONT_HAVEs in the message +// Note that this is just used to calculate latency. +func (pm *PeerManager) ResponseReceived(p peer.ID, at time.Time, ks []cid.Cid) { + pm.pqLk.Lock() + pq, ok := pm.peerQueues[p] + pm.pqLk.Unlock() + + if ok { + pq.ResponseReceived(at, ks) + } +} + // BroadcastWantHaves broadcasts want-haves to all peers (used by the session // to discover seeds). // For each peer it filters out want-haves that have previously been sent to diff --git a/internal/peermanager/peermanager_test.go b/internal/peermanager/peermanager_test.go index 469aa4d199a2000fb613d55ab9a5daa2130d578c..d5d348fe63626122d287251f7d11ec53d3f6f12f 100644 --- a/internal/peermanager/peermanager_test.go +++ b/internal/peermanager/peermanager_test.go @@ -35,6 +35,8 @@ func (fp *mockPeerQueue) AddWants(wbs []cid.Cid, whs []cid.Cid) { func (fp *mockPeerQueue) AddCancels(cs []cid.Cid) { fp.msgs <- msg{fp.p, nil, nil, cs} } +func (fp *mockPeerQueue) ResponseReceived(at time.Time, ks []cid.Cid) { +} type peerWants struct { wantHaves []cid.Cid