Commit a0fd23cd authored by hannahhoward's avatar hannahhoward

refactor(sessions): extract request splitting

Move the job of splitting requests to its own package
parent 7f9589bc
......@@ -9,6 +9,8 @@ import (
"sync/atomic"
"time"
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
decision "github.com/ipfs/go-bitswap/decision"
bsgetter "github.com/ipfs/go-bitswap/getter"
bsmsg "github.com/ipfs/go-bitswap/message"
......@@ -103,12 +105,15 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
}
wm := bswm.New(ctx)
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager) bssm.Session {
return bssession.New(ctx, id, wm, pm)
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) bssm.Session {
return bssession.New(ctx, id, wm, pm, srs)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager {
return bsspm.New(ctx, id, network)
}
sessionRequestSplitterFactory := func(ctx context.Context) bssession.RequestSplitter {
return bssrs.New(ctx)
}
bs := &Bitswap{
blockstore: bstore,
......@@ -121,7 +126,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pm: bspm.New(ctx, peerQueueFactory),
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory),
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
......
......@@ -2,7 +2,6 @@ package session
import (
"context"
"math/rand"
"time"
lru "github.com/hashicorp/golang-lru"
......@@ -13,14 +12,11 @@ import (
logging "github.com/ipfs/go-log"
loggables "github.com/libp2p/go-libp2p-loggables"
peer "github.com/libp2p/go-libp2p-peer"
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
)
const (
minReceivedToSplit = 2
maxSplit = 32
maxAcceptableDupes = 0.4
minDuplesToTryLessSplits = 0.2
initialSplit = 2
broadcastLiveWantsLimit = 4
targetedLiveWantsLimit = 32
)
......@@ -41,6 +37,14 @@ type PeerManager interface {
RecordPeerResponse(peer.ID, cid.Cid)
}
// RequestSplitter provides an interface for splitting
// a request for Cids up among peers.
type RequestSplitter interface {
SplitRequest([]peer.ID, []cid.Cid) []*bssrs.PartialRequest
RecordDuplicateBlock()
RecordUniqueBlock()
}
type interestReq struct {
c cid.Cid
resp chan bool
......@@ -60,6 +64,7 @@ type Session struct {
ctx context.Context
wm WantManager
pm PeerManager
srs RequestSplitter
// channels
incoming chan blkRecv
......@@ -78,9 +83,6 @@ type Session struct {
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int
receivedCount int
split int
duplicateReceivedCount int
// identifiers
notif notifications.PubSub
uuid logging.Loggable
......@@ -89,7 +91,7 @@ type Session struct {
// New creates a new bitswap session whose lifetime is bounded by the
// given context.
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Session {
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager, srs RequestSplitter) *Session {
s := &Session{
liveWants: make(map[cid.Cid]time.Time),
newReqs: make(chan []cid.Cid),
......@@ -102,7 +104,7 @@ func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Sessio
ctx: ctx,
wm: wm,
pm: pm,
split: initialSplit,
srs: srs,
incoming: make(chan blkRecv),
notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"),
......@@ -230,7 +232,7 @@ func (s *Session) run(ctx context.Context) {
select {
case blk := <-s.incoming:
if blk.counterMessage {
s.updateReceiveCounters(ctx, blk.blk)
s.updateReceiveCounters(ctx, blk)
} else {
s.handleIncomingBlock(ctx, blk)
}
......@@ -357,22 +359,13 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
}
}
func (s *Session) duplicateRatio() float64 {
return float64(s.duplicateReceivedCount) / float64(s.receivedCount)
}
func (s *Session) updateReceiveCounters(ctx context.Context, blk blocks.Block) {
if s.pastWants.Has(blk.Cid()) {
s.receivedCount++
s.duplicateReceivedCount++
if (s.receivedCount > minReceivedToSplit) && (s.duplicateRatio() > maxAcceptableDupes) && (s.split < maxSplit) {
s.split++
}
func (s *Session) updateReceiveCounters(ctx context.Context, blk blkRecv) {
ks := blk.blk.Cid()
if s.pastWants.Has(ks) {
s.srs.RecordDuplicateBlock()
} else {
if s.cidIsWanted(blk.Cid()) {
s.receivedCount++
if (s.split > 1) && (s.duplicateRatio() < minDuplesToTryLessSplits) {
s.split--
}
if s.cidIsWanted(ks) {
s.srs.RecordUniqueBlock()
}
}
}
......@@ -384,12 +377,10 @@ func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
}
peers := s.pm.GetOptimizedPeers()
if len(peers) > 0 {
splitRequests := split(ks, peers, s.split)
for i, currentKeys := range splitRequests.ks {
currentPeers := splitRequests.peers[i]
// right now we're requesting each block from every peer, but soon, maybe not
s.pm.RecordPeerRequests(currentPeers, currentKeys)
s.wm.WantBlocks(ctx, currentKeys, currentPeers, s.id)
splitRequests := s.srs.SplitRequest(peers, ks)
for _, splitRequest := range splitRequests {
s.pm.RecordPeerRequests(splitRequest.Peers, splitRequest.Keys)
s.wm.WantBlocks(ctx, splitRequest.Keys, splitRequest.Peers, s.id)
}
} else {
s.pm.RecordPeerRequests(nil, ks)
......@@ -410,39 +401,6 @@ func (s *Session) resetTick() {
}
}
type splitRec struct {
ks [][]cid.Cid
peers [][]peer.ID
}
func split(ks []cid.Cid, peers []peer.ID, split int) *splitRec {
peerSplit := split
if len(peers) < peerSplit {
peerSplit = len(peers)
}
keySplit := split
if len(ks) < keySplit {
keySplit = len(ks)
}
if keySplit > peerSplit {
keySplit = peerSplit
}
out := &splitRec{
ks: make([][]cid.Cid, keySplit),
peers: make([][]peer.ID, peerSplit),
}
for i, c := range ks {
pos := i % keySplit
out.ks[pos] = append(out.ks[pos], c)
}
peerOrder := rand.Perm(len(peers))
for i, po := range peerOrder {
pos := i % peerSplit
out.peers[pos] = append(out.peers[pos], peers[po])
}
return out
}
func (s *Session) wantBudget() int {
live := len(s.liveWants)
var budget int
......
......@@ -8,6 +8,7 @@ import (
"github.com/ipfs/go-block-format"
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
"github.com/ipfs/go-bitswap/testutil"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
......@@ -55,6 +56,16 @@ func (fpm *fakePeerManager) RecordPeerResponse(p peer.ID, c cid.Cid) {
fpm.lk.Unlock()
}
type fakeRequestSplitter struct {
}
func (frs *fakeRequestSplitter) SplitRequest(peers []peer.ID, keys []cid.Cid) []*bssrs.PartialRequest {
return []*bssrs.PartialRequest{&bssrs.PartialRequest{Peers: peers, Keys: keys}}
}
func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
func (frs *fakeRequestSplitter) RecordUniqueBlock() {}
func TestSessionGetBlocks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
......@@ -62,8 +73,9 @@ func TestSessionGetBlocks(t *testing.T) {
cancelReqs := make(chan wantReq, 1)
fwm := &fakeWantManager{wantReqs, cancelReqs}
fpm := &fakePeerManager{}
frs := &fakeRequestSplitter{}
id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm)
session := New(ctx, id, fwm, fpm, frs)
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
var cids []cid.Cid
......@@ -163,8 +175,9 @@ func TestSessionFindMorePeers(t *testing.T) {
cancelReqs := make(chan wantReq, 1)
fwm := &fakeWantManager{wantReqs, cancelReqs}
fpm := &fakePeerManager{findMorePeersRequested: make(chan struct{})}
frs := &fakeRequestSplitter{}
id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm)
session := New(ctx, id, fwm, fpm, frs)
session.SetBaseTickDelay(200 * time.Microsecond)
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
......
......@@ -23,10 +23,14 @@ type Session interface {
type sesTrk struct {
session Session
pm bssession.PeerManager
srs bssession.RequestSplitter
}
// SessionFactory generates a new session for the SessionManager to track.
type SessionFactory func(ctx context.Context, id uint64, pm bssession.PeerManager) Session
type SessionFactory func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) Session
// RequestSplitterFactory generates a new request splitter for a session.
type RequestSplitterFactory func(ctx context.Context) bssession.RequestSplitter
// PeerManagerFactory generates a new peer manager for a session.
type PeerManagerFactory func(ctx context.Context, id uint64) bssession.PeerManager
......@@ -37,6 +41,8 @@ type SessionManager struct {
ctx context.Context
sessionFactory SessionFactory
peerManagerFactory PeerManagerFactory
requestSplitterFactory RequestSplitterFactory
// Sessions
sessLk sync.Mutex
sessions []sesTrk
......@@ -47,11 +53,12 @@ type SessionManager struct {
}
// New creates a new SessionManager.
func New(ctx context.Context, sessionFactory SessionFactory, peerManagerFactory PeerManagerFactory) *SessionManager {
func New(ctx context.Context, sessionFactory SessionFactory, peerManagerFactory PeerManagerFactory, requestSplitterFactory RequestSplitterFactory) *SessionManager {
return &SessionManager{
ctx: ctx,
sessionFactory: sessionFactory,
peerManagerFactory: peerManagerFactory,
requestSplitterFactory: requestSplitterFactory,
}
}
......@@ -62,8 +69,9 @@ func (sm *SessionManager) NewSession(ctx context.Context) exchange.Fetcher {
sessionctx, cancel := context.WithCancel(ctx)
pm := sm.peerManagerFactory(sessionctx, id)
session := sm.sessionFactory(sessionctx, id, pm)
tracked := sesTrk{session, pm}
srs := sm.requestSplitterFactory(sessionctx)
session := sm.sessionFactory(sessionctx, id, pm, srs)
tracked := sesTrk{session, pm, srs}
sm.sessLk.Lock()
sm.sessions = append(sm.sessions, tracked)
sm.sessLk.Unlock()
......
......@@ -5,6 +5,8 @@ import (
"testing"
"time"
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
bssession "github.com/ipfs/go-bitswap/session"
blocks "github.com/ipfs/go-block-format"
......@@ -18,6 +20,7 @@ type fakeSession struct {
updateReceiveCounters bool
id uint64
pm *fakePeerManager
srs *fakeRequestSplitter
}
func (*fakeSession) GetBlock(context.Context, cid.Cid) (blocks.Block, error) {
......@@ -39,14 +42,24 @@ func (*fakePeerManager) GetOptimizedPeers() []peer.ID { return nil }
func (*fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
func (*fakePeerManager) RecordPeerResponse(peer.ID, cid.Cid) {}
type fakeRequestSplitter struct {
}
func (frs *fakeRequestSplitter) SplitRequest(peers []peer.ID, keys []cid.Cid) []*bssrs.PartialRequest {
return nil
}
func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
func (frs *fakeRequestSplitter) RecordUniqueBlock() {}
var nextInterestedIn bool
func sessionFactory(ctx context.Context, id uint64, pm bssession.PeerManager) Session {
func sessionFactory(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) Session {
return &fakeSession{
interested: nextInterestedIn,
receivedBlock: false,
id: id,
pm: pm.(*fakePeerManager),
srs: srs.(*fakeRequestSplitter),
}
}
......@@ -54,11 +67,15 @@ func peerManagerFactory(ctx context.Context, id uint64) bssession.PeerManager {
return &fakePeerManager{id}
}
func requestSplitterFactory(ctx context.Context) bssession.RequestSplitter {
return &fakeRequestSplitter{}
}
func TestAddingSessions(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sm := New(ctx, sessionFactory, peerManagerFactory)
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
p := peer.ID(123)
block := blocks.NewBlock([]byte("block"))
......@@ -94,7 +111,7 @@ func TestReceivingBlocksWhenNotInterested(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sm := New(ctx, sessionFactory, peerManagerFactory)
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
p := peer.ID(123)
block := blocks.NewBlock([]byte("block"))
......@@ -117,7 +134,7 @@ func TestReceivingBlocksWhenNotInterested(t *testing.T) {
func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
sm := New(ctx, sessionFactory, peerManagerFactory)
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
p := peer.ID(123)
block := blocks.NewBlock([]byte("block"))
......@@ -142,7 +159,7 @@ func TestRemovingPeersWhenSessionContextCancelled(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sm := New(ctx, sessionFactory, peerManagerFactory)
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
p := peer.ID(123)
block := blocks.NewBlock([]byte("block"))
......
package sessionrequestsplitter
import (
"context"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-peer"
)
const (
minReceivedToAdjustSplit = 2
maxSplit = 16
maxAcceptableDupes = 0.4
minDuplesToTryLessSplits = 0.2
initialSplit = 2
)
// PartialRequest is represents one slice of an over request split among peers
type PartialRequest struct {
Peers []peer.ID
Keys []cid.Cid
}
type srsMessage interface {
handle(srs *SessionRequestSplitter)
}
// SessionRequestSplitter track how many duplicate and unique blocks come in and
// uses that to determine how much to split up each set of wants among peers.
type SessionRequestSplitter struct {
ctx context.Context
messages chan srsMessage
// data, do not touch outside run loop
receivedCount int
split int
duplicateReceivedCount int
}
// New returns a new SessionRequestSplitter.
func New(ctx context.Context) *SessionRequestSplitter {
srs := &SessionRequestSplitter{
ctx: ctx,
messages: make(chan srsMessage, 10),
split: initialSplit,
}
go srs.run()
return srs
}
// SplitRequest splits a request for the given cids one or more times among the
// given peers.
func (srs *SessionRequestSplitter) SplitRequest(peers []peer.ID, ks []cid.Cid) []*PartialRequest {
resp := make(chan []*PartialRequest)
select {
case srs.messages <- &splitRequestMessage{peers, ks, resp}:
case <-srs.ctx.Done():
return nil
}
select {
case splitRequests := <-resp:
return splitRequests
case <-srs.ctx.Done():
return nil
}
}
// RecordDuplicateBlock records the fact that the session received a duplicate
// block and adjusts split factor as neccesary.
func (srs *SessionRequestSplitter) RecordDuplicateBlock() {
select {
case srs.messages <- &recordDuplicateMessage{}:
case <-srs.ctx.Done():
}
}
// RecordUniqueBlock records the fact that the session received unique block
// and adjusts the split factor as neccesary.
func (srs *SessionRequestSplitter) RecordUniqueBlock() {
select {
case srs.messages <- &recordUniqueMessage{}:
case <-srs.ctx.Done():
}
}
func (srs *SessionRequestSplitter) run() {
for {
select {
case message := <-srs.messages:
message.handle(srs)
case <-srs.ctx.Done():
return
}
}
}
func (srs *SessionRequestSplitter) duplicateRatio() float64 {
return float64(srs.duplicateReceivedCount) / float64(srs.receivedCount)
}
type splitRequestMessage struct {
peers []peer.ID
ks []cid.Cid
resp chan []*PartialRequest
}
func (s *splitRequestMessage) handle(srs *SessionRequestSplitter) {
split := srs.split
peers := s.peers
ks := s.ks
if len(peers) < split {
split = len(peers)
}
peerSplits := splitPeers(peers, split)
if len(ks) < split {
split = len(ks)
}
keySplits := splitKeys(ks, split)
splitRequests := make([]*PartialRequest, len(keySplits))
for i := range splitRequests {
splitRequests[i] = &PartialRequest{peerSplits[i], keySplits[i]}
}
s.resp <- splitRequests
}
type recordDuplicateMessage struct{}
func (r *recordDuplicateMessage) handle(srs *SessionRequestSplitter) {
srs.receivedCount++
srs.duplicateReceivedCount++
if (srs.receivedCount > minReceivedToAdjustSplit) && (srs.duplicateRatio() > maxAcceptableDupes) && (srs.split < maxSplit) {
srs.split++
}
}
type recordUniqueMessage struct{}
func (r *recordUniqueMessage) handle(srs *SessionRequestSplitter) {
srs.receivedCount++
if (srs.split > 1) && (srs.duplicateRatio() < minDuplesToTryLessSplits) {
srs.split--
}
}
func splitKeys(ks []cid.Cid, split int) [][]cid.Cid {
splits := make([][]cid.Cid, split)
for i, c := range ks {
pos := i % split
splits[pos] = append(splits[pos], c)
}
return splits
}
func splitPeers(peers []peer.ID, split int) [][]peer.ID {
splits := make([][]peer.ID, split)
for i, p := range peers {
pos := i % split
splits[pos] = append(splits[pos], p)
}
return splits
}
package sessionrequestsplitter
import (
"context"
"testing"
"github.com/ipfs/go-bitswap/testutil"
)
func TestSplittingRequests(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(10)
keys := testutil.GenerateCids(6)
srs := New(ctx)
partialRequests := srs.SplitRequest(peers, keys)
if len(partialRequests) != 2 {
t.Fatal("Did not generate right number of partial requests")
}
for _, partialRequest := range partialRequests {
if len(partialRequest.Peers) != 5 && len(partialRequest.Keys) != 3 {
t.Fatal("Did not split request into even partial requests")
}
}
}
func TestSplittingRequestsTooFewKeys(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(10)
keys := testutil.GenerateCids(1)
srs := New(ctx)
partialRequests := srs.SplitRequest(peers, keys)
if len(partialRequests) != 1 {
t.Fatal("Should only generate as many requests as keys")
}
for _, partialRequest := range partialRequests {
if len(partialRequest.Peers) != 5 && len(partialRequest.Keys) != 1 {
t.Fatal("Should still split peers up between keys")
}
}
}
func TestSplittingRequestsTooFewPeers(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(1)
keys := testutil.GenerateCids(6)
srs := New(ctx)
partialRequests := srs.SplitRequest(peers, keys)
if len(partialRequests) != 1 {
t.Fatal("Should only generate as many requests as peers")
}
for _, partialRequest := range partialRequests {
if len(partialRequest.Peers) != 1 && len(partialRequest.Keys) != 6 {
t.Fatal("Should not split keys if there are not enough peers")
}
}
}
func TestSplittingRequestsIncreasingSplitDueToDupes(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(maxSplit)
keys := testutil.GenerateCids(maxSplit)
srs := New(ctx)
for i := 0; i < maxSplit+minReceivedToAdjustSplit; i++ {
srs.RecordDuplicateBlock()
}
partialRequests := srs.SplitRequest(peers, keys)
if len(partialRequests) != maxSplit {
t.Fatal("Did not adjust split up as duplicates came in")
}
}
func TestSplittingRequestsDecreasingSplitDueToNoDupes(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(maxSplit)
keys := testutil.GenerateCids(maxSplit)
srs := New(ctx)
for i := 0; i < 5+minReceivedToAdjustSplit; i++ {
srs.RecordUniqueBlock()
}
partialRequests := srs.SplitRequest(peers, keys)
if len(partialRequests) != 1 {
t.Fatal("Did not adjust split down as unique blocks came in")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment