Commit 0d8b75d7 authored by hannahhoward's avatar hannahhoward

feat(sessions): record duplicate responses

send duplicate responses to the session peer manager to track latencies
parent 1bf9ed31
......@@ -333,9 +333,8 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
defer wg.Done()
bs.updateReceiveCounters(b)
bs.sm.UpdateReceiveCounters(b)
bs.sm.UpdateReceiveCounters(p, b)
log.Debugf("[recv] block; cid=%s, peer=%s", b.Cid(), p)
// skip received blocks that are not in the wantlist
if !bs.wm.IsWanted(b.Cid()) {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), p)
......
......@@ -147,9 +147,9 @@ func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
// UpdateReceiveCounters updates receive counters for a block,
// which may be a duplicate and adjusts the split factor based on that.
func (s *Session) UpdateReceiveCounters(blk blocks.Block) {
func (s *Session) UpdateReceiveCounters(from peer.ID, blk blocks.Block) {
select {
case s.incoming <- blkRecv{from: "", blk: blk, counterMessage: true}:
case s.incoming <- blkRecv{from: from, blk: blk, counterMessage: true}:
case <-s.ctx.Done():
}
}
......@@ -308,7 +308,6 @@ func (s *Session) handleCancel(keys []cid.Cid) {
}
func (s *Session) handleIdleTick(ctx context.Context) {
live := make([]cid.Cid, 0, len(s.liveWants))
now := time.Now()
for c := range s.liveWants {
......@@ -415,6 +414,9 @@ func (s *Session) updateReceiveCounters(ctx context.Context, blk blkRecv) {
ks := blk.blk.Cid()
if s.pastWants.Has(ks) {
s.srs.RecordDuplicateBlock()
if blk.from != "" {
s.pm.RecordPeerResponse(blk.from, ks)
}
}
}
......
......@@ -19,7 +19,7 @@ type Session interface {
exchange.Fetcher
InterestedIn(cid.Cid) bool
ReceiveBlockFrom(peer.ID, blocks.Block)
UpdateReceiveCounters(blocks.Block)
UpdateReceiveCounters(peer.ID, blocks.Block)
}
type sesTrk struct {
......@@ -128,11 +128,11 @@ func (sm *SessionManager) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
// UpdateReceiveCounters records the fact that a block was received, allowing
// sessions to track duplicates
func (sm *SessionManager) UpdateReceiveCounters(blk blocks.Block) {
func (sm *SessionManager) UpdateReceiveCounters(from peer.ID, blk blocks.Block) {
sm.sessLk.Lock()
defer sm.sessLk.Unlock()
for _, s := range sm.sessions {
s.session.UpdateReceiveCounters(blk)
s.session.UpdateReceiveCounters(from, blk)
}
}
......@@ -32,7 +32,7 @@ func (*fakeSession) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block,
}
func (fs *fakeSession) InterestedIn(cid.Cid) bool { return fs.interested }
func (fs *fakeSession) ReceiveBlockFrom(peer.ID, blocks.Block) { fs.receivedBlock = true }
func (fs *fakeSession) UpdateReceiveCounters(blocks.Block) { fs.updateReceiveCounters = true }
func (fs *fakeSession) UpdateReceiveCounters(peer.ID, blocks.Block) { fs.updateReceiveCounters = true }
type fakePeerManager struct {
id uint64
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment