Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-bitswap
Commits
86089ee1
Unverified
Commit
86089ee1
authored
Jun 03, 2019
by
Hannah Howard
Committed by
GitHub
Jun 03, 2019
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #133 from ipfs/feat/improve-provider-requests
feat(sessions): add rebroadcasting, search backoff
parents
10e93ab6
92a82791
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
284 additions
and
84 deletions
+284
-84
bitswap.go
bitswap.go
+45
-19
bitswap_test.go
bitswap_test.go
+1
-3
bitswap_with_sessions_test.go
bitswap_with_sessions_test.go
+2
-2
session/session.go
session/session.go
+82
-37
session/session_test.go
session/session_test.go
+128
-7
sessionmanager/sessionmanager.go
sessionmanager/sessionmanager.go
+7
-3
sessionmanager/sessionmanager_test.go
sessionmanager/sessionmanager_test.go
+19
-13
No files found.
bitswap.go
View file @
86089ee1
...
...
@@ -9,6 +9,7 @@ import (
"time"
bssrs
"github.com/ipfs/go-bitswap/sessionrequestsplitter"
delay
"github.com/ipfs/go-ipfs-delay"
decision
"github.com/ipfs/go-bitswap/decision"
bsgetter
"github.com/ipfs/go-bitswap/getter"
...
...
@@ -38,7 +39,8 @@ var _ exchange.SessionExchange = (*Bitswap)(nil)
const
(
// these requests take at _least_ two minutes at the moment.
provideTimeout
=
time
.
Minute
*
3
provideTimeout
=
time
.
Minute
*
3
defaultProvSearchDelay
=
time
.
Second
)
var
(
...
...
@@ -65,6 +67,20 @@ func ProvideEnabled(enabled bool) Option {
}
}
// ProviderSearchDelay overwrites the global provider search delay
func
ProviderSearchDelay
(
newProvSearchDelay
time
.
Duration
)
Option
{
return
func
(
bs
*
Bitswap
)
{
bs
.
provSearchDelay
=
newProvSearchDelay
}
}
// RebroadcastDelay overwrites the global provider rebroadcast delay
func
RebroadcastDelay
(
newRebroadcastDelay
delay
.
D
)
Option
{
return
func
(
bs
*
Bitswap
)
{
bs
.
rebroadcastDelay
=
newRebroadcastDelay
}
}
// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate. Runs until context is cancelled or bitswap.Close is called.
...
...
@@ -99,8 +115,10 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
wm
:=
bswm
.
New
(
ctx
,
bspm
.
New
(
ctx
,
peerQueueFactory
))
pqm
:=
bspqm
.
New
(
ctx
,
network
)
sessionFactory
:=
func
(
ctx
context
.
Context
,
id
uint64
,
pm
bssession
.
PeerManager
,
srs
bssession
.
RequestSplitter
)
bssm
.
Session
{
return
bssession
.
New
(
ctx
,
id
,
wm
,
pm
,
srs
)
sessionFactory
:=
func
(
ctx
context
.
Context
,
id
uint64
,
pm
bssession
.
PeerManager
,
srs
bssession
.
RequestSplitter
,
provSearchDelay
time
.
Duration
,
rebroadcastDelay
delay
.
D
)
bssm
.
Session
{
return
bssession
.
New
(
ctx
,
id
,
wm
,
pm
,
srs
,
provSearchDelay
,
rebroadcastDelay
)
}
sessionPeerManagerFactory
:=
func
(
ctx
context
.
Context
,
id
uint64
)
bssession
.
PeerManager
{
return
bsspm
.
New
(
ctx
,
id
,
network
.
ConnectionManager
(),
pqm
)
...
...
@@ -110,20 +128,22 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
}
bs
:=
&
Bitswap
{
blockstore
:
bstore
,
engine
:
decision
.
NewEngine
(
ctx
,
bstore
,
network
.
ConnectionManager
()),
// TODO close the engine with Close() method
network
:
network
,
process
:
px
,
newBlocks
:
make
(
chan
cid
.
Cid
,
HasBlockBufferSize
),
provideKeys
:
make
(
chan
cid
.
Cid
,
provideKeysBufferSize
),
wm
:
wm
,
pqm
:
pqm
,
sm
:
bssm
.
New
(
ctx
,
sessionFactory
,
sessionPeerManagerFactory
,
sessionRequestSplitterFactory
),
counters
:
new
(
counters
),
dupMetric
:
dupHist
,
allMetric
:
allHist
,
sentHistogram
:
sentHistogram
,
provideEnabled
:
true
,
blockstore
:
bstore
,
engine
:
decision
.
NewEngine
(
ctx
,
bstore
,
network
.
ConnectionManager
()),
// TODO close the engine with Close() method
network
:
network
,
process
:
px
,
newBlocks
:
make
(
chan
cid
.
Cid
,
HasBlockBufferSize
),
provideKeys
:
make
(
chan
cid
.
Cid
,
provideKeysBufferSize
),
wm
:
wm
,
pqm
:
pqm
,
sm
:
bssm
.
New
(
ctx
,
sessionFactory
,
sessionPeerManagerFactory
,
sessionRequestSplitterFactory
),
counters
:
new
(
counters
),
dupMetric
:
dupHist
,
allMetric
:
allHist
,
sentHistogram
:
sentHistogram
,
provideEnabled
:
true
,
provSearchDelay
:
defaultProvSearchDelay
,
rebroadcastDelay
:
delay
.
Fixed
(
time
.
Minute
),
}
// apply functional options before starting and running bitswap
...
...
@@ -190,6 +210,12 @@ type Bitswap struct {
// whether or not to make provide announcements
provideEnabled
bool
// how long to wait before looking for providers in a session
provSearchDelay
time
.
Duration
// how often to rebroadcast providing requests to find more optimized providers
rebroadcastDelay
delay
.
D
}
type
counters
struct
{
...
...
@@ -232,7 +258,7 @@ func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func
(
bs
*
Bitswap
)
GetBlocks
(
ctx
context
.
Context
,
keys
[]
cid
.
Cid
)
(
<-
chan
blocks
.
Block
,
error
)
{
session
:=
bs
.
sm
.
NewSession
(
ctx
)
session
:=
bs
.
sm
.
NewSession
(
ctx
,
bs
.
provSearchDelay
,
bs
.
rebroadcastDelay
)
return
session
.
GetBlocks
(
ctx
,
keys
)
}
...
...
@@ -398,5 +424,5 @@ func (bs *Bitswap) IsOnline() bool {
// be more efficient in its requests to peers. If you are using a session
// from go-blockservice, it will create a bitswap session automatically.
func
(
bs
*
Bitswap
)
NewSession
(
ctx
context
.
Context
)
exchange
.
Fetcher
{
return
bs
.
sm
.
NewSession
(
ctx
)
return
bs
.
sm
.
NewSession
(
ctx
,
bs
.
provSearchDelay
,
bs
.
rebroadcastDelay
)
}
bitswap_test.go
View file @
86089ee1
...
...
@@ -102,11 +102,9 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
}
func
TestDoesNotProvideWhenConfiguredNotTo
(
t
*
testing
.
T
)
{
bssession
.
SetProviderSearchDelay
(
50
*
time
.
Millisecond
)
defer
bssession
.
SetProviderSearchDelay
(
time
.
Second
)
net
:=
tn
.
VirtualNetwork
(
mockrouting
.
NewServer
(),
delay
.
Fixed
(
kNetworkDelay
))
block
:=
blocks
.
NewBlock
([]
byte
(
"block"
))
ig
:=
testinstance
.
NewTestInstanceGenerator
(
net
,
bitswap
.
ProvideEnabled
(
false
))
ig
:=
testinstance
.
NewTestInstanceGenerator
(
net
,
bitswap
.
ProvideEnabled
(
false
)
,
bitswap
.
ProviderSearchDelay
(
50
*
time
.
Millisecond
)
)
defer
ig
.
Close
()
hasBlock
:=
ig
.
Next
()
...
...
bitswap_with_sessions_test.go
View file @
86089ee1
...
...
@@ -6,6 +6,7 @@ import (
"testing"
"time"
bitswap
"github.com/ipfs/go-bitswap"
bssession
"github.com/ipfs/go-bitswap/session"
testinstance
"github.com/ipfs/go-bitswap/testinstance"
blocks
"github.com/ipfs/go-block-format"
...
...
@@ -161,9 +162,8 @@ func TestFetchNotConnected(t *testing.T) {
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
2
*
time
.
Second
)
defer
cancel
()
bssession
.
SetProviderSearchDelay
(
10
*
time
.
Millisecond
)
vnet
:=
getVirtualNetwork
()
ig
:=
testinstance
.
NewTestInstanceGenerator
(
vnet
)
ig
:=
testinstance
.
NewTestInstanceGenerator
(
vnet
,
bitswap
.
ProviderSearchDelay
(
10
*
time
.
Millisecond
)
)
defer
ig
.
Close
()
bgen
:=
blocksutil
.
NewBlockGenerator
()
...
...
session/session.go
View file @
86089ee1
...
...
@@ -2,6 +2,7 @@ package session
import
(
"context"
"math/rand"
"time"
lru
"github.com/hashicorp/golang-lru"
...
...
@@ -9,6 +10,7 @@ import (
notifications
"github.com/ipfs/go-bitswap/notifications"
blocks
"github.com/ipfs/go-block-format"
cid
"github.com/ipfs/go-cid"
delay
"github.com/ipfs/go-ipfs-delay"
logging
"github.com/ipfs/go-log"
peer
"github.com/libp2p/go-libp2p-core/peer"
loggables
"github.com/libp2p/go-libp2p-loggables"
...
...
@@ -75,14 +77,18 @@ type Session struct {
tickDelayReqs
chan
time
.
Duration
// do not touch outside run loop
tofetch
*
cidQueue
interest
*
lru
.
Cache
pastWants
*
cidQueue
liveWants
map
[
cid
.
Cid
]
time
.
Time
tick
*
time
.
Timer
baseTickDelay
time
.
Duration
latTotal
time
.
Duration
fetchcnt
int
tofetch
*
cidQueue
interest
*
lru
.
Cache
pastWants
*
cidQueue
liveWants
map
[
cid
.
Cid
]
time
.
Time
tick
*
time
.
Timer
rebroadcast
*
time
.
Timer
baseTickDelay
time
.
Duration
latTotal
time
.
Duration
fetchcnt
int
consecutiveTicks
int
provSearchDelay
time
.
Duration
rebroadcastDelay
delay
.
D
// identifiers
notif
notifications
.
PubSub
uuid
logging
.
Loggable
...
...
@@ -91,25 +97,33 @@ type Session struct {
// New creates a new bitswap session whose lifetime is bounded by the
// given context.
func
New
(
ctx
context
.
Context
,
id
uint64
,
wm
WantManager
,
pm
PeerManager
,
srs
RequestSplitter
)
*
Session
{
func
New
(
ctx
context
.
Context
,
id
uint64
,
wm
WantManager
,
pm
PeerManager
,
srs
RequestSplitter
,
provSearchDelay
time
.
Duration
,
rebroadcastDelay
delay
.
D
)
*
Session
{
s
:=
&
Session
{
liveWants
:
make
(
map
[
cid
.
Cid
]
time
.
Time
),
newReqs
:
make
(
chan
[]
cid
.
Cid
),
cancelKeys
:
make
(
chan
[]
cid
.
Cid
),
tofetch
:
newCidQueue
(),
pastWants
:
newCidQueue
(),
interestReqs
:
make
(
chan
interestReq
),
latencyReqs
:
make
(
chan
chan
time
.
Duration
),
tickDelayReqs
:
make
(
chan
time
.
Duration
),
ctx
:
ctx
,
wm
:
wm
,
pm
:
pm
,
srs
:
srs
,
incoming
:
make
(
chan
blkRecv
),
notif
:
notifications
.
New
(),
uuid
:
loggables
.
Uuid
(
"GetBlockRequest"
),
baseTickDelay
:
time
.
Millisecond
*
500
,
id
:
id
,
liveWants
:
make
(
map
[
cid
.
Cid
]
time
.
Time
),
newReqs
:
make
(
chan
[]
cid
.
Cid
),
cancelKeys
:
make
(
chan
[]
cid
.
Cid
),
tofetch
:
newCidQueue
(),
pastWants
:
newCidQueue
(),
interestReqs
:
make
(
chan
interestReq
),
latencyReqs
:
make
(
chan
chan
time
.
Duration
),
tickDelayReqs
:
make
(
chan
time
.
Duration
),
ctx
:
ctx
,
wm
:
wm
,
pm
:
pm
,
srs
:
srs
,
incoming
:
make
(
chan
blkRecv
),
notif
:
notifications
.
New
(),
uuid
:
loggables
.
Uuid
(
"GetBlockRequest"
),
baseTickDelay
:
time
.
Millisecond
*
500
,
id
:
id
,
provSearchDelay
:
provSearchDelay
,
rebroadcastDelay
:
rebroadcastDelay
,
}
cache
,
_
:=
lru
.
New
(
2048
)
...
...
@@ -222,17 +236,11 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
}
}
var
provSearchDelay
=
time
.
Second
// SetProviderSearchDelay overwrites the global provider search delay
func
SetProviderSearchDelay
(
newProvSearchDelay
time
.
Duration
)
{
provSearchDelay
=
newProvSearchDelay
}
// Session run loop -- everything function below here should not be called
// of this loop
func
(
s
*
Session
)
run
(
ctx
context
.
Context
)
{
s
.
tick
=
time
.
NewTimer
(
provSearchDelay
)
s
.
tick
=
time
.
NewTimer
(
s
.
provSearchDelay
)
s
.
rebroadcast
=
time
.
NewTimer
(
s
.
rebroadcastDelay
.
Get
())
for
{
select
{
case
blk
:=
<-
s
.
incoming
:
...
...
@@ -247,6 +255,8 @@ func (s *Session) run(ctx context.Context) {
s
.
handleCancel
(
keys
)
case
<-
s
.
tick
.
C
:
s
.
handleTick
(
ctx
)
case
<-
s
.
rebroadcast
.
C
:
s
.
handleRebroadcast
(
ctx
)
case
lwchk
:=
<-
s
.
interestReqs
:
lwchk
.
resp
<-
s
.
cidIsWanted
(
lwchk
.
c
)
case
resp
:=
<-
s
.
latencyReqs
:
...
...
@@ -310,12 +320,42 @@ func (s *Session) handleTick(ctx context.Context) {
s
.
pm
.
RecordPeerRequests
(
nil
,
live
)
s
.
wm
.
WantBlocks
(
ctx
,
live
,
nil
,
s
.
id
)
if
len
(
live
)
>
0
{
// do no find providers on consecutive ticks
// -- just rely on periodic rebroadcast
if
len
(
live
)
>
0
&&
(
s
.
consecutiveTicks
==
0
)
{
s
.
pm
.
FindMorePeers
(
ctx
,
live
[
0
])
}
s
.
resetTick
()
if
len
(
s
.
liveWants
)
>
0
{
s
.
consecutiveTicks
++
}
}
func
(
s
*
Session
)
handleRebroadcast
(
ctx
context
.
Context
)
{
if
len
(
s
.
liveWants
)
==
0
{
return
}
// TODO: come up with a better strategy for determining when to search
// for new providers for blocks.
s
.
pm
.
FindMorePeers
(
ctx
,
s
.
randomLiveWant
())
s
.
rebroadcast
.
Reset
(
s
.
rebroadcastDelay
.
Get
())
}
func
(
s
*
Session
)
randomLiveWant
()
cid
.
Cid
{
i
:=
rand
.
Intn
(
len
(
s
.
liveWants
))
// picking a random live want
for
k
:=
range
s
.
liveWants
{
if
i
==
0
{
return
k
}
i
--
}
return
cid
.
Cid
{}
}
func
(
s
*
Session
)
handleShutdown
()
{
s
.
tick
.
Stop
()
s
.
notif
.
Shutdown
()
...
...
@@ -347,6 +387,8 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
s
.
tofetch
.
Remove
(
c
)
}
s
.
fetchcnt
++
// we've received new wanted blocks, so future ticks are not consecutive
s
.
consecutiveTicks
=
0
s
.
notif
.
Publish
(
blk
)
toAdd
:=
s
.
wantBudget
()
...
...
@@ -395,12 +437,15 @@ func (s *Session) averageLatency() time.Duration {
}
func
(
s
*
Session
)
resetTick
()
{
var
tickDelay
time
.
Duration
if
s
.
latTotal
==
0
{
s
.
tick
.
Reset
(
provSearchDelay
)
tick
Delay
=
s
.
provSearchDelay
}
else
{
avLat
:=
s
.
averageLatency
()
s
.
tick
.
Reset
(
s
.
baseTickDelay
+
(
3
*
avLat
)
)
tick
Delay
=
s
.
baseTickDelay
+
(
3
*
avLat
)
}
tickDelay
=
tickDelay
*
time
.
Duration
(
1
+
s
.
consecutiveTicks
)
s
.
tick
.
Reset
(
tickDelay
)
}
func
(
s
*
Session
)
wantBudget
()
int
{
...
...
session/session_test.go
View file @
86089ee1
...
...
@@ -6,12 +6,12 @@ import (
"testing"
"time"
"github.com/ipfs/go-block-format"
bssrs
"github.com/ipfs/go-bitswap/sessionrequestsplitter"
"github.com/ipfs/go-bitswap/testutil"
blocks
"github.com/ipfs/go-block-format"
cid
"github.com/ipfs/go-cid"
blocksutil
"github.com/ipfs/go-ipfs-blocksutil"
delay
"github.com/ipfs/go-ipfs-delay"
peer
"github.com/libp2p/go-libp2p-core/peer"
)
...
...
@@ -42,12 +42,12 @@ func (fwm *fakeWantManager) CancelWants(ctx context.Context, cids []cid.Cid, pee
type
fakePeerManager
struct
{
lk
sync
.
RWMutex
peers
[]
peer
.
ID
findMorePeersRequested
chan
struct
{}
findMorePeersRequested
chan
cid
.
Cid
}
func
(
fpm
*
fakePeerManager
)
FindMorePeers
(
ctx
context
.
Context
,
k
cid
.
Cid
)
{
select
{
case
fpm
.
findMorePeersRequested
<-
struct
{}{}
:
case
fpm
.
findMorePeersRequested
<-
k
:
case
<-
ctx
.
Done
()
:
}
}
...
...
@@ -84,7 +84,7 @@ func TestSessionGetBlocks(t *testing.T) {
fpm
:=
&
fakePeerManager
{}
frs
:=
&
fakeRequestSplitter
{}
id
:=
testutil
.
GenerateSessionID
()
session
:=
New
(
ctx
,
id
,
fwm
,
fpm
,
frs
)
session
:=
New
(
ctx
,
id
,
fwm
,
fpm
,
frs
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
)
)
blockGenerator
:=
blocksutil
.
NewBlockGenerator
()
blks
:=
blockGenerator
.
Blocks
(
broadcastLiveWantsLimit
*
2
)
var
cids
[]
cid
.
Cid
...
...
@@ -193,10 +193,10 @@ func TestSessionFindMorePeers(t *testing.T) {
wantReqs
:=
make
(
chan
wantReq
,
1
)
cancelReqs
:=
make
(
chan
wantReq
,
1
)
fwm
:=
&
fakeWantManager
{
wantReqs
,
cancelReqs
}
fpm
:=
&
fakePeerManager
{
findMorePeersRequested
:
make
(
chan
struct
{}
,
1
)}
fpm
:=
&
fakePeerManager
{
findMorePeersRequested
:
make
(
chan
cid
.
Cid
,
1
)}
frs
:=
&
fakeRequestSplitter
{}
id
:=
testutil
.
GenerateSessionID
()
session
:=
New
(
ctx
,
id
,
fwm
,
fpm
,
frs
)
session
:=
New
(
ctx
,
id
,
fwm
,
fpm
,
frs
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
)
)
session
.
SetBaseTickDelay
(
200
*
time
.
Microsecond
)
blockGenerator
:=
blocksutil
.
NewBlockGenerator
()
blks
:=
blockGenerator
.
Blocks
(
broadcastLiveWantsLimit
*
2
)
...
...
@@ -258,3 +258,124 @@ func TestSessionFindMorePeers(t *testing.T) {
t
.
Fatal
(
"Did not find more peers"
)
}
}
func
TestSessionFailingToGetFirstBlock
(
t
*
testing
.
T
)
{
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
2
*
time
.
Second
)
defer
cancel
()
wantReqs
:=
make
(
chan
wantReq
,
1
)
cancelReqs
:=
make
(
chan
wantReq
,
1
)
fwm
:=
&
fakeWantManager
{
wantReqs
,
cancelReqs
}
fpm
:=
&
fakePeerManager
{
findMorePeersRequested
:
make
(
chan
cid
.
Cid
,
1
)}
frs
:=
&
fakeRequestSplitter
{}
id
:=
testutil
.
GenerateSessionID
()
session
:=
New
(
ctx
,
id
,
fwm
,
fpm
,
frs
,
10
*
time
.
Millisecond
,
delay
.
Fixed
(
100
*
time
.
Millisecond
))
blockGenerator
:=
blocksutil
.
NewBlockGenerator
()
blks
:=
blockGenerator
.
Blocks
(
4
)
var
cids
[]
cid
.
Cid
for
_
,
block
:=
range
blks
{
cids
=
append
(
cids
,
block
.
Cid
())
}
startTick
:=
time
.
Now
()
_
,
err
:=
session
.
GetBlocks
(
ctx
,
cids
)
if
err
!=
nil
{
t
.
Fatal
(
"error getting blocks"
)
}
// clear the initial block of wants
select
{
case
<-
wantReqs
:
case
<-
ctx
.
Done
()
:
t
.
Fatal
(
"Did not make first want request "
)
}
// verify a broadcast is made
select
{
case
receivedWantReq
:=
<-
wantReqs
:
if
len
(
receivedWantReq
.
cids
)
<
len
(
cids
)
{
t
.
Fatal
(
"did not rebroadcast whole live list"
)
}
if
receivedWantReq
.
peers
!=
nil
{
t
.
Fatal
(
"did not make a broadcast"
)
}
case
<-
ctx
.
Done
()
:
t
.
Fatal
(
"Never rebroadcast want list"
)
}
// wait for a request to get more peers to occur
select
{
case
k
:=
<-
fpm
.
findMorePeersRequested
:
if
testutil
.
IndexOf
(
blks
,
k
)
==
-
1
{
t
.
Fatal
(
"did not rebroadcast an active want"
)
}
case
<-
ctx
.
Done
()
:
t
.
Fatal
(
"Did not find more peers"
)
}
firstTickLength
:=
time
.
Since
(
startTick
)
// wait for another broadcast to occur
select
{
case
receivedWantReq
:=
<-
wantReqs
:
if
len
(
receivedWantReq
.
cids
)
<
len
(
cids
)
{
t
.
Fatal
(
"did not rebroadcast whole live list"
)
}
if
receivedWantReq
.
peers
!=
nil
{
t
.
Fatal
(
"did not make a broadcast"
)
}
case
<-
ctx
.
Done
()
:
t
.
Fatal
(
"Never rebroadcast want list"
)
}
startTick
=
time
.
Now
()
// wait for another broadcast to occur
select
{
case
receivedWantReq
:=
<-
wantReqs
:
if
len
(
receivedWantReq
.
cids
)
<
len
(
cids
)
{
t
.
Fatal
(
"did not rebroadcast whole live list"
)
}
if
receivedWantReq
.
peers
!=
nil
{
t
.
Fatal
(
"did not make a broadcast"
)
}
case
<-
ctx
.
Done
()
:
t
.
Fatal
(
"Never rebroadcast want list"
)
}
consecutiveTickLength
:=
time
.
Since
(
startTick
)
// tick should take longer
if
firstTickLength
>
consecutiveTickLength
{
t
.
Fatal
(
"Should have increased tick length after first consecutive tick"
)
}
startTick
=
time
.
Now
()
// wait for another broadcast to occur
select
{
case
receivedWantReq
:=
<-
wantReqs
:
if
len
(
receivedWantReq
.
cids
)
<
len
(
cids
)
{
t
.
Fatal
(
"did not rebroadcast whole live list"
)
}
if
receivedWantReq
.
peers
!=
nil
{
t
.
Fatal
(
"did not make a broadcast"
)
}
case
<-
ctx
.
Done
()
:
t
.
Fatal
(
"Never rebroadcast want list"
)
}
secondConsecutiveTickLength
:=
time
.
Since
(
startTick
)
// tick should take longer
if
consecutiveTickLength
>
secondConsecutiveTickLength
{
t
.
Fatal
(
"Should have increased tick length after first consecutive tick"
)
}
// should not have looked for peers on consecutive ticks
select
{
case
<-
fpm
.
findMorePeersRequested
:
t
.
Fatal
(
"Should not have looked for peers on consecutive tick"
)
default
:
}
// wait for rebroadcast to occur
select
{
case
k
:=
<-
fpm
.
findMorePeersRequested
:
if
testutil
.
IndexOf
(
blks
,
k
)
==
-
1
{
t
.
Fatal
(
"did not rebroadcast an active want"
)
}
case
<-
ctx
.
Done
()
:
t
.
Fatal
(
"Did not rebroadcast to find more peers"
)
}
}
sessionmanager/sessionmanager.go
View file @
86089ee1
...
...
@@ -3,9 +3,11 @@ package sessionmanager
import
(
"context"
"sync"
"time"
blocks
"github.com/ipfs/go-block-format"
cid
"github.com/ipfs/go-cid"
delay
"github.com/ipfs/go-ipfs-delay"
bssession
"github.com/ipfs/go-bitswap/session"
exchange
"github.com/ipfs/go-ipfs-exchange-interface"
...
...
@@ -27,7 +29,7 @@ type sesTrk struct {
}
// SessionFactory generates a new session for the SessionManager to track.
type
SessionFactory
func
(
ctx
context
.
Context
,
id
uint64
,
pm
bssession
.
PeerManager
,
srs
bssession
.
RequestSplitter
)
Session
type
SessionFactory
func
(
ctx
context
.
Context
,
id
uint64
,
pm
bssession
.
PeerManager
,
srs
bssession
.
RequestSplitter
,
provSearchDelay
time
.
Duration
,
rebroadcastDelay
delay
.
D
)
Session
// RequestSplitterFactory generates a new request splitter for a session.
type
RequestSplitterFactory
func
(
ctx
context
.
Context
)
bssession
.
RequestSplitter
...
...
@@ -64,13 +66,15 @@ func New(ctx context.Context, sessionFactory SessionFactory, peerManagerFactory
// NewSession initializes a session with the given context, and adds to the
// session manager.
func
(
sm
*
SessionManager
)
NewSession
(
ctx
context
.
Context
)
exchange
.
Fetcher
{
func
(
sm
*
SessionManager
)
NewSession
(
ctx
context
.
Context
,
provSearchDelay
time
.
Duration
,
rebroadcastDelay
delay
.
D
)
exchange
.
Fetcher
{
id
:=
sm
.
GetNextSessionID
()
sessionctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
pm
:=
sm
.
peerManagerFactory
(
sessionctx
,
id
)
srs
:=
sm
.
requestSplitterFactory
(
sessionctx
)
session
:=
sm
.
sessionFactory
(
sessionctx
,
id
,
pm
,
srs
)
session
:=
sm
.
sessionFactory
(
sessionctx
,
id
,
pm
,
srs
,
provSearchDelay
,
rebroadcastDelay
)
tracked
:=
sesTrk
{
session
,
pm
,
srs
}
sm
.
sessLk
.
Lock
()
sm
.
sessions
=
append
(
sm
.
sessions
,
tracked
)
...
...
sessionmanager/sessionmanager_test.go
View file @
86089ee1
...
...
@@ -6,6 +6,7 @@ import (
"time"
bssrs
"github.com/ipfs/go-bitswap/sessionrequestsplitter"
delay
"github.com/ipfs/go-ipfs-delay"
bssession
"github.com/ipfs/go-bitswap/session"
...
...
@@ -53,7 +54,12 @@ func (frs *fakeRequestSplitter) RecordUniqueBlock() {}
var
nextInterestedIn
bool
func
sessionFactory
(
ctx
context
.
Context
,
id
uint64
,
pm
bssession
.
PeerManager
,
srs
bssession
.
RequestSplitter
)
Session
{
func
sessionFactory
(
ctx
context
.
Context
,
id
uint64
,
pm
bssession
.
PeerManager
,
srs
bssession
.
RequestSplitter
,
provSearchDelay
time
.
Duration
,
rebroadcastDelay
delay
.
D
)
Session
{
return
&
fakeSession
{
interested
:
nextInterestedIn
,
receivedBlock
:
false
,
...
...
@@ -83,18 +89,18 @@ func TestAddingSessions(t *testing.T) {
nextInterestedIn
=
true
currentID
:=
sm
.
GetNextSessionID
()
firstSession
:=
sm
.
NewSession
(
ctx
)
.
(
*
fakeSession
)
firstSession
:=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
)
)
.
(
*
fakeSession
)
if
firstSession
.
id
!=
firstSession
.
pm
.
id
||
firstSession
.
id
!=
currentID
+
1
{
t
.
Fatal
(
"session does not have correct id set"
)
}
secondSession
:=
sm
.
NewSession
(
ctx
)
.
(
*
fakeSession
)
secondSession
:=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
)
)
.
(
*
fakeSession
)
if
secondSession
.
id
!=
secondSession
.
pm
.
id
||
secondSession
.
id
!=
firstSession
.
id
+
1
{
t
.
Fatal
(
"session does not have correct id set"
)
}
sm
.
GetNextSessionID
()
thirdSession
:=
sm
.
NewSession
(
ctx
)
.
(
*
fakeSession
)
thirdSession
:=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
)
)
.
(
*
fakeSession
)
if
thirdSession
.
id
!=
thirdSession
.
pm
.
id
||
thirdSession
.
id
!=
secondSession
.
id
+
2
{
t
.
Fatal
(
"session does not have correct id set"
)
...
...
@@ -117,11 +123,11 @@ func TestReceivingBlocksWhenNotInterested(t *testing.T) {
block
:=
blocks
.
NewBlock
([]
byte
(
"block"
))
// we'll be interested in all blocks for this test
nextInterestedIn
=
false
firstSession
:=
sm
.
NewSession
(
ctx
)
.
(
*
fakeSession
)
firstSession
:=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
)
)
.
(
*
fakeSession
)
nextInterestedIn
=
true
secondSession
:=
sm
.
NewSession
(
ctx
)
.
(
*
fakeSession
)
secondSession
:=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
)
)
.
(
*
fakeSession
)
nextInterestedIn
=
false
thirdSession
:=
sm
.
NewSession
(
ctx
)
.
(
*
fakeSession
)
thirdSession
:=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
)
)
.
(
*
fakeSession
)
sm
.
ReceiveBlockFrom
(
p
,
block
)
if
firstSession
.
receivedBlock
||
...
...
@@ -140,9 +146,9 @@ func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) {
block
:=
blocks
.
NewBlock
([]
byte
(
"block"
))
// we'll be interested in all blocks for this test
nextInterestedIn
=
true
firstSession
:=
sm
.
NewSession
(
ctx
)
.
(
*
fakeSession
)
secondSession
:=
sm
.
NewSession
(
ctx
)
.
(
*
fakeSession
)
thirdSession
:=
sm
.
NewSession
(
ctx
)
.
(
*
fakeSession
)
firstSession
:=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
)
)
.
(
*
fakeSession
)
secondSession
:=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
)
)
.
(
*
fakeSession
)
thirdSession
:=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
)
)
.
(
*
fakeSession
)
cancel
()
// wait for sessions to get removed
...
...
@@ -165,10 +171,10 @@ func TestRemovingPeersWhenSessionContextCancelled(t *testing.T) {
block
:=
blocks
.
NewBlock
([]
byte
(
"block"
))
// we'll be interested in all blocks for this test
nextInterestedIn
=
true
firstSession
:=
sm
.
NewSession
(
ctx
)
.
(
*
fakeSession
)
firstSession
:=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
)
)
.
(
*
fakeSession
)
sessionCtx
,
sessionCancel
:=
context
.
WithCancel
(
ctx
)
secondSession
:=
sm
.
NewSession
(
sessionCtx
)
.
(
*
fakeSession
)
thirdSession
:=
sm
.
NewSession
(
ctx
)
.
(
*
fakeSession
)
secondSession
:=
sm
.
NewSession
(
sessionCtx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
)
)
.
(
*
fakeSession
)
thirdSession
:=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
)
)
.
(
*
fakeSession
)
sessionCancel
()
// wait for sessions to get removed
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment