Unverified Commit 43c65d45 authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

Merge pull request #27 from ipfs/feat/speed-up-sessions

Speed up sessions Round #1
parents ce22eba3 b1a82dcb
......@@ -9,6 +9,8 @@ import (
"sync/atomic"
"time"
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
decision "github.com/ipfs/go-bitswap/decision"
bsgetter "github.com/ipfs/go-bitswap/getter"
bsmsg "github.com/ipfs/go-bitswap/message"
......@@ -103,12 +105,15 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
}
wm := bswm.New(ctx)
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager) bssm.Session {
return bssession.New(ctx, id, wm, pm)
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) bssm.Session {
return bssession.New(ctx, id, wm, pm, srs)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager {
return bsspm.New(ctx, id, network)
}
sessionRequestSplitterFactory := func(ctx context.Context) bssession.RequestSplitter {
return bssrs.New(ctx)
}
bs := &Bitswap{
blockstore: bstore,
......@@ -121,7 +126,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pm: bspm.New(ctx, peerQueueFactory),
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory),
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
......@@ -391,7 +396,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
defer wg.Done()
bs.updateReceiveCounters(b)
bs.sm.UpdateReceiveCounters(b)
log.Debugf("got block %s from %s", b, p)
// skip received blocks that are not in the wantlist
......
......@@ -12,9 +12,14 @@ import (
logging "github.com/ipfs/go-log"
loggables "github.com/libp2p/go-libp2p-loggables"
peer "github.com/libp2p/go-libp2p-peer"
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
)
const activeWantsLimit = 16
const (
broadcastLiveWantsLimit = 4
targetedLiveWantsLimit = 32
)
// WantManager is an interface that can be used to request blocks
// from given peers.
......@@ -32,14 +37,23 @@ type PeerManager interface {
RecordPeerResponse(peer.ID, cid.Cid)
}
// RequestSplitter provides an interface for splitting
// a request for Cids up among peers.
type RequestSplitter interface {
SplitRequest([]peer.ID, []cid.Cid) []*bssrs.PartialRequest
RecordDuplicateBlock()
RecordUniqueBlock()
}
type interestReq struct {
c cid.Cid
resp chan bool
}
type blkRecv struct {
from peer.ID
blk blocks.Block
from peer.ID
blk blocks.Block
counterMessage bool
}
// Session holds state for an individual bitswap transfer operation.
......@@ -50,6 +64,7 @@ type Session struct {
ctx context.Context
wm WantManager
pm PeerManager
srs RequestSplitter
// channels
incoming chan blkRecv
......@@ -62,12 +77,12 @@ type Session struct {
// do not touch outside run loop
tofetch *cidQueue
interest *lru.Cache
pastWants *cidQueue
liveWants map[cid.Cid]time.Time
tick *time.Timer
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int
// identifiers
notif notifications.PubSub
uuid logging.Loggable
......@@ -76,18 +91,20 @@ type Session struct {
// New creates a new bitswap session whose lifetime is bounded by the
// given context.
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Session {
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager, srs RequestSplitter) *Session {
s := &Session{
liveWants: make(map[cid.Cid]time.Time),
newReqs: make(chan []cid.Cid),
cancelKeys: make(chan []cid.Cid),
tofetch: newCidQueue(),
pastWants: newCidQueue(),
interestReqs: make(chan interestReq),
latencyReqs: make(chan chan time.Duration),
tickDelayReqs: make(chan time.Duration),
ctx: ctx,
wm: wm,
pm: pm,
srs: srs,
incoming: make(chan blkRecv),
notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"),
......@@ -106,7 +123,7 @@ func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Sessio
// ReceiveBlockFrom receives an incoming block from the given peer.
func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
select {
case s.incoming <- blkRecv{from: from, blk: blk}:
case s.incoming <- blkRecv{from: from, blk: blk, counterMessage: false}:
case <-s.ctx.Done():
}
ks := []cid.Cid{blk.Cid()}
......@@ -114,6 +131,15 @@ func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
}
// UpdateReceiveCounters updates receive counters for a block,
// which may be a duplicate and adjusts the split factor based on that.
func (s *Session) UpdateReceiveCounters(blk blocks.Block) {
select {
case s.incoming <- blkRecv{from: "", blk: blk, counterMessage: true}:
case <-s.ctx.Done():
}
}
// InterestedIn returns true if this session is interested in the given Cid.
func (s *Session) InterestedIn(c cid.Cid) bool {
if s.interest.Contains(c) {
......@@ -205,7 +231,11 @@ func (s *Session) run(ctx context.Context) {
for {
select {
case blk := <-s.incoming:
s.handleIncomingBlock(ctx, blk)
if blk.counterMessage {
s.updateReceiveCounters(ctx, blk)
} else {
s.handleIncomingBlock(ctx, blk)
}
case keys := <-s.newReqs:
s.handleNewRequest(ctx, keys)
case keys := <-s.cancelKeys:
......@@ -241,8 +271,7 @@ func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) {
for _, k := range keys {
s.interest.Add(k, nil)
}
if len(s.liveWants) < activeWantsLimit {
toadd := activeWantsLimit - len(s.liveWants)
if toadd := s.wantBudget(); toadd > 0 {
if toadd > len(keys) {
toadd = len(keys)
}
......@@ -264,6 +293,7 @@ func (s *Session) handleCancel(keys []cid.Cid) {
}
func (s *Session) handleTick(ctx context.Context) {
live := make([]cid.Cid, 0, len(s.liveWants))
now := time.Now()
for c := range s.liveWants {
......@@ -303,6 +333,7 @@ func (s *Session) cidIsWanted(c cid.Cid) bool {
func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
c := blk.Cid()
if s.cidIsWanted(c) {
s.srs.RecordUniqueBlock()
tval, ok := s.liveWants[c]
if ok {
s.latTotal += time.Since(tval)
......@@ -313,9 +344,26 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
s.fetchcnt++
s.notif.Publish(blk)
if next := s.tofetch.Pop(); next.Defined() {
s.wantBlocks(ctx, []cid.Cid{next})
toAdd := s.wantBudget()
if toAdd > s.tofetch.Len() {
toAdd = s.tofetch.Len()
}
if toAdd > 0 {
var keys []cid.Cid
for i := 0; i < toAdd; i++ {
keys = append(keys, s.tofetch.Pop())
}
s.wantBlocks(ctx, keys)
}
s.pastWants.Push(c)
}
}
func (s *Session) updateReceiveCounters(ctx context.Context, blk blkRecv) {
ks := blk.blk.Cid()
if s.pastWants.Has(ks) {
s.srs.RecordDuplicateBlock()
}
}
......@@ -325,9 +373,16 @@ func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
s.liveWants[c] = now
}
peers := s.pm.GetOptimizedPeers()
// right now we're requesting each block from every peer, but soon, maybe not
s.pm.RecordPeerRequests(peers, ks)
s.wm.WantBlocks(ctx, ks, peers, s.id)
if len(peers) > 0 {
splitRequests := s.srs.SplitRequest(peers, ks)
for _, splitRequest := range splitRequests {
s.pm.RecordPeerRequests(splitRequest.Peers, splitRequest.Keys)
s.wm.WantBlocks(ctx, splitRequest.Keys, splitRequest.Peers, s.id)
}
} else {
s.pm.RecordPeerRequests(nil, ks)
s.wm.WantBlocks(ctx, ks, nil, s.id)
}
}
func (s *Session) averageLatency() time.Duration {
......@@ -342,3 +397,17 @@ func (s *Session) resetTick() {
s.tick.Reset(s.baseTickDelay + (3 * avLat))
}
}
func (s *Session) wantBudget() int {
live := len(s.liveWants)
var budget int
if len(s.pm.GetOptimizedPeers()) > 0 {
budget = targetedLiveWantsLimit - live
} else {
budget = broadcastLiveWantsLimit - live
}
if budget < 0 {
budget = 0
}
return budget
}
......@@ -8,6 +8,7 @@ import (
"github.com/ipfs/go-block-format"
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
"github.com/ipfs/go-bitswap/testutil"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
......@@ -55,6 +56,16 @@ func (fpm *fakePeerManager) RecordPeerResponse(p peer.ID, c cid.Cid) {
fpm.lk.Unlock()
}
type fakeRequestSplitter struct {
}
func (frs *fakeRequestSplitter) SplitRequest(peers []peer.ID, keys []cid.Cid) []*bssrs.PartialRequest {
return []*bssrs.PartialRequest{&bssrs.PartialRequest{Peers: peers, Keys: keys}}
}
func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
func (frs *fakeRequestSplitter) RecordUniqueBlock() {}
func TestSessionGetBlocks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
......@@ -62,10 +73,11 @@ func TestSessionGetBlocks(t *testing.T) {
cancelReqs := make(chan wantReq, 1)
fwm := &fakeWantManager{wantReqs, cancelReqs}
fpm := &fakePeerManager{}
frs := &fakeRequestSplitter{}
id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm)
session := New(ctx, id, fwm, fpm, frs)
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(activeWantsLimit * 2)
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
var cids []cid.Cid
for _, block := range blks {
cids = append(cids, block.Cid())
......@@ -79,7 +91,7 @@ func TestSessionGetBlocks(t *testing.T) {
// check initial want request
receivedWantReq := <-fwm.wantReqs
if len(receivedWantReq.cids) != activeWantsLimit {
if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
t.Fatal("did not enqueue correct initial number of wants")
}
if receivedWantReq.peers != nil {
......@@ -87,7 +99,7 @@ func TestSessionGetBlocks(t *testing.T) {
}
// now receive the first set of blocks
peers := testutil.GeneratePeers(activeWantsLimit)
peers := testutil.GeneratePeers(broadcastLiveWantsLimit)
var newCancelReqs []wantReq
var newBlockReqs []wantReq
var receivedBlocks []blocks.Block
......@@ -97,13 +109,16 @@ func TestSessionGetBlocks(t *testing.T) {
receivedBlocks = append(receivedBlocks, receivedBlock)
cancelBlock := <-cancelReqs
newCancelReqs = append(newCancelReqs, cancelBlock)
wantBlock := <-wantReqs
newBlockReqs = append(newBlockReqs, wantBlock)
select {
case wantBlock := <-wantReqs:
newBlockReqs = append(newBlockReqs, wantBlock)
default:
}
}
// verify new peers were recorded
fpm.lk.Lock()
if len(fpm.peers) != activeWantsLimit {
if len(fpm.peers) != broadcastLiveWantsLimit {
t.Fatal("received blocks not recorded by the peer manager")
}
for _, p := range fpm.peers {
......@@ -116,26 +131,26 @@ func TestSessionGetBlocks(t *testing.T) {
// look at new interactions with want manager
// should have cancelled each received block
if len(newCancelReqs) != activeWantsLimit {
if len(newCancelReqs) != broadcastLiveWantsLimit {
t.Fatal("did not cancel each block once it was received")
}
// new session reqs should be targeted
totalEnqueued := 0
var newCidsRequested []cid.Cid
for _, w := range newBlockReqs {
if len(w.peers) == 0 {
t.Fatal("should not have broadcast again after initial broadcast")
}
totalEnqueued += len(w.cids)
newCidsRequested = append(newCidsRequested, w.cids...)
}
// full new round of cids should be requested
if totalEnqueued != activeWantsLimit {
if len(newCidsRequested) != broadcastLiveWantsLimit {
t.Fatal("new blocks were not requested")
}
// receive remaining blocks
for i, p := range peers {
session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, newBlockReqs[i].cids[0])])
session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, newCidsRequested[i])])
receivedBlock := <-getBlocksCh
receivedBlocks = append(receivedBlocks, receivedBlock)
cancelBlock := <-cancelReqs
......@@ -159,12 +174,13 @@ func TestSessionFindMorePeers(t *testing.T) {
wantReqs := make(chan wantReq, 1)
cancelReqs := make(chan wantReq, 1)
fwm := &fakeWantManager{wantReqs, cancelReqs}
fpm := &fakePeerManager{findMorePeersRequested: make(chan struct{})}
fpm := &fakePeerManager{findMorePeersRequested: make(chan struct{}, 1)}
frs := &fakeRequestSplitter{}
id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm)
session := New(ctx, id, fwm, fpm, frs)
session.SetBaseTickDelay(200 * time.Microsecond)
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(activeWantsLimit * 2)
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
var cids []cid.Cid
for _, block := range blks {
cids = append(cids, block.Cid())
......@@ -190,7 +206,7 @@ func TestSessionFindMorePeers(t *testing.T) {
// verify a broadcast was made
receivedWantReq := <-wantReqs
if len(receivedWantReq.cids) != activeWantsLimit {
if len(receivedWantReq.cids) < broadcastLiveWantsLimit {
t.Fatal("did not rebroadcast whole live list")
}
if receivedWantReq.peers != nil {
......
......@@ -17,15 +17,20 @@ type Session interface {
exchange.Fetcher
InterestedIn(cid.Cid) bool
ReceiveBlockFrom(peer.ID, blocks.Block)
UpdateReceiveCounters(blocks.Block)
}
type sesTrk struct {
session Session
pm bssession.PeerManager
srs bssession.RequestSplitter
}
// SessionFactory generates a new session for the SessionManager to track.
type SessionFactory func(ctx context.Context, id uint64, pm bssession.PeerManager) Session
type SessionFactory func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) Session
// RequestSplitterFactory generates a new request splitter for a session.
type RequestSplitterFactory func(ctx context.Context) bssession.RequestSplitter
// PeerManagerFactory generates a new peer manager for a session.
type PeerManagerFactory func(ctx context.Context, id uint64) bssession.PeerManager
......@@ -33,9 +38,11 @@ type PeerManagerFactory func(ctx context.Context, id uint64) bssession.PeerManag
// SessionManager is responsible for creating, managing, and dispatching to
// sessions.
type SessionManager struct {
ctx context.Context
sessionFactory SessionFactory
peerManagerFactory PeerManagerFactory
ctx context.Context
sessionFactory SessionFactory
peerManagerFactory PeerManagerFactory
requestSplitterFactory RequestSplitterFactory
// Sessions
sessLk sync.Mutex
sessions []sesTrk
......@@ -46,11 +53,12 @@ type SessionManager struct {
}
// New creates a new SessionManager.
func New(ctx context.Context, sessionFactory SessionFactory, peerManagerFactory PeerManagerFactory) *SessionManager {
func New(ctx context.Context, sessionFactory SessionFactory, peerManagerFactory PeerManagerFactory, requestSplitterFactory RequestSplitterFactory) *SessionManager {
return &SessionManager{
ctx: ctx,
sessionFactory: sessionFactory,
peerManagerFactory: peerManagerFactory,
ctx: ctx,
sessionFactory: sessionFactory,
peerManagerFactory: peerManagerFactory,
requestSplitterFactory: requestSplitterFactory,
}
}
......@@ -61,8 +69,9 @@ func (sm *SessionManager) NewSession(ctx context.Context) exchange.Fetcher {
sessionctx, cancel := context.WithCancel(ctx)
pm := sm.peerManagerFactory(sessionctx, id)
session := sm.sessionFactory(sessionctx, id, pm)
tracked := sesTrk{session, pm}
srs := sm.requestSplitterFactory(sessionctx)
session := sm.sessionFactory(sessionctx, id, pm, srs)
tracked := sesTrk{session, pm, srs}
sm.sessLk.Lock()
sm.sessions = append(sm.sessions, tracked)
sm.sessLk.Unlock()
......@@ -112,3 +121,14 @@ func (sm *SessionManager) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
}
}
}
// UpdateReceiveCounters records the fact that a block was received, allowing
// sessions to track duplicates
func (sm *SessionManager) UpdateReceiveCounters(blk blocks.Block) {
sm.sessLk.Lock()
defer sm.sessLk.Unlock()
for _, s := range sm.sessions {
s.session.UpdateReceiveCounters(blk)
}
}
......@@ -5,6 +5,8 @@ import (
"testing"
"time"
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
bssession "github.com/ipfs/go-bitswap/session"
blocks "github.com/ipfs/go-block-format"
......@@ -13,10 +15,12 @@ import (
)
type fakeSession struct {
interested bool
receivedBlock bool
id uint64
pm *fakePeerManager
interested bool
receivedBlock bool
updateReceiveCounters bool
id uint64
pm *fakePeerManager
srs *fakeRequestSplitter
}
func (*fakeSession) GetBlock(context.Context, cid.Cid) (blocks.Block, error) {
......@@ -27,6 +31,7 @@ func (*fakeSession) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block,
}
func (fs *fakeSession) InterestedIn(cid.Cid) bool { return fs.interested }
func (fs *fakeSession) ReceiveBlockFrom(peer.ID, blocks.Block) { fs.receivedBlock = true }
func (fs *fakeSession) UpdateReceiveCounters(blocks.Block) { fs.updateReceiveCounters = true }
type fakePeerManager struct {
id uint64
......@@ -37,14 +42,24 @@ func (*fakePeerManager) GetOptimizedPeers() []peer.ID { return nil }
func (*fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
func (*fakePeerManager) RecordPeerResponse(peer.ID, cid.Cid) {}
type fakeRequestSplitter struct {
}
func (frs *fakeRequestSplitter) SplitRequest(peers []peer.ID, keys []cid.Cid) []*bssrs.PartialRequest {
return nil
}
func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
func (frs *fakeRequestSplitter) RecordUniqueBlock() {}
var nextInterestedIn bool
func sessionFactory(ctx context.Context, id uint64, pm bssession.PeerManager) Session {
func sessionFactory(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) Session {
return &fakeSession{
interested: nextInterestedIn,
receivedBlock: false,
id: id,
pm: pm.(*fakePeerManager),
srs: srs.(*fakeRequestSplitter),
}
}
......@@ -52,11 +67,15 @@ func peerManagerFactory(ctx context.Context, id uint64) bssession.PeerManager {
return &fakePeerManager{id}
}
func requestSplitterFactory(ctx context.Context) bssession.RequestSplitter {
return &fakeRequestSplitter{}
}
func TestAddingSessions(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sm := New(ctx, sessionFactory, peerManagerFactory)
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
p := peer.ID(123)
block := blocks.NewBlock([]byte("block"))
......@@ -92,7 +111,7 @@ func TestReceivingBlocksWhenNotInterested(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sm := New(ctx, sessionFactory, peerManagerFactory)
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
p := peer.ID(123)
block := blocks.NewBlock([]byte("block"))
......@@ -115,7 +134,7 @@ func TestReceivingBlocksWhenNotInterested(t *testing.T) {
func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
sm := New(ctx, sessionFactory, peerManagerFactory)
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
p := peer.ID(123)
block := blocks.NewBlock([]byte("block"))
......@@ -140,7 +159,7 @@ func TestRemovingPeersWhenSessionContextCancelled(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sm := New(ctx, sessionFactory, peerManagerFactory)
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory)
p := peer.ID(123)
block := blocks.NewBlock([]byte("block"))
......
......@@ -3,18 +3,28 @@ package sessionpeermanager
import (
"context"
"fmt"
"math/rand"
cid "github.com/ipfs/go-cid"
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
peer "github.com/libp2p/go-libp2p-peer"
)
const (
maxOptimizedPeers = 32
reservePeers = 2
)
// PeerNetwork is an interface for finding providers and managing connections
type PeerNetwork interface {
ConnectionManager() ifconnmgr.ConnManager
FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID
}
type peerMessage interface {
handle(spm *SessionPeerManager)
}
// SessionPeerManager tracks and manages peers for a session, and provides
// the best ones to the session
type SessionPeerManager struct {
......@@ -22,22 +32,21 @@ type SessionPeerManager struct {
network PeerNetwork
tag string
newPeers chan peer.ID
peerReqs chan chan []peer.ID
peerMessages chan peerMessage
// do not touch outside of run loop
activePeers map[peer.ID]struct{}
activePeersArr []peer.ID
activePeers map[peer.ID]bool
unoptimizedPeersArr []peer.ID
optimizedPeersArr []peer.ID
}
// New creates a new SessionPeerManager
func New(ctx context.Context, id uint64, network PeerNetwork) *SessionPeerManager {
spm := &SessionPeerManager{
ctx: ctx,
network: network,
newPeers: make(chan peer.ID, 16),
peerReqs: make(chan chan []peer.ID),
activePeers: make(map[peer.ID]struct{}),
ctx: ctx,
network: network,
peerMessages: make(chan peerMessage, 16),
activePeers: make(map[peer.ID]bool),
}
spm.tag = fmt.Sprint("bs-ses-", id)
......@@ -53,7 +62,7 @@ func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, k cid.Cid) {
// at the moment, we're just adding peers here
// in the future, we'll actually use this to record metrics
select {
case spm.newPeers <- p:
case spm.peerMessages <- &peerResponseMessage{p}:
case <-spm.ctx.Done():
}
}
......@@ -70,7 +79,7 @@ func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID {
// ordered by optimization, or only a subset
resp := make(chan []peer.ID)
select {
case spm.peerReqs <- resp:
case spm.peerMessages <- &peerReqMessage{resp}:
case <-spm.ctx.Done():
return nil
}
......@@ -93,7 +102,7 @@ func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
// - ensure two 'findprovs' calls for the same block don't run concurrently
// - share peers between sessions based on interest set
for p := range spm.network.FindProvidersAsync(ctx, k, 10) {
spm.newPeers <- p
spm.peerMessages <- &peerFoundMessage{p}
}
}(c)
}
......@@ -101,29 +110,105 @@ func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
func (spm *SessionPeerManager) run(ctx context.Context) {
for {
select {
case p := <-spm.newPeers:
spm.addActivePeer(p)
case resp := <-spm.peerReqs:
resp <- spm.activePeersArr
case pm := <-spm.peerMessages:
pm.handle(spm)
case <-ctx.Done():
spm.handleShutdown()
return
}
}
}
func (spm *SessionPeerManager) addActivePeer(p peer.ID) {
func (spm *SessionPeerManager) tagPeer(p peer.ID) {
cmgr := spm.network.ConnectionManager()
cmgr.TagPeer(p, spm.tag, 10)
}
func (spm *SessionPeerManager) insertOptimizedPeer(p peer.ID) {
if len(spm.optimizedPeersArr) >= (maxOptimizedPeers - reservePeers) {
tailPeer := spm.optimizedPeersArr[len(spm.optimizedPeersArr)-1]
spm.optimizedPeersArr = spm.optimizedPeersArr[:len(spm.optimizedPeersArr)-1]
spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, tailPeer)
}
spm.optimizedPeersArr = append([]peer.ID{p}, spm.optimizedPeersArr...)
}
func (spm *SessionPeerManager) removeOptimizedPeer(p peer.ID) {
for i := 0; i < len(spm.optimizedPeersArr); i++ {
if spm.optimizedPeersArr[i] == p {
spm.optimizedPeersArr = append(spm.optimizedPeersArr[:i], spm.optimizedPeersArr[i+1:]...)
return
}
}
}
func (spm *SessionPeerManager) removeUnoptimizedPeer(p peer.ID) {
for i := 0; i < len(spm.unoptimizedPeersArr); i++ {
if spm.unoptimizedPeersArr[i] == p {
spm.unoptimizedPeersArr[i] = spm.unoptimizedPeersArr[len(spm.unoptimizedPeersArr)-1]
spm.unoptimizedPeersArr = spm.unoptimizedPeersArr[:len(spm.unoptimizedPeersArr)-1]
return
}
}
}
type peerFoundMessage struct {
p peer.ID
}
func (pfm *peerFoundMessage) handle(spm *SessionPeerManager) {
p := pfm.p
if _, ok := spm.activePeers[p]; !ok {
spm.activePeers[p] = struct{}{}
spm.activePeersArr = append(spm.activePeersArr, p)
spm.activePeers[p] = false
spm.unoptimizedPeersArr = append(spm.unoptimizedPeersArr, p)
spm.tagPeer(p)
}
}
type peerResponseMessage struct {
p peer.ID
}
func (prm *peerResponseMessage) handle(spm *SessionPeerManager) {
p := prm.p
isOptimized, ok := spm.activePeers[p]
if !ok {
spm.activePeers[p] = true
spm.tagPeer(p)
} else {
if isOptimized {
spm.removeOptimizedPeer(p)
} else {
spm.activePeers[p] = true
spm.removeUnoptimizedPeer(p)
}
}
spm.insertOptimizedPeer(p)
}
type peerReqMessage struct {
resp chan<- []peer.ID
}
func (prm *peerReqMessage) handle(spm *SessionPeerManager) {
randomOrder := rand.Perm(len(spm.unoptimizedPeersArr))
maxPeers := len(spm.unoptimizedPeersArr) + len(spm.optimizedPeersArr)
if maxPeers > maxOptimizedPeers {
maxPeers = maxOptimizedPeers
}
cmgr := spm.network.ConnectionManager()
cmgr.TagPeer(p, spm.tag, 10)
extraPeers := make([]peer.ID, maxPeers-len(spm.optimizedPeersArr))
for i := range extraPeers {
extraPeers[i] = spm.unoptimizedPeersArr[randomOrder[i]]
}
prm.resp <- append(spm.optimizedPeersArr, extraPeers...)
}
func (spm *SessionPeerManager) handleShutdown() {
cmgr := spm.network.ConnectionManager()
for _, p := range spm.activePeersArr {
for p := range spm.activePeers {
cmgr.UntagPeer(p, spm.tag)
}
}
......@@ -3,6 +3,7 @@ package sessionpeermanager
import (
"context"
"sync"
"math/rand"
"testing"
"time"
......@@ -120,6 +121,69 @@ func TestRecordingReceivedBlocks(t *testing.T) {
}
}
func TestOrderingPeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
peers := testutil.GeneratePeers(100)
fcm := &fakeConnManager{}
fpn := &fakePeerNetwork{peers, fcm}
c := testutil.GenerateCids(1)
id := testutil.GenerateSessionID()
sessionPeerManager := New(ctx, id, fpn)
// add all peers to session
sessionPeerManager.FindMorePeers(ctx, c[0])
// record broadcast
sessionPeerManager.RecordPeerRequests(nil, c)
// record receives
peer1 := peers[rand.Intn(100)]
peer2 := peers[rand.Intn(100)]
peer3 := peers[rand.Intn(100)]
time.Sleep(1 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer1, c[0])
time.Sleep(1 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer2, c[0])
time.Sleep(1 * time.Millisecond)
sessionPeerManager.RecordPeerResponse(peer3, c[0])
sessionPeers := sessionPeerManager.GetOptimizedPeers()
if len(sessionPeers) != maxOptimizedPeers {
t.Fatal("Should not return more than the max of optimized peers")
}
// should prioritize peers which have received blocks
if (sessionPeers[0] != peer3) || (sessionPeers[1] != peer2) || (sessionPeers[2] != peer1) {
t.Fatal("Did not prioritize peers that received blocks")
}
// Receive a second time from same node
sessionPeerManager.RecordPeerResponse(peer3, c[0])
// call again
nextSessionPeers := sessionPeerManager.GetOptimizedPeers()
if len(nextSessionPeers) != maxOptimizedPeers {
t.Fatal("Should not return more than the max of optimized peers")
}
// should not duplicate
if (nextSessionPeers[0] != peer3) || (nextSessionPeers[1] != peer2) || (nextSessionPeers[2] != peer1) {
t.Fatal("Did dedup peers which received multiple blocks")
}
// should randomize other peers
totalSame := 0
for i := 3; i < maxOptimizedPeers; i++ {
if sessionPeers[i] == nextSessionPeers[i] {
totalSame++
}
}
if totalSame >= maxOptimizedPeers-3 {
t.Fatal("should not return the same random peers each time")
}
}
func TestUntaggingPeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
......
package sessionrequestsplitter
import (
"context"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-peer"
)
const (
minReceivedToAdjustSplit = 2
maxSplit = 16
maxAcceptableDupes = 0.4
minDuplesToTryLessSplits = 0.2
initialSplit = 2
)
// PartialRequest is represents one slice of an over request split among peers
type PartialRequest struct {
Peers []peer.ID
Keys []cid.Cid
}
type srsMessage interface {
handle(srs *SessionRequestSplitter)
}
// SessionRequestSplitter track how many duplicate and unique blocks come in and
// uses that to determine how much to split up each set of wants among peers.
type SessionRequestSplitter struct {
ctx context.Context
messages chan srsMessage
// data, do not touch outside run loop
receivedCount int
split int
duplicateReceivedCount int
}
// New returns a new SessionRequestSplitter.
func New(ctx context.Context) *SessionRequestSplitter {
srs := &SessionRequestSplitter{
ctx: ctx,
messages: make(chan srsMessage, 10),
split: initialSplit,
}
go srs.run()
return srs
}
// SplitRequest splits a request for the given cids one or more times among the
// given peers.
func (srs *SessionRequestSplitter) SplitRequest(peers []peer.ID, ks []cid.Cid) []*PartialRequest {
resp := make(chan []*PartialRequest)
select {
case srs.messages <- &splitRequestMessage{peers, ks, resp}:
case <-srs.ctx.Done():
return nil
}
select {
case splitRequests := <-resp:
return splitRequests
case <-srs.ctx.Done():
return nil
}
}
// RecordDuplicateBlock records the fact that the session received a duplicate
// block and adjusts split factor as neccesary.
func (srs *SessionRequestSplitter) RecordDuplicateBlock() {
select {
case srs.messages <- &recordDuplicateMessage{}:
case <-srs.ctx.Done():
}
}
// RecordUniqueBlock records the fact that the session received unique block
// and adjusts the split factor as neccesary.
func (srs *SessionRequestSplitter) RecordUniqueBlock() {
select {
case srs.messages <- &recordUniqueMessage{}:
case <-srs.ctx.Done():
}
}
func (srs *SessionRequestSplitter) run() {
for {
select {
case message := <-srs.messages:
message.handle(srs)
case <-srs.ctx.Done():
return
}
}
}
func (srs *SessionRequestSplitter) duplicateRatio() float64 {
return float64(srs.duplicateReceivedCount) / float64(srs.receivedCount)
}
type splitRequestMessage struct {
peers []peer.ID
ks []cid.Cid
resp chan []*PartialRequest
}
func (s *splitRequestMessage) handle(srs *SessionRequestSplitter) {
split := srs.split
peers := s.peers
ks := s.ks
if len(peers) < split {
split = len(peers)
}
peerSplits := splitPeers(peers, split)
if len(ks) < split {
split = len(ks)
}
keySplits := splitKeys(ks, split)
splitRequests := make([]*PartialRequest, len(keySplits))
for i := range splitRequests {
splitRequests[i] = &PartialRequest{peerSplits[i], keySplits[i]}
}
s.resp <- splitRequests
}
type recordDuplicateMessage struct{}
func (r *recordDuplicateMessage) handle(srs *SessionRequestSplitter) {
srs.receivedCount++
srs.duplicateReceivedCount++
if (srs.receivedCount > minReceivedToAdjustSplit) && (srs.duplicateRatio() > maxAcceptableDupes) && (srs.split < maxSplit) {
srs.split++
}
}
type recordUniqueMessage struct{}
func (r *recordUniqueMessage) handle(srs *SessionRequestSplitter) {
srs.receivedCount++
if (srs.split > 1) && (srs.duplicateRatio() < minDuplesToTryLessSplits) {
srs.split--
}
}
func splitKeys(ks []cid.Cid, split int) [][]cid.Cid {
splits := make([][]cid.Cid, split)
for i, c := range ks {
pos := i % split
splits[pos] = append(splits[pos], c)
}
return splits
}
func splitPeers(peers []peer.ID, split int) [][]peer.ID {
splits := make([][]peer.ID, split)
for i, p := range peers {
pos := i % split
splits[pos] = append(splits[pos], p)
}
return splits
}
package sessionrequestsplitter
import (
"context"
"testing"
"github.com/ipfs/go-bitswap/testutil"
)
func TestSplittingRequests(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(10)
keys := testutil.GenerateCids(6)
srs := New(ctx)
partialRequests := srs.SplitRequest(peers, keys)
if len(partialRequests) != 2 {
t.Fatal("Did not generate right number of partial requests")
}
for _, partialRequest := range partialRequests {
if len(partialRequest.Peers) != 5 && len(partialRequest.Keys) != 3 {
t.Fatal("Did not split request into even partial requests")
}
}
}
func TestSplittingRequestsTooFewKeys(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(10)
keys := testutil.GenerateCids(1)
srs := New(ctx)
partialRequests := srs.SplitRequest(peers, keys)
if len(partialRequests) != 1 {
t.Fatal("Should only generate as many requests as keys")
}
for _, partialRequest := range partialRequests {
if len(partialRequest.Peers) != 5 && len(partialRequest.Keys) != 1 {
t.Fatal("Should still split peers up between keys")
}
}
}
func TestSplittingRequestsTooFewPeers(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(1)
keys := testutil.GenerateCids(6)
srs := New(ctx)
partialRequests := srs.SplitRequest(peers, keys)
if len(partialRequests) != 1 {
t.Fatal("Should only generate as many requests as peers")
}
for _, partialRequest := range partialRequests {
if len(partialRequest.Peers) != 1 && len(partialRequest.Keys) != 6 {
t.Fatal("Should not split keys if there are not enough peers")
}
}
}
func TestSplittingRequestsIncreasingSplitDueToDupes(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(maxSplit)
keys := testutil.GenerateCids(maxSplit)
srs := New(ctx)
for i := 0; i < maxSplit+minReceivedToAdjustSplit; i++ {
srs.RecordDuplicateBlock()
}
partialRequests := srs.SplitRequest(peers, keys)
if len(partialRequests) != maxSplit {
t.Fatal("Did not adjust split up as duplicates came in")
}
}
func TestSplittingRequestsDecreasingSplitDueToNoDupes(t *testing.T) {
ctx := context.Background()
peers := testutil.GeneratePeers(maxSplit)
keys := testutil.GenerateCids(maxSplit)
srs := New(ctx)
for i := 0; i < 5+minReceivedToAdjustSplit; i++ {
srs.RecordUniqueBlock()
}
partialRequests := srs.SplitRequest(peers, keys)
if len(partialRequests) != 1 {
t.Fatal("Did not adjust split down as unique blocks came in")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment