Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
p2p
go-p2p-kad-dht
Commits
ebd2d69f
Commit
ebd2d69f
authored
Jan 04, 2021
by
Adin Schmahmann
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
rename messageManager to messageSenderImpl and renamed dht.messageMgr to
dht.msgSender
parent
6fca41f4
Changes
4
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
15 additions
and
15 deletions
+15
-15
dht.go
dht.go
+3
-3
dht_test.go
dht_test.go
+4
-4
message_manager.go
message_manager.go
+7
-7
subscriber_notifee.go
subscriber_notifee.go
+1
-1
No files found.
dht.go
View file @
ebd2d69f
...
...
@@ -96,7 +96,7 @@ type IpfsDHT struct {
proc
goprocess
.
Process
protoMessenger
*
pb
.
ProtocolMessenger
m
essageMgr
*
message
Manager
m
sgSender
*
message
SenderImpl
plk
sync
.
Mutex
...
...
@@ -188,12 +188,12 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
dht
.
disableFixLowPeers
=
cfg
.
disableFixLowPeers
dht
.
Validator
=
cfg
.
validator
dht
.
m
essageMg
r
=
&
message
Manager
{
dht
.
m
sgSende
r
=
&
message
SenderImpl
{
host
:
h
,
strmap
:
make
(
map
[
peer
.
ID
]
*
peerMessageSender
),
protocols
:
dht
.
protocols
,
}
dht
.
protoMessenger
,
err
=
pb
.
NewProtocolMessenger
(
dht
.
m
essageMg
r
,
pb
.
WithValidator
(
dht
.
Validator
))
dht
.
protoMessenger
,
err
=
pb
.
NewProtocolMessenger
(
dht
.
m
sgSende
r
,
pb
.
WithValidator
(
dht
.
Validator
))
if
err
!=
nil
{
return
nil
,
err
}
...
...
dht_test.go
View file @
ebd2d69f
...
...
@@ -570,14 +570,14 @@ func TestInvalidMessageSenderTracking(t *testing.T) {
defer
dht
.
Close
()
foo
:=
peer
.
ID
(
"asdasd"
)
_
,
err
:=
dht
.
m
essageMg
r
.
messageSenderForPeer
(
ctx
,
foo
)
_
,
err
:=
dht
.
m
sgSende
r
.
messageSenderForPeer
(
ctx
,
foo
)
if
err
==
nil
{
t
.
Fatal
(
"that shouldnt have succeeded"
)
}
dht
.
m
essageMg
r
.
smlk
.
Lock
()
mscnt
:=
len
(
dht
.
m
essageMg
r
.
strmap
)
dht
.
m
essageMg
r
.
smlk
.
Unlock
()
dht
.
m
sgSende
r
.
smlk
.
Lock
()
mscnt
:=
len
(
dht
.
m
sgSende
r
.
strmap
)
dht
.
m
sgSende
r
.
smlk
.
Unlock
()
if
mscnt
>
0
{
t
.
Fatal
(
"should have no message senders in map"
)
...
...
message_manager.go
View file @
ebd2d69f
...
...
@@ -19,16 +19,16 @@ import (
"go.opencensus.io/tag"
)
// message
Manager
is responsible for sending requests and messages to peers efficiently, including reuse of streams.
// message
SenderImpl
is responsible for sending requests and messages to peers efficiently, including reuse of streams.
// It also tracks metrics for sent requests and messages.
type
message
Manager
struct
{
type
message
SenderImpl
struct
{
host
host
.
Host
// the network services we need
smlk
sync
.
Mutex
strmap
map
[
peer
.
ID
]
*
peerMessageSender
protocols
[]
protocol
.
ID
}
func
(
m
*
message
Manager
)
streamDisconnect
(
ctx
context
.
Context
,
p
peer
.
ID
)
{
func
(
m
*
message
SenderImpl
)
streamDisconnect
(
ctx
context
.
Context
,
p
peer
.
ID
)
{
m
.
smlk
.
Lock
()
defer
m
.
smlk
.
Unlock
()
ms
,
ok
:=
m
.
strmap
[
p
]
...
...
@@ -49,7 +49,7 @@ func (m *messageManager) streamDisconnect(ctx context.Context, p peer.ID) {
// SendRequest sends out a request, but also makes sure to
// measure the RTT for latency measurements.
func
(
m
*
message
Manager
)
SendRequest
(
ctx
context
.
Context
,
p
peer
.
ID
,
pmes
*
pb
.
Message
)
(
*
pb
.
Message
,
error
)
{
func
(
m
*
message
SenderImpl
)
SendRequest
(
ctx
context
.
Context
,
p
peer
.
ID
,
pmes
*
pb
.
Message
)
(
*
pb
.
Message
,
error
)
{
ctx
,
_
=
tag
.
New
(
ctx
,
metrics
.
UpsertMessageType
(
pmes
))
ms
,
err
:=
m
.
messageSenderForPeer
(
ctx
,
p
)
...
...
@@ -84,7 +84,7 @@ func (m *messageManager) SendRequest(ctx context.Context, p peer.ID, pmes *pb.Me
}
// SendMessage sends out a message
func
(
m
*
message
Manager
)
SendMessage
(
ctx
context
.
Context
,
p
peer
.
ID
,
pmes
*
pb
.
Message
)
error
{
func
(
m
*
message
SenderImpl
)
SendMessage
(
ctx
context
.
Context
,
p
peer
.
ID
,
pmes
*
pb
.
Message
)
error
{
ctx
,
_
=
tag
.
New
(
ctx
,
metrics
.
UpsertMessageType
(
pmes
))
ms
,
err
:=
m
.
messageSenderForPeer
(
ctx
,
p
)
...
...
@@ -113,7 +113,7 @@ func (m *messageManager) SendMessage(ctx context.Context, p peer.ID, pmes *pb.Me
return
nil
}
func
(
m
*
message
Manager
)
messageSenderForPeer
(
ctx
context
.
Context
,
p
peer
.
ID
)
(
*
peerMessageSender
,
error
)
{
func
(
m
*
message
SenderImpl
)
messageSenderForPeer
(
ctx
context
.
Context
,
p
peer
.
ID
)
(
*
peerMessageSender
,
error
)
{
m
.
smlk
.
Lock
()
ms
,
ok
:=
m
.
strmap
[
p
]
if
ok
{
...
...
@@ -151,7 +151,7 @@ type peerMessageSender struct {
r
msgio
.
ReadCloser
lk
ctxMutex
p
peer
.
ID
m
*
message
Manager
m
*
message
SenderImpl
invalid
bool
singleMes
int
...
...
subscriber_notifee.go
View file @
ebd2d69f
...
...
@@ -173,7 +173,7 @@ func (nn *subscriberNotifee) Disconnected(n network.Network, v network.Conn) {
return
}
dht
.
m
essageMg
r
.
streamDisconnect
(
dht
.
Context
(),
p
)
dht
.
m
sgSende
r
.
streamDisconnect
(
dht
.
Context
(),
p
)
}
func
(
nn
*
subscriberNotifee
)
Connected
(
network
.
Network
,
network
.
Conn
)
{}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment