Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-bitswap
Commits
565aa560
Unverified
Commit
565aa560
authored
Aug 27, 2019
by
Steven Allen
Committed by
GitHub
Aug 27, 2019
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #188 from ipfs/fix/optimize
reduce session contention
parents
7944a99c
8454ba00
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
43 additions
and
32 deletions
+43
-32
session/session.go
session/session.go
+37
-27
sessionmanager/sessionmanager.go
sessionmanager/sessionmanager.go
+6
-5
No files found.
session/session.go
View file @
565aa560
...
...
@@ -45,9 +45,18 @@ type RequestSplitter interface {
RecordUniqueBlock
()
}
type
rcvFrom
struct
{
type
opType
int
const
(
opReceive
opType
=
iota
opWant
opCancel
)
type
op
struct
{
op
opType
from
peer
.
ID
k
s
[]
cid
.
Cid
k
eys
[]
cid
.
Cid
}
// Session holds state for an individual bitswap transfer operation.
...
...
@@ -63,9 +72,7 @@ type Session struct {
sw
sessionWants
// channels
incoming
chan
rcvFrom
newReqs
chan
[]
cid
.
Cid
cancelKeys
chan
[]
cid
.
Cid
incoming
chan
op
latencyReqs
chan
chan
time
.
Duration
tickDelayReqs
chan
time
.
Duration
...
...
@@ -100,15 +107,13 @@ func New(ctx context.Context,
liveWants
:
make
(
map
[
cid
.
Cid
]
time
.
Time
),
pastWants
:
cid
.
NewSet
(),
},
newReqs
:
make
(
chan
[]
cid
.
Cid
),
cancelKeys
:
make
(
chan
[]
cid
.
Cid
),
latencyReqs
:
make
(
chan
chan
time
.
Duration
),
tickDelayReqs
:
make
(
chan
time
.
Duration
),
ctx
:
ctx
,
wm
:
wm
,
pm
:
pm
,
srs
:
srs
,
incoming
:
make
(
chan
rcvFrom
),
incoming
:
make
(
chan
op
,
16
),
notif
:
notif
,
uuid
:
loggables
.
Uuid
(
"GetBlockRequest"
),
baseTickDelay
:
time
.
Millisecond
*
500
,
...
...
@@ -130,7 +135,7 @@ func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid) {
}
select
{
case
s
.
incoming
<-
rcvFrom
{
from
:
from
,
ks
:
interested
}
:
case
s
.
incoming
<-
op
{
op
:
opReceive
,
from
:
from
,
k
ey
s
:
interested
}
:
case
<-
s
.
ctx
.
Done
()
:
}
}
...
...
@@ -154,14 +159,14 @@ func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.
return
bsgetter
.
AsyncGetBlocks
(
ctx
,
s
.
ctx
,
keys
,
s
.
notif
,
func
(
ctx
context
.
Context
,
keys
[]
cid
.
Cid
)
{
select
{
case
s
.
newReqs
<-
keys
:
case
s
.
incoming
<-
op
{
op
:
opWant
,
keys
:
keys
}
:
case
<-
ctx
.
Done
()
:
case
<-
s
.
ctx
.
Done
()
:
}
},
func
(
keys
[]
cid
.
Cid
)
{
select
{
case
s
.
cancelKeys
<-
keys
:
case
s
.
incoming
<-
op
{
op
:
opCancel
,
keys
:
keys
}
:
case
<-
s
.
ctx
.
Done
()
:
}
},
...
...
@@ -200,12 +205,17 @@ func (s *Session) run(ctx context.Context) {
s
.
periodicSearchTimer
=
time
.
NewTimer
(
s
.
periodicSearchDelay
.
NextWaitTime
())
for
{
select
{
case
rcv
:=
<-
s
.
incoming
:
s
.
handleIncoming
(
ctx
,
rcv
)
case
keys
:=
<-
s
.
newReqs
:
s
.
wantBlocks
(
ctx
,
keys
)
case
keys
:=
<-
s
.
cancelKeys
:
s
.
sw
.
CancelPending
(
keys
)
case
oper
:=
<-
s
.
incoming
:
switch
oper
.
op
{
case
opReceive
:
s
.
handleReceive
(
ctx
,
oper
.
from
,
oper
.
keys
)
case
opWant
:
s
.
wantBlocks
(
ctx
,
oper
.
keys
)
case
opCancel
:
s
.
sw
.
CancelPending
(
oper
.
keys
)
default
:
panic
(
"unhandled operation"
)
}
case
<-
s
.
idleTick
.
C
:
s
.
handleIdleTick
(
ctx
)
case
<-
s
.
periodicSearchTimer
.
C
:
...
...
@@ -261,15 +271,15 @@ func (s *Session) handleShutdown() {
s
.
wm
.
CancelWants
(
s
.
ctx
,
live
,
nil
,
s
.
id
)
}
func
(
s
*
Session
)
handle
Incoming
(
ctx
context
.
Context
,
rcv
rcvFrom
)
{
func
(
s
*
Session
)
handle
Receive
(
ctx
context
.
Context
,
from
peer
.
ID
,
keys
[]
cid
.
Cid
)
{
// Record statistics only if the blocks came from the network
// (blocks can also be received from the local node)
if
rcv
.
from
!=
""
{
s
.
updateReceiveCounters
(
ctx
,
rcv
)
if
from
!=
""
{
s
.
updateReceiveCounters
(
ctx
,
from
,
keys
)
}
// Update the want list
wanted
,
totalLatency
:=
s
.
sw
.
BlocksReceived
(
rcv
.
k
s
)
wanted
,
totalLatency
:=
s
.
sw
.
BlocksReceived
(
key
s
)
if
len
(
wanted
)
==
0
{
return
}
...
...
@@ -280,18 +290,18 @@ func (s *Session) handleIncoming(ctx context.Context, rcv rcvFrom) {
s
.
idleTick
.
Stop
()
// Process the received blocks
s
.
process
Incoming
(
ctx
,
wanted
,
totalLatency
)
s
.
process
Receive
(
ctx
,
wanted
,
totalLatency
)
s
.
resetIdleTick
()
}
func
(
s
*
Session
)
updateReceiveCounters
(
ctx
context
.
Context
,
rcv
rcvFrom
)
{
func
(
s
*
Session
)
updateReceiveCounters
(
ctx
context
.
Context
,
from
peer
.
ID
,
keys
[]
cid
.
Cid
)
{
// Record unique vs duplicate blocks
s
.
sw
.
ForEachUniqDup
(
rcv
.
k
s
,
s
.
srs
.
RecordUniqueBlock
,
s
.
srs
.
RecordDuplicateBlock
)
s
.
sw
.
ForEachUniqDup
(
key
s
,
s
.
srs
.
RecordUniqueBlock
,
s
.
srs
.
RecordDuplicateBlock
)
// Record response (to be able to time latency)
if
len
(
rcv
.
k
s
)
>
0
{
s
.
pm
.
RecordPeerResponse
(
rcv
.
from
,
rcv
.
k
s
)
if
len
(
key
s
)
>
0
{
s
.
pm
.
RecordPeerResponse
(
from
,
key
s
)
}
}
...
...
@@ -300,7 +310,7 @@ func (s *Session) cancelIncoming(ctx context.Context, ks []cid.Cid) {
s
.
wm
.
CancelWants
(
s
.
ctx
,
ks
,
nil
,
s
.
id
)
}
func
(
s
*
Session
)
process
Incoming
(
ctx
context
.
Context
,
ks
[]
cid
.
Cid
,
totalLatency
time
.
Duration
)
{
func
(
s
*
Session
)
process
Receive
(
ctx
context
.
Context
,
ks
[]
cid
.
Cid
,
totalLatency
time
.
Duration
)
{
// Keep track of the total number of blocks received and total latency
s
.
fetchcnt
+=
len
(
ks
)
s
.
latTotal
+=
totalLatency
...
...
sessionmanager/sessionmanager.go
View file @
565aa560
...
...
@@ -46,7 +46,7 @@ type SessionManager struct {
notif
notifications
.
PubSub
// Sessions
sessLk
sync
.
Mutex
sessLk
sync
.
RW
Mutex
sessions
[]
sesTrk
// Session Index
...
...
@@ -100,6 +100,7 @@ func (sm *SessionManager) removeSession(session sesTrk) {
for
i
:=
0
;
i
<
len
(
sm
.
sessions
);
i
++
{
if
sm
.
sessions
[
i
]
==
session
{
sm
.
sessions
[
i
]
=
sm
.
sessions
[
len
(
sm
.
sessions
)
-
1
]
sm
.
sessions
[
len
(
sm
.
sessions
)
-
1
]
=
sesTrk
{}
// free memory.
sm
.
sessions
=
sm
.
sessions
[
:
len
(
sm
.
sessions
)
-
1
]
return
}
...
...
@@ -116,8 +117,8 @@ func (sm *SessionManager) GetNextSessionID() uint64 {
// ReceiveFrom receives block CIDs from a peer and dispatches to sessions.
func
(
sm
*
SessionManager
)
ReceiveFrom
(
from
peer
.
ID
,
ks
[]
cid
.
Cid
)
{
sm
.
sessLk
.
Lock
()
defer
sm
.
sessLk
.
Unlock
()
sm
.
sessLk
.
R
Lock
()
defer
sm
.
sessLk
.
R
Unlock
()
for
_
,
s
:=
range
sm
.
sessions
{
s
.
session
.
ReceiveFrom
(
from
,
ks
)
...
...
@@ -127,8 +128,8 @@ func (sm *SessionManager) ReceiveFrom(from peer.ID, ks []cid.Cid) {
// IsWanted indicates whether any of the sessions are waiting to receive
// the block with the given CID.
func
(
sm
*
SessionManager
)
IsWanted
(
cid
cid
.
Cid
)
bool
{
sm
.
sessLk
.
Lock
()
defer
sm
.
sessLk
.
Unlock
()
sm
.
sessLk
.
R
Lock
()
defer
sm
.
sessLk
.
R
Unlock
()
for
_
,
s
:=
range
sm
.
sessions
{
if
s
.
session
.
IsWanted
(
cid
)
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment