Unverified Commit 4fc62722 authored by Hannah Howard's avatar Hannah Howard Committed by GitHub

Merge pull request #26 from ipfs/feat/refactor-sessions

Bitswap Refactor #4: Extract session peer manager from sessions
parents 1e9b2c41 16f00de5
......@@ -16,9 +16,10 @@ import (
bsnet "github.com/ipfs/go-bitswap/network"
notifications "github.com/ipfs/go-bitswap/notifications"
bspm "github.com/ipfs/go-bitswap/peermanager"
bssession "github.com/ipfs/go-bitswap/session"
bssm "github.com/ipfs/go-bitswap/sessionmanager"
bsspm "github.com/ipfs/go-bitswap/sessionpeermanager"
bswm "github.com/ipfs/go-bitswap/wantmanager"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
......@@ -102,6 +103,13 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
}
wm := bswm.New(ctx)
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager) bssm.Session {
return bssession.New(ctx, id, wm, pm)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager {
return bsspm.New(ctx, id, network)
}
bs := &Bitswap{
blockstore: bstore,
notifications: notif,
......@@ -113,7 +121,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pm: bspm.New(ctx, peerQueueFactory),
sm: bssm.New(ctx, wm, network),
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory),
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
......
package session
import cid "github.com/ipfs/go-cid"
type cidQueue struct {
elems []cid.Cid
eset *cid.Set
}
func newCidQueue() *cidQueue {
return &cidQueue{eset: cid.NewSet()}
}
func (cq *cidQueue) Pop() cid.Cid {
for {
if len(cq.elems) == 0 {
return cid.Cid{}
}
out := cq.elems[0]
cq.elems = cq.elems[1:]
if cq.eset.Has(out) {
cq.eset.Remove(out)
return out
}
}
}
func (cq *cidQueue) Push(c cid.Cid) {
if cq.eset.Visit(c) {
cq.elems = append(cq.elems, c)
}
}
func (cq *cidQueue) Remove(c cid.Cid) {
cq.eset.Remove(c)
}
func (cq *cidQueue) Has(c cid.Cid) bool {
return cq.eset.Has(c)
}
func (cq *cidQueue) Len() int {
return cq.eset.Len()
}
......@@ -2,12 +2,10 @@ package session
import (
"context"
"fmt"
"time"
lru "github.com/hashicorp/golang-lru"
bsgetter "github.com/ipfs/go-bitswap/getter"
bsnet "github.com/ipfs/go-bitswap/network"
notifications "github.com/ipfs/go-bitswap/notifications"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
......@@ -18,13 +16,22 @@ import (
const activeWantsLimit = 16
// SessionWantManager is an interface that can be used to request blocks
// WantManager is an interface that can be used to request blocks
// from given peers.
type SessionWantManager interface {
type WantManager interface {
WantBlocks(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
CancelWants(ctx context.Context, ks []cid.Cid, peers []peer.ID, ses uint64)
}
// PeerManager provides an interface for tracking and optimize peers, and
// requesting more when neccesary.
type PeerManager interface {
FindMorePeers(context.Context, cid.Cid)
GetOptimizedPeers() []peer.ID
RecordPeerRequests([]peer.ID, []cid.Cid)
RecordPeerResponse(peer.ID, cid.Cid)
}
type interestReq struct {
c cid.Cid
resp chan bool
......@@ -40,9 +47,9 @@ type blkRecv struct {
// info to, and who to request blocks from.
type Session struct {
// dependencies
ctx context.Context
wm SessionWantManager
network bsnet.BitSwapNetwork
ctx context.Context
wm WantManager
pm PeerManager
// channels
incoming chan blkRecv
......@@ -53,28 +60,24 @@ type Session struct {
tickDelayReqs chan time.Duration
// do not touch outside run loop
tofetch *cidQueue
activePeers map[peer.ID]struct{}
activePeersArr []peer.ID
interest *lru.Cache
liveWants map[cid.Cid]time.Time
tick *time.Timer
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int
tofetch *cidQueue
interest *lru.Cache
liveWants map[cid.Cid]time.Time
tick *time.Timer
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int
// identifiers
notif notifications.PubSub
uuid logging.Loggable
id uint64
tag string
}
// New creates a new bitswap session whose lifetime is bounded by the
// given context.
func New(ctx context.Context, id uint64, wm SessionWantManager, network bsnet.BitSwapNetwork) *Session {
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Session {
s := &Session{
activePeers: make(map[peer.ID]struct{}),
liveWants: make(map[cid.Cid]time.Time),
newReqs: make(chan []cid.Cid),
cancelKeys: make(chan []cid.Cid),
......@@ -84,7 +87,7 @@ func New(ctx context.Context, id uint64, wm SessionWantManager, network bsnet.Bi
tickDelayReqs: make(chan time.Duration),
ctx: ctx,
wm: wm,
network: network,
pm: pm,
incoming: make(chan blkRecv),
notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"),
......@@ -92,8 +95,6 @@ func New(ctx context.Context, id uint64, wm SessionWantManager, network bsnet.Bi
id: id,
}
s.tag = fmt.Sprint("bs-ses-", s.id)
cache, _ := lru.New(2048)
s.interest = cache
......@@ -108,11 +109,39 @@ func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
case s.incoming <- blkRecv{from: from, blk: blk}:
case <-s.ctx.Done():
}
ks := []cid.Cid{blk.Cid()}
s.wm.CancelWants(s.ctx, ks, nil, s.id)
}
// InterestedIn returns true if this session is interested in the given Cid.
func (s *Session) InterestedIn(c cid.Cid) bool {
return s.interest.Contains(c) || s.isLiveWant(c)
if s.interest.Contains(c) {
return true
}
// TODO: PERF: this is using a channel to guard a map access against race
// conditions. This is definitely much slower than a mutex, though its unclear
// if it will actually induce any noticeable slowness. This is implemented this
// way to avoid adding a more complex set of mutexes around the liveWants map.
// note that in the average case (where this session *is* interested in the
// block we received) this function will not be called, as the cid will likely
// still be in the interest cache.
resp := make(chan bool, 1)
select {
case s.interestReqs <- interestReq{
c: c,
resp: resp,
}:
case <-s.ctx.Done():
return false
}
select {
case want := <-resp:
return want
case <-s.ctx.Done():
return false
}
}
// GetBlock fetches a single block.
......@@ -125,14 +154,24 @@ func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, err
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
ctx = logging.ContextWithLoggable(ctx, s.uuid)
return bsgetter.AsyncGetBlocks(ctx, keys, s.notif, s.fetch, s.cancel)
}
// ID returns the sessions identifier.
func (s *Session) ID() uint64 {
return s.id
return bsgetter.AsyncGetBlocks(ctx, keys, s.notif,
func(ctx context.Context, keys []cid.Cid) {
select {
case s.newReqs <- keys:
case <-ctx.Done():
case <-s.ctx.Done():
}
},
func(keys []cid.Cid) {
select {
case s.cancelKeys <- keys:
case <-s.ctx.Done():
}
},
)
}
// GetAverageLatency returns the average latency for block requests.
func (s *Session) GetAverageLatency() time.Duration {
resp := make(chan time.Duration)
select {
......@@ -149,6 +188,7 @@ func (s *Session) GetAverageLatency() time.Duration {
}
}
// SetBaseTickDelay changes the rate at which ticks happen.
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
select {
case s.tickDelayReqs <- baseTickDelay:
......@@ -156,54 +196,12 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
}
}
// TODO: PERF: this is using a channel to guard a map access against race
// conditions. This is definitely much slower than a mutex, though its unclear
// if it will actually induce any noticeable slowness. This is implemented this
// way to avoid adding a more complex set of mutexes around the liveWants map.
// note that in the average case (where this session *is* interested in the
// block we received) this function will not be called, as the cid will likely
// still be in the interest cache.
func (s *Session) isLiveWant(c cid.Cid) bool {
resp := make(chan bool, 1)
select {
case s.interestReqs <- interestReq{
c: c,
resp: resp,
}:
case <-s.ctx.Done():
return false
}
select {
case want := <-resp:
return want
case <-s.ctx.Done():
return false
}
}
func (s *Session) fetch(ctx context.Context, keys []cid.Cid) {
select {
case s.newReqs <- keys:
case <-ctx.Done():
case <-s.ctx.Done():
}
}
func (s *Session) cancel(keys []cid.Cid) {
select {
case s.cancelKeys <- keys:
case <-s.ctx.Done():
}
}
const provSearchDelay = time.Second * 10
// Session run loop -- everything function below here should not be called
// of this loop
func (s *Session) run(ctx context.Context) {
s.tick = time.NewTimer(provSearchDelay)
newpeers := make(chan peer.ID, 16)
for {
select {
case blk := <-s.incoming:
......@@ -213,9 +211,7 @@ func (s *Session) run(ctx context.Context) {
case keys := <-s.cancelKeys:
s.handleCancel(keys)
case <-s.tick.C:
s.handleTick(ctx, newpeers)
case p := <-newpeers:
s.addActivePeer(p)
s.handleTick(ctx)
case lwchk := <-s.interestReqs:
lwchk.resp <- s.cidIsWanted(lwchk.c)
case resp := <-s.latencyReqs:
......@@ -233,7 +229,7 @@ func (s *Session) handleIncomingBlock(ctx context.Context, blk blkRecv) {
s.tick.Stop()
if blk.from != "" {
s.addActivePeer(blk.from)
s.pm.RecordPeerResponse(blk.from, blk.blk.Cid())
}
s.receiveBlock(ctx, blk.blk)
......@@ -267,7 +263,7 @@ func (s *Session) handleCancel(keys []cid.Cid) {
}
}
func (s *Session) handleTick(ctx context.Context, newpeers chan<- peer.ID) {
func (s *Session) handleTick(ctx context.Context) {
live := make([]cid.Cid, 0, len(s.liveWants))
now := time.Now()
for c := range s.liveWants {
......@@ -276,33 +272,15 @@ func (s *Session) handleTick(ctx context.Context, newpeers chan<- peer.ID) {
}
// Broadcast these keys to everyone we're connected to
s.pm.RecordPeerRequests(nil, live)
s.wm.WantBlocks(ctx, live, nil, s.id)
if len(live) > 0 {
go func(k cid.Cid) {
// TODO: have a task queue setup for this to:
// - rate limit
// - manage timeouts
// - ensure two 'findprovs' calls for the same block don't run concurrently
// - share peers between sessions based on interest set
for p := range s.network.FindProvidersAsync(ctx, k, 10) {
newpeers <- p
}
}(live[0])
s.pm.FindMorePeers(ctx, live[0])
}
s.resetTick()
}
func (s *Session) addActivePeer(p peer.ID) {
if _, ok := s.activePeers[p]; !ok {
s.activePeers[p] = struct{}{}
s.activePeersArr = append(s.activePeersArr, p)
cmgr := s.network.ConnectionManager()
cmgr.TagPeer(p, s.tag, 10)
}
}
func (s *Session) handleShutdown() {
s.tick.Stop()
s.notif.Shutdown()
......@@ -312,10 +290,6 @@ func (s *Session) handleShutdown() {
live = append(live, c)
}
s.wm.CancelWants(s.ctx, live, nil, s.id)
cmgr := s.network.ConnectionManager()
for _, p := range s.activePeersArr {
cmgr.UntagPeer(p, s.tag)
}
}
func (s *Session) cidIsWanted(c cid.Cid) bool {
......@@ -350,12 +324,16 @@ func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
for _, c := range ks {
s.liveWants[c] = now
}
s.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id)
peers := s.pm.GetOptimizedPeers()
// right now we're requesting each block from every peer, but soon, maybe not
s.pm.RecordPeerRequests(peers, ks)
s.wm.WantBlocks(ctx, ks, peers, s.id)
}
func (s *Session) averageLatency() time.Duration {
return s.latTotal / time.Duration(s.fetchcnt)
}
func (s *Session) resetTick() {
if s.latTotal == 0 {
s.tick.Reset(provSearchDelay)
......@@ -364,46 +342,3 @@ func (s *Session) resetTick() {
s.tick.Reset(s.baseTickDelay + (3 * avLat))
}
}
type cidQueue struct {
elems []cid.Cid
eset *cid.Set
}
func newCidQueue() *cidQueue {
return &cidQueue{eset: cid.NewSet()}
}
func (cq *cidQueue) Pop() cid.Cid {
for {
if len(cq.elems) == 0 {
return cid.Cid{}
}
out := cq.elems[0]
cq.elems = cq.elems[1:]
if cq.eset.Has(out) {
cq.eset.Remove(out)
return out
}
}
}
func (cq *cidQueue) Push(c cid.Cid) {
if cq.eset.Visit(c) {
cq.elems = append(cq.elems, c)
}
}
func (cq *cidQueue) Remove(c cid.Cid) {
cq.eset.Remove(c)
}
func (cq *cidQueue) Has(c cid.Cid) bool {
return cq.eset.Has(c)
}
func (cq *cidQueue) Len() int {
return cq.eset.Len()
}
package session
import (
"context"
"sync"
"testing"
"time"
"github.com/ipfs/go-block-format"
"github.com/ipfs/go-bitswap/testutil"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
peer "github.com/libp2p/go-libp2p-peer"
)
type wantReq struct {
cids []cid.Cid
peers []peer.ID
}
type fakeWantManager struct {
wantReqs chan wantReq
cancelReqs chan wantReq
}
func (fwm *fakeWantManager) WantBlocks(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) {
fwm.wantReqs <- wantReq{cids, peers}
}
func (fwm *fakeWantManager) CancelWants(ctx context.Context, cids []cid.Cid, peers []peer.ID, ses uint64) {
fwm.cancelReqs <- wantReq{cids, peers}
}
type fakePeerManager struct {
lk sync.RWMutex
peers []peer.ID
findMorePeersRequested bool
}
func (fpm *fakePeerManager) FindMorePeers(context.Context, cid.Cid) {
fpm.lk.Lock()
fpm.findMorePeersRequested = true
fpm.lk.Unlock()
}
func (fpm *fakePeerManager) GetOptimizedPeers() []peer.ID {
fpm.lk.Lock()
defer fpm.lk.Unlock()
return fpm.peers
}
func (fpm *fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
func (fpm *fakePeerManager) RecordPeerResponse(p peer.ID, c cid.Cid) {
fpm.lk.Lock()
fpm.peers = append(fpm.peers, p)
fpm.lk.Unlock()
}
func TestSessionGetBlocks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
wantReqs := make(chan wantReq, 1)
cancelReqs := make(chan wantReq, 1)
fwm := &fakeWantManager{wantReqs, cancelReqs}
fpm := &fakePeerManager{}
id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm)
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(activeWantsLimit * 2)
var cids []cid.Cid
for _, block := range blks {
cids = append(cids, block.Cid())
}
getBlocksCh, err := session.GetBlocks(ctx, cids)
if err != nil {
t.Fatal("error getting blocks")
}
// check initial want request
receivedWantReq := <-fwm.wantReqs
if len(receivedWantReq.cids) != activeWantsLimit {
t.Fatal("did not enqueue correct initial number of wants")
}
if receivedWantReq.peers != nil {
t.Fatal("first want request should be a broadcast")
}
// now receive the first set of blocks
peers := testutil.GeneratePeers(activeWantsLimit)
var newCancelReqs []wantReq
var newBlockReqs []wantReq
var receivedBlocks []blocks.Block
for i, p := range peers {
session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, receivedWantReq.cids[i])])
receivedBlock := <-getBlocksCh
receivedBlocks = append(receivedBlocks, receivedBlock)
cancelBlock := <-cancelReqs
newCancelReqs = append(newCancelReqs, cancelBlock)
wantBlock := <-wantReqs
newBlockReqs = append(newBlockReqs, wantBlock)
}
// verify new peers were recorded
fpm.lk.Lock()
if len(fpm.peers) != activeWantsLimit {
t.Fatal("received blocks not recorded by the peer manager")
}
for _, p := range fpm.peers {
if !testutil.ContainsPeer(peers, p) {
t.Fatal("incorrect peer recorded to peer manager")
}
}
fpm.lk.Unlock()
// look at new interactions with want manager
// should have cancelled each received block
if len(newCancelReqs) != activeWantsLimit {
t.Fatal("did not cancel each block once it was received")
}
// new session reqs should be targeted
totalEnqueued := 0
for _, w := range newBlockReqs {
if len(w.peers) == 0 {
t.Fatal("should not have broadcast again after initial broadcast")
}
totalEnqueued += len(w.cids)
}
// full new round of cids should be requested
if totalEnqueued != activeWantsLimit {
t.Fatal("new blocks were not requested")
}
// receive remaining blocks
for i, p := range peers {
session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, newBlockReqs[i].cids[0])])
receivedBlock := <-getBlocksCh
receivedBlocks = append(receivedBlocks, receivedBlock)
cancelBlock := <-cancelReqs
newCancelReqs = append(newCancelReqs, cancelBlock)
}
if len(receivedBlocks) != len(blks) {
t.Fatal("did not receive enough blocks")
}
for _, block := range receivedBlocks {
if !testutil.ContainsBlock(blks, block) {
t.Fatal("received incorrect block")
}
}
}
func TestSessionFindMorePeers(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
wantReqs := make(chan wantReq, 1)
cancelReqs := make(chan wantReq, 1)
fwm := &fakeWantManager{wantReqs, cancelReqs}
fpm := &fakePeerManager{}
id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm)
session.SetBaseTickDelay(200 * time.Microsecond)
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(activeWantsLimit * 2)
var cids []cid.Cid
for _, block := range blks {
cids = append(cids, block.Cid())
}
getBlocksCh, err := session.GetBlocks(ctx, cids)
if err != nil {
t.Fatal("error getting blocks")
}
// clear the initial block of wants
<-wantReqs
// receive a block to trigger a tick reset
time.Sleep(200 * time.Microsecond)
p := testutil.GeneratePeers(1)[0]
session.ReceiveBlockFrom(p, blks[0])
<-getBlocksCh
<-wantReqs
<-cancelReqs
// wait long enough for a tick to occur
time.Sleep(20 * time.Millisecond)
// trigger to find providers should have happened
fpm.lk.Lock()
if fpm.findMorePeersRequested != true {
t.Fatal("should have attempted to find more peers but didn't")
}
fpm.lk.Unlock()
// verify a broadcast was made
receivedWantReq := <-wantReqs
if len(receivedWantReq.cids) != activeWantsLimit {
t.Fatal("did not rebroadcast whole live list")
}
if receivedWantReq.peers != nil {
t.Fatal("did not make a broadcast")
}
<-ctx.Done()
}
......@@ -7,22 +7,38 @@ import (
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
bsnet "github.com/ipfs/go-bitswap/network"
bssession "github.com/ipfs/go-bitswap/session"
bswm "github.com/ipfs/go-bitswap/wantmanager"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
peer "github.com/libp2p/go-libp2p-peer"
)
// Session is a session that is managed by the session manager
type Session interface {
exchange.Fetcher
InterestedIn(cid.Cid) bool
ReceiveBlockFrom(peer.ID, blocks.Block)
}
type sesTrk struct {
session Session
pm bssession.PeerManager
}
// SessionFactory generates a new session for the SessionManager to track.
type SessionFactory func(ctx context.Context, id uint64, pm bssession.PeerManager) Session
// PeerManagerFactory generates a new peer manager for a session.
type PeerManagerFactory func(ctx context.Context, id uint64) bssession.PeerManager
// SessionManager is responsible for creating, managing, and dispatching to
// sessions.
type SessionManager struct {
wm *bswm.WantManager
network bsnet.BitSwapNetwork
ctx context.Context
ctx context.Context
sessionFactory SessionFactory
peerManagerFactory PeerManagerFactory
// Sessions
sessLk sync.Mutex
sessions []*bssession.Session
sessions []sesTrk
// Session Index
sessIDLk sync.Mutex
......@@ -30,11 +46,11 @@ type SessionManager struct {
}
// New creates a new SessionManager.
func New(ctx context.Context, wm *bswm.WantManager, network bsnet.BitSwapNetwork) *SessionManager {
func New(ctx context.Context, sessionFactory SessionFactory, peerManagerFactory PeerManagerFactory) *SessionManager {
return &SessionManager{
ctx: ctx,
wm: wm,
network: network,
ctx: ctx,
sessionFactory: sessionFactory,
peerManagerFactory: peerManagerFactory,
}
}
......@@ -44,24 +60,26 @@ func (sm *SessionManager) NewSession(ctx context.Context) exchange.Fetcher {
id := sm.GetNextSessionID()
sessionctx, cancel := context.WithCancel(ctx)
session := bssession.New(sessionctx, id, sm.wm, sm.network)
pm := sm.peerManagerFactory(sessionctx, id)
session := sm.sessionFactory(sessionctx, id, pm)
tracked := sesTrk{session, pm}
sm.sessLk.Lock()
sm.sessions = append(sm.sessions, session)
sm.sessions = append(sm.sessions, tracked)
sm.sessLk.Unlock()
go func() {
defer cancel()
select {
case <-sm.ctx.Done():
sm.removeSession(session)
sm.removeSession(tracked)
case <-ctx.Done():
sm.removeSession(session)
sm.removeSession(tracked)
}
}()
return session
}
func (sm *SessionManager) removeSession(session exchange.Fetcher) {
func (sm *SessionManager) removeSession(session sesTrk) {
sm.sessLk.Lock()
defer sm.sessLk.Unlock()
for i := 0; i < len(sm.sessions); i++ {
......@@ -88,11 +106,9 @@ func (sm *SessionManager) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
defer sm.sessLk.Unlock()
k := blk.Cid()
ks := []cid.Cid{k}
for _, s := range sm.sessions {
if s.InterestedIn(k) {
s.ReceiveBlockFrom(from, blk)
sm.wm.CancelWants(sm.ctx, ks, nil, s.ID())
if s.session.InterestedIn(k) {
s.session.ReceiveBlockFrom(from, blk)
}
}
}
package sessionmanager
import (
"context"
"testing"
"time"
bssession "github.com/ipfs/go-bitswap/session"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
)
type fakeSession struct {
interested bool
receivedBlock bool
id uint64
pm *fakePeerManager
}
func (*fakeSession) GetBlock(context.Context, cid.Cid) (blocks.Block, error) {
return nil, nil
}
func (*fakeSession) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) {
return nil, nil
}
func (fs *fakeSession) InterestedIn(cid.Cid) bool { return fs.interested }
func (fs *fakeSession) ReceiveBlockFrom(peer.ID, blocks.Block) { fs.receivedBlock = true }
type fakePeerManager struct {
id uint64
}
func (*fakePeerManager) FindMorePeers(context.Context, cid.Cid) {}
func (*fakePeerManager) GetOptimizedPeers() []peer.ID { return nil }
func (*fakePeerManager) RecordPeerRequests([]peer.ID, []cid.Cid) {}
func (*fakePeerManager) RecordPeerResponse(peer.ID, cid.Cid) {}
var nextInterestedIn bool
func sessionFactory(ctx context.Context, id uint64, pm bssession.PeerManager) Session {
return &fakeSession{
interested: nextInterestedIn,
receivedBlock: false,
id: id,
pm: pm.(*fakePeerManager),
}
}
func peerManagerFactory(ctx context.Context, id uint64) bssession.PeerManager {
return &fakePeerManager{id}
}
func TestAddingSessions(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sm := New(ctx, sessionFactory, peerManagerFactory)
p := peer.ID(123)
block := blocks.NewBlock([]byte("block"))
// we'll be interested in all blocks for this test
nextInterestedIn = true
currentID := sm.GetNextSessionID()
firstSession := sm.NewSession(ctx).(*fakeSession)
if firstSession.id != firstSession.pm.id ||
firstSession.id != currentID+1 {
t.Fatal("session does not have correct id set")
}
secondSession := sm.NewSession(ctx).(*fakeSession)
if secondSession.id != secondSession.pm.id ||
secondSession.id != firstSession.id+1 {
t.Fatal("session does not have correct id set")
}
sm.GetNextSessionID()
thirdSession := sm.NewSession(ctx).(*fakeSession)
if thirdSession.id != thirdSession.pm.id ||
thirdSession.id != secondSession.id+2 {
t.Fatal("session does not have correct id set")
}
sm.ReceiveBlockFrom(p, block)
if !firstSession.receivedBlock ||
!secondSession.receivedBlock ||
!thirdSession.receivedBlock {
t.Fatal("should have received blocks but didn't")
}
}
func TestReceivingBlocksWhenNotInterested(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sm := New(ctx, sessionFactory, peerManagerFactory)
p := peer.ID(123)
block := blocks.NewBlock([]byte("block"))
// we'll be interested in all blocks for this test
nextInterestedIn = false
firstSession := sm.NewSession(ctx).(*fakeSession)
nextInterestedIn = true
secondSession := sm.NewSession(ctx).(*fakeSession)
nextInterestedIn = false
thirdSession := sm.NewSession(ctx).(*fakeSession)
sm.ReceiveBlockFrom(p, block)
if firstSession.receivedBlock ||
!secondSession.receivedBlock ||
thirdSession.receivedBlock {
t.Fatal("did not receive blocks only for interested sessions")
}
}
func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
sm := New(ctx, sessionFactory, peerManagerFactory)
p := peer.ID(123)
block := blocks.NewBlock([]byte("block"))
// we'll be interested in all blocks for this test
nextInterestedIn = true
firstSession := sm.NewSession(ctx).(*fakeSession)
secondSession := sm.NewSession(ctx).(*fakeSession)
thirdSession := sm.NewSession(ctx).(*fakeSession)
cancel()
// wait for sessions to get removed
time.Sleep(10 * time.Millisecond)
sm.ReceiveBlockFrom(p, block)
if firstSession.receivedBlock ||
secondSession.receivedBlock ||
thirdSession.receivedBlock {
t.Fatal("received blocks for sessions after manager is shutdown")
}
}
func TestRemovingPeersWhenSessionContextCancelled(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sm := New(ctx, sessionFactory, peerManagerFactory)
p := peer.ID(123)
block := blocks.NewBlock([]byte("block"))
// we'll be interested in all blocks for this test
nextInterestedIn = true
firstSession := sm.NewSession(ctx).(*fakeSession)
sessionCtx, sessionCancel := context.WithCancel(ctx)
secondSession := sm.NewSession(sessionCtx).(*fakeSession)
thirdSession := sm.NewSession(ctx).(*fakeSession)
sessionCancel()
// wait for sessions to get removed
time.Sleep(10 * time.Millisecond)
sm.ReceiveBlockFrom(p, block)
if !firstSession.receivedBlock ||
secondSession.receivedBlock ||
!thirdSession.receivedBlock {
t.Fatal("received blocks for sessions that are canceled")
}
}
package sessionpeermanager
import (
"context"
"fmt"
cid "github.com/ipfs/go-cid"
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
peer "github.com/libp2p/go-libp2p-peer"
)
// PeerNetwork is an interface for finding providers and managing connections
type PeerNetwork interface {
ConnectionManager() ifconnmgr.ConnManager
FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID
}
// SessionPeerManager tracks and manages peers for a session, and provides
// the best ones to the session
type SessionPeerManager struct {
ctx context.Context
network PeerNetwork
tag string
newPeers chan peer.ID
peerReqs chan chan []peer.ID
// do not touch outside of run loop
activePeers map[peer.ID]struct{}
activePeersArr []peer.ID
}
// New creates a new SessionPeerManager
func New(ctx context.Context, id uint64, network PeerNetwork) *SessionPeerManager {
spm := &SessionPeerManager{
ctx: ctx,
network: network,
newPeers: make(chan peer.ID, 16),
peerReqs: make(chan chan []peer.ID),
activePeers: make(map[peer.ID]struct{}),
}
spm.tag = fmt.Sprint("bs-ses-", id)
go spm.run(ctx)
return spm
}
// RecordPeerResponse records that a peer received a block, and adds to it
// the list of peers if it wasn't already added
func (spm *SessionPeerManager) RecordPeerResponse(p peer.ID, k cid.Cid) {
// at the moment, we're just adding peers here
// in the future, we'll actually use this to record metrics
select {
case spm.newPeers <- p:
case <-spm.ctx.Done():
}
}
// RecordPeerRequests records that a given set of peers requested the given cids
func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
// at the moment, we're not doing anything here
// soon we'll use this to track latency by peer
}
// GetOptimizedPeers returns the best peers available for a session
func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID {
// right now this just returns all peers, but soon we might return peers
// ordered by optimization, or only a subset
resp := make(chan []peer.ID)
select {
case spm.peerReqs <- resp:
case <-spm.ctx.Done():
return nil
}
select {
case peers := <-resp:
return peers
case <-spm.ctx.Done():
return nil
}
}
// FindMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
go func(k cid.Cid) {
// TODO: have a task queue setup for this to:
// - rate limit
// - manage timeouts
// - ensure two 'findprovs' calls for the same block don't run concurrently
// - share peers between sessions based on interest set
for p := range spm.network.FindProvidersAsync(ctx, k, 10) {
spm.newPeers <- p
}
}(c)
}
func (spm *SessionPeerManager) run(ctx context.Context) {
for {
select {
case p := <-spm.newPeers:
spm.addActivePeer(p)
case resp := <-spm.peerReqs:
resp <- spm.activePeersArr
case <-ctx.Done():
spm.handleShutdown()
return
}
}
}
func (spm *SessionPeerManager) addActivePeer(p peer.ID) {
if _, ok := spm.activePeers[p]; !ok {
spm.activePeers[p] = struct{}{}
spm.activePeersArr = append(spm.activePeersArr, p)
cmgr := spm.network.ConnectionManager()
cmgr.TagPeer(p, spm.tag, 10)
}
}
func (spm *SessionPeerManager) handleShutdown() {
cmgr := spm.network.ConnectionManager()
for _, p := range spm.activePeersArr {
cmgr.UntagPeer(p, spm.tag)
}
}
package sessionpeermanager
import (
"context"
"testing"
"time"
"github.com/ipfs/go-bitswap/testutil"
cid "github.com/ipfs/go-cid"
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
)
type fakePeerNetwork struct {
peers []peer.ID
connManager ifconnmgr.ConnManager
}
func (fpn *fakePeerNetwork) ConnectionManager() ifconnmgr.ConnManager {
return fpn.connManager
}
func (fpn *fakePeerNetwork) FindProvidersAsync(ctx context.Context, c cid.Cid, num int) <-chan peer.ID {
peerCh := make(chan peer.ID)
go func() {
defer close(peerCh)
for _, p := range fpn.peers {
select {
case peerCh <- p:
case <-ctx.Done():
return
}
}
}()
return peerCh
}
type fakeConnManager struct {
taggedPeers []peer.ID
}
func (fcm *fakeConnManager) TagPeer(p peer.ID, tag string, n int) {
fcm.taggedPeers = append(fcm.taggedPeers, p)
}
func (fcm *fakeConnManager) UntagPeer(p peer.ID, tag string) {
for i := 0; i < len(fcm.taggedPeers); i++ {
if fcm.taggedPeers[i] == p {
fcm.taggedPeers[i] = fcm.taggedPeers[len(fcm.taggedPeers)-1]
fcm.taggedPeers = fcm.taggedPeers[:len(fcm.taggedPeers)-1]
return
}
}
}
func (*fakeConnManager) GetTagInfo(p peer.ID) *ifconnmgr.TagInfo { return nil }
func (*fakeConnManager) TrimOpenConns(ctx context.Context) {}
func (*fakeConnManager) Notifee() inet.Notifiee { return nil }
func TestFindingMorePeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
peers := testutil.GeneratePeers(5)
fcm := &fakeConnManager{}
fpn := &fakePeerNetwork{peers, fcm}
c := testutil.GenerateCids(1)[0]
id := testutil.GenerateSessionID()
sessionPeerManager := New(ctx, id, fpn)
findCtx, findCancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer findCancel()
sessionPeerManager.FindMorePeers(ctx, c)
<-findCtx.Done()
sessionPeers := sessionPeerManager.GetOptimizedPeers()
if len(sessionPeers) != len(peers) {
t.Fatal("incorrect number of peers found")
}
for _, p := range sessionPeers {
if !testutil.ContainsPeer(peers, p) {
t.Fatal("incorrect peer found through finding providers")
}
}
if len(fcm.taggedPeers) != len(peers) {
t.Fatal("Peers were not tagged!")
}
}
func TestRecordingReceivedBlocks(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
p := testutil.GeneratePeers(1)[0]
fcm := &fakeConnManager{}
fpn := &fakePeerNetwork{nil, fcm}
c := testutil.GenerateCids(1)[0]
id := testutil.GenerateSessionID()
sessionPeerManager := New(ctx, id, fpn)
sessionPeerManager.RecordPeerResponse(p, c)
time.Sleep(10 * time.Millisecond)
sessionPeers := sessionPeerManager.GetOptimizedPeers()
if len(sessionPeers) != 1 {
t.Fatal("did not add peer on receive")
}
if sessionPeers[0] != p {
t.Fatal("incorrect peer added on receive")
}
if len(fcm.taggedPeers) != 1 {
t.Fatal("Peers was not tagged!")
}
}
func TestUntaggingPeers(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
peers := testutil.GeneratePeers(5)
fcm := &fakeConnManager{}
fpn := &fakePeerNetwork{peers, fcm}
c := testutil.GenerateCids(1)[0]
id := testutil.GenerateSessionID()
sessionPeerManager := New(ctx, id, fpn)
sessionPeerManager.FindMorePeers(ctx, c)
time.Sleep(5 * time.Millisecond)
if len(fcm.taggedPeers) != len(peers) {
t.Fatal("Peers were not tagged!")
}
<-ctx.Done()
time.Sleep(5 * time.Millisecond)
if len(fcm.taggedPeers) != 0 {
t.Fatal("Peers were not untagged!")
}
}
......@@ -3,6 +3,7 @@ package testutil
import (
bsmsg "github.com/ipfs/go-bitswap/message"
"github.com/ipfs/go-bitswap/wantlist"
"github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
peer "github.com/libp2p/go-libp2p-peer"
......@@ -76,3 +77,18 @@ func ContainsPeer(peers []peer.ID, p peer.ID) bool {
}
return false
}
// IndexOf returns the index of a given cid in an array of blocks
func IndexOf(blks []blocks.Block, c cid.Cid) int {
for i, n := range blks {
if n.Cid() == c {
return i
}
}
return -1
}
// ContainsBlock returns true if a block is found n a list of blocks
func ContainsBlock(blks []blocks.Block, block blocks.Block) bool {
return IndexOf(blks, block.Cid()) != -1
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment