Commit 92a82791 authored by hannahhoward's avatar hannahhoward

feat(session): allow configuring delays per instance

Re-setup provider search delay and rebroadcast delay on a per bitswap instance basis
parent e2e33435
......@@ -9,6 +9,7 @@ import (
"time"
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
delay "github.com/ipfs/go-ipfs-delay"
decision "github.com/ipfs/go-bitswap/decision"
bsgetter "github.com/ipfs/go-bitswap/getter"
......@@ -39,6 +40,7 @@ var _ exchange.SessionExchange = (*Bitswap)(nil)
const (
// these requests take at _least_ two minutes at the moment.
provideTimeout = time.Minute * 3
defaultProvSearchDelay = time.Second
)
var (
......@@ -65,6 +67,20 @@ func ProvideEnabled(enabled bool) Option {
}
}
// ProviderSearchDelay overwrites the global provider search delay
func ProviderSearchDelay(newProvSearchDelay time.Duration) Option {
return func(bs *Bitswap) {
bs.provSearchDelay = newProvSearchDelay
}
}
// RebroadcastDelay overwrites the global provider rebroadcast delay
func RebroadcastDelay(newRebroadcastDelay delay.D) Option {
return func(bs *Bitswap) {
bs.rebroadcastDelay = newRebroadcastDelay
}
}
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate. Runs until context is cancelled or bitswap.Close is called.
......@@ -99,8 +115,10 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
wm := bswm.New(ctx, bspm.New(ctx, peerQueueFactory))
pqm := bspqm.New(ctx, network)
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) bssm.Session {
return bssession.New(ctx, id, wm, pm, srs)
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter,
provSearchDelay time.Duration,
rebroadcastDelay delay.D) bssm.Session {
return bssession.New(ctx, id, wm, pm, srs, provSearchDelay, rebroadcastDelay)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager {
return bsspm.New(ctx, id, network.ConnectionManager(), pqm)
......@@ -124,6 +142,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
allMetric: allHist,
sentHistogram: sentHistogram,
provideEnabled: true,
provSearchDelay: defaultProvSearchDelay,
rebroadcastDelay: delay.Fixed(time.Minute),
}
// apply functional options before starting and running bitswap
......@@ -190,6 +210,12 @@ type Bitswap struct {
// whether or not to make provide announcements
provideEnabled bool
// how long to wait before looking for providers in a session
provSearchDelay time.Duration
// how often to rebroadcast providing requests to find more optimized providers
rebroadcastDelay delay.D
}
type counters struct {
......@@ -232,7 +258,7 @@ func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
session := bs.sm.NewSession(ctx)
session := bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
return session.GetBlocks(ctx, keys)
}
......@@ -398,5 +424,5 @@ func (bs *Bitswap) IsOnline() bool {
// be more efficient in its requests to peers. If you are using a session
// from go-blockservice, it will create a bitswap session automatically.
func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
return bs.sm.NewSession(ctx)
return bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
}
......@@ -102,11 +102,9 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
}
func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
bssession.SetProviderSearchDelay(50 * time.Millisecond)
defer bssession.SetProviderSearchDelay(time.Second)
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
ig := testinstance.NewTestInstanceGenerator(net, bitswap.ProvideEnabled(false))
ig := testinstance.NewTestInstanceGenerator(net, bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50*time.Millisecond))
defer ig.Close()
hasBlock := ig.Next()
......
......@@ -6,6 +6,7 @@ import (
"testing"
"time"
bitswap "github.com/ipfs/go-bitswap"
bssession "github.com/ipfs/go-bitswap/session"
testinstance "github.com/ipfs/go-bitswap/testinstance"
blocks "github.com/ipfs/go-block-format"
......@@ -161,9 +162,8 @@ func TestFetchNotConnected(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
bssession.SetProviderSearchDelay(10 * time.Millisecond)
vnet := getVirtualNetwork()
ig := testinstance.NewTestInstanceGenerator(vnet)
ig := testinstance.NewTestInstanceGenerator(vnet, bitswap.ProviderSearchDelay(10*time.Millisecond))
defer ig.Close()
bgen := blocksutil.NewBlockGenerator()
......
......@@ -87,6 +87,8 @@ type Session struct {
latTotal time.Duration
fetchcnt int
consecutiveTicks int
provSearchDelay time.Duration
rebroadcastDelay delay.D
// identifiers
notif notifications.PubSub
uuid logging.Loggable
......@@ -95,7 +97,13 @@ type Session struct {
// New creates a new bitswap session whose lifetime is bounded by the
// given context.
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager, srs RequestSplitter) *Session {
func New(ctx context.Context,
id uint64,
wm WantManager,
pm PeerManager,
srs RequestSplitter,
provSearchDelay time.Duration,
rebroadcastDelay delay.D) *Session {
s := &Session{
liveWants: make(map[cid.Cid]time.Time),
newReqs: make(chan []cid.Cid),
......@@ -114,6 +122,8 @@ func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager, srs Req
uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500,
id: id,
provSearchDelay: provSearchDelay,
rebroadcastDelay: rebroadcastDelay,
}
cache, _ := lru.New(2048)
......@@ -226,24 +236,11 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
}
}
var provSearchDelay = time.Second
var rebroadcastDelay = delay.Fixed(time.Minute)
// SetProviderSearchDelay overwrites the global provider search delay
func SetProviderSearchDelay(newProvSearchDelay time.Duration) {
provSearchDelay = newProvSearchDelay
}
// SetRebroadcastDelay overwrites the global provider rebroadcast delay
func SetRebroadcastDelay(newRebroadcastDelay delay.D) {
rebroadcastDelay = newRebroadcastDelay
}
// Session run loop -- everything function below here should not be called
// of this loop
func (s *Session) run(ctx context.Context) {
s.tick = time.NewTimer(provSearchDelay)
s.rebroadcast = time.NewTimer(rebroadcastDelay.Get())
s.tick = time.NewTimer(s.provSearchDelay)
s.rebroadcast = time.NewTimer(s.rebroadcastDelay.Get())
for {
select {
case blk := <-s.incoming:
......@@ -345,7 +342,7 @@ func (s *Session) handleRebroadcast(ctx context.Context) {
// for new providers for blocks.
s.pm.FindMorePeers(ctx, s.randomLiveWant())
s.rebroadcast.Reset(rebroadcastDelay.Get())
s.rebroadcast.Reset(s.rebroadcastDelay.Get())
}
func (s *Session) randomLiveWant() cid.Cid {
......@@ -442,7 +439,7 @@ func (s *Session) averageLatency() time.Duration {
func (s *Session) resetTick() {
var tickDelay time.Duration
if s.latTotal == 0 {
tickDelay = provSearchDelay
tickDelay = s.provSearchDelay
} else {
avLat := s.averageLatency()
tickDelay = s.baseTickDelay + (3 * avLat)
......
......@@ -84,7 +84,7 @@ func TestSessionGetBlocks(t *testing.T) {
fpm := &fakePeerManager{}
frs := &fakeRequestSplitter{}
id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm, frs)
session := New(ctx, id, fwm, fpm, frs, time.Second, delay.Fixed(time.Minute))
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
var cids []cid.Cid
......@@ -196,7 +196,7 @@ func TestSessionFindMorePeers(t *testing.T) {
fpm := &fakePeerManager{findMorePeersRequested: make(chan cid.Cid, 1)}
frs := &fakeRequestSplitter{}
id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm, frs)
session := New(ctx, id, fwm, fpm, frs, time.Second, delay.Fixed(time.Minute))
session.SetBaseTickDelay(200 * time.Microsecond)
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
......@@ -260,11 +260,6 @@ func TestSessionFindMorePeers(t *testing.T) {
}
func TestSessionFailingToGetFirstBlock(t *testing.T) {
SetProviderSearchDelay(10 * time.Millisecond)
defer SetProviderSearchDelay(1 * time.Second)
SetRebroadcastDelay(delay.Fixed(100 * time.Millisecond))
defer SetRebroadcastDelay(delay.Fixed(1 * time.Minute))
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
wantReqs := make(chan wantReq, 1)
......@@ -274,7 +269,7 @@ func TestSessionFailingToGetFirstBlock(t *testing.T) {
frs := &fakeRequestSplitter{}
id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm, frs)
session := New(ctx, id, fwm, fpm, frs, 10*time.Millisecond, delay.Fixed(100*time.Millisecond))
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(4)
var cids []cid.Cid
......
......@@ -3,9 +3,11 @@ package sessionmanager
import (
"context"
"sync"
"time"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
bssession "github.com/ipfs/go-bitswap/session"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
......@@ -27,7 +29,7 @@ type sesTrk struct {
}
// SessionFactory generates a new session for the SessionManager to track.
type SessionFactory func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) Session
type SessionFactory func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter, provSearchDelay time.Duration, rebroadcastDelay delay.D) Session
// RequestSplitterFactory generates a new request splitter for a session.
type RequestSplitterFactory func(ctx context.Context) bssession.RequestSplitter
......@@ -64,13 +66,15 @@ func New(ctx context.Context, sessionFactory SessionFactory, peerManagerFactory
// NewSession initializes a session with the given context, and adds to the
// session manager.
func (sm *SessionManager) NewSession(ctx context.Context) exchange.Fetcher {
func (sm *SessionManager) NewSession(ctx context.Context,
provSearchDelay time.Duration,
rebroadcastDelay delay.D) exchange.Fetcher {
id := sm.GetNextSessionID()
sessionctx, cancel := context.WithCancel(ctx)
pm := sm.peerManagerFactory(sessionctx, id)
srs := sm.requestSplitterFactory(sessionctx)
session := sm.sessionFactory(sessionctx, id, pm, srs)
session := sm.sessionFactory(sessionctx, id, pm, srs, provSearchDelay, rebroadcastDelay)
tracked := sesTrk{session, pm, srs}
sm.sessLk.Lock()
sm.sessions = append(sm.sessions, tracked)
......
......@@ -6,6 +6,7 @@ import (
"time"
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
delay "github.com/ipfs/go-ipfs-delay"
bssession "github.com/ipfs/go-bitswap/session"
......@@ -53,7 +54,12 @@ func (frs *fakeRequestSplitter) RecordUniqueBlock() {}
var nextInterestedIn bool
func sessionFactory(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) Session {
func sessionFactory(ctx context.Context,
id uint64,
pm bssession.PeerManager,
srs bssession.RequestSplitter,
provSearchDelay time.Duration,
rebroadcastDelay delay.D) Session {
return &fakeSession{
interested: nextInterestedIn,
receivedBlock: false,
......@@ -83,18 +89,18 @@ func TestAddingSessions(t *testing.T) {
nextInterestedIn = true
currentID := sm.GetNextSessionID()
firstSession := sm.NewSession(ctx).(*fakeSession)
firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
if firstSession.id != firstSession.pm.id ||
firstSession.id != currentID+1 {
t.Fatal("session does not have correct id set")
}
secondSession := sm.NewSession(ctx).(*fakeSession)
secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
if secondSession.id != secondSession.pm.id ||
secondSession.id != firstSession.id+1 {
t.Fatal("session does not have correct id set")
}
sm.GetNextSessionID()
thirdSession := sm.NewSession(ctx).(*fakeSession)
thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
if thirdSession.id != thirdSession.pm.id ||
thirdSession.id != secondSession.id+2 {
t.Fatal("session does not have correct id set")
......@@ -117,11 +123,11 @@ func TestReceivingBlocksWhenNotInterested(t *testing.T) {
block := blocks.NewBlock([]byte("block"))
// we'll be interested in all blocks for this test
nextInterestedIn = false
firstSession := sm.NewSession(ctx).(*fakeSession)
firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
nextInterestedIn = true
secondSession := sm.NewSession(ctx).(*fakeSession)
secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
nextInterestedIn = false
thirdSession := sm.NewSession(ctx).(*fakeSession)
thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
sm.ReceiveBlockFrom(p, block)
if firstSession.receivedBlock ||
......@@ -140,9 +146,9 @@ func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) {
block := blocks.NewBlock([]byte("block"))
// we'll be interested in all blocks for this test
nextInterestedIn = true
firstSession := sm.NewSession(ctx).(*fakeSession)
secondSession := sm.NewSession(ctx).(*fakeSession)
thirdSession := sm.NewSession(ctx).(*fakeSession)
firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
cancel()
// wait for sessions to get removed
......@@ -165,10 +171,10 @@ func TestRemovingPeersWhenSessionContextCancelled(t *testing.T) {
block := blocks.NewBlock([]byte("block"))
// we'll be interested in all blocks for this test
nextInterestedIn = true
firstSession := sm.NewSession(ctx).(*fakeSession)
firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
sessionCtx, sessionCancel := context.WithCancel(ctx)
secondSession := sm.NewSession(sessionCtx).(*fakeSession)
thirdSession := sm.NewSession(ctx).(*fakeSession)
secondSession := sm.NewSession(sessionCtx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
sessionCancel()
// wait for sessions to get removed
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment