diff --git a/internal/sessiondata/sessiondata.go b/internal/sessiondata/sessiondata.go deleted file mode 100644 index a56f93be51cd43d2abbce696ad7ffe47ee809ea8..0000000000000000000000000000000000000000 --- a/internal/sessiondata/sessiondata.go +++ /dev/null @@ -1,18 +0,0 @@ -package sessiondata - -import ( - cid "github.com/ipfs/go-cid" - peer "github.com/libp2p/go-libp2p-core/peer" -) - -// OptimizedPeer describes a peer and its level of optimization from 0 to 1. -type OptimizedPeer struct { - Peer peer.ID - OptimizationRating float64 -} - -// PartialRequest is represents one slice of an over request split among peers -type PartialRequest struct { - Peers []peer.ID - Keys []cid.Cid -} diff --git a/internal/sessionrequestsplitter/sessionrequestsplitter.go b/internal/sessionrequestsplitter/sessionrequestsplitter.go deleted file mode 100644 index b96985ec995c3201d52d98b68b23957d2a496505..0000000000000000000000000000000000000000 --- a/internal/sessionrequestsplitter/sessionrequestsplitter.go +++ /dev/null @@ -1,163 +0,0 @@ -package sessionrequestsplitter - -import ( - "context" - - bssd "github.com/ipfs/go-bitswap/internal/sessiondata" - - "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p-core/peer" -) - -const ( - minReceivedToAdjustSplit = 2 - maxSplit = 16 - maxAcceptableDupes = 0.4 - minDuplesToTryLessSplits = 0.2 - initialSplit = 2 -) - -type srsMessage interface { - handle(srs *SessionRequestSplitter) -} - -// SessionRequestSplitter track how many duplicate and unique blocks come in and -// uses that to determine how much to split up each set of wants among peers. -type SessionRequestSplitter struct { - ctx context.Context - messages chan srsMessage - - // data, do not touch outside run loop - receivedCount int - split int - duplicateReceivedCount int -} - -// New returns a new SessionRequestSplitter. -func New(ctx context.Context) *SessionRequestSplitter { - srs := &SessionRequestSplitter{ - ctx: ctx, - messages: make(chan srsMessage, 10), - split: initialSplit, - } - go srs.run() - return srs -} - -// SplitRequest splits a request for the given cids one or more times among the -// given peers. -func (srs *SessionRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer, ks []cid.Cid) []bssd.PartialRequest { - resp := make(chan []bssd.PartialRequest, 1) - - select { - case srs.messages <- &splitRequestMessage{optimizedPeers, ks, resp}: - case <-srs.ctx.Done(): - return nil - } - select { - case splitRequests := <-resp: - return splitRequests - case <-srs.ctx.Done(): - return nil - } - -} - -// RecordDuplicateBlock records the fact that the session received a duplicate -// block and adjusts split factor as neccesary. -func (srs *SessionRequestSplitter) RecordDuplicateBlock() { - select { - case srs.messages <- &recordDuplicateMessage{}: - case <-srs.ctx.Done(): - } -} - -// RecordUniqueBlock records the fact that the session received a unique block -// and adjusts the split factor as neccesary. -func (srs *SessionRequestSplitter) RecordUniqueBlock() { - select { - case srs.messages <- &recordUniqueMessage{}: - case <-srs.ctx.Done(): - } -} - -func (srs *SessionRequestSplitter) run() { - for { - select { - case message := <-srs.messages: - message.handle(srs) - case <-srs.ctx.Done(): - return - } - } -} - -func (srs *SessionRequestSplitter) duplicateRatio() float64 { - return float64(srs.duplicateReceivedCount) / float64(srs.receivedCount) -} - -type splitRequestMessage struct { - optimizedPeers []bssd.OptimizedPeer - ks []cid.Cid - resp chan []bssd.PartialRequest -} - -func (s *splitRequestMessage) handle(srs *SessionRequestSplitter) { - split := srs.split - // first iteration ignore optimization ratings - peers := make([]peer.ID, len(s.optimizedPeers)) - for i, optimizedPeer := range s.optimizedPeers { - peers[i] = optimizedPeer.Peer - } - ks := s.ks - if len(peers) < split { - split = len(peers) - } - peerSplits := splitPeers(peers, split) - if len(ks) < split { - split = len(ks) - } - keySplits := splitKeys(ks, split) - splitRequests := make([]bssd.PartialRequest, 0, len(keySplits)) - for i, keySplit := range keySplits { - splitRequests = append(splitRequests, bssd.PartialRequest{Peers: peerSplits[i], Keys: keySplit}) - } - s.resp <- splitRequests -} - -type recordDuplicateMessage struct{} - -func (r *recordDuplicateMessage) handle(srs *SessionRequestSplitter) { - srs.receivedCount++ - srs.duplicateReceivedCount++ - if (srs.receivedCount > minReceivedToAdjustSplit) && (srs.duplicateRatio() > maxAcceptableDupes) && (srs.split < maxSplit) { - srs.split++ - } -} - -type recordUniqueMessage struct{} - -func (r *recordUniqueMessage) handle(srs *SessionRequestSplitter) { - srs.receivedCount++ - if (srs.split > 1) && (srs.duplicateRatio() < minDuplesToTryLessSplits) { - srs.split-- - } - -} -func splitKeys(ks []cid.Cid, split int) [][]cid.Cid { - splits := make([][]cid.Cid, split) - for i, c := range ks { - pos := i % split - splits[pos] = append(splits[pos], c) - } - return splits -} - -func splitPeers(peers []peer.ID, split int) [][]peer.ID { - splits := make([][]peer.ID, split) - for i, p := range peers { - pos := i % split - splits[pos] = append(splits[pos], p) - } - return splits -} diff --git a/internal/sessionrequestsplitter/sessionrequestsplitter_test.go b/internal/sessionrequestsplitter/sessionrequestsplitter_test.go deleted file mode 100644 index b0e7a0f309536ad422ccf4e1b53b019c2a709d52..0000000000000000000000000000000000000000 --- a/internal/sessionrequestsplitter/sessionrequestsplitter_test.go +++ /dev/null @@ -1,98 +0,0 @@ -package sessionrequestsplitter - -import ( - "context" - "testing" - - "github.com/ipfs/go-bitswap/internal/testutil" -) - -func quadEaseOut(t float64) float64 { return t * t } - -func TestSplittingRequests(t *testing.T) { - ctx := context.Background() - optimizedPeers := testutil.GenerateOptimizedPeers(10, 5, quadEaseOut) - keys := testutil.GenerateCids(6) - - srs := New(ctx) - - partialRequests := srs.SplitRequest(optimizedPeers, keys) - if len(partialRequests) != 2 { - t.Fatal("Did not generate right number of partial requests") - } - for _, partialRequest := range partialRequests { - if len(partialRequest.Peers) != 5 && len(partialRequest.Keys) != 3 { - t.Fatal("Did not split request into even partial requests") - } - } -} - -func TestSplittingRequestsTooFewKeys(t *testing.T) { - ctx := context.Background() - optimizedPeers := testutil.GenerateOptimizedPeers(10, 5, quadEaseOut) - keys := testutil.GenerateCids(1) - - srs := New(ctx) - - partialRequests := srs.SplitRequest(optimizedPeers, keys) - if len(partialRequests) != 1 { - t.Fatal("Should only generate as many requests as keys") - } - for _, partialRequest := range partialRequests { - if len(partialRequest.Peers) != 5 && len(partialRequest.Keys) != 1 { - t.Fatal("Should still split peers up between keys") - } - } -} - -func TestSplittingRequestsTooFewPeers(t *testing.T) { - ctx := context.Background() - optimizedPeers := testutil.GenerateOptimizedPeers(1, 1, quadEaseOut) - keys := testutil.GenerateCids(6) - - srs := New(ctx) - - partialRequests := srs.SplitRequest(optimizedPeers, keys) - if len(partialRequests) != 1 { - t.Fatal("Should only generate as many requests as peers") - } - for _, partialRequest := range partialRequests { - if len(partialRequest.Peers) != 1 && len(partialRequest.Keys) != 6 { - t.Fatal("Should not split keys if there are not enough peers") - } - } -} - -func TestSplittingRequestsIncreasingSplitDueToDupes(t *testing.T) { - ctx := context.Background() - optimizedPeers := testutil.GenerateOptimizedPeers(maxSplit, maxSplit, quadEaseOut) - keys := testutil.GenerateCids(maxSplit) - - srs := New(ctx) - - for i := 0; i < maxSplit+minReceivedToAdjustSplit; i++ { - srs.RecordDuplicateBlock() - } - - partialRequests := srs.SplitRequest(optimizedPeers, keys) - if len(partialRequests) != maxSplit { - t.Fatal("Did not adjust split up as duplicates came in") - } -} - -func TestSplittingRequestsDecreasingSplitDueToNoDupes(t *testing.T) { - ctx := context.Background() - optimizedPeers := testutil.GenerateOptimizedPeers(maxSplit, maxSplit, quadEaseOut) - keys := testutil.GenerateCids(maxSplit) - - srs := New(ctx) - - for i := 0; i < 5+minReceivedToAdjustSplit; i++ { - srs.RecordUniqueBlock() - } - - partialRequests := srs.SplitRequest(optimizedPeers, keys) - if len(partialRequests) != 1 { - t.Fatal("Did not adjust split down as unique blocks came in") - } -} diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index 086035a0d59a8f8b9f4fef5f2be2998649a4135d..48af8a7d8b18800bb861ad208cfd953289d32e41 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -3,7 +3,6 @@ package testutil import ( "math/rand" - bssd "github.com/ipfs/go-bitswap/internal/sessiondata" bsmsg "github.com/ipfs/go-bitswap/message" "github.com/ipfs/go-bitswap/wantlist" blocks "github.com/ipfs/go-block-format" @@ -66,24 +65,6 @@ func GeneratePeers(n int) []peer.ID { return peerIds } -// GenerateOptimizedPeers creates n peer ids, -// with optimization fall off up to optCount, curveFunc to scale it -func GenerateOptimizedPeers(n int, optCount int, curveFunc func(float64) float64) []bssd.OptimizedPeer { - peers := GeneratePeers(n) - optimizedPeers := make([]bssd.OptimizedPeer, 0, n) - for i, peer := range peers { - var optimizationRating float64 - if i <= optCount { - optimizationRating = 1.0 - float64(i)/float64(optCount) - } else { - optimizationRating = 0.0 - } - optimizationRating = curveFunc(optimizationRating) - optimizedPeers = append(optimizedPeers, bssd.OptimizedPeer{Peer: peer, OptimizationRating: optimizationRating}) - } - return optimizedPeers -} - var nextSession uint64 // GenerateSessionID make a unit session identifier.