Commit 9e891268 authored by hannahhoward's avatar hannahhoward

refactor(sessions): extract peer management

extract the job of finding and managing peers for a session from the job of requesting blocks
parent 1e9b2c41
......@@ -2,12 +2,10 @@ package session
import (
"context"
"fmt"
"time"
lru "github.com/hashicorp/golang-lru"
bsgetter "github.com/ipfs/go-bitswap/getter"
bsnet "github.com/ipfs/go-bitswap/network"
notifications "github.com/ipfs/go-bitswap/notifications"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
......@@ -18,13 +16,20 @@ import (
const activeWantsLimit = 16
// SessionWantManager is an interface that can be used to request blocks
// Wantmanager is an interface that can be used to request blocks
// from given peers.
type SessionWantManager interface {
type WantManager interface {
WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
}
type PeerManager interface {
FindMorePeers(context.Context, cid.Cid)
GetOptimizedPeers() []peer.ID
RecordPeerRequests([]peer.ID, []cid.Cid)
RecordPeerResponse(peer.ID, cid.Cid)
}
type interestReq struct {
c cid.Cid
resp chan bool
......@@ -40,9 +45,9 @@ type blkRecv struct {
// info to, and who to request blocks from.
type Session struct {
// dependencies
ctx context.Context
wm SessionWantManager
network bsnet.BitSwapNetwork
ctx context.Context
wm WantManager
pm PeerManager
// channels
incoming chan blkRecv
......@@ -53,28 +58,24 @@ type Session struct {
tickDelayReqs chan time.Duration
// do not touch outside run loop
tofetch *cidQueue
activePeers map[peer.ID]struct{}
activePeersArr []peer.ID
interest *lru.Cache
liveWants map[cid.Cid]time.Time
tick *time.Timer
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int
tofetch *cidQueue
interest *lru.Cache
liveWants map[cid.Cid]time.Time
tick *time.Timer
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int
// identifiers
notif notifications.PubSub
uuid logging.Loggable
id uint64
tag string
}
// New creates a new bitswap session whose lifetime is bounded by the
// given context.
func New(ctx context.Context, id uint64, wm SessionWantManager, network bsnet.BitSwapNetwork) *Session {
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Session {
s := &Session{
activePeers: make(map[peer.ID]struct{}),
liveWants: make(map[cid.Cid]time.Time),
newReqs: make(chan []cid.Cid),
cancelKeys: make(chan []cid.Cid),
......@@ -84,7 +85,7 @@ func New(ctx context.Context, id uint64, wm SessionWantManager, network bsnet.Bi
tickDelayReqs: make(chan time.Duration),
ctx: ctx,
wm: wm,
network: network,
pm: pm,
incoming: make(chan blkRecv),
notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"),
......@@ -92,8 +93,6 @@ func New(ctx context.Context, id uint64, wm SessionWantManager, network bsnet.Bi
id: id,
}
s.tag = fmt.Sprint("bs-ses-", s.id)
cache, _ := lru.New(2048)
s.interest = cache
......@@ -203,7 +202,6 @@ const provSearchDelay = time.Second * 10
// of this loop
func (s *Session) run(ctx context.Context) {
s.tick = time.NewTimer(provSearchDelay)
newpeers := make(chan peer.ID, 16)
for {
select {
case blk := <-s.incoming:
......@@ -213,9 +211,7 @@ func (s *Session) run(ctx context.Context) {
case keys := <-s.cancelKeys:
s.handleCancel(keys)
case <-s.tick.C:
s.handleTick(ctx, newpeers)
case p := <-newpeers:
s.addActivePeer(p)
s.handleTick(ctx)
case lwchk := <-s.interestReqs:
lwchk.resp <- s.cidIsWanted(lwchk.c)
case resp := <-s.latencyReqs:
......@@ -233,7 +229,7 @@ func (s *Session) handleIncomingBlock(ctx context.Context, blk blkRecv) {
s.tick.Stop()
if blk.from != "" {
s.addActivePeer(blk.from)
s.pm.RecordPeerResponse(blk.from, blk.blk.Cid())
}
s.receiveBlock(ctx, blk.blk)
......@@ -267,7 +263,7 @@ func (s *Session) handleCancel(keys []cid.Cid) {
}
}
func (s *Session) handleTick(ctx context.Context, newpeers chan<- peer.ID) {
func (s *Session) handleTick(ctx context.Context) {
live := make([]cid.Cid, 0, len(s.liveWants))
now := time.Now()
for c := range s.liveWants {
......@@ -276,33 +272,15 @@ func (s *Session) handleTick(ctx context.Context, newpeers chan<- peer.ID) {
}
// Broadcast these keys to everyone we're connected to
s.pm.RecordPeerRequests(nil, live)
s.wm.WantBlocks(ctx, live, nil, s.id)
if len(live) > 0 {
go func(k cid.Cid) {
// TODO: have a task queue setup for this to:
// - rate limit
// - manage timeouts
// - ensure two 'findprovs' calls for the same block don't run concurrently
// - share peers between sessions based on interest set
for p := range s.network.FindProvidersAsync(ctx, k, 10) {
newpeers <- p
}
}(live[0])
s.pm.FindMorePeers(ctx, live[0])
}
s.resetTick()
}
func (s *Session) addActivePeer(p peer.ID) {
if _, ok := s.activePeers[p]; !ok {
s.activePeers[p] = struct{}{}
s.activePeersArr = append(s.activePeersArr, p)
cmgr := s.network.ConnectionManager()
cmgr.TagPeer(p, s.tag, 10)
}
}
func (s *Session) handleShutdown() {
s.tick.Stop()
s.notif.Shutdown()
......@@ -312,10 +290,6 @@ func (s *Session) handleShutdown() {
live = append(live, c)
}
s.wm.CancelWants(s.ctx, live, nil, s.id)
cmgr := s.network.ConnectionManager()
for _, p := range s.activePeersArr {
cmgr.UntagPeer(p, s.tag)
}
}
func (s *Session) cidIsWanted(c cid.Cid) bool {
......@@ -350,7 +324,10 @@ func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
for _, c := range ks {
s.liveWants[c] = now
}
s.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id)
peers := s.pm.GetOptimizedPeers()
// right now we're requesting each block from every peer, but soon, maybe not
s.pm.RecordPeerRequests(peers, ks)
s.wm.WantBlocks(ctx, ks, peers, s.id)
}
func (s *Session) averageLatency() time.Duration {
......
......@@ -7,22 +7,26 @@ import (
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
bsnet "github.com/ipfs/go-bitswap/network"
bssession "github.com/ipfs/go-bitswap/session"
bswm "github.com/ipfs/go-bitswap/wantmanager"
bsspm "github.com/ipfs/go-bitswap/sessionpeermanager"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
peer "github.com/libp2p/go-libp2p-peer"
)
type sesTrk struct {
session *bssession.Session
pm *bsspm.SessionPeerManager
}
// SessionManager is responsible for creating, managing, and dispatching to
// sessions.
type SessionManager struct {
wm *bswm.WantManager
network bsnet.BitSwapNetwork
wm bssession.WantManager
network bsspm.PeerNetwork
ctx context.Context
// Sessions
sessLk sync.Mutex
sessions []*bssession.Session
sessions []sesTrk
// Session Index
sessIDLk sync.Mutex
......@@ -30,7 +34,7 @@ type SessionManager struct {
}
// New creates a new SessionManager.
func New(ctx context.Context, wm *bswm.WantManager, network bsnet.BitSwapNetwork) *SessionManager {
func New(ctx context.Context, wm bssession.WantManager, network bsspm.PeerNetwork) *SessionManager {
return &SessionManager{
ctx: ctx,
wm: wm,
......@@ -44,24 +48,26 @@ func (sm *SessionManager) NewSession(ctx context.Context) exchange.Fetcher {
id := sm.GetNextSessionID()
sessionctx, cancel := context.WithCancel(ctx)
session := bssession.New(sessionctx, id, sm.wm, sm.network)
pm := bsspm.New(sessionctx, id, sm.network)
session := bssession.New(sessionctx, id, sm.wm, pm)
tracked := sesTrk{session, pm}
sm.sessLk.Lock()
sm.sessions = append(sm.sessions, session)
sm.sessions = append(sm.sessions, tracked)
sm.sessLk.Unlock()
go func() {
defer cancel()
select {
case <-sm.ctx.Done():
sm.removeSession(session)
sm.removeSession(tracked)
case <-ctx.Done():
sm.removeSession(session)
sm.removeSession(tracked)
}
}()
return session
}
func (sm *SessionManager) removeSession(session exchange.Fetcher) {
func (sm *SessionManager) removeSession(session sesTrk) {
sm.sessLk.Lock()
defer sm.sessLk.Unlock()
for i := 0; i < len(sm.sessions); i++ {
......@@ -90,9 +96,9 @@ func (sm *SessionManager) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
k := blk.Cid()
ks := []cid.Cid{k}
for _, s := range sm.sessions {
if s.InterestedIn(k) {
s.ReceiveBlockFrom(from, blk)
sm.wm.CancelWants(sm.ctx, ks, nil, s.ID())
if s.session.InterestedIn(k) {
s.session.ReceiveBlockFrom(from, blk)
sm.wm.CancelWants(sm.ctx, ks, nil, s.session.ID())
}
}
}
package sessionpeermanager
import (
"context"
"fmt"
cid "github.com/ipfs/go-cid"
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
peer "github.com/libp2p/go-libp2p-peer"
)
type PeerNetwork interface {
ConnectionManager() ifconnmgr.ConnManager
FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID
}
type SessionPeerManager struct {
ctx context.Context
network PeerNetwork
tag string
newPeers chan peer.ID
peerReqs chan chan []peer.ID
// do not touch outside of run loop
activePeers map[peer.ID]struct{}
activePeersArr []peer.ID
}
func New(ctx context.Context, id uint64, network PeerNetwork) *SessionPeerManager {
spm := &SessionPeerManager{
ctx: ctx,
network: network,
newPeers: make(chan peer.ID, 16),
peerReqs: make(chan chan []peer.ID),
activePeers: make(map[peer.ID]struct{}),
}
spm.tag = fmt.Sprint("bs-ses-", id)
go spm.run(ctx)
return spm
}
func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, k cid.Cid) {
// at the moment, we're just adding peers here
// in the future, we'll actually use this to record metrics
select {
case spm.newPeers <- p:
case <-spm.ctx.Done():
}
}
func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
// at the moment, we're not doing anything here
// soon we'll use this to track latency by peer
}
func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID {
// right now this just returns all peers, but soon we might return peers
// ordered by optimization, or only a subset
resp := make(chan []peer.ID)
select {
case spm.peerReqs <- resp:
case <-spm.ctx.Done():
return nil
}
select {
case peers := <-resp:
return peers
case <-spm.ctx.Done():
return nil
}
}
func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
go func(k cid.Cid) {
// TODO: have a task queue setup for this to:
// - rate limit
// - manage timeouts
// - ensure two 'findprovs' calls for the same block don't run concurrently
// - share peers between sessions based on interest set
for p := range spm.network.FindProvidersAsync(ctx, k, 10) {
spm.newPeers <- p
}
}(c)
}
func (spm *SessionPeerManager) run(ctx context.Context) {
for {
select {
case p := <-spm.newPeers:
spm.addActivePeer(p)
case resp := <-spm.peerReqs:
resp <- spm.activePeersArr
case <-ctx.Done():
spm.handleShutdown()
return
}
}
}
func (spm *SessionPeerManager) addActivePeer(p peer.ID) {
if _, ok := spm.activePeers[p]; !ok {
spm.activePeers[p] = struct{}{}
spm.activePeersArr = append(spm.activePeersArr, p)
cmgr := spm.network.ConnectionManager()
cmgr.TagPeer(p, spm.tag, 10)
}
}
func (spm *SessionPeerManager) handleShutdown() {
cmgr := spm.network.ConnectionManager()
for _, p := range spm.activePeersArr {
cmgr.UntagPeer(p, spm.tag)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment