Commit 8e59a716 authored by hannahhoward's avatar hannahhoward

feat(sessions): pass optimization rating

When fetching optimized peers from the peer manager, return an optimization rating, and pass on to
request splitter

BREAKING CHANGE: interface change to GetOptimizedPeers and SplitRequests public package methods
parent 98f01e7f
......@@ -8,6 +8,7 @@ import (
lru "github.com/hashicorp/golang-lru"
bsgetter "github.com/ipfs/go-bitswap/getter"
notifications "github.com/ipfs/go-bitswap/notifications"
bssd "github.com/ipfs/go-bitswap/sessiondata"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
......@@ -15,7 +16,6 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer"
loggables "github.com/libp2p/go-libp2p-loggables"
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
)
const (
......@@ -34,7 +34,7 @@ type WantManager interface {
// requesting more when neccesary.
type PeerManager interface {
FindMorePeers(context.Context, cid.Cid)
GetOptimizedPeers() []peer.ID
GetOptimizedPeers() []bssd.OptimizedPeer
RecordPeerRequests([]peer.ID, []cid.Cid)
RecordPeerResponse(peer.ID, cid.Cid)
}
......@@ -42,7 +42,7 @@ type PeerManager interface {
// RequestSplitter provides an interface for splitting
// a request for Cids up among peers.
type RequestSplitter interface {
SplitRequest([]peer.ID, []cid.Cid) []*bssrs.PartialRequest
SplitRequest([]bssd.OptimizedPeer, []cid.Cid) []bssd.PartialRequest
RecordDuplicateBlock()
RecordUniqueBlock()
}
......
......@@ -6,7 +6,7 @@ import (
"testing"
"time"
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
bssd "github.com/ipfs/go-bitswap/sessiondata"
"github.com/ipfs/go-bitswap/testutil"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
......@@ -52,10 +52,14 @@ func (fpm *fakePeerManager) FindMorePeers(ctx context.Context, k cid.Cid) {
}
}
func (fpm *fakePeerManager) GetOptimizedPeers() []peer.ID {
func (fpm *fakePeerManager) GetOptimizedPeers() []bssd.OptimizedPeer {
fpm.lk.Lock()
defer fpm.lk.Unlock()
return fpm.peers
optimizedPeers := make([]bssd.OptimizedPeer, 0, len(fpm.peers))
for _, peer := range fpm.peers {
optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{Peer: peer, OptimizationRating: 1.0})
}
return optimizedPeers
}
func (fpm *fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
......@@ -68,8 +72,12 @@ func (fpm *fakePeerManager) RecordPeerResponse(p peer.ID, c cid.Cid) {
type fakeRequestSplitter struct {
}
func (frs *fakeRequestSplitter) SplitRequest(peers []peer.ID, keys []cid.Cid) []*bssrs.PartialRequest {
return []*bssrs.PartialRequest{&bssrs.PartialRequest{Peers: peers, Keys: keys}}
func (frs *fakeRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer, keys []cid.Cid) []bssd.PartialRequest {
peers := make([]peer.ID, len(optimizedPeers))
for i, optimizedPeer := range optimizedPeers {
peers[i] = optimizedPeer.Peer
}
return []bssd.PartialRequest{bssd.PartialRequest{Peers: peers, Keys: keys}}
}
func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
......
package sessiondata
import (
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
)
// OptimizedPeer describes a peer and its level of optimization from 0 to 1.
type OptimizedPeer struct {
Peer peer.ID
OptimizationRating float64
}
// PartialRequest is represents one slice of an over request split among peers
type PartialRequest struct {
Peers []peer.ID
Keys []cid.Cid
}
......@@ -5,10 +5,10 @@ import (
"testing"
"time"
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
delay "github.com/ipfs/go-ipfs-delay"
bssession "github.com/ipfs/go-bitswap/session"
bssd "github.com/ipfs/go-bitswap/sessiondata"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
......@@ -39,14 +39,14 @@ type fakePeerManager struct {
}
func (*fakePeerManager) FindMorePeers(context.Context, cid.Cid) {}
func (*fakePeerManager) GetOptimizedPeers() []peer.ID { return nil }
func (*fakePeerManager) GetOptimizedPeers() []bssd.OptimizedPeer { return nil }
func (*fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
func (*fakePeerManager) RecordPeerResponse(peer.ID, cid.Cid) {}
type fakeRequestSplitter struct {
}
func (frs *fakeRequestSplitter) SplitRequest(peers []peer.ID, keys []cid.Cid) []*bssrs.PartialRequest {
func (frs *fakeRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer, keys []cid.Cid) []bssd.PartialRequest {
return nil
}
func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
......
......@@ -6,6 +6,8 @@ import (
"math/rand"
"sort"
bssd "github.com/ipfs/go-bitswap/sessiondata"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
)
......@@ -78,7 +80,7 @@ func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, k cid.Cid) {
}
}
// RecordPeerRequests records that a given set of peers requested the given cids
// RecordPeerRequests records that a given set of peers requested the given cids.
func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
// at the moment, we're not doing anything here
// soon we'll use this to track latency by peer
......@@ -88,11 +90,12 @@ func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
}
}
// GetOptimizedPeers returns the best peers available for a session
func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID {
// GetOptimizedPeers returns the best peers available for a session, along with
// a rating for how good they are, in comparison to the best peer.
func (spm *SessionPeerManager) GetOptimizedPeers() []bssd.OptimizedPeer {
// right now this just returns all peers, but soon we might return peers
// ordered by optimization, or only a subset
resp := make(chan []peer.ID, 1)
resp := make(chan []bssd.OptimizedPeer, 1)
select {
case spm.peerMessages <- &getPeersMessage{resp}:
case <-spm.ctx.Done():
......@@ -191,19 +194,28 @@ func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {
p := prm.p
k := prm.k
data, ok := spm.activePeers[p]
if !ok {
data = newPeerData()
spm.activePeers[p] = data
spm.tagPeer(p)
wasOptimized := ok && data.hasLatency
if wasOptimized {
spm.removeOptimizedPeer(p)
} else {
if data.hasLatency {
spm.removeOptimizedPeer(p)
} else {
if ok {
spm.removeUnoptimizedPeer(p)
} else {
data = newPeerData()
spm.activePeers[p] = data
}
}
fallbackLatency, hasFallbackLatency := spm.broadcastLatency.CheckDuration(k)
data.AdjustLatency(k, hasFallbackLatency, fallbackLatency)
var tagValue int
if data.hasLatency {
tagValue = optimizedTagValue
} else {
tagValue = unoptimizedTagValue
}
if !ok || wasOptimized != data.hasLatency {
spm.tagPeer(p, tagValue)
}
spm.insertPeer(p, data)
}
......@@ -231,7 +243,7 @@ func (prm *peerRequestMessage) handle(spm *SessionPeerManager) {
}
type getPeersMessage struct {
resp chan<- []peer.ID
resp chan<- []bssd.OptimizedPeer
}
func (prm *getPeersMessage) handle(spm *SessionPeerManager) {
......@@ -240,12 +252,26 @@ func (prm *getPeersMessage) handle(spm *SessionPeerManager) {
if maxPeers > maxOptimizedPeers {
maxPeers = maxOptimizedPeers
}
extraPeers := make([]peer.ID, maxPeers-len(spm.optimizedPeersArr))
for i := range extraPeers {
extraPeers[i] = spm.unoptimizedPeersArr[randomOrder[i]]
var bestPeerLatency float64
if len(spm.optimizedPeersArr) > 0 {
bestPeerLatency = float64(spm.activePeers[spm.optimizedPeersArr[0]].latency)
} else {
bestPeerLatency = 0
}
optimizedPeers := make([]bssd.OptimizedPeer, 0, maxPeers)
for i := 0; i < maxPeers; i++ {
if i < len(spm.optimizedPeersArr) {
p := spm.optimizedPeersArr[i]
optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{
Peer: p,
OptimizationRating: bestPeerLatency / float64(spm.activePeers[p].latency),
})
} else {
p := spm.unoptimizedPeersArr[randomOrder[i-len(spm.optimizedPeersArr)]]
optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{Peer: p, OptimizationRating: 0.0})
}
}
prm.resp <- append(spm.optimizedPeersArr, extraPeers...)
prm.resp <- optimizedPeers
}
func (spm *SessionPeerManager) handleShutdown() {
......
......@@ -74,6 +74,15 @@ func (fpt *fakePeerTagger) count() int {
return len(fpt.taggedPeers)
}
func getPeers(sessionPeerManager *SessionPeerManager) []peer.ID {
optimizedPeers := sessionPeerManager.GetOptimizedPeers()
var peers []peer.ID
for _, optimizedPeer := range optimizedPeers {
peers = append(peers, optimizedPeer.Peer)
}
return peers
}
func TestFindingMorePeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
......@@ -98,7 +107,7 @@ func TestFindingMorePeers(t *testing.T) {
}
time.Sleep(2 * time.Millisecond)
sessionPeers := sessionPeerManager.GetOptimizedPeers()
sessionPeers := getPeers(sessionPeerManager)
if len(sessionPeers) != len(peers) {
t.Fatal("incorrect number of peers found")
}
......@@ -125,7 +134,7 @@ func TestRecordingReceivedBlocks(t *testing.T) {
sessionPeerManager := New(ctx, id, fpt, fppf)
sessionPeerManager.RecordPeerResponse(p, c)
time.Sleep(10 * time.Millisecond)
sessionPeers := sessionPeerManager.GetOptimizedPeers()
sessionPeers := getPeers(sessionPeerManager)
if len(sessionPeers) != 1 {
t.Fatal("did not add peer on receive")
}
......@@ -178,10 +187,28 @@ func TestOrderingPeers(t *testing.T) {
}
// should prioritize peers which are fastest
if (sessionPeers[0] != peer1) || (sessionPeers[1] != peer2) || (sessionPeers[2] != peer3) {
if (sessionPeers[0].Peer != peer1) || (sessionPeers[1].Peer != peer2) || (sessionPeers[2].Peer != peer3) {
t.Fatal("Did not prioritize peers that received blocks")
}
// should give first peer rating of 1
if sessionPeers[0].OptimizationRating < 1.0 {
t.Fatal("Did not assign rating to best peer correctly")
}
// should give other optimized peers ratings between 0 & 1
if (sessionPeers[1].OptimizationRating >= 1.0) || (sessionPeers[1].OptimizationRating <= 0.0) ||
(sessionPeers[2].OptimizationRating >= 1.0) || (sessionPeers[2].OptimizationRating <= 0.0) {
t.Fatal("Did not assign rating to other optimized peers correctly")
}
// should other peers rating of zero
for i := 3; i < maxOptimizedPeers; i++ {
if sessionPeers[i].OptimizationRating != 0.0 {
t.Fatal("Did not assign rating to unoptimized peer correctly")
}
}
c2 := testutil.GenerateCids(1)
// Request again
......@@ -197,14 +224,15 @@ func TestOrderingPeers(t *testing.T) {
}
// should sort by average latency
if (nextSessionPeers[0] != peer1) || (nextSessionPeers[1] != peer3) || (nextSessionPeers[2] != peer2) {
if (nextSessionPeers[0].Peer != peer1) || (nextSessionPeers[1].Peer != peer3) ||
(nextSessionPeers[2].Peer != peer2) {
t.Fatal("Did not dedup peers which received multiple blocks")
}
// should randomize other peers
totalSame := 0
for i := 3; i < maxOptimizedPeers; i++ {
if sessionPeers[i] == nextSessionPeers[i] {
if sessionPeers[i].Peer == nextSessionPeers[i].Peer {
totalSame++
}
}
......
......@@ -3,6 +3,8 @@ package sessionrequestsplitter
import (
"context"
bssd "github.com/ipfs/go-bitswap/sessiondata"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
)
......@@ -15,12 +17,6 @@ const (
initialSplit = 2
)
// PartialRequest is represents one slice of an over request split among peers
type PartialRequest struct {
Peers []peer.ID
Keys []cid.Cid
}
type srsMessage interface {
handle(srs *SessionRequestSplitter)
}
......@@ -50,11 +46,11 @@ func New(ctx context.Context) *SessionRequestSplitter {
// SplitRequest splits a request for the given cids one or more times among the
// given peers.
func (srs *SessionRequestSplitter) SplitRequest(peers []peer.ID, ks []cid.Cid) []*PartialRequest {
resp := make(chan []*PartialRequest, 1)
func (srs *SessionRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer, ks []cid.Cid) []bssd.PartialRequest {
resp := make(chan []bssd.PartialRequest, 1)
select {
case srs.messages <- &splitRequestMessage{peers, ks, resp}:
case srs.messages <- &splitRequestMessage{optimizedPeers, ks, resp}:
case <-srs.ctx.Done():
return nil
}
......@@ -101,14 +97,18 @@ func (srs *SessionRequestSplitter) duplicateRatio() float64 {
}
type splitRequestMessage struct {
peers []peer.ID
ks []cid.Cid
resp chan []*PartialRequest
optimizedPeers []bssd.OptimizedPeer
ks []cid.Cid
resp chan []bssd.PartialRequest
}
func (s *splitRequestMessage) handle(srs *SessionRequestSplitter) {
split := srs.split
peers := s.peers
// first iteration ignore optimization ratings
peers := make([]peer.ID, len(s.optimizedPeers))
for i, optimizedPeer := range s.optimizedPeers {
peers[i] = optimizedPeer.Peer
}
ks := s.ks
if len(peers) < split {
split = len(peers)
......@@ -118,9 +118,9 @@ func (s *splitRequestMessage) handle(srs *SessionRequestSplitter) {
split = len(ks)
}
keySplits := splitKeys(ks, split)
splitRequests := make([]*PartialRequest, len(keySplits))
for i := range splitRequests {
splitRequests[i] = &PartialRequest{peerSplits[i], keySplits[i]}
splitRequests := make([]bssd.PartialRequest, 0, len(keySplits))
for i, keySplit := range keySplits {
splitRequests = append(splitRequests, bssd.PartialRequest{Peers: peerSplits[i], Keys: keySplit})
}
s.resp <- splitRequests
}
......
......@@ -7,14 +7,16 @@ import (
"github.com/ipfs/go-bitswap/testutil"
)
func quadEaseOut(t float64) float64 { return t * t }
func TestSplittingRequests(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(10)
optimizedPeers := testutil.GenerateOptimizedPeers(10, 5, quadEaseOut)
keys := testutil.GenerateCids(6)
srs := New(ctx)
partialRequests := srs.SplitRequest(peers, keys)
partialRequests := srs.SplitRequest(optimizedPeers, keys)
if len(partialRequests) != 2 {
t.Fatal("Did not generate right number of partial requests")
}
......@@ -27,12 +29,12 @@ func TestSplittingRequests(t *testing.T) {
func TestSplittingRequestsTooFewKeys(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(10)
optimizedPeers := testutil.GenerateOptimizedPeers(10, 5, quadEaseOut)
keys := testutil.GenerateCids(1)
srs := New(ctx)
partialRequests := srs.SplitRequest(peers, keys)
partialRequests := srs.SplitRequest(optimizedPeers, keys)
if len(partialRequests) != 1 {
t.Fatal("Should only generate as many requests as keys")
}
......@@ -45,12 +47,12 @@ func TestSplittingRequestsTooFewKeys(t *testing.T) {
func TestSplittingRequestsTooFewPeers(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(1)
optimizedPeers := testutil.GenerateOptimizedPeers(1, 1, quadEaseOut)
keys := testutil.GenerateCids(6)
srs := New(ctx)
partialRequests := srs.SplitRequest(peers, keys)
partialRequests := srs.SplitRequest(optimizedPeers, keys)
if len(partialRequests) != 1 {
t.Fatal("Should only generate as many requests as peers")
}
......@@ -63,7 +65,7 @@ func TestSplittingRequestsTooFewPeers(t *testing.T) {
func TestSplittingRequestsIncreasingSplitDueToDupes(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(maxSplit)
optimizedPeers := testutil.GenerateOptimizedPeers(maxSplit, maxSplit, quadEaseOut)
keys := testutil.GenerateCids(maxSplit)
srs := New(ctx)
......@@ -72,7 +74,7 @@ func TestSplittingRequestsIncreasingSplitDueToDupes(t *testing.T) {
srs.RecordDuplicateBlock()
}
partialRequests := srs.SplitRequest(peers, keys)
partialRequests := srs.SplitRequest(optimizedPeers, keys)
if len(partialRequests) != maxSplit {
t.Fatal("Did not adjust split up as duplicates came in")
}
......@@ -80,7 +82,7 @@ func TestSplittingRequestsIncreasingSplitDueToDupes(t *testing.T) {
func TestSplittingRequestsDecreasingSplitDueToNoDupes(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(maxSplit)
optimizedPeers := testutil.GenerateOptimizedPeers(maxSplit, maxSplit, quadEaseOut)
keys := testutil.GenerateCids(maxSplit)
srs := New(ctx)
......@@ -89,7 +91,7 @@ func TestSplittingRequestsDecreasingSplitDueToNoDupes(t *testing.T) {
srs.RecordUniqueBlock()
}
partialRequests := srs.SplitRequest(peers, keys)
partialRequests := srs.SplitRequest(optimizedPeers, keys)
if len(partialRequests) != 1 {
t.Fatal("Did not adjust split down as unique blocks came in")
}
......
......@@ -4,6 +4,7 @@ import (
"math/rand"
bsmsg "github.com/ipfs/go-bitswap/message"
bssd "github.com/ipfs/go-bitswap/sessiondata"
"github.com/ipfs/go-bitswap/wantlist"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
......@@ -76,6 +77,24 @@ func GeneratePeers(n int) []peer.ID {
return peerIds
}
// GenerateOptimizedPeers creates n peer ids,
// with optimization fall off up to optCount, curveFunc to scale it
func GenerateOptimizedPeers(n int, optCount int, curveFunc func(float64) float64) []bssd.OptimizedPeer {
peers := GeneratePeers(n)
optimizedPeers := make([]bssd.OptimizedPeer, 0, n)
for i, peer := range peers {
var optimizationRating float64
if i <= optCount {
optimizationRating = 1.0 - float64(i)/float64(optCount)
} else {
optimizationRating = 0.0
}
optimizationRating = curveFunc(optimizationRating)
optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{Peer: peer, OptimizationRating: optimizationRating})
}
return optimizedPeers
}
var nextSession uint64
// GenerateSessionID make a unit session identifier.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment