Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-bitswap
Commits
e9661edc
Commit
e9661edc
authored
Aug 22, 2019
by
Dirk McCormick
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
refactor: session want management
parent
7458eb8f
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
390 additions
and
195 deletions
+390
-195
session/session.go
session/session.go
+18
-153
session/session_test.go
session/session_test.go
+0
-6
session/sessionwants.go
session/sessionwants.go
+190
-0
session/sessionwants_test.go
session/sessionwants_test.go
+152
-0
sessionmanager/sessionmanager.go
sessionmanager/sessionmanager.go
+8
-11
sessionmanager/sessionmanager_test.go
sessionmanager/sessionmanager_test.go
+22
-25
No files found.
session/session.go
View file @
e9661edc
...
...
@@ -2,8 +2,6 @@ package session
import
(
"context"
"math/rand"
"sync"
"time"
bsgetter
"github.com/ipfs/go-bitswap/getter"
...
...
@@ -52,13 +50,6 @@ type rcvFrom struct {
ks
[]
cid
.
Cid
}
type
sessionWants
struct
{
sync
.
RWMutex
toFetch
*
cidQueue
liveWants
map
[
cid
.
Cid
]
time
.
Time
pastWants
*
cid
.
Set
}
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
// info to, and who to request blocks from.
...
...
@@ -133,26 +124,20 @@ func New(ctx context.Context,
// ReceiveFrom receives incoming blocks from the given peer.
func
(
s
*
Session
)
ReceiveFrom
(
from
peer
.
ID
,
ks
[]
cid
.
Cid
)
{
interested
:=
s
.
sw
.
FilterInteresting
(
ks
)
if
len
(
interested
)
==
0
{
return
}
select
{
case
s
.
incoming
<-
rcvFrom
{
from
:
from
,
ks
:
ks
}
:
case
s
.
incoming
<-
rcvFrom
{
from
:
from
,
ks
:
interested
}
:
case
<-
s
.
ctx
.
Done
()
:
}
}
// IsWanted returns true if this session is waiting to receive the given Cid.
func
(
s
*
Session
)
IsWanted
(
c
cid
.
Cid
)
bool
{
s
.
sw
.
RLock
()
defer
s
.
sw
.
RUnlock
()
return
s
.
unlockedIsWanted
(
c
)
}
// InterestedIn returns true if this session has ever requested the given Cid.
func
(
s
*
Session
)
InterestedIn
(
c
cid
.
Cid
)
bool
{
s
.
sw
.
RLock
()
defer
s
.
sw
.
RUnlock
()
return
s
.
unlockedIsWanted
(
c
)
||
s
.
sw
.
pastWants
.
Has
(
c
)
return
s
.
sw
.
IsWanted
(
c
)
}
// GetBlock fetches a single block.
...
...
@@ -220,7 +205,7 @@ func (s *Session) run(ctx context.Context) {
case
keys
:=
<-
s
.
newReqs
:
s
.
wantBlocks
(
ctx
,
keys
)
case
keys
:=
<-
s
.
cancelKeys
:
s
.
handleCancel
(
keys
)
s
.
sw
.
CancelPending
(
keys
)
case
<-
s
.
idleTick
.
C
:
s
.
handleIdleTick
(
ctx
)
case
<-
s
.
periodicSearchTimer
.
C
:
...
...
@@ -236,17 +221,8 @@ func (s *Session) run(ctx context.Context) {
}
}
func
(
s
*
Session
)
handleCancel
(
keys
[]
cid
.
Cid
)
{
s
.
sw
.
Lock
()
defer
s
.
sw
.
Unlock
()
for
_
,
k
:=
range
keys
{
s
.
sw
.
toFetch
.
Remove
(
k
)
}
}
func
(
s
*
Session
)
handleIdleTick
(
ctx
context
.
Context
)
{
live
:=
s
.
p
repareBroadcast
()
live
:=
s
.
sw
.
P
repareBroadcast
()
// Broadcast these keys to everyone we're connected to
s
.
pm
.
RecordPeerRequests
(
nil
,
live
)
...
...
@@ -259,29 +235,13 @@ func (s *Session) handleIdleTick(ctx context.Context) {
}
s
.
resetIdleTick
()
s
.
sw
.
RLock
()
defer
s
.
sw
.
RUnlock
()
if
len
(
s
.
sw
.
liveWants
)
>
0
{
if
s
.
sw
.
HasLiveWants
()
{
s
.
consecutiveTicks
++
}
}
func
(
s
*
Session
)
prepareBroadcast
()
[]
cid
.
Cid
{
s
.
sw
.
Lock
()
defer
s
.
sw
.
Unlock
()
live
:=
make
([]
cid
.
Cid
,
0
,
len
(
s
.
sw
.
liveWants
))
now
:=
time
.
Now
()
for
c
:=
range
s
.
sw
.
liveWants
{
live
=
append
(
live
,
c
)
s
.
sw
.
liveWants
[
c
]
=
now
}
return
live
}
func
(
s
*
Session
)
handlePeriodicSearch
(
ctx
context
.
Context
)
{
randomWant
:=
s
.
r
andomLiveWant
()
randomWant
:=
s
.
sw
.
R
andomLiveWant
()
if
!
randomWant
.
Defined
()
{
return
}
...
...
@@ -294,50 +254,13 @@ func (s *Session) handlePeriodicSearch(ctx context.Context) {
s
.
periodicSearchTimer
.
Reset
(
s
.
periodicSearchDelay
.
NextWaitTime
())
}
func
(
s
*
Session
)
randomLiveWant
()
cid
.
Cid
{
s
.
sw
.
RLock
()
defer
s
.
sw
.
RUnlock
()
if
len
(
s
.
sw
.
liveWants
)
==
0
{
return
cid
.
Cid
{}
}
i
:=
rand
.
Intn
(
len
(
s
.
sw
.
liveWants
))
// picking a random live want
for
k
:=
range
s
.
sw
.
liveWants
{
if
i
==
0
{
return
k
}
i
--
}
return
cid
.
Cid
{}
}
func
(
s
*
Session
)
handleShutdown
()
{
s
.
idleTick
.
Stop
()
live
:=
s
.
l
iveWants
()
live
:=
s
.
sw
.
L
iveWants
()
s
.
wm
.
CancelWants
(
s
.
ctx
,
live
,
nil
,
s
.
id
)
}
func
(
s
*
Session
)
liveWants
()
[]
cid
.
Cid
{
s
.
sw
.
RLock
()
defer
s
.
sw
.
RUnlock
()
live
:=
make
([]
cid
.
Cid
,
0
,
len
(
s
.
sw
.
liveWants
))
for
c
:=
range
s
.
sw
.
liveWants
{
live
=
append
(
live
,
c
)
}
return
live
}
func
(
s
*
Session
)
unlockedIsWanted
(
c
cid
.
Cid
)
bool
{
_
,
ok
:=
s
.
sw
.
liveWants
[
c
]
if
!
ok
{
ok
=
s
.
sw
.
toFetch
.
Has
(
c
)
}
return
ok
}
func
(
s
*
Session
)
handleIncoming
(
ctx
context
.
Context
,
rcv
rcvFrom
)
{
// Record statistics only if the blocks came from the network
// (blocks can also be received from the local node)
...
...
@@ -346,7 +269,7 @@ func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) {
}
// Update the want list
wanted
,
totalLatency
:=
s
.
b
locksReceived
(
rcv
.
ks
)
wanted
,
totalLatency
:=
s
.
sw
.
B
locksReceived
(
rcv
.
ks
)
if
len
(
wanted
)
==
0
{
return
}
...
...
@@ -363,17 +286,8 @@ func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) {
}
func
(
s
*
Session
)
updateReceiveCounters
(
ctx
context
.
Context
,
rcv
rcvFrom
)
{
s
.
sw
.
RLock
()
for
_
,
c
:=
range
rcv
.
ks
{
if
s
.
unlockedIsWanted
(
c
)
{
s
.
srs
.
RecordUniqueBlock
()
}
else
if
s
.
sw
.
pastWants
.
Has
(
c
)
{
s
.
srs
.
RecordDuplicateBlock
()
}
}
s
.
sw
.
RUnlock
()
// Record unique vs duplicate blocks
s
.
sw
.
ForEachUniqDup
(
rcv
.
ks
,
s
.
srs
.
RecordUniqueBlock
,
s
.
srs
.
RecordDuplicateBlock
)
// Record response (to be able to time latency)
if
len
(
rcv
.
ks
)
>
0
{
...
...
@@ -381,34 +295,6 @@ func (s *Session) updateReceiveCounters(ctx context.Context, rcv rcvFrom) {
}
}
func
(
s
*
Session
)
blocksReceived
(
cids
[]
cid
.
Cid
)
([]
cid
.
Cid
,
time
.
Duration
)
{
s
.
sw
.
Lock
()
defer
s
.
sw
.
Unlock
()
totalLatency
:=
time
.
Duration
(
0
)
wanted
:=
make
([]
cid
.
Cid
,
0
,
len
(
cids
))
for
_
,
c
:=
range
cids
{
if
s
.
unlockedIsWanted
(
c
)
{
wanted
=
append
(
wanted
,
c
)
// If the block CID was in the live wants queue, remove it
tval
,
ok
:=
s
.
sw
.
liveWants
[
c
]
if
ok
{
totalLatency
+=
time
.
Since
(
tval
)
delete
(
s
.
sw
.
liveWants
,
c
)
}
else
{
// Otherwise remove it from the toFetch queue, if it was there
s
.
sw
.
toFetch
.
Remove
(
c
)
}
// Keep track of CIDs we've successfully fetched
s
.
sw
.
pastWants
.
Add
(
c
)
}
}
return
wanted
,
totalLatency
}
func
(
s
*
Session
)
cancelIncoming
(
ctx
context
.
Context
,
ks
[]
cid
.
Cid
)
{
s
.
pm
.
RecordCancels
(
ks
)
s
.
wm
.
CancelWants
(
s
.
ctx
,
ks
,
nil
,
s
.
id
)
...
...
@@ -427,7 +313,9 @@ func (s *Session) processIncoming(ctx context.Context, ks []cid.Cid, totalLatenc
}
func
(
s
*
Session
)
wantBlocks
(
ctx
context
.
Context
,
newks
[]
cid
.
Cid
)
{
ks
:=
s
.
getNextWants
(
s
.
wantLimit
(),
newks
)
// Given the want limit and any newly received blocks, get as many wants as
// we can to send out
ks
:=
s
.
sw
.
GetNextWants
(
s
.
wantLimit
(),
newks
)
if
len
(
ks
)
==
0
{
return
}
...
...
@@ -445,29 +333,6 @@ func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
}
}
func
(
s
*
Session
)
getNextWants
(
limit
int
,
newWants
[]
cid
.
Cid
)
[]
cid
.
Cid
{
s
.
sw
.
Lock
()
defer
s
.
sw
.
Unlock
()
now
:=
time
.
Now
()
for
_
,
k
:=
range
newWants
{
s
.
sw
.
toFetch
.
Push
(
k
)
}
currentLiveCount
:=
len
(
s
.
sw
.
liveWants
)
toAdd
:=
limit
-
currentLiveCount
var
live
[]
cid
.
Cid
for
;
toAdd
>
0
&&
s
.
sw
.
toFetch
.
Len
()
>
0
;
toAdd
--
{
c
:=
s
.
sw
.
toFetch
.
Pop
()
live
=
append
(
live
,
c
)
s
.
sw
.
liveWants
[
c
]
=
now
}
return
live
}
func
(
s
*
Session
)
averageLatency
()
time
.
Duration
{
return
s
.
latTotal
/
time
.
Duration
(
s
.
fetchcnt
)
}
...
...
session/session_test.go
View file @
e9661edc
...
...
@@ -122,9 +122,6 @@ func TestSessionGetBlocks(t *testing.T) {
if
!
session
.
IsWanted
(
c
)
{
t
.
Fatal
(
"expected session to want cids"
)
}
if
!
session
.
InterestedIn
(
c
)
{
t
.
Fatal
(
"expected session to be interested in cids"
)
}
}
// now receive the first set of blocks
...
...
@@ -223,9 +220,6 @@ func TestSessionGetBlocks(t *testing.T) {
if
session
.
IsWanted
(
c
)
{
t
.
Fatal
(
"expected session NOT to want cids"
)
}
if
!
session
.
InterestedIn
(
c
)
{
t
.
Fatal
(
"expected session to still be interested in cids"
)
}
}
}
...
...
session/sessionwants.go
0 → 100644
View file @
e9661edc
package
session
import
(
"math/rand"
"sync"
"time"
cid
"github.com/ipfs/go-cid"
)
type
sessionWants
struct
{
sync
.
RWMutex
toFetch
*
cidQueue
liveWants
map
[
cid
.
Cid
]
time
.
Time
pastWants
*
cid
.
Set
}
// BlocksReceived moves received block CIDs from live to past wants and
// measures latency. It returns the CIDs of blocks that were actually wanted
// (as opposed to duplicates) and the total latency for all incoming blocks.
func
(
sw
*
sessionWants
)
BlocksReceived
(
cids
[]
cid
.
Cid
)
([]
cid
.
Cid
,
time
.
Duration
)
{
sw
.
Lock
()
defer
sw
.
Unlock
()
totalLatency
:=
time
.
Duration
(
0
)
wanted
:=
make
([]
cid
.
Cid
,
0
,
len
(
cids
))
for
_
,
c
:=
range
cids
{
if
sw
.
unlockedIsWanted
(
c
)
{
wanted
=
append
(
wanted
,
c
)
// If the block CID was in the live wants queue, remove it
tval
,
ok
:=
sw
.
liveWants
[
c
]
if
ok
{
totalLatency
+=
time
.
Since
(
tval
)
delete
(
sw
.
liveWants
,
c
)
}
else
{
// Otherwise remove it from the toFetch queue, if it was there
sw
.
toFetch
.
Remove
(
c
)
}
// Keep track of CIDs we've successfully fetched
sw
.
pastWants
.
Add
(
c
)
}
}
return
wanted
,
totalLatency
}
// GetNextWants adds any new wants to the list of CIDs to fetch, then moves as
// many CIDs from the fetch queue to the live wants list as possible (given the
// limit). Returns the newly live wants.
func
(
sw
*
sessionWants
)
GetNextWants
(
limit
int
,
newWants
[]
cid
.
Cid
)
[]
cid
.
Cid
{
now
:=
time
.
Now
()
sw
.
Lock
()
defer
sw
.
Unlock
()
// Add new wants to the fetch queue
for
_
,
k
:=
range
newWants
{
sw
.
toFetch
.
Push
(
k
)
}
// Move CIDs from fetch queue to the live wants queue (up to the limit)
currentLiveCount
:=
len
(
sw
.
liveWants
)
toAdd
:=
limit
-
currentLiveCount
var
live
[]
cid
.
Cid
for
;
toAdd
>
0
&&
sw
.
toFetch
.
Len
()
>
0
;
toAdd
--
{
c
:=
sw
.
toFetch
.
Pop
()
live
=
append
(
live
,
c
)
sw
.
liveWants
[
c
]
=
now
}
return
live
}
// PrepareBroadcast saves the current time for each live want and returns the
// live want CIDs.
func
(
sw
*
sessionWants
)
PrepareBroadcast
()
[]
cid
.
Cid
{
now
:=
time
.
Now
()
sw
.
Lock
()
defer
sw
.
Unlock
()
live
:=
make
([]
cid
.
Cid
,
0
,
len
(
sw
.
liveWants
))
for
c
:=
range
sw
.
liveWants
{
live
=
append
(
live
,
c
)
sw
.
liveWants
[
c
]
=
now
}
return
live
}
// CancelPending removes the given CIDs from the fetch queue.
func
(
sw
*
sessionWants
)
CancelPending
(
keys
[]
cid
.
Cid
)
{
sw
.
Lock
()
defer
sw
.
Unlock
()
for
_
,
k
:=
range
keys
{
sw
.
toFetch
.
Remove
(
k
)
}
}
// ForEachUniqDup iterates over each of the given CIDs and calls isUniqFn
// if the session is expecting a block for the CID, or isDupFn if the session
// has already received the block.
func
(
sw
*
sessionWants
)
ForEachUniqDup
(
ks
[]
cid
.
Cid
,
isUniqFn
,
isDupFn
func
())
{
sw
.
RLock
()
for
_
,
k
:=
range
ks
{
if
sw
.
unlockedIsWanted
(
k
)
{
isUniqFn
()
}
else
if
sw
.
pastWants
.
Has
(
k
)
{
isDupFn
()
}
}
sw
.
RUnlock
()
}
// LiveWants returns a list of live wants
func
(
sw
*
sessionWants
)
LiveWants
()
[]
cid
.
Cid
{
sw
.
RLock
()
defer
sw
.
RUnlock
()
live
:=
make
([]
cid
.
Cid
,
0
,
len
(
sw
.
liveWants
))
for
c
:=
range
sw
.
liveWants
{
live
=
append
(
live
,
c
)
}
return
live
}
// RandomLiveWant returns a randomly selected live want
func
(
sw
*
sessionWants
)
RandomLiveWant
()
cid
.
Cid
{
sw
.
RLock
()
defer
sw
.
RUnlock
()
if
len
(
sw
.
liveWants
)
==
0
{
return
cid
.
Cid
{}
}
i
:=
rand
.
Intn
(
len
(
sw
.
liveWants
))
// picking a random live want
for
k
:=
range
sw
.
liveWants
{
if
i
==
0
{
return
k
}
i
--
}
return
cid
.
Cid
{}
}
// Has live wants indicates if there are any live wants
func
(
sw
*
sessionWants
)
HasLiveWants
()
bool
{
sw
.
RLock
()
defer
sw
.
RUnlock
()
return
len
(
sw
.
liveWants
)
>
0
}
// IsWanted indicates if the session is expecting to receive the block with the
// given CID
func
(
sw
*
sessionWants
)
IsWanted
(
c
cid
.
Cid
)
bool
{
sw
.
RLock
()
defer
sw
.
RUnlock
()
return
sw
.
unlockedIsWanted
(
c
)
}
// FilterInteresting filters the list so that it only contains keys for
// blocks that the session is waiting to receive or has received in the past
func
(
sw
*
sessionWants
)
FilterInteresting
(
ks
[]
cid
.
Cid
)
[]
cid
.
Cid
{
sw
.
RLock
()
defer
sw
.
RUnlock
()
interested
:=
make
([]
cid
.
Cid
,
0
,
len
(
ks
))
for
_
,
k
:=
range
ks
{
if
sw
.
unlockedIsWanted
(
k
)
||
sw
.
pastWants
.
Has
(
k
)
{
interested
=
append
(
interested
,
k
)
}
}
return
interested
}
func
(
sw
*
sessionWants
)
unlockedIsWanted
(
c
cid
.
Cid
)
bool
{
_
,
ok
:=
sw
.
liveWants
[
c
]
if
!
ok
{
ok
=
sw
.
toFetch
.
Has
(
c
)
}
return
ok
}
session/sessionwants_test.go
0 → 100644
View file @
e9661edc
package
session
import
(
"testing"
"time"
"github.com/ipfs/go-bitswap/testutil"
cid
"github.com/ipfs/go-cid"
)
func
TestSessionWants
(
t
*
testing
.
T
)
{
sw
:=
sessionWants
{
toFetch
:
newCidQueue
(),
liveWants
:
make
(
map
[
cid
.
Cid
]
time
.
Time
),
pastWants
:
cid
.
NewSet
(),
}
cids
:=
testutil
.
GenerateCids
(
10
)
others
:=
testutil
.
GenerateCids
(
1
)
// Expect these functions to return nothing on a new sessionWants
lws
:=
sw
.
PrepareBroadcast
()
if
len
(
lws
)
>
0
{
t
.
Fatal
(
"expected no broadcast wants"
)
}
lws
=
sw
.
LiveWants
()
if
len
(
lws
)
>
0
{
t
.
Fatal
(
"expected no live wants"
)
}
if
sw
.
HasLiveWants
()
{
t
.
Fatal
(
"expected not to have live wants"
)
}
rw
:=
sw
.
RandomLiveWant
()
if
rw
.
Defined
()
{
t
.
Fatal
(
"expected no random want"
)
}
if
sw
.
IsWanted
(
cids
[
0
])
{
t
.
Fatal
(
"expected cid to not be wanted"
)
}
if
len
(
sw
.
FilterInteresting
(
cids
))
>
0
{
t
.
Fatal
(
"expected no interesting wants"
)
}
// Add 10 new wants with a limit of 5
// The first 5 cids should go into the toFetch queue
// The other 5 cids should go into the live want queue
// toFetch Live Past
// 98765 43210
nextw
:=
sw
.
GetNextWants
(
5
,
cids
)
if
len
(
nextw
)
!=
5
{
t
.
Fatal
(
"expected 5 next wants"
)
}
lws
=
sw
.
PrepareBroadcast
()
if
len
(
lws
)
!=
5
{
t
.
Fatal
(
"expected 5 broadcast wants"
)
}
lws
=
sw
.
LiveWants
()
if
len
(
lws
)
!=
5
{
t
.
Fatal
(
"expected 5 live wants"
)
}
if
!
sw
.
HasLiveWants
()
{
t
.
Fatal
(
"expected to have live wants"
)
}
rw
=
sw
.
RandomLiveWant
()
if
!
rw
.
Defined
()
{
t
.
Fatal
(
"expected random want"
)
}
if
!
sw
.
IsWanted
(
cids
[
0
])
{
t
.
Fatal
(
"expected cid to be wanted"
)
}
if
!
sw
.
IsWanted
(
cids
[
9
])
{
t
.
Fatal
(
"expected cid to be wanted"
)
}
if
len
(
sw
.
FilterInteresting
([]
cid
.
Cid
{
cids
[
0
],
cids
[
9
],
others
[
0
]}))
!=
2
{
t
.
Fatal
(
"expected 2 interesting wants"
)
}
// Two wanted blocks and one other block are received.
// The wanted blocks should be moved from the live wants queue
// to the past wants set (the other block CID should be ignored)
// toFetch Live Past
// 98765 432__ 10
recvdCids
:=
[]
cid
.
Cid
{
cids
[
0
],
cids
[
1
],
others
[
0
]}
uniq
:=
0
dup
:=
0
sw
.
ForEachUniqDup
(
recvdCids
,
func
()
{
uniq
++
},
func
()
{
dup
++
})
if
uniq
!=
2
||
dup
!=
0
{
t
.
Fatal
(
"expected 2 uniqs / 0 dups"
,
uniq
,
dup
)
}
sw
.
BlocksReceived
(
recvdCids
)
lws
=
sw
.
LiveWants
()
if
len
(
lws
)
!=
3
{
t
.
Fatal
(
"expected 3 live wants"
)
}
if
sw
.
IsWanted
(
cids
[
0
])
{
t
.
Fatal
(
"expected cid to no longer be wanted"
)
}
if
!
sw
.
IsWanted
(
cids
[
9
])
{
t
.
Fatal
(
"expected cid to be wanted"
)
}
if
len
(
sw
.
FilterInteresting
([]
cid
.
Cid
{
cids
[
0
],
cids
[
9
],
others
[
0
]}))
!=
2
{
t
.
Fatal
(
"expected 2 interesting wants"
)
}
// Ask for next wants with a limit of 5
// Should move 2 wants from toFetch queue to live wants
// toFetch Live Past
// 987__ 65432 10
nextw
=
sw
.
GetNextWants
(
5
,
nil
)
if
len
(
nextw
)
!=
2
{
t
.
Fatal
(
"expected 2 next wants"
)
}
lws
=
sw
.
LiveWants
()
if
len
(
lws
)
!=
5
{
t
.
Fatal
(
"expected 5 live wants"
)
}
if
!
sw
.
IsWanted
(
cids
[
5
])
{
t
.
Fatal
(
"expected cid to be wanted"
)
}
// One wanted block and one dup block are received.
// The wanted block should be moved from the live wants queue
// to the past wants set
// toFetch Live Past
// 987 654_2 310
recvdCids
=
[]
cid
.
Cid
{
cids
[
0
],
cids
[
3
]}
uniq
=
0
dup
=
0
sw
.
ForEachUniqDup
(
recvdCids
,
func
()
{
uniq
++
},
func
()
{
dup
++
})
if
uniq
!=
1
||
dup
!=
1
{
t
.
Fatal
(
"expected 1 uniq / 1 dup"
,
uniq
,
dup
)
}
sw
.
BlocksReceived
(
recvdCids
)
lws
=
sw
.
LiveWants
()
if
len
(
lws
)
!=
4
{
t
.
Fatal
(
"expected 4 live wants"
)
}
// One block in the toFetch queue should be cancelled
// toFetch Live Past
// 9_7 654_2 310
sw
.
CancelPending
([]
cid
.
Cid
{
cids
[
8
]})
lws
=
sw
.
LiveWants
()
if
len
(
lws
)
!=
4
{
t
.
Fatal
(
"expected 4 live wants"
)
}
if
sw
.
IsWanted
(
cids
[
8
])
{
t
.
Fatal
(
"expected cid to no longer be wanted"
)
}
if
len
(
sw
.
FilterInteresting
([]
cid
.
Cid
{
cids
[
0
],
cids
[
8
]}))
!=
1
{
t
.
Fatal
(
"expected 1 interesting wants"
)
}
}
sessionmanager/sessionmanager.go
View file @
e9661edc
...
...
@@ -17,7 +17,6 @@ import (
// Session is a session that is managed by the session manager
type
Session
interface
{
exchange
.
Fetcher
InterestedIn
(
cid
.
Cid
)
bool
ReceiveFrom
(
peer
.
ID
,
[]
cid
.
Cid
)
IsWanted
(
cid
.
Cid
)
bool
}
...
...
@@ -115,22 +114,20 @@ func (sm *SessionManager) GetNextSessionID() uint64 {
return
sm
.
sessID
}
// ReceiveFrom receives blocks from a peer and dispatches to interested
// sessions.
// ReceiveFrom receives block CIDs from a peer and dispatches to sessions.
func
(
sm
*
SessionManager
)
ReceiveFrom
(
from
peer
.
ID
,
ks
[]
cid
.
Cid
)
{
sm
.
sessLk
.
Lock
()
defer
sm
.
sessLk
.
Unlock
()
// Only give each session the blocks / dups that it is interested in
var
wg
sync
.
WaitGroup
for
_
,
s
:=
range
sm
.
sessions
{
sessKs
:=
make
([]
cid
.
Cid
,
0
,
len
(
ks
))
for
_
,
k
:=
range
ks
{
if
s
.
session
.
InterestedIn
(
k
)
{
sessKs
=
append
(
sessKs
,
k
)
}
}
s
.
session
.
ReceiveFrom
(
from
,
sessKs
)
wg
.
Add
(
1
)
go
func
()
{
defer
wg
.
Done
()
s
.
session
.
ReceiveFrom
(
from
,
ks
)
}()
}
wg
.
Wait
()
}
// IsWanted indicates whether any of the sessions are waiting to receive
...
...
sessionmanager/sessionmanager_test.go
View file @
e9661edc
...
...
@@ -18,12 +18,12 @@ import (
)
type
fakeSession
struct
{
interes
ted
[]
cid
.
Cid
ks
[]
cid
.
Cid
id
uint64
pm
*
fakePeerManager
srs
*
fakeRequestSplitter
notif
notifications
.
PubSub
wan
ted
[]
cid
.
Cid
ks
[]
cid
.
Cid
id
uint64
pm
*
fakePeerManager
srs
*
fakeRequestSplitter
notif
notifications
.
PubSub
}
func
(
*
fakeSession
)
GetBlock
(
context
.
Context
,
cid
.
Cid
)
(
blocks
.
Block
,
error
)
{
...
...
@@ -32,17 +32,14 @@ func (*fakeSession) GetBlock(context.Context, cid.Cid) (blocks.Block, error) {
func
(
*
fakeSession
)
GetBlocks
(
context
.
Context
,
[]
cid
.
Cid
)
(
<-
chan
blocks
.
Block
,
error
)
{
return
nil
,
nil
}
func
(
fs
*
fakeSession
)
I
nteres
ted
In
(
c
cid
.
Cid
)
bool
{
for
_
,
ic
:=
range
fs
.
interes
ted
{
func
(
fs
*
fakeSession
)
I
sWan
ted
(
c
cid
.
Cid
)
bool
{
for
_
,
ic
:=
range
fs
.
wan
ted
{
if
c
==
ic
{
return
true
}
}
return
false
}
func
(
fs
*
fakeSession
)
IsWanted
(
c
cid
.
Cid
)
bool
{
return
fs
.
InterestedIn
(
c
)
}
func
(
fs
*
fakeSession
)
ReceiveFrom
(
p
peer
.
ID
,
ks
[]
cid
.
Cid
)
{
fs
.
ks
=
append
(
fs
.
ks
,
ks
...
)
}
...
...
@@ -66,7 +63,7 @@ func (frs *fakeRequestSplitter) SplitRequest(optimizedPeers []bssd.OptimizedPeer
func
(
frs
*
fakeRequestSplitter
)
RecordDuplicateBlock
()
{}
func
(
frs
*
fakeRequestSplitter
)
RecordUniqueBlock
()
{}
var
next
I
nte
restedIn
[]
cid
.
Cid
var
next
Wa
nte
d
[]
cid
.
Cid
func
sessionFactory
(
ctx
context
.
Context
,
id
uint64
,
...
...
@@ -76,11 +73,11 @@ func sessionFactory(ctx context.Context,
provSearchDelay
time
.
Duration
,
rebroadcastDelay
delay
.
D
)
Session
{
return
&
fakeSession
{
interes
ted
:
next
I
nte
restedIn
,
id
:
id
,
pm
:
pm
.
(
*
fakePeerManager
),
srs
:
srs
.
(
*
fakeRequestSplitter
),
notif
:
notif
,
wan
ted
:
next
Wa
nte
d
,
id
:
id
,
pm
:
pm
.
(
*
fakePeerManager
),
srs
:
srs
.
(
*
fakeRequestSplitter
),
notif
:
notif
,
}
}
...
...
@@ -121,7 +118,7 @@ func TestAddingSessions(t *testing.T) {
p
:=
peer
.
ID
(
123
)
block
:=
blocks
.
NewBlock
([]
byte
(
"block"
))
// we'll be interested in all blocks for this test
next
I
nte
restedIn
=
[]
cid
.
Cid
{
block
.
Cid
()}
next
Wa
nte
d
=
[]
cid
.
Cid
{
block
.
Cid
()}
currentID
:=
sm
.
GetNextSessionID
()
firstSession
:=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
))
.
(
*
fakeSession
)
...
...
@@ -163,11 +160,11 @@ func TestReceivingBlocksWhenNotInterested(t *testing.T) {
cids
=
append
(
cids
,
b
.
Cid
())
}
next
I
nte
restedIn
=
[]
cid
.
Cid
{
cids
[
0
],
cids
[
1
]}
next
Wa
nte
d
=
[]
cid
.
Cid
{
cids
[
0
],
cids
[
1
]}
firstSession
:=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
))
.
(
*
fakeSession
)
next
I
nte
restedIn
=
[]
cid
.
Cid
{
cids
[
0
]}
next
Wa
nte
d
=
[]
cid
.
Cid
{
cids
[
0
]}
secondSession
:=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
))
.
(
*
fakeSession
)
next
I
nte
restedIn
=
[]
cid
.
Cid
{}
next
Wa
nte
d
=
[]
cid
.
Cid
{}
thirdSession
:=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
))
.
(
*
fakeSession
)
sm
.
ReceiveFrom
(
p
,
[]
cid
.
Cid
{
blks
[
0
]
.
Cid
(),
blks
[
1
]
.
Cid
()})
...
...
@@ -193,9 +190,9 @@ func TestIsWanted(t *testing.T) {
cids
=
append
(
cids
,
b
.
Cid
())
}
next
I
nte
restedIn
=
[]
cid
.
Cid
{
cids
[
0
],
cids
[
1
]}
next
Wa
nte
d
=
[]
cid
.
Cid
{
cids
[
0
],
cids
[
1
]}
_
=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
))
.
(
*
fakeSession
)
next
I
nte
restedIn
=
[]
cid
.
Cid
{
cids
[
0
],
cids
[
2
]}
next
Wa
nte
d
=
[]
cid
.
Cid
{
cids
[
0
],
cids
[
2
]}
_
=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
))
.
(
*
fakeSession
)
if
!
sm
.
IsWanted
(
cids
[
0
])
||
...
...
@@ -218,7 +215,7 @@ func TestRemovingPeersWhenManagerContextCancelled(t *testing.T) {
p
:=
peer
.
ID
(
123
)
block
:=
blocks
.
NewBlock
([]
byte
(
"block"
))
// we'll be interested in all blocks for this test
next
I
nte
restedIn
=
[]
cid
.
Cid
{
block
.
Cid
()}
next
Wa
nte
d
=
[]
cid
.
Cid
{
block
.
Cid
()}
firstSession
:=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
))
.
(
*
fakeSession
)
secondSession
:=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
))
.
(
*
fakeSession
)
thirdSession
:=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
))
.
(
*
fakeSession
)
...
...
@@ -245,7 +242,7 @@ func TestRemovingPeersWhenSessionContextCancelled(t *testing.T) {
p
:=
peer
.
ID
(
123
)
block
:=
blocks
.
NewBlock
([]
byte
(
"block"
))
// we'll be interested in all blocks for this test
next
I
nte
restedIn
=
[]
cid
.
Cid
{
block
.
Cid
()}
next
Wa
nte
d
=
[]
cid
.
Cid
{
block
.
Cid
()}
firstSession
:=
sm
.
NewSession
(
ctx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
))
.
(
*
fakeSession
)
sessionCtx
,
sessionCancel
:=
context
.
WithCancel
(
ctx
)
secondSession
:=
sm
.
NewSession
(
sessionCtx
,
time
.
Second
,
delay
.
Fixed
(
time
.
Minute
))
.
(
*
fakeSession
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment