Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
dms3
go-dms3
Commits
3be5c913
Commit
3be5c913
authored
Jul 07, 2017
by
Jeromy
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fix issue with sessions not receiving locally added blocks
License: MIT Signed-off-by:
Jeromy
<
jeromyj@gmail.com
>
parent
124afdba
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
120 additions
and
25 deletions
+120
-25
exchange/bitswap/bitswap.go
exchange/bitswap/bitswap.go
+9
-1
exchange/bitswap/session.go
exchange/bitswap/session.go
+71
-24
exchange/bitswap/session_test.go
exchange/bitswap/session_test.go
+40
-0
No files found.
exchange/bitswap/bitswap.go
View file @
3be5c913
...
@@ -317,6 +317,10 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error {
...
@@ -317,6 +317,10 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error {
// it now as it requires more thought and isnt causing immediate problems.
// it now as it requires more thought and isnt causing immediate problems.
bs
.
notifications
.
Publish
(
blk
)
bs
.
notifications
.
Publish
(
blk
)
for
_
,
s
:=
range
bs
.
SessionsForBlock
(
blk
.
Cid
())
{
s
.
receiveBlockFrom
(
""
,
blk
)
}
bs
.
engine
.
AddBlock
(
blk
)
bs
.
engine
.
AddBlock
(
blk
)
select
{
select
{
...
@@ -370,7 +374,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
...
@@ -370,7 +374,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
wg
:=
sync
.
WaitGroup
{}
wg
:=
sync
.
WaitGroup
{}
for
_
,
block
:=
range
iblocks
{
for
_
,
block
:=
range
iblocks
{
wg
.
Add
(
1
)
wg
.
Add
(
1
)
go
func
(
b
blocks
.
Block
)
{
go
func
(
b
blocks
.
Block
)
{
// TODO: this probably doesnt need to be a goroutine...
defer
wg
.
Done
()
defer
wg
.
Done
()
bs
.
updateReceiveCounters
(
b
)
bs
.
updateReceiveCounters
(
b
)
...
@@ -382,7 +386,11 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
...
@@ -382,7 +386,11 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
ses
.
receiveBlockFrom
(
p
,
b
)
ses
.
receiveBlockFrom
(
p
,
b
)
bs
.
CancelWants
([]
*
cid
.
Cid
{
k
},
ses
.
id
)
bs
.
CancelWants
([]
*
cid
.
Cid
{
k
},
ses
.
id
)
}
}
log
.
Debugf
(
"got block %s from %s"
,
b
,
p
)
log
.
Debugf
(
"got block %s from %s"
,
b
,
p
)
// TODO: rework this to not call 'HasBlock'. 'HasBlock' is really
// designed to be called when blocks are coming in from non-bitswap
// places (like the user manually adding data)
if
err
:=
bs
.
HasBlock
(
b
);
err
!=
nil
{
if
err
:=
bs
.
HasBlock
(
b
);
err
!=
nil
{
log
.
Warningf
(
"ReceiveMessage HasBlock error: %s"
,
err
)
log
.
Warningf
(
"ReceiveMessage HasBlock error: %s"
,
err
)
}
}
...
...
exchange/bitswap/session.go
View file @
3be5c913
...
@@ -21,7 +21,7 @@ const activeWantsLimit = 16
...
@@ -21,7 +21,7 @@ const activeWantsLimit = 16
// info to, and who to request blocks from
// info to, and who to request blocks from
type
Session
struct
{
type
Session
struct
{
ctx
context
.
Context
ctx
context
.
Context
tofetch
[]
*
cid
.
Cid
tofetch
*
cid
Queue
activePeers
map
[
peer
.
ID
]
struct
{}
activePeers
map
[
peer
.
ID
]
struct
{}
activePeersArr
[]
peer
.
ID
activePeersArr
[]
peer
.
ID
...
@@ -55,6 +55,7 @@ func (bs *Bitswap) NewSession(ctx context.Context) *Session {
...
@@ -55,6 +55,7 @@ func (bs *Bitswap) NewSession(ctx context.Context) *Session {
liveWants
:
make
(
map
[
string
]
time
.
Time
),
liveWants
:
make
(
map
[
string
]
time
.
Time
),
newReqs
:
make
(
chan
[]
*
cid
.
Cid
),
newReqs
:
make
(
chan
[]
*
cid
.
Cid
),
cancelKeys
:
make
(
chan
[]
*
cid
.
Cid
),
cancelKeys
:
make
(
chan
[]
*
cid
.
Cid
),
tofetch
:
newCidQueue
(),
interestReqs
:
make
(
chan
interestReq
),
interestReqs
:
make
(
chan
interestReq
),
ctx
:
ctx
,
ctx
:
ctx
,
bs
:
bs
,
bs
:
bs
,
...
@@ -157,7 +158,9 @@ func (s *Session) run(ctx context.Context) {
...
@@ -157,7 +158,9 @@ func (s *Session) run(ctx context.Context) {
s
.
wantBlocks
(
ctx
,
now
)
s
.
wantBlocks
(
ctx
,
now
)
}
}
s
.
tofetch
=
append
(
s
.
tofetch
,
keys
...
)
for
_
,
k
:=
range
keys
{
s
.
tofetch
.
Push
(
k
)
}
case
keys
:=
<-
s
.
cancelKeys
:
case
keys
:=
<-
s
.
cancelKeys
:
s
.
cancel
(
keys
)
s
.
cancel
(
keys
)
...
@@ -188,8 +191,7 @@ func (s *Session) run(ctx context.Context) {
...
@@ -188,8 +191,7 @@ func (s *Session) run(ctx context.Context) {
case
p
:=
<-
newpeers
:
case
p
:=
<-
newpeers
:
s
.
addActivePeer
(
p
)
s
.
addActivePeer
(
p
)
case
lwchk
:=
<-
s
.
interestReqs
:
case
lwchk
:=
<-
s
.
interestReqs
:
_
,
ok
:=
s
.
liveWants
[
lwchk
.
c
.
KeyString
()]
lwchk
.
resp
<-
s
.
cidIsWanted
(
lwchk
.
c
)
lwchk
.
resp
<-
ok
case
<-
ctx
.
Done
()
:
case
<-
ctx
.
Done
()
:
s
.
tick
.
Stop
()
s
.
tick
.
Stop
()
return
return
...
@@ -197,19 +199,31 @@ func (s *Session) run(ctx context.Context) {
...
@@ -197,19 +199,31 @@ func (s *Session) run(ctx context.Context) {
}
}
}
}
func
(
s
*
Session
)
cidIsWanted
(
c
*
cid
.
Cid
)
bool
{
_
,
ok
:=
s
.
liveWants
[
c
.
KeyString
()]
if
!
ok
{
ok
=
s
.
tofetch
.
Has
(
c
)
}
return
ok
}
func
(
s
*
Session
)
receiveBlock
(
ctx
context
.
Context
,
blk
blocks
.
Block
)
{
func
(
s
*
Session
)
receiveBlock
(
ctx
context
.
Context
,
blk
blocks
.
Block
)
{
ks
:=
blk
.
Cid
()
.
KeyString
()
c
:=
blk
.
Cid
()
if
_
,
ok
:=
s
.
liveWants
[
ks
];
ok
{
if
s
.
cidIsWanted
(
c
)
{
tval
:=
s
.
liveWants
[
ks
]
ks
:=
c
.
KeyString
()
s
.
latTotal
+=
time
.
Since
(
tval
)
tval
,
ok
:=
s
.
liveWants
[
ks
]
if
ok
{
s
.
latTotal
+=
time
.
Since
(
tval
)
delete
(
s
.
liveWants
,
ks
)
}
else
{
s
.
tofetch
.
Remove
(
c
)
}
s
.
fetchcnt
++
s
.
fetchcnt
++
delete
(
s
.
liveWants
,
ks
)
s
.
notif
.
Publish
(
blk
)
s
.
notif
.
Publish
(
blk
)
if
len
(
s
.
tofetch
)
>
0
{
if
next
:=
s
.
tofetch
.
Pop
();
next
!=
nil
{
next
:=
s
.
tofetch
[
0
:
1
]
s
.
wantBlocks
(
ctx
,
[]
*
cid
.
Cid
{
next
})
s
.
tofetch
=
s
.
tofetch
[
1
:
]
s
.
wantBlocks
(
ctx
,
next
)
}
}
}
}
}
}
...
@@ -222,19 +236,9 @@ func (s *Session) wantBlocks(ctx context.Context, ks []*cid.Cid) {
...
@@ -222,19 +236,9 @@ func (s *Session) wantBlocks(ctx context.Context, ks []*cid.Cid) {
}
}
func
(
s
*
Session
)
cancel
(
keys
[]
*
cid
.
Cid
)
{
func
(
s
*
Session
)
cancel
(
keys
[]
*
cid
.
Cid
)
{
sset
:=
cid
.
NewSet
()
for
_
,
c
:=
range
keys
{
for
_
,
c
:=
range
keys
{
s
set
.
Add
(
c
)
s
.
tofetch
.
Remove
(
c
)
}
}
var
i
,
j
int
for
;
j
<
len
(
s
.
tofetch
);
j
++
{
if
sset
.
Has
(
s
.
tofetch
[
j
])
{
continue
}
s
.
tofetch
[
i
]
=
s
.
tofetch
[
j
]
i
++
}
s
.
tofetch
=
s
.
tofetch
[
:
i
]
}
}
func
(
s
*
Session
)
cancelWants
(
keys
[]
*
cid
.
Cid
)
{
func
(
s
*
Session
)
cancelWants
(
keys
[]
*
cid
.
Cid
)
{
...
@@ -260,3 +264,46 @@ func (s *Session) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks
...
@@ -260,3 +264,46 @@ func (s *Session) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan blocks
func
(
s
*
Session
)
GetBlock
(
parent
context
.
Context
,
k
*
cid
.
Cid
)
(
blocks
.
Block
,
error
)
{
func
(
s
*
Session
)
GetBlock
(
parent
context
.
Context
,
k
*
cid
.
Cid
)
(
blocks
.
Block
,
error
)
{
return
getBlock
(
parent
,
k
,
s
.
GetBlocks
)
return
getBlock
(
parent
,
k
,
s
.
GetBlocks
)
}
}
type
cidQueue
struct
{
elems
[]
*
cid
.
Cid
eset
*
cid
.
Set
}
func
newCidQueue
()
*
cidQueue
{
return
&
cidQueue
{
eset
:
cid
.
NewSet
()}
}
func
(
cq
*
cidQueue
)
Pop
()
*
cid
.
Cid
{
for
{
if
len
(
cq
.
elems
)
==
0
{
return
nil
}
out
:=
cq
.
elems
[
0
]
cq
.
elems
=
cq
.
elems
[
1
:
]
if
cq
.
eset
.
Has
(
out
)
{
cq
.
eset
.
Remove
(
out
)
return
out
}
}
}
func
(
cq
*
cidQueue
)
Push
(
c
*
cid
.
Cid
)
{
if
cq
.
eset
.
Visit
(
c
)
{
cq
.
elems
=
append
(
cq
.
elems
,
c
)
}
}
func
(
cq
*
cidQueue
)
Remove
(
c
*
cid
.
Cid
)
{
cq
.
eset
.
Remove
(
c
)
}
func
(
cq
*
cidQueue
)
Has
(
c
*
cid
.
Cid
)
bool
{
return
cq
.
eset
.
Has
(
c
)
}
func
(
cq
*
cidQueue
)
Len
()
int
{
return
cq
.
eset
.
Len
()
}
exchange/bitswap/session_test.go
View file @
3be5c913
...
@@ -202,3 +202,43 @@ func TestInterestCacheOverflow(t *testing.T) {
...
@@ -202,3 +202,43 @@ func TestInterestCacheOverflow(t *testing.T) {
t
.
Fatal
(
"timed out waiting for block"
)
t
.
Fatal
(
"timed out waiting for block"
)
}
}
}
}
func
TestPutAfterSessionCacheEvict
(
t
*
testing
.
T
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
defer
cancel
()
vnet
:=
getVirtualNetwork
()
sesgen
:=
NewTestSessionGenerator
(
vnet
)
defer
sesgen
.
Close
()
bgen
:=
blocksutil
.
NewBlockGenerator
()
blks
:=
bgen
.
Blocks
(
2500
)
inst
:=
sesgen
.
Instances
(
1
)
a
:=
inst
[
0
]
ses
:=
a
.
Exchange
.
NewSession
(
ctx
)
var
allcids
[]
*
cid
.
Cid
for
_
,
blk
:=
range
blks
[
1
:
]
{
allcids
=
append
(
allcids
,
blk
.
Cid
())
}
blkch
,
err
:=
ses
.
GetBlocks
(
ctx
,
allcids
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
// wait to ensure that all the above cids were added to the sessions cache
time
.
Sleep
(
time
.
Millisecond
*
50
)
if
err
:=
a
.
Exchange
.
HasBlock
(
blks
[
17
]);
err
!=
nil
{
t
.
Fatal
(
err
)
}
select
{
case
<-
blkch
:
case
<-
time
.
After
(
time
.
Millisecond
*
50
)
:
t
.
Fatal
(
"timed out waiting for block"
)
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment