Unverified Commit 8f0e4c60 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #149 from ipfs/feat/better-peer-tracking

Feat: Track Session Peer Latency More Accurately
parents cfc5c2fe 0d8b75d7
......@@ -333,9 +333,8 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
defer wg.Done()
bs.updateReceiveCounters(b)
bs.sm.UpdateReceiveCounters(b)
bs.sm.UpdateReceiveCounters(p, b)
log.Debugf("[recv] block; cid=%s, peer=%s", b.Cid(), p)
// skip received blocks that are not in the wantlist
if !bs.wm.IsWanted(b.Cid()) {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), p)
......
......@@ -8,14 +8,13 @@ import (
lru "github.com/hashicorp/golang-lru"
bsgetter "github.com/ipfs/go-bitswap/getter"
notifications "github.com/ipfs/go-bitswap/notifications"
bssd "github.com/ipfs/go-bitswap/sessiondata"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-core/peer"
loggables "github.com/libp2p/go-libp2p-loggables"
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
)
const (
......@@ -34,15 +33,16 @@ type WantManager interface {
// requesting more when neccesary.
type PeerManager interface {
FindMorePeers(context.Context, cid.Cid)
GetOptimizedPeers() []peer.ID
GetOptimizedPeers() []bssd.OptimizedPeer
RecordPeerRequests([]peer.ID, []cid.Cid)
RecordPeerResponse(peer.ID, cid.Cid)
RecordCancel(cid.Cid)
}
// RequestSplitter provides an interface for splitting
// a request for Cids up among peers.
type RequestSplitter interface {
SplitRequest([]peer.ID, []cid.Cid) []*bssrs.PartialRequest
SplitRequest([]bssd.OptimizedPeer, []cid.Cid) []bssd.PartialRequest
RecordDuplicateBlock()
RecordUniqueBlock()
}
......@@ -141,15 +141,15 @@ func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
case <-s.ctx.Done():
}
ks := []cid.Cid{blk.Cid()}
s.pm.RecordCancel(blk.Cid())
s.wm.CancelWants(s.ctx, ks, nil, s.id)
}
// UpdateReceiveCounters updates receive counters for a block,
// which may be a duplicate and adjusts the split factor based on that.
func (s *Session) UpdateReceiveCounters(blk blocks.Block) {
func (s *Session) UpdateReceiveCounters(from peer.ID, blk blocks.Block) {
select {
case s.incoming <- blkRecv{from: "", blk: blk, counterMessage: true}:
case s.incoming <- blkRecv{from: from, blk: blk, counterMessage: true}:
case <-s.ctx.Done():
}
}
......@@ -308,7 +308,6 @@ func (s *Session) handleCancel(keys []cid.Cid) {
}
func (s *Session) handleIdleTick(ctx context.Context) {
live := make([]cid.Cid, 0, len(s.liveWants))
now := time.Now()
for c := range s.liveWants {
......@@ -415,6 +414,9 @@ func (s *Session) updateReceiveCounters(ctx context.Context, blk blkRecv) {
ks := blk.blk.Cid()
if s.pastWants.Has(ks) {
s.srs.RecordDuplicateBlock()
if blk.from != "" {
s.pm.RecordPeerResponse(blk.from, ks)
}
}
}
......
......@@ -6,7 +6,7 @@ import (
"testing"
"time"
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
bssd "github.com/ipfs/go-bitswap/sessiondata"
"github.com/ipfs/go-bitswap/testutil"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
......@@ -52,10 +52,14 @@ func (fpm *fakePeerManager) FindMorePeers(ctx context.Context, k cid.Cid) {
}
}
func (fpm *fakePeerManager) GetOptimizedPeers() []peer.ID {
func (fpm *fakePeerManager) GetOptimizedPeers() []bssd.OptimizedPeer {
fpm.lk.Lock()
defer fpm.lk.Unlock()
return fpm.peers
optimizedPeers := make([]bssd.OptimizedPeer, 0, len(fpm.peers))
for _, peer := range fpm.peers {
optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{Peer: peer, OptimizationRating: 1.0})
}
return optimizedPeers
}
func (fpm *fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
......@@ -64,12 +68,17 @@ func (fpm *fakePeerManager) RecordPeerResponse(p peer.ID, c cid.Cid) {
fpm.peers = append(fpm.peers, p)
fpm.lk.Unlock()
}
func (fpm *fakePeerManager) RecordCancel(c cid.Cid) {}
type fakeRequestSplitter struct {
}
func (frs *fakeRequestSplitter) SplitRequest(peers []peer.ID, keys []cid.Cid) []*bssrs.PartialRequest {
return []*bssrs.PartialRequest{&bssrs.PartialRequest{Peers: peers, Keys: keys}}
func (frs *fakeRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer, keys []cid.Cid) []bssd.PartialRequest {
peers := make([]peer.ID, len(optimizedPeers))
for i, optimizedPeer := range optimizedPeers {
peers[i] = optimizedPeer.Peer
}
return []bssd.PartialRequest{bssd.PartialRequest{Peers: peers, Keys: keys}}
}
func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
......
package sessiondata
import (
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
)
// OptimizedPeer describes a peer and its level of optimization from 0 to 1.
type OptimizedPeer struct {
Peer peer.ID
OptimizationRating float64
}
// PartialRequest is represents one slice of an over request split among peers
type PartialRequest struct {
Peers []peer.ID
Keys []cid.Cid
}
......@@ -19,7 +19,7 @@ type Session interface {
exchange.Fetcher
InterestedIn(cid.Cid) bool
ReceiveBlockFrom(peer.ID, blocks.Block)
UpdateReceiveCounters(blocks.Block)
UpdateReceiveCounters(peer.ID, blocks.Block)
}
type sesTrk struct {
......@@ -128,11 +128,11 @@ func (sm *SessionManager) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
// UpdateReceiveCounters records the fact that a block was received, allowing
// sessions to track duplicates
func (sm *SessionManager) UpdateReceiveCounters(blk blocks.Block) {
func (sm *SessionManager) UpdateReceiveCounters(from peer.ID, blk blocks.Block) {
sm.sessLk.Lock()
defer sm.sessLk.Unlock()
for _, s := range sm.sessions {
s.session.UpdateReceiveCounters(blk)
s.session.UpdateReceiveCounters(from, blk)
}
}
......@@ -5,10 +5,10 @@ import (
"testing"
"time"
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
delay "github.com/ipfs/go-ipfs-delay"
bssession "github.com/ipfs/go-bitswap/session"
bssd "github.com/ipfs/go-bitswap/sessiondata"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
......@@ -32,21 +32,22 @@ func (*fakeSession) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block,
}
func (fs *fakeSession) InterestedIn(cid.Cid) bool { return fs.interested }
func (fs *fakeSession) ReceiveBlockFrom(peer.ID, blocks.Block) { fs.receivedBlock = true }
func (fs *fakeSession) UpdateReceiveCounters(blocks.Block) { fs.updateReceiveCounters = true }
func (fs *fakeSession) UpdateReceiveCounters(peer.ID, blocks.Block) { fs.updateReceiveCounters = true }
type fakePeerManager struct {
id uint64
}
func (*fakePeerManager) FindMorePeers(context.Context, cid.Cid) {}
func (*fakePeerManager) GetOptimizedPeers() []peer.ID { return nil }
func (*fakePeerManager) GetOptimizedPeers() []bssd.OptimizedPeer { return nil }
func (*fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
func (*fakePeerManager) RecordPeerResponse(peer.ID, cid.Cid) {}
func (*fakePeerManager) RecordCancel(c cid.Cid) {}
type fakeRequestSplitter struct {
}
func (frs *fakeRequestSplitter) SplitRequest(peers []peer.ID, keys []cid.Cid) []*bssrs.PartialRequest {
func (frs *fakeRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer, keys []cid.Cid) []bssd.PartialRequest {
return nil
}
func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
......
package sessionpeermanager
import (
"time"
"github.com/ipfs/go-cid"
)
type requestData struct {
startedAt time.Time
wasCancelled bool
timeoutFunc *time.Timer
}
type latencyTracker struct {
requests map[cid.Cid]*requestData
}
func newLatencyTracker() *latencyTracker {
return &latencyTracker{requests: make(map[cid.Cid]*requestData)}
}
type afterTimeoutFunc func(cid.Cid)
func (lt *latencyTracker) SetupRequests(keys []cid.Cid, timeoutDuration time.Duration, afterTimeout afterTimeoutFunc) {
startedAt := time.Now()
for _, k := range keys {
if _, ok := lt.requests[k]; !ok {
lt.requests[k] = &requestData{
startedAt,
false,
time.AfterFunc(timeoutDuration, makeAfterTimeout(afterTimeout, k)),
}
}
}
}
func makeAfterTimeout(afterTimeout afterTimeoutFunc, k cid.Cid) func() {
return func() { afterTimeout(k) }
}
func (lt *latencyTracker) CheckDuration(key cid.Cid) (time.Duration, bool) {
request, ok := lt.requests[key]
var latency time.Duration
if ok {
latency = time.Now().Sub(request.startedAt)
}
return latency, ok
}
func (lt *latencyTracker) RemoveRequest(key cid.Cid) {
request, ok := lt.requests[key]
if ok {
request.timeoutFunc.Stop()
delete(lt.requests, key)
}
}
func (lt *latencyTracker) RecordCancel(key cid.Cid) {
request, ok := lt.requests[key]
if ok {
request.wasCancelled = true
}
}
func (lt *latencyTracker) WasCancelled(key cid.Cid) bool {
request, ok := lt.requests[key]
return ok && request.wasCancelled
}
func (lt *latencyTracker) Shutdown() {
for _, request := range lt.requests {
request.timeoutFunc.Stop()
}
}
package sessionpeermanager
import (
"time"
"github.com/ipfs/go-cid"
)
const (
newLatencyWeight = 0.5
)
type peerData struct {
hasLatency bool
latency time.Duration
lt *latencyTracker
}
func newPeerData() *peerData {
return &peerData{
hasLatency: false,
lt: newLatencyTracker(),
latency: 0,
}
}
func (pd *peerData) AdjustLatency(k cid.Cid, hasFallbackLatency bool, fallbackLatency time.Duration) {
latency, hasLatency := pd.lt.CheckDuration(k)
pd.lt.RemoveRequest(k)
if !hasLatency {
latency, hasLatency = fallbackLatency, hasFallbackLatency
}
if hasLatency {
if pd.hasLatency {
pd.latency = time.Duration(float64(pd.latency)*(1.0-newLatencyWeight) + float64(latency)*newLatencyWeight)
} else {
pd.latency = latency
pd.hasLatency = true
}
}
}
......@@ -4,14 +4,18 @@ import (
"context"
"fmt"
"math/rand"
"sort"
"time"
bssd "github.com/ipfs/go-bitswap/sessiondata"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
)
const (
defaultTimeoutDuration = 5 * time.Second
maxOptimizedPeers = 32
reservePeers = 2
unoptimizedTagValue = 5 // tag value for "unoptimized" session peers.
optimizedTagValue = 10 // tag value for "optimized" session peers.
)
......@@ -43,20 +47,23 @@ type SessionPeerManager struct {
peerMessages chan peerMessage
// do not touch outside of run loop
activePeers map[peer.ID]bool
activePeers map[peer.ID]*peerData
unoptimizedPeersArr []peer.ID
optimizedPeersArr []peer.ID
broadcastLatency *latencyTracker
timeoutDuration time.Duration
}
// New creates a new SessionPeerManager
func New(ctx context.Context, id uint64, tagger PeerTagger, providerFinder PeerProviderFinder) *SessionPeerManager {
spm := &SessionPeerManager{
id: id,
ctx: ctx,
tagger: tagger,
providerFinder: providerFinder,
peerMessages: make(chan peerMessage, 16),
activePeers: make(map[peer.ID]bool),
activePeers: make(map[peer.ID]*peerData),
broadcastLatency: newLatencyTracker(),
timeoutDuration: defaultTimeoutDuration,
}
spm.tag = fmt.Sprint("bs-ses-", id)
......@@ -69,27 +76,39 @@ func New(ctx context.Context, id uint64, tagger PeerTagger, providerFinder PeerP
// the list of peers if it wasn't already added
func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, k cid.Cid) {
select {
case spm.peerMessages <- &peerResponseMessage{p, k}:
case <-spm.ctx.Done():
}
}
// RecordCancel records the fact that cancellations were sent to peers,
// so if not blocks come in, don't let it affect peers timeout
func (spm *SessionPeerManager) RecordCancel(k cid.Cid) {
// at the moment, we're just adding peers here
// in the future, we'll actually use this to record metrics
select {
case spm.peerMessages <- &peerResponseMessage{p}:
case spm.peerMessages <- &cancelMessage{k}:
case <-spm.ctx.Done():
}
}
// RecordPeerRequests records that a given set of peers requested the given cids
// RecordPeerRequests records that a given set of peers requested the given cids.
func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
// at the moment, we're not doing anything here
// soon we'll use this to track latency by peer
select {
case spm.peerMessages <- &peerRequestMessage{p, ks}:
case <-spm.ctx.Done():
}
}
// GetOptimizedPeers returns the best peers available for a session
func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID {
// GetOptimizedPeers returns the best peers available for a session, along with
// a rating for how good they are, in comparison to the best peer.
func (spm *SessionPeerManager) GetOptimizedPeers() []bssd.OptimizedPeer {
// right now this just returns all peers, but soon we might return peers
// ordered by optimization, or only a subset
resp := make(chan []peer.ID, 1)
resp := make(chan []bssd.OptimizedPeer, 1)
select {
case spm.peerMessages <- &peerReqMessage{resp}:
case spm.peerMessages <- &getPeersMessage{resp}:
case <-spm.ctx.Done():
return nil
}
......@@ -117,6 +136,15 @@ func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
}(c)
}
// SetTimeoutDuration changes the length of time used to timeout recording of
// requests
func (spm *SessionPeerManager) SetTimeoutDuration(timeoutDuration time.Duration) {
select {
case spm.peerMessages <- &setTimeoutMessage{timeoutDuration}:
case <-spm.ctx.Done():
}
}
func (spm *SessionPeerManager) run(ctx context.Context) {
for {
select {
......@@ -129,18 +157,26 @@ func (spm *SessionPeerManager) run(ctx context.Context) {
}
}
func (spm *SessionPeerManager) tagPeer(p peer.ID, value int) {
func (spm *SessionPeerManager) tagPeer(p peer.ID, data *peerData) {
var value int
if data.hasLatency {
value = optimizedTagValue
} else {
value = unoptimizedTagValue
}
spm.tagger.TagPeer(p, spm.tag, value)
}
func (spm *SessionPeerManager) insertOptimizedPeer(p peer.ID) {
if len(spm.optimizedPeersArr) >= (maxOptimizedPeers - reservePeers) {
tailPeer := spm.optimizedPeersArr[len(spm.optimizedPeersArr)-1]
spm.optimizedPeersArr = spm.optimizedPeersArr[:len(spm.optimizedPeersArr)-1]
spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, tailPeer)
func (spm *SessionPeerManager) insertPeer(p peer.ID, data *peerData) {
if data.hasLatency {
insertPos := sort.Search(len(spm.optimizedPeersArr), func(i int) bool {
return spm.activePeers[spm.optimizedPeersArr[i]].latency > data.latency
})
spm.optimizedPeersArr = append(spm.optimizedPeersArr[:insertPos],
append([]peer.ID{p}, spm.optimizedPeersArr[insertPos:]...)...)
} else {
spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
}
spm.optimizedPeersArr = append([]peer.ID{p}, spm.optimizedPeersArr...)
}
func (spm *SessionPeerManager) removeOptimizedPeer(p peer.ID) {
......@@ -162,6 +198,27 @@ func (spm *SessionPeerManager) removeUnoptimizedPeer(p peer.ID) {
}
}
func (spm *SessionPeerManager) recordResponse(p peer.ID, k cid.Cid) {
data, ok := spm.activePeers[p]
wasOptimized := ok && data.hasLatency
if wasOptimized {
spm.removeOptimizedPeer(p)
} else {
if ok {
spm.removeUnoptimizedPeer(p)
} else {
data = newPeerData()
spm.activePeers[p] = data
}
}
fallbackLatency, hasFallbackLatency := spm.broadcastLatency.CheckDuration(k)
data.AdjustLatency(k, hasFallbackLatency, fallbackLatency)
if !ok || wasOptimized != data.hasLatency {
spm.tagPeer(p, data)
}
spm.insertPeer(p, data)
}
type peerFoundMessage struct {
p peer.ID
}
......@@ -169,53 +226,125 @@ type peerFoundMessage struct {
func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
p := pfm.p
if _, ok := spm.activePeers[p]; !ok {
spm.activePeers[p] = false
spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
spm.tagPeer(p, unoptimizedTagValue)
spm.activePeers[p] = newPeerData()
spm.insertPeer(p, spm.activePeers[p])
spm.tagPeer(p, spm.activePeers[p])
}
}
type peerResponseMessage struct {
p peer.ID
k cid.Cid
}
func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {
p := prm.p
isOptimized, ok := spm.activePeers[p]
if isOptimized {
spm.removeOptimizedPeer(p)
} else {
spm.activePeers[p] = true
spm.tagPeer(p, optimizedTagValue)
spm.recordResponse(prm.p, prm.k)
}
// transition from unoptimized.
if ok {
spm.removeUnoptimizedPeer(p)
type peerRequestMessage struct {
peers []peer.ID
keys []cid.Cid
}
func (spm *SessionPeerManager) makeTimeout(p peer.ID) afterTimeoutFunc {
return func(k cid.Cid) {
select {
case spm.peerMessages <- &peerTimeoutMessage{p, k}:
case <-spm.ctx.Done():
}
}
}
func (prm *peerRequestMessage) handle(spm *SessionPeerManager) {
if prm.peers == nil {
spm.broadcastLatency.SetupRequests(prm.keys, spm.timeoutDuration, func(k cid.Cid) {
select {
case spm.peerMessages <- &broadcastTimeoutMessage{k}:
case <-spm.ctx.Done():
}
})
} else {
for _, p := range prm.peers {
if data, ok := spm.activePeers[p]; ok {
data.lt.SetupRequests(prm.keys, spm.timeoutDuration, spm.makeTimeout(p))
}
}
}
spm.insertOptimizedPeer(p)
}
type peerReqMessage struct {
resp chan<- []peer.ID
type getPeersMessage struct {
resp chan<- []bssd.OptimizedPeer
}
func (prm *peerReqMessage) handle(spm *SessionPeerManager) {
func (prm *getPeersMessage) handle(spm *SessionPeerManager) {
randomOrder := rand.Perm(len(spm.unoptimizedPeersArr))
maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr)
if maxPeers > maxOptimizedPeers {
maxPeers = maxOptimizedPeers
}
var bestPeerLatency float64
if len(spm.optimizedPeersArr) > 0 {
bestPeerLatency = float64(spm.activePeers[spm.optimizedPeersArr[0]].latency)
} else {
bestPeerLatency = 0
}
optimizedPeers := make([]bssd.OptimizedPeer, 0, maxPeers)
for i := 0; i < maxPeers; i++ {
if i < len(spm.optimizedPeersArr) {
p := spm.optimizedPeersArr[i]
optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{
Peer: p,
OptimizationRating: bestPeerLatency / float64(spm.activePeers[p].latency),
})
} else {
p := spm.unoptimizedPeersArr[randomOrder[i-len(spm.optimizedPeersArr)]]
optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{Peer: p, OptimizationRating: 0.0})
}
}
prm.resp <- optimizedPeers
}
type cancelMessage struct {
k cid.Cid
}
extraPeers := make([]peer.ID, maxPeers-len(spm.optimizedPeersArr))
for i := range extraPeers {
extraPeers[i] = spm.unoptimizedPeersArr[randomOrder[i]]
func (cm *cancelMessage) handle(spm *SessionPeerManager) {
for _, data := range spm.activePeers {
data.lt.RecordCancel(cm.k)
}
prm.resp <- append(spm.optimizedPeersArr, extraPeers...)
}
func (spm *SessionPeerManager) handleShutdown() {
for p := range spm.activePeers {
for p, data := range spm.activePeers {
spm.tagger.UntagPeer(p, spm.tag)
data.lt.Shutdown()
}
}
type peerTimeoutMessage struct {
p peer.ID
k cid.Cid
}
func (ptm *peerTimeoutMessage) handle(spm *SessionPeerManager) {
data, ok := spm.activePeers[ptm.p]
if !ok || !data.lt.WasCancelled(ptm.k) {
spm.recordResponse(ptm.p, ptm.k)
}
}
type broadcastTimeoutMessage struct {
k cid.Cid
}
func (btm *broadcastTimeoutMessage) handle(spm *SessionPeerManager) {
spm.broadcastLatency.RemoveRequest(btm.k)
}
type setTimeoutMessage struct {
timeoutDuration time.Duration
}
func (stm *setTimeoutMessage) handle(spm *SessionPeerManager) {
spm.timeoutDuration = stm.timeoutDuration
}
......@@ -74,6 +74,15 @@ func (fpt *fakePeerTagger) count() int {
return len(fpt.taggedPeers)
}
func getPeers(sessionPeerManager *SessionPeerManager) []peer.ID {
optimizedPeers := sessionPeerManager.GetOptimizedPeers()
var peers []peer.ID
for _, optimizedPeer := range optimizedPeers {
peers = append(peers, optimizedPeer.Peer)
}
return peers
}
func TestFindingMorePeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
......@@ -98,7 +107,7 @@ func TestFindingMorePeers(t *testing.T) {
}
time.Sleep(2 * time.Millisecond)
sessionPeers := sessionPeerManager.GetOptimizedPeers()
sessionPeers := getPeers(sessionPeerManager)
if len(sessionPeers) != len(peers) {
t.Fatal("incorrect number of peers found")
}
......@@ -125,7 +134,7 @@ func TestRecordingReceivedBlocks(t *testing.T) {
sessionPeerManager := New(ctx, id, fpt, fppf)
sessionPeerManager.RecordPeerResponse(p, c)
time.Sleep(10 * time.Millisecond)
sessionPeers := sessionPeerManager.GetOptimizedPeers()
sessionPeers := getPeers(sessionPeerManager)
if len(sessionPeers) != 1 {
t.Fatal("did not add peer on receive")
}
......@@ -167,7 +176,7 @@ func TestOrderingPeers(t *testing.T) {
peer3 := peers[rand.Intn(100)]
time.Sleep(1 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer1, c[0])
time.Sleep(1 * time.Millisecond)
time.Sleep(5 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer2, c[0])
time.Sleep(1 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer3, c[0])
......@@ -177,13 +186,36 @@ func TestOrderingPeers(t *testing.T) {
t.Fatal("Should not return more than the max of optimized peers")
}
// should prioritize peers which have received blocks
if (sessionPeers[0] != peer3) || (sessionPeers[1] != peer2) || (sessionPeers[2] != peer1) {
// should prioritize peers which are fastest
if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) {
t.Fatal("Did not prioritize peers that received blocks")
}
// Receive a second time from same node
sessionPeerManager.RecordPeerResponse(peer3, c[0])
// should give first peer rating of 1
if sessionPeers[0].OptimizationRating < 1.0 {
t.Fatal("Did not assign rating to best peer correctly")
}
// should give other optimized peers ratings between 0 & 1
if (sessionPeers[1].OptimizationRating >= 1.0) || (sessionPeers[1].OptimizationRating <= 0.0) ||
(sessionPeers[2].OptimizationRating >= 1.0) || (sessionPeers[2].OptimizationRating <= 0.0) {
t.Fatal("Did not assign rating to other optimized peers correctly")
}
// should other peers rating of zero
for i := 3; i < maxOptimizedPeers; i++ {
if sessionPeers[i].OptimizationRating != 0.0 {
t.Fatal("Did not assign rating to unoptimized peer correctly")
}
}
c2 := testutil.GenerateCids(1)
// Request again
sessionPeerManager.RecordPeerRequests(nil, c2)
// Receive a second time
sessionPeerManager.RecordPeerResponse(peer3, c2[0])
// call again
nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
......@@ -191,15 +223,16 @@ func TestOrderingPeers(t *testing.T) {
t.Fatal("Should not return more than the max of optimized peers")
}
// should not duplicate
if (nextSessionPeers[0] != peer3) || (nextSessionPeers[1] != peer2) || (nextSessionPeers[2] != peer1) {
t.Fatal("Did dedup peers which received multiple blocks")
// should sort by average latency
if (nextSessionPeers[0].Peer != peer1) || (nextSessionPeers[1].Peer != peer3) ||
(nextSessionPeers[2].Peer != peer2) {
t.Fatal("Did not dedup peers which received multiple blocks")
}
// should randomize other peers
totalSame := 0
for i := 3; i < maxOptimizedPeers; i++ {
if sessionPeers[i] == nextSessionPeers[i] {
if sessionPeers[i].Peer == nextSessionPeers[i].Peer {
totalSame++
}
}
......@@ -208,6 +241,115 @@ func TestOrderingPeers(t *testing.T) {
}
}
func TestTimeoutsAndCancels(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
peers := testutil.GeneratePeers(3)
completed := make(chan struct{})
fpt := &fakePeerTagger{}
fppf := &fakePeerProviderFinder{peers, completed}
c := testutil.GenerateCids(1)
id := testutil.GenerateSessionID()
sessionPeerManager := New(ctx, id, fpt, fppf)
// add all peers to session
sessionPeerManager.FindMorePeers(ctx, c[0])
select {
case <-completed:
case <-ctx.Done():
t.Fatal("Did not finish finding providers")
}
time.Sleep(2 * time.Millisecond)
sessionPeerManager.SetTimeoutDuration(20 * time.Millisecond)
// record broadcast
sessionPeerManager.RecordPeerRequests(nil, c)
// record receives
peer1 := peers[0]
peer2 := peers[1]
peer3 := peers[2]
time.Sleep(1 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer1, c[0])
time.Sleep(2 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer2, c[0])
time.Sleep(40 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer3, c[0])
sessionPeers := sessionPeerManager.GetOptimizedPeers()
// should prioritize peers which are fastest
if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) {
t.Fatal("Did not prioritize peers that received blocks")
}
// should give first peer rating of 1
if sessionPeers[0].OptimizationRating < 1.0 {
t.Fatal("Did not assign rating to best peer correctly")
}
// should give other optimized peers ratings between 0 & 1
if (sessionPeers[1].OptimizationRating >= 1.0) || (sessionPeers[1].OptimizationRating <= 0.0) {
t.Fatal("Did not assign rating to other optimized peers correctly")
}
// should not record a response for a broadcast return that arrived AFTER the timeout period
// leaving peer unoptimized
if sessionPeers[2].OptimizationRating != 0 {
t.Fatal("should not have recorded broadcast response for peer that arrived after timeout period")
}
// now we make a targeted request, which SHOULD affect peer
// rating if it times out
c2 := testutil.GenerateCids(1)
// Request again
sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c2)
// wait for a timeout
time.Sleep(40 * time.Millisecond)
// call again
nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
if sessionPeers[1].OptimizationRating <= nextSessionPeers[1].OptimizationRating {
t.Fatal("Timeout should have affected optimization rating but did not")
}
// now we make a targeted request, but later cancel it
// timing out should not affect rating
c3 := testutil.GenerateCids(1)
// Request again
sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c3)
sessionPeerManager.RecordCancel(c3[0])
// wait for a timeout
time.Sleep(40 * time.Millisecond)
// call again
thirdSessionPeers := sessionPeerManager.GetOptimizedPeers()
if nextSessionPeers[1].OptimizationRating != thirdSessionPeers[1].OptimizationRating {
t.Fatal("Timeout should not have affected optimization rating but did")
}
// if we make a targeted request that is then cancelled, but we still
// receive the block before the timeout, it's worth recording and affecting latency
c4 := testutil.GenerateCids(1)
// Request again
sessionPeerManager.RecordPeerRequests([]peer.ID{peer2}, c4)
sessionPeerManager.RecordCancel(c4[0])
time.Sleep(2 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer2, c4[0])
// call again
fourthSessionPeers := sessionPeerManager.GetOptimizedPeers()
if thirdSessionPeers[1].OptimizationRating >= fourthSessionPeers[1].OptimizationRating {
t.Fatal("Timeout should have affected optimization rating but did not")
}
}
func TestUntaggingPeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
......
......@@ -3,6 +3,8 @@ package sessionrequestsplitter
import (
"context"
bssd "github.com/ipfs/go-bitswap/sessiondata"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
)
......@@ -15,12 +17,6 @@ const (
initialSplit = 2
)
// PartialRequest is represents one slice of an over request split among peers
type PartialRequest struct {
Peers []peer.ID
Keys []cid.Cid
}
type srsMessage interface {
handle(srs *SessionRequestSplitter)
}
......@@ -50,11 +46,11 @@ func New(ctx context.Context) *SessionRequestSplitter {
// SplitRequest splits a request for the given cids one or more times among the
// given peers.
func (srs *SessionRequestSplitter) SplitRequest(peers []peer.ID, ks []cid.Cid) []*PartialRequest {
resp := make(chan []*PartialRequest, 1)
func (srs *SessionRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer, ks []cid.Cid) []bssd.PartialRequest {
resp := make(chan []bssd.PartialRequest, 1)
select {
case srs.messages <- &splitRequestMessage{peers, ks, resp}:
case srs.messages <- &splitRequestMessage{optimizedPeers, ks, resp}:
case <-srs.ctx.Done():
return nil
}
......@@ -101,14 +97,18 @@ func (srs *SessionRequestSplitter) duplicateRatio() float64 {
}
type splitRequestMessage struct {
peers []peer.ID
optimizedPeers []bssd.OptimizedPeer
ks []cid.Cid
resp chan []*PartialRequest
resp chan []bssd.PartialRequest
}
func (s *splitRequestMessage) handle(srs *SessionRequestSplitter) {
split := srs.split
peers := s.peers
// first iteration ignore optimization ratings
peers := make([]peer.ID, len(s.optimizedPeers))
for i, optimizedPeer := range s.optimizedPeers {
peers[i] = optimizedPeer.Peer
}
ks := s.ks
if len(peers) < split {
split = len(peers)
......@@ -118,9 +118,9 @@ func (s *splitRequestMessage) handle(srs *SessionRequestSplitter) {
split = len(ks)
}
keySplits := splitKeys(ks, split)
splitRequests := make([]*PartialRequest, len(keySplits))
for i := range splitRequests {
splitRequests[i] = &PartialRequest{peerSplits[i], keySplits[i]}
splitRequests := make([]bssd.PartialRequest, 0, len(keySplits))
for i, keySplit := range keySplits {
splitRequests = append(splitRequests, bssd.PartialRequest{Peers: peerSplits[i], Keys: keySplit})
}
s.resp <- splitRequests
}
......
......@@ -7,14 +7,16 @@ import (
"github.com/ipfs/go-bitswap/testutil"
)
func quadEaseOut(t float64) float64 { return t * t }
func TestSplittingRequests(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(10)
optimizedPeers := testutil.GenerateOptimizedPeers(10, 5, quadEaseOut)
keys := testutil.GenerateCids(6)
srs := New(ctx)
partialRequests := srs.SplitRequest(peers, keys)
partialRequests := srs.SplitRequest(optimizedPeers, keys)
if len(partialRequests) != 2 {
t.Fatal("Did not generate right number of partial requests")
}
......@@ -27,12 +29,12 @@ func TestSplittingRequests(t *testing.T) {
func TestSplittingRequestsTooFewKeys(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(10)
optimizedPeers := testutil.GenerateOptimizedPeers(10, 5, quadEaseOut)
keys := testutil.GenerateCids(1)
srs := New(ctx)
partialRequests := srs.SplitRequest(peers, keys)
partialRequests := srs.SplitRequest(optimizedPeers, keys)
if len(partialRequests) != 1 {
t.Fatal("Should only generate as many requests as keys")
}
......@@ -45,12 +47,12 @@ func TestSplittingRequestsTooFewKeys(t *testing.T) {
func TestSplittingRequestsTooFewPeers(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(1)
optimizedPeers := testutil.GenerateOptimizedPeers(1, 1, quadEaseOut)
keys := testutil.GenerateCids(6)
srs := New(ctx)
partialRequests := srs.SplitRequest(peers, keys)
partialRequests := srs.SplitRequest(optimizedPeers, keys)
if len(partialRequests) != 1 {
t.Fatal("Should only generate as many requests as peers")
}
......@@ -63,7 +65,7 @@ func TestSplittingRequestsTooFewPeers(t *testing.T) {
func TestSplittingRequestsIncreasingSplitDueToDupes(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(maxSplit)
optimizedPeers := testutil.GenerateOptimizedPeers(maxSplit, maxSplit, quadEaseOut)
keys := testutil.GenerateCids(maxSplit)
srs := New(ctx)
......@@ -72,7 +74,7 @@ func TestSplittingRequestsIncreasingSplitDueToDupes(t *testing.T) {
srs.RecordDuplicateBlock()
}
partialRequests := srs.SplitRequest(peers, keys)
partialRequests := srs.SplitRequest(optimizedPeers, keys)
if len(partialRequests) != maxSplit {
t.Fatal("Did not adjust split up as duplicates came in")
}
......@@ -80,7 +82,7 @@ func TestSplittingRequestsIncreasingSplitDueToDupes(t *testing.T) {
func TestSplittingRequestsDecreasingSplitDueToNoDupes(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(maxSplit)
optimizedPeers := testutil.GenerateOptimizedPeers(maxSplit, maxSplit, quadEaseOut)
keys := testutil.GenerateCids(maxSplit)
srs := New(ctx)
......@@ -89,7 +91,7 @@ func TestSplittingRequestsDecreasingSplitDueToNoDupes(t *testing.T) {
srs.RecordUniqueBlock()
}
partialRequests := srs.SplitRequest(peers, keys)
partialRequests := srs.SplitRequest(optimizedPeers, keys)
if len(partialRequests) != 1 {
t.Fatal("Did not adjust split down as unique blocks came in")
}
......
......@@ -4,6 +4,7 @@ import (
"math/rand"
bsmsg "github.com/ipfs/go-bitswap/message"
bssd "github.com/ipfs/go-bitswap/sessiondata"
"github.com/ipfs/go-bitswap/wantlist"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
......@@ -76,6 +77,24 @@ func GeneratePeers(n int) []peer.ID {
return peerIds
}
// GenerateOptimizedPeers creates n peer ids,
// with optimization fall off up to optCount, curveFunc to scale it
func GenerateOptimizedPeers(n int, optCount int, curveFunc func(float64) float64) []bssd.OptimizedPeer {
peers := GeneratePeers(n)
optimizedPeers := make([]bssd.OptimizedPeer, 0, n)
for i, peer := range peers {
var optimizationRating float64
if i <= optCount {
optimizationRating = 1.0 - float64(i)/float64(optCount)
} else {
optimizationRating = 0.0
}
optimizationRating = curveFunc(optimizationRating)
optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{Peer: peer, OptimizationRating: optimizationRating})
}
return optimizedPeers
}
var nextSession uint64
// GenerateSessionID make a unit session identifier.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment