Commit 98f01e7f authored by hannahhoward's avatar hannahhoward

feat(sessions): track real latency per peer

Return optimized peers in real latency order, weighted toward recent
requests
parent cfc5c2fe
package sessionpeermanager
import (
"time"
"github.com/ipfs/go-cid"
)
const (
timeoutDuration = 5 * time.Second
)
type requestData struct {
startedAt time.Time
timeoutFunc *time.Timer
}
type latencyTracker struct {
requests map[cid.Cid]*requestData
}
func newLatencyTracker() *latencyTracker {
return &latencyTracker{requests: make(map[cid.Cid]*requestData)}
}
type afterTimeoutFunc func(cid.Cid)
func (lt *latencyTracker) SetupRequests(keys []cid.Cid, afterTimeout afterTimeoutFunc) {
startedAt := time.Now()
for _, k := range keys {
if _, ok := lt.requests[k]; !ok {
lt.requests[k] = &requestData{startedAt, time.AfterFunc(timeoutDuration, makeAfterTimeout(afterTimeout, k))}
}
}
}
func makeAfterTimeout(afterTimeout afterTimeoutFunc, k cid.Cid) func() {
return func() { afterTimeout(k) }
}
func (lt *latencyTracker) CheckDuration(key cid.Cid) (time.Duration, bool) {
request, ok := lt.requests[key]
var latency time.Duration
if ok {
latency = time.Now().Sub(request.startedAt)
}
return latency, ok
}
func (lt *latencyTracker) RecordResponse(key cid.Cid) (time.Duration, bool) {
request, ok := lt.requests[key]
var latency time.Duration
if ok {
latency = time.Now().Sub(request.startedAt)
request.timeoutFunc.Stop()
delete(lt.requests, key)
}
return latency, ok
}
func (lt *latencyTracker) Shutdown() {
for _, request := range lt.requests {
request.timeoutFunc.Stop()
}
}
package sessionpeermanager
import (
"time"
"github.com/ipfs/go-cid"
)
const (
newLatencyWeight = 0.5
)
type peerData struct {
hasLatency bool
latency time.Duration
lt *latencyTracker
}
func newPeerData() *peerData {
return &peerData{
hasLatency: false,
lt: newLatencyTracker(),
latency: 0,
}
}
func (pd *peerData) AdjustLatency(k cid.Cid, hasFallbackLatency bool, fallbackLatency time.Duration) {
latency, hasLatency := pd.lt.RecordResponse(k)
if !hasLatency {
latency, hasLatency = fallbackLatency, hasFallbackLatency
}
if hasLatency {
if pd.hasLatency {
pd.latency = time.Duration(float64(pd.latency)*(1.0-newLatencyWeight) + float64(latency)*newLatencyWeight)
} else {
pd.latency = latency
pd.hasLatency = true
}
}
}
......@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/rand"
"sort"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
......@@ -11,7 +12,6 @@ import (
const (
maxOptimizedPeers = 32
reservePeers = 2
unoptimizedTagValue = 5 // tag value for "unoptimized" session peers.
optimizedTagValue = 10 // tag value for "optimized" session peers.
)
......@@ -43,20 +43,21 @@ type SessionPeerManager struct {
peerMessages chan peerMessage
// do not touch outside of run loop
activePeers map[peer.ID]bool
activePeers map[peer.ID]*peerData
unoptimizedPeersArr []peer.ID
optimizedPeersArr []peer.ID
broadcastLatency *latencyTracker
}
// New creates a new SessionPeerManager
func New(ctx context.Context, id uint64, tagger PeerTagger, providerFinder PeerProviderFinder) *SessionPeerManager {
spm := &SessionPeerManager{
id: id,
ctx: ctx,
ctx: ctx,
tagger: tagger,
providerFinder: providerFinder,
peerMessages: make(chan peerMessage, 16),
activePeers: make(map[peer.ID]bool),
peerMessages: make(chan peerMessage, 16),
activePeers: make(map[peer.ID]*peerData),
broadcastLatency: newLatencyTracker(),
}
spm.tag = fmt.Sprint("bs-ses-", id)
......@@ -72,7 +73,7 @@ func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, k cid.Cid) {
// at the moment, we're just adding peers here
// in the future, we'll actually use this to record metrics
select {
case spm.peerMessages <- &peerResponseMessage{p}:
case spm.peerMessages <- &peerResponseMessage{p, k}:
case <-spm.ctx.Done():
}
}
......@@ -81,6 +82,10 @@ func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, k cid.Cid) {
func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
// at the moment, we're not doing anything here
// soon we'll use this to track latency by peer
select {
case spm.peerMessages <- &peerRequestMessage{p, ks}:
case <-spm.ctx.Done():
}
}
// GetOptimizedPeers returns the best peers available for a session
......@@ -89,7 +94,7 @@ func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID {
// ordered by optimization, or only a subset
resp := make(chan []peer.ID, 1)
select {
case spm.peerMessages <- &peerReqMessage{resp}:
case spm.peerMessages <- &getPeersMessage{resp}:
case <-spm.ctx.Done():
return nil
}
......@@ -133,14 +138,16 @@ func (spm *SessionPeerManager) tagPeer(p peer.ID, value int) {
spm.tagger.TagPeer(p, spm.tag, value)
}
func (spm *SessionPeerManager) insertOptimizedPeer(p peer.ID) {
if len(spm.optimizedPeersArr) >= (maxOptimizedPeers - reservePeers) {
tailPeer := spm.optimizedPeersArr[len(spm.optimizedPeersArr)-1]
spm.optimizedPeersArr = spm.optimizedPeersArr[:len(spm.optimizedPeersArr)-1]
spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, tailPeer)
func (spm *SessionPeerManager) insertPeer(p peer.ID, data *peerData) {
if data.hasLatency {
insertPos := sort.Search(len(spm.optimizedPeersArr), func(i int) bool {
return spm.activePeers[spm.optimizedPeersArr[i]].latency > data.latency
})
spm.optimizedPeersArr = append(spm.optimizedPeersArr[:insertPos],
append([]peer.ID{p}, spm.optimizedPeersArr[insertPos:]...)...)
} else {
spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
}
spm.optimizedPeersArr = append([]peer.ID{p}, spm.optimizedPeersArr...)
}
func (spm *SessionPeerManager) removeOptimizedPeer(p peer.ID) {
......@@ -169,38 +176,65 @@ type peerFoundMessage struct {
func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
p := pfm.p
if _, ok := spm.activePeers[p]; !ok {
spm.activePeers[p] = false
spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
spm.activePeers[p] = newPeerData()
spm.insertPeer(p, spm.activePeers[p])
spm.tagPeer(p, unoptimizedTagValue)
}
}
type peerResponseMessage struct {
p peer.ID
k cid.Cid
}
func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {
p := prm.p
isOptimized, ok := spm.activePeers[p]
if isOptimized {
spm.removeOptimizedPeer(p)
k := prm.k
data, ok := spm.activePeers[p]
if !ok {
data = newPeerData()
spm.activePeers[p] = data
spm.tagPeer(p)
} else {
spm.activePeers[p] = true
spm.tagPeer(p, optimizedTagValue)
// transition from unoptimized.
if ok {
if data.hasLatency {
spm.removeOptimizedPeer(p)
} else {
spm.removeUnoptimizedPeer(p)
}
}
spm.insertOptimizedPeer(p)
fallbackLatency, hasFallbackLatency := spm.broadcastLatency.CheckDuration(k)
data.AdjustLatency(k, hasFallbackLatency, fallbackLatency)
spm.insertPeer(p, data)
}
type peerRequestMessage struct {
peers []peer.ID
keys []cid.Cid
}
func (spm *SessionPeerManager) makeTimeout(p peer.ID) afterTimeoutFunc {
return func(k cid.Cid) {
spm.RecordPeerResponse(p, k)
}
}
func (prm *peerRequestMessage) handle(spm *SessionPeerManager) {
if prm.peers == nil {
spm.broadcastLatency.SetupRequests(prm.keys, func(k cid.Cid) {})
} else {
for _, p := range prm.peers {
if data, ok := spm.activePeers[p]; ok {
data.lt.SetupRequests(prm.keys, spm.makeTimeout(p))
}
}
}
}
type peerReqMessage struct {
type getPeersMessage struct {
resp chan<- []peer.ID
}
func (prm *peerReqMessage) handle(spm *SessionPeerManager) {
func (prm *getPeersMessage) handle(spm *SessionPeerManager) {
randomOrder := rand.Perm(len(spm.unoptimizedPeersArr))
maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr)
if maxPeers > maxOptimizedPeers {
......@@ -215,7 +249,8 @@ func (prm *peerReqMessage) handle(spm *SessionPeerManager) {
}
func (spm *SessionPeerManager) handleShutdown() {
for p := range spm.activePeers {
for p, data := range spm.activePeers {
spm.tagger.UntagPeer(p, spm.tag)
data.lt.Shutdown()
}
}
......@@ -167,7 +167,7 @@ func TestOrderingPeers(t *testing.T) {
peer3 := peers[rand.Intn(100)]
time.Sleep(1 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer1, c[0])
time.Sleep(1 * time.Millisecond)
time.Sleep(5 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer2, c[0])
time.Sleep(1 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer3, c[0])
......@@ -177,13 +177,18 @@ func TestOrderingPeers(t *testing.T) {
t.Fatal("Should not return more than the max of optimized peers")
}
// should prioritize peers which have received blocks
if (sessionPeers[0] != peer3) || (sessionPeers[1] != peer2) || (sessionPeers[2] != peer1) {
// should prioritize peers which are fastest
if (sessionPeers[0] != peer1) || (sessionPeers[1] != peer2) || (sessionPeers[2] != peer3) {
t.Fatal("Did not prioritize peers that received blocks")
}
// Receive a second time from same node
sessionPeerManager.RecordPeerResponse(peer3, c[0])
c2 := testutil.GenerateCids(1)
// Request again
sessionPeerManager.RecordPeerRequests(nil, c2)
// Receive a second time
sessionPeerManager.RecordPeerResponse(peer3, c2[0])
// call again
nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
......@@ -191,9 +196,9 @@ func TestOrderingPeers(t *testing.T) {
t.Fatal("Should not return more than the max of optimized peers")
}
// should not duplicate
if (nextSessionPeers[0] != peer3) || (nextSessionPeers[1] != peer2) || (nextSessionPeers[2] != peer1) {
t.Fatal("Did dedup peers which received multiple blocks")
// should sort by average latency
if (nextSessionPeers[0] != peer1) || (nextSessionPeers[1] != peer3) || (nextSessionPeers[2] != peer2) {
t.Fatal("Did not dedup peers which received multiple blocks")
}
// should randomize other peers
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment