Commit 1274d405 authored by Dirk McCormick's avatar Dirk McCormick

refactor: remove unused code

parent 29b4de92
package sessiondata
import (
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
)
// OptimizedPeer describes a peer and its level of optimization from 0 to 1.
type OptimizedPeer struct {
Peer peer.ID
OptimizationRating float64
}
// PartialRequest is represents one slice of an over request split among peers
type PartialRequest struct {
Peers []peer.ID
Keys []cid.Cid
}
package sessionrequestsplitter
import (
"context"
bssd "github.com/ipfs/go-bitswap/internal/sessiondata"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
)
const (
minReceivedToAdjustSplit = 2
maxSplit = 16
maxAcceptableDupes = 0.4
minDuplesToTryLessSplits = 0.2
initialSplit = 2
)
type srsMessage interface {
handle(srs *SessionRequestSplitter)
}
// SessionRequestSplitter track how many duplicate and unique blocks come in and
// uses that to determine how much to split up each set of wants among peers.
type SessionRequestSplitter struct {
ctx context.Context
messages chan srsMessage
// data, do not touch outside run loop
receivedCount int
split int
duplicateReceivedCount int
}
// New returns a new SessionRequestSplitter.
func New(ctx context.Context) *SessionRequestSplitter {
srs := &SessionRequestSplitter{
ctx: ctx,
messages: make(chan srsMessage, 10),
split: initialSplit,
}
go srs.run()
return srs
}
// SplitRequest splits a request for the given cids one or more times among the
// given peers.
func (srs *SessionRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer, ks []cid.Cid) []bssd.PartialRequest {
resp := make(chan []bssd.PartialRequest, 1)
select {
case srs.messages <- &splitRequestMessage{optimizedPeers, ks, resp}:
case <-srs.ctx.Done():
return nil
}
select {
case splitRequests := <-resp:
return splitRequests
case <-srs.ctx.Done():
return nil
}
}
// RecordDuplicateBlock records the fact that the session received a duplicate
// block and adjusts split factor as neccesary.
func (srs *SessionRequestSplitter) RecordDuplicateBlock() {
select {
case srs.messages <- &recordDuplicateMessage{}:
case <-srs.ctx.Done():
}
}
// RecordUniqueBlock records the fact that the session received a unique block
// and adjusts the split factor as neccesary.
func (srs *SessionRequestSplitter) RecordUniqueBlock() {
select {
case srs.messages <- &recordUniqueMessage{}:
case <-srs.ctx.Done():
}
}
func (srs *SessionRequestSplitter) run() {
for {
select {
case message := <-srs.messages:
message.handle(srs)
case <-srs.ctx.Done():
return
}
}
}
func (srs *SessionRequestSplitter) duplicateRatio() float64 {
return float64(srs.duplicateReceivedCount) / float64(srs.receivedCount)
}
type splitRequestMessage struct {
optimizedPeers []bssd.OptimizedPeer
ks []cid.Cid
resp chan []bssd.PartialRequest
}
func (s *splitRequestMessage) handle(srs *SessionRequestSplitter) {
split := srs.split
// first iteration ignore optimization ratings
peers := make([]peer.ID, len(s.optimizedPeers))
for i, optimizedPeer := range s.optimizedPeers {
peers[i] = optimizedPeer.Peer
}
ks := s.ks
if len(peers) < split {
split = len(peers)
}
peerSplits := splitPeers(peers, split)
if len(ks) < split {
split = len(ks)
}
keySplits := splitKeys(ks, split)
splitRequests := make([]bssd.PartialRequest, 0, len(keySplits))
for i, keySplit := range keySplits {
splitRequests = append(splitRequests, bssd.PartialRequest{Peers: peerSplits[i], Keys: keySplit})
}
s.resp <- splitRequests
}
type recordDuplicateMessage struct{}
func (r *recordDuplicateMessage) handle(srs *SessionRequestSplitter) {
srs.receivedCount++
srs.duplicateReceivedCount++
if (srs.receivedCount > minReceivedToAdjustSplit) && (srs.duplicateRatio() > maxAcceptableDupes) && (srs.split < maxSplit) {
srs.split++
}
}
type recordUniqueMessage struct{}
func (r *recordUniqueMessage) handle(srs *SessionRequestSplitter) {
srs.receivedCount++
if (srs.split > 1) && (srs.duplicateRatio() < minDuplesToTryLessSplits) {
srs.split--
}
}
func splitKeys(ks []cid.Cid, split int) [][]cid.Cid {
splits := make([][]cid.Cid, split)
for i, c := range ks {
pos := i % split
splits[pos] = append(splits[pos], c)
}
return splits
}
func splitPeers(peers []peer.ID, split int) [][]peer.ID {
splits := make([][]peer.ID, split)
for i, p := range peers {
pos := i % split
splits[pos] = append(splits[pos], p)
}
return splits
}
package sessionrequestsplitter
import (
"context"
"testing"
"github.com/ipfs/go-bitswap/internal/testutil"
)
func quadEaseOut(t float64) float64 { return t * t }
func TestSplittingRequests(t *testing.T) {
ctx := context.Background()
optimizedPeers := testutil.GenerateOptimizedPeers(10, 5, quadEaseOut)
keys := testutil.GenerateCids(6)
srs := New(ctx)
partialRequests := srs.SplitRequest(optimizedPeers, keys)
if len(partialRequests) != 2 {
t.Fatal("Did not generate right number of partial requests")
}
for _, partialRequest := range partialRequests {
if len(partialRequest.Peers) != 5 && len(partialRequest.Keys) != 3 {
t.Fatal("Did not split request into even partial requests")
}
}
}
func TestSplittingRequestsTooFewKeys(t *testing.T) {
ctx := context.Background()
optimizedPeers := testutil.GenerateOptimizedPeers(10, 5, quadEaseOut)
keys := testutil.GenerateCids(1)
srs := New(ctx)
partialRequests := srs.SplitRequest(optimizedPeers, keys)
if len(partialRequests) != 1 {
t.Fatal("Should only generate as many requests as keys")
}
for _, partialRequest := range partialRequests {
if len(partialRequest.Peers) != 5 && len(partialRequest.Keys) != 1 {
t.Fatal("Should still split peers up between keys")
}
}
}
func TestSplittingRequestsTooFewPeers(t *testing.T) {
ctx := context.Background()
optimizedPeers := testutil.GenerateOptimizedPeers(1, 1, quadEaseOut)
keys := testutil.GenerateCids(6)
srs := New(ctx)
partialRequests := srs.SplitRequest(optimizedPeers, keys)
if len(partialRequests) != 1 {
t.Fatal("Should only generate as many requests as peers")
}
for _, partialRequest := range partialRequests {
if len(partialRequest.Peers) != 1 && len(partialRequest.Keys) != 6 {
t.Fatal("Should not split keys if there are not enough peers")
}
}
}
func TestSplittingRequestsIncreasingSplitDueToDupes(t *testing.T) {
ctx := context.Background()
optimizedPeers := testutil.GenerateOptimizedPeers(maxSplit, maxSplit, quadEaseOut)
keys := testutil.GenerateCids(maxSplit)
srs := New(ctx)
for i := 0; i < maxSplit+minReceivedToAdjustSplit; i++ {
srs.RecordDuplicateBlock()
}
partialRequests := srs.SplitRequest(optimizedPeers, keys)
if len(partialRequests) != maxSplit {
t.Fatal("Did not adjust split up as duplicates came in")
}
}
func TestSplittingRequestsDecreasingSplitDueToNoDupes(t *testing.T) {
ctx := context.Background()
optimizedPeers := testutil.GenerateOptimizedPeers(maxSplit, maxSplit, quadEaseOut)
keys := testutil.GenerateCids(maxSplit)
srs := New(ctx)
for i := 0; i < 5+minReceivedToAdjustSplit; i++ {
srs.RecordUniqueBlock()
}
partialRequests := srs.SplitRequest(optimizedPeers, keys)
if len(partialRequests) != 1 {
t.Fatal("Did not adjust split down as unique blocks came in")
}
}
......@@ -3,7 +3,6 @@ package testutil
import (
"math/rand"
bssd "github.com/ipfs/go-bitswap/internal/sessiondata"
bsmsg "github.com/ipfs/go-bitswap/message"
"github.com/ipfs/go-bitswap/wantlist"
blocks "github.com/ipfs/go-block-format"
......@@ -66,24 +65,6 @@ func GeneratePeers(n int) []peer.ID {
return peerIds
}
// GenerateOptimizedPeers creates n peer ids,
// with optimization fall off up to optCount, curveFunc to scale it
func GenerateOptimizedPeers(n int, optCount int, curveFunc func(float64) float64) []bssd.OptimizedPeer {
peers := GeneratePeers(n)
optimizedPeers := make([]bssd.OptimizedPeer, 0, n)
for i, peer := range peers {
var optimizationRating float64
if i <= optCount {
optimizationRating = 1.0 - float64(i)/float64(optCount)
} else {
optimizationRating = 0.0
}
optimizationRating = curveFunc(optimizationRating)
optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{Peer: peer, OptimizationRating: optimizationRating})
}
return optimizedPeers
}
var nextSession uint64
// GenerateSessionID make a unit session identifier.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment