Commit 5c215f41 authored by Dirk McCormick's avatar Dirk McCormick

fix: simplify latency timing

parent 6763be87
......@@ -303,14 +303,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
// HasBlock announces the existence of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return bs.receiveBlocksFrom(context.Background(), time.Time{}, "", []blocks.Block{blk}, nil, nil)
return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk}, nil, nil)
}
// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, at time.Time, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
......@@ -355,7 +355,7 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, at time.Time, from pee
combined = append(combined, allKs...)
combined = append(combined, haves...)
combined = append(combined, dontHaves...)
bs.pm.ResponseReceived(from, at, combined)
bs.pm.ResponseReceived(from, combined)
}
// Send all block keys (including duplicates) to any sessions that want them.
......@@ -396,8 +396,6 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, at time.Time, from pee
// ReceiveMessage is called by the network interface when a new message is
// received.
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
now := time.Now()
bs.counterLk.Lock()
bs.counters.messagesRecvd++
bs.counterLk.Unlock()
......@@ -421,7 +419,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
dontHaves := incoming.DontHaves()
if len(iblocks) > 0 || len(haves) > 0 || len(dontHaves) > 0 {
// Process blocks
err := bs.receiveBlocksFrom(ctx, now, p, iblocks, haves, dontHaves)
err := bs.receiveBlocksFrom(ctx, p, iblocks, haves, dontHaves)
if err != nil {
log.Warnf("ReceiveMessage recvBlockFrom error: %s", err)
return
......
......@@ -63,8 +63,11 @@ type MessageQueue struct {
maxMessageSize int
sendErrorBackoff time.Duration
// Signals that there are outgoing wants / cancels ready to be processed
outgoingWork chan time.Time
responses chan *Response
// Channel of CIDs of blocks / HAVEs / DONT_HAVEs received from the peer
responses chan []cid.Cid
// Take lock whenever any of these variables are modified
wllock sync.Mutex
......@@ -181,14 +184,6 @@ type DontHaveTimeoutManager interface {
UpdateMessageLatency(time.Duration)
}
// Response from the peer
type Response struct {
// The time at which the response was received
at time.Time
// The blocks / HAVEs / DONT_HAVEs in the response
ks []cid.Cid
}
// New creates a new MessageQueue.
func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout) *MessageQueue {
onTimeout := func(ks []cid.Cid) {
......@@ -215,7 +210,7 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
peerWants: newRecallWantList(),
cancels: cid.NewSet(),
outgoingWork: make(chan time.Time, 1),
responses: make(chan *Response, 8),
responses: make(chan []cid.Cid, 8),
rebroadcastInterval: defaultRebroadcastInterval,
sendErrorBackoff: sendErrorBackoff,
priority: maxPriority,
......@@ -320,7 +315,7 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) {
// ResponseReceived is called when a message is received from the network.
// ks is the set of blocks, HAVEs and DONT_HAVEs in the message
// Note that this is just used to calculate latency.
func (mq *MessageQueue) ResponseReceived(at time.Time, ks []cid.Cid) {
func (mq *MessageQueue) ResponseReceived(ks []cid.Cid) {
if len(ks) == 0 {
return
}
......@@ -328,7 +323,7 @@ func (mq *MessageQueue) ResponseReceived(at time.Time, ks []cid.Cid) {
// These messages are just used to approximate latency, so if we get so
// many responses that they get backed up, just ignore the overflow.
select {
case mq.responses <- &Response{at, ks}:
case mq.responses <- ks:
default:
}
}
......@@ -541,8 +536,9 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) {
mq.dhTimeoutMgr.AddPending(wants)
}
// handleResponse is called when a response is received from the peer
func (mq *MessageQueue) handleResponse(res *Response) {
// handleResponse is called when a response is received from the peer,
// with the CIDs of received blocks / HAVEs / DONT_HAVEs
func (mq *MessageQueue) handleResponse(ks []cid.Cid) {
now := time.Now()
earliest := time.Time{}
......@@ -552,7 +548,7 @@ func (mq *MessageQueue) handleResponse(res *Response) {
// sent to the peer.
// Find the earliest request so as to calculate the longest latency as
// we want to be conservative when setting the timeout.
for _, c := range res.ks {
for _, c := range ks {
if at, ok := mq.bcstWants.sentAt[c]; ok && (earliest.IsZero() || at.Before(earliest)) {
earliest = at
}
......
......@@ -623,7 +623,7 @@ func TestResponseReceived(t *testing.T) {
collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
// Receive a response for some of the wants from both groups
messageQueue.ResponseReceived(time.Now(), []cid.Cid{cids[0], cids[6], cids[9]})
messageQueue.ResponseReceived([]cid.Cid{cids[0], cids[6], cids[9]})
// Wait a short time for processing
time.Sleep(10 * time.Millisecond)
......
......@@ -3,7 +3,6 @@ package peermanager
import (
"context"
"sync"
"time"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-metrics-interface"
......@@ -19,7 +18,7 @@ type PeerQueue interface {
AddBroadcastWantHaves([]cid.Cid)
AddWants([]cid.Cid, []cid.Cid)
AddCancels([]cid.Cid)
ResponseReceived(at time.Time, ks []cid.Cid)
ResponseReceived(ks []cid.Cid)
Startup()
Shutdown()
}
......@@ -121,13 +120,13 @@ func (pm *PeerManager) Disconnected(p peer.ID) {
// ResponseReceived is called when a message is received from the network.
// ks is the set of blocks, HAVEs and DONT_HAVEs in the message
// Note that this is just used to calculate latency.
func (pm *PeerManager) ResponseReceived(p peer.ID, at time.Time, ks []cid.Cid) {
func (pm *PeerManager) ResponseReceived(p peer.ID, ks []cid.Cid) {
pm.pqLk.Lock()
pq, ok := pm.peerQueues[p]
pm.pqLk.Unlock()
if ok {
pq.ResponseReceived(at, ks)
pq.ResponseReceived(ks)
}
}
......
......@@ -35,7 +35,7 @@ func (fp *mockPeerQueue) AddWants(wbs []cid.Cid, whs []cid.Cid) {
func (fp *mockPeerQueue) AddCancels(cs []cid.Cid) {
fp.msgs <- msg{fp.p, nil, nil, cs}
}
func (fp *mockPeerQueue) ResponseReceived(at time.Time, ks []cid.Cid) {
func (fp *mockPeerQueue) ResponseReceived(ks []cid.Cid) {
}
type peerWants struct {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment