Commit 1bf9ed31 authored by hannahhoward's avatar hannahhoward

feat(sessionpeermanager): track cancels

Better estimate latency per peer by tracking cancellations
parent 8e59a716
......@@ -15,7 +15,6 @@ import (
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-core/peer"
loggables "github.com/libp2p/go-libp2p-loggables"
)
const (
......@@ -37,6 +36,7 @@ type PeerManager interface {
GetOptimizedPeers() []bssd.OptimizedPeer
RecordPeerRequests([]peer.ID, []cid.Cid)
RecordPeerResponse(peer.ID, cid.Cid)
RecordCancel(cid.Cid)
}
// RequestSplitter provides an interface for splitting
......@@ -141,8 +141,8 @@ func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
case <-s.ctx.Done():
}
ks := []cid.Cid{blk.Cid()}
s.pm.RecordCancel(blk.Cid())
s.wm.CancelWants(s.ctx, ks, nil, s.id)
}
// UpdateReceiveCounters updates receive counters for a block,
......
......@@ -68,6 +68,7 @@ func (fpm *fakePeerManager) RecordPeerResponse(p peer.ID, c cid.Cid) {
fpm.peers = append(fpm.peers, p)
fpm.lk.Unlock()
}
func (fpm *fakePeerManager) RecordCancel(c cid.Cid) {}
type fakeRequestSplitter struct {
}
......
......@@ -42,6 +42,7 @@ func (*fakePeerManager) FindMorePeers(context.Context, cid.Cid) {}
func (*fakePeerManager) GetOptimizedPeers() []bssd.OptimizedPeer { return nil }
func (*fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
func (*fakePeerManager) RecordPeerResponse(peer.ID, cid.Cid) {}
func (*fakePeerManager) RecordCancel(c cid.Cid) {}
type fakeRequestSplitter struct {
}
......
......@@ -6,12 +6,9 @@ import (
"github.com/ipfs/go-cid"
)
const (
timeoutDuration = 5 * time.Second
)
type requestData struct {
startedAt time.Time
wasCancelled bool
timeoutFunc *time.Timer
}
......@@ -25,11 +22,15 @@ func newLatencyTracker() *latencyTracker {
type afterTimeoutFunc func(cid.Cid)
func (lt *latencyTracker) SetupRequests(keys []cid.Cid, afterTimeout afterTimeoutFunc) {
func (lt *latencyTracker) SetupRequests(keys []cid.Cid, timeoutDuration time.Duration, afterTimeout afterTimeoutFunc) {
startedAt := time.Now()
for _, k := range keys {
if _, ok := lt.requests[k]; !ok {
lt.requests[k] = &requestData{startedAt, time.AfterFunc(timeoutDuration, makeAfterTimeout(afterTimeout, k))}
lt.requests[k] = &requestData{
startedAt,
false,
time.AfterFunc(timeoutDuration, makeAfterTimeout(afterTimeout, k)),
}
}
}
}
......@@ -47,15 +48,24 @@ func (lt *latencyTracker) CheckDuration(key cid.Cid) (time.Duration, bool) {
return latency, ok
}
func (lt *latencyTracker) RecordResponse(key cid.Cid) (time.Duration, bool) {
func (lt *latencyTracker) RemoveRequest(key cid.Cid) {
request, ok := lt.requests[key]
var latency time.Duration
if ok {
latency = time.Now().Sub(request.startedAt)
request.timeoutFunc.Stop()
delete(lt.requests, key)
}
return latency, ok
}
func (lt *latencyTracker) RecordCancel(key cid.Cid) {
request, ok := lt.requests[key]
if ok {
request.wasCancelled = true
}
}
func (lt *latencyTracker) WasCancelled(key cid.Cid) bool {
request, ok := lt.requests[key]
return ok && request.wasCancelled
}
func (lt *latencyTracker) Shutdown() {
......
......@@ -25,8 +25,8 @@ func newPeerData() *peerData {
}
func (pd *peerData) AdjustLatency(k cid.Cid, hasFallbackLatency bool, fallbackLatency time.Duration) {
latency, hasLatency := pd.lt.RecordResponse(k)
latency, hasLatency := pd.lt.CheckDuration(k)
pd.lt.RemoveRequest(k)
if !hasLatency {
latency, hasLatency = fallbackLatency, hasFallbackLatency
}
......
......@@ -5,6 +5,7 @@ import (
"fmt"
"math/rand"
"sort"
"time"
bssd "github.com/ipfs/go-bitswap/sessiondata"
......@@ -13,6 +14,7 @@ import (
)
const (
defaultTimeoutDuration = 5 * time.Second
maxOptimizedPeers = 32
unoptimizedTagValue = 5 // tag value for "unoptimized" session peers.
optimizedTagValue = 10 // tag value for "optimized" session peers.
......@@ -49,6 +51,7 @@ type SessionPeerManager struct {
unoptimizedPeersArr []peer.ID
optimizedPeersArr []peer.ID
broadcastLatency *latencyTracker
timeoutDuration time.Duration
}
// New creates a new SessionPeerManager
......@@ -60,6 +63,7 @@ func New(ctx context.Context, id uint64, tagger PeerTagger, providerFinder PeerP
peerMessages: make(chan peerMessage, 16),
activePeers: make(map[peer.ID]*peerData),
broadcastLatency: newLatencyTracker(),
timeoutDuration: defaultTimeoutDuration,
}
spm.tag = fmt.Sprint("bs-ses-", id)
......@@ -72,18 +76,25 @@ func New(ctx context.Context, id uint64, tagger PeerTagger, providerFinder PeerP
// the list of peers if it wasn't already added
func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, k cid.Cid) {
select {
case spm.peerMessages <- &peerResponseMessage{p, k}:
case <-spm.ctx.Done():
}
}
// RecordCancel records the fact that cancellations were sent to peers,
// so if not blocks come in, don't let it affect peers timeout
func (spm *SessionPeerManager) RecordCancel(k cid.Cid) {
// at the moment, we're just adding peers here
// in the future, we'll actually use this to record metrics
select {
case spm.peerMessages <- &peerResponseMessage{p, k}:
case spm.peerMessages <- &cancelMessage{k}:
case <-spm.ctx.Done():
}
}
// RecordPeerRequests records that a given set of peers requested the given cids.
func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
// at the moment, we're not doing anything here
// soon we'll use this to track latency by peer
select {
case spm.peerMessages <- &peerRequestMessage{p, ks}:
case <-spm.ctx.Done():
......@@ -125,6 +136,15 @@ func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
}(c)
}
// SetTimeoutDuration changes the length of time used to timeout recording of
// requests
func (spm *SessionPeerManager) SetTimeoutDuration(timeoutDuration time.Duration) {
select {
case spm.peerMessages <- &setTimeoutMessage{timeoutDuration}:
case <-spm.ctx.Done():
}
}
func (spm *SessionPeerManager) run(ctx context.Context) {
for {
select {
......@@ -137,7 +157,13 @@ func (spm *SessionPeerManager) run(ctx context.Context) {
}
}
func (spm *SessionPeerManager) tagPeer(p peer.ID, value int) {
func (spm *SessionPeerManager) tagPeer(p peer.ID, data *peerData) {
var value int
if data.hasLatency {
value = optimizedTagValue
} else {
value = unoptimizedTagValue
}
spm.tagger.TagPeer(p, spm.tag, value)
}
......@@ -172,6 +198,27 @@ func (spm *SessionPeerManager) removeUnoptimizedPeer(p peer.ID) {
}
}
func (spm *SessionPeerManager) recordResponse(p peer.ID, k cid.Cid) {
data, ok := spm.activePeers[p]
wasOptimized := ok && data.hasLatency
if wasOptimized {
spm.removeOptimizedPeer(p)
} else {
if ok {
spm.removeUnoptimizedPeer(p)
} else {
data = newPeerData()
spm.activePeers[p] = data
}
}
fallbackLatency, hasFallbackLatency := spm.broadcastLatency.CheckDuration(k)
data.AdjustLatency(k, hasFallbackLatency, fallbackLatency)
if !ok || wasOptimized != data.hasLatency {
spm.tagPeer(p, data)
}
spm.insertPeer(p, data)
}
type peerFoundMessage struct {
p peer.ID
}
......@@ -181,7 +228,7 @@ func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
if _, ok := spm.activePeers[p]; !ok {
spm.activePeers[p] = newPeerData()
spm.insertPeer(p, spm.activePeers[p])
spm.tagPeer(p, unoptimizedTagValue)
spm.tagPeer(p, spm.activePeers[p])
}
}
......@@ -191,32 +238,7 @@ type peerResponseMessage struct {
}
func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {
p := prm.p
k := prm.k
data, ok := spm.activePeers[p]
wasOptimized := ok && data.hasLatency
if wasOptimized {
spm.removeOptimizedPeer(p)
} else {
if ok {
spm.removeUnoptimizedPeer(p)
} else {
data = newPeerData()
spm.activePeers[p] = data
}
}
fallbackLatency, hasFallbackLatency := spm.broadcastLatency.CheckDuration(k)
data.AdjustLatency(k, hasFallbackLatency, fallbackLatency)
var tagValue int
if data.hasLatency {
tagValue = optimizedTagValue
} else {
tagValue = unoptimizedTagValue
}
if !ok || wasOptimized != data.hasLatency {
spm.tagPeer(p, tagValue)
}
spm.insertPeer(p, data)
spm.recordResponse(prm.p, prm.k)
}
type peerRequestMessage struct {
......@@ -226,17 +248,25 @@ type peerRequestMessage struct {
func (spm *SessionPeerManager) makeTimeout(p peer.ID) afterTimeoutFunc {
return func(k cid.Cid) {
spm.RecordPeerResponse(p, k)
select {
case spm.peerMessages <- &peerTimeoutMessage{p, k}:
case <-spm.ctx.Done():
}
}
}
func (prm *peerRequestMessage) handle(spm *SessionPeerManager) {
if prm.peers == nil {
spm.broadcastLatency.SetupRequests(prm.keys, func(k cid.Cid) {})
spm.broadcastLatency.SetupRequests(prm.keys, spm.timeoutDuration, func(k cid.Cid) {
select {
case spm.peerMessages <- &broadcastTimeoutMessage{k}:
case <-spm.ctx.Done():
}
})
} else {
for _, p := range prm.peers {
if data, ok := spm.activePeers[p]; ok {
data.lt.SetupRequests(prm.keys, spm.makeTimeout(p))
data.lt.SetupRequests(prm.keys, spm.timeoutDuration, spm.makeTimeout(p))
}
}
}
......@@ -274,9 +304,47 @@ func (prm *getPeersMessage) handle(spm *SessionPeerManager) {
prm.resp <- optimizedPeers
}
type cancelMessage struct {
k cid.Cid
}
func (cm *cancelMessage) handle(spm *SessionPeerManager) {
for _, data := range spm.activePeers {
data.lt.RecordCancel(cm.k)
}
}
func (spm *SessionPeerManager) handleShutdown() {
for p, data := range spm.activePeers {
spm.tagger.UntagPeer(p, spm.tag)
data.lt.Shutdown()
}
}
type peerTimeoutMessage struct {
p peer.ID
k cid.Cid
}
func (ptm *peerTimeoutMessage) handle(spm *SessionPeerManager) {
data, ok := spm.activePeers[ptm.p]
if !ok || !data.lt.WasCancelled(ptm.k) {
spm.recordResponse(ptm.p, ptm.k)
}
}
type broadcastTimeoutMessage struct {
k cid.Cid
}
func (btm *broadcastTimeoutMessage) handle(spm *SessionPeerManager) {
spm.broadcastLatency.RemoveRequest(btm.k)
}
type setTimeoutMessage struct {
timeoutDuration time.Duration
}
func (stm *setTimeoutMessage) handle(spm *SessionPeerManager) {
spm.timeoutDuration = stm.timeoutDuration
}
......@@ -241,6 +241,115 @@ func TestOrderingPeers(t *testing.T) {
}
}
func TestTimeoutsAndCancels(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
peers := testutil.GeneratePeers(3)
completed := make(chan struct{})
fpt := &fakePeerTagger{}
fppf := &fakePeerProviderFinder{peers, completed}
c := testutil.GenerateCids(1)
id := testutil.GenerateSessionID()
sessionPeerManager := New(ctx, id, fpt, fppf)
// add all peers to session
sessionPeerManager.FindMorePeers(ctx, c[0])
select {
case <-completed:
case <-ctx.Done():
t.Fatal("Did not finish finding providers")
}
time.Sleep(2 * time.Millisecond)
sessionPeerManager.SetTimeoutDuration(20 * time.Millisecond)
// record broadcast
sessionPeerManager.RecordPeerRequests(nil, c)
// record receives
peer1 := peers[0]
peer2 := peers[1]
peer3 := peers[2]
time.Sleep(1 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer1, c[0])
time.Sleep(2 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer2, c[0])
time.Sleep(40 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer3, c[0])
sessionPeers := sessionPeerManager.GetOptimizedPeers()
// should prioritize peers which are fastest
if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) {
t.Fatal("Did not prioritize peers that received blocks")
}
// should give first peer rating of 1
if sessionPeers[0].OptimizationRating < 1.0 {
t.Fatal("Did not assign rating to best peer correctly")
}
// should give other optimized peers ratings between 0 & 1
if (sessionPeers[1].OptimizationRating >= 1.0) || (sessionPeers[1].OptimizationRating <= 0.0) {
t.Fatal("Did not assign rating to other optimized peers correctly")
}
// should not record a response for a broadcast return that arrived AFTER the timeout period
// leaving peer unoptimized
if sessionPeers[2].OptimizationRating != 0 {
t.Fatal("should not have recorded broadcast response for peer that arrived after timeout period")
}
// now we make a targeted request, which SHOULD affect peer
// rating if it times out
c2 := testutil.GenerateCids(1)
// Request again
sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c2)
// wait for a timeout
time.Sleep(40 * time.Millisecond)
// call again
nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
if sessionPeers[1].OptimizationRating <= nextSessionPeers[1].OptimizationRating {
t.Fatal("Timeout should have affected optimization rating but did not")
}
// now we make a targeted request, but later cancel it
// timing out should not affect rating
c3 := testutil.GenerateCids(1)
// Request again
sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c3)
sessionPeerManager.RecordCancel(c3[0])
// wait for a timeout
time.Sleep(40 * time.Millisecond)
// call again
thirdSessionPeers := sessionPeerManager.GetOptimizedPeers()
if nextSessionPeers[1].OptimizationRating != thirdSessionPeers[1].OptimizationRating {
t.Fatal("Timeout should not have affected optimization rating but did")
}
// if we make a targeted request that is then cancelled, but we still
// receive the block before the timeout, it's worth recording and affecting latency
c4 := testutil.GenerateCids(1)
// Request again
sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c4)
sessionPeerManager.RecordCancel(c4[0])
time.Sleep(2 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer2, c4[0])
// call again
fourthSessionPeers := sessionPeerManager.GetOptimizedPeers()
if thirdSessionPeers[1].OptimizationRating >= fourthSessionPeers[1].OptimizationRating {
t.Fatal("Timeout should have affected optimization rating but did not")
}
}
func TestUntaggingPeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment