Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
p2p
go-p2p-kad-dht
Commits
3fc048d4
Unverified
Commit
3fc048d4
authored
Jan 20, 2018
by
Steven Allen
Committed by
GitHub
Jan 20, 2018
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #111 from libp2p/fix/99
fix connection tracking race
parents
ceab7886
395fb26a
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
54 additions
and
74 deletions
+54
-74
dht.go
dht.go
+1
-3
notif.go
notif.go
+53
-71
No files found.
dht.go
View file @
3fc048d4
...
...
@@ -63,8 +63,7 @@ type IpfsDHT struct {
strmap
map
[
peer
.
ID
]
*
messageSender
smlk
sync
.
Mutex
plk
sync
.
Mutex
peers
map
[
peer
.
ID
]
*
peerTracker
plk
sync
.
Mutex
}
// NewDHT creates a new DHT object with the given peer as the 'local' host
...
...
@@ -119,7 +118,6 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
providers
:
providers
.
NewProviderManager
(
ctx
,
h
.
ID
(),
dstore
),
birth
:
time
.
Now
(),
routingTable
:
rt
,
peers
:
make
(
map
[
peer
.
ID
]
*
peerTracker
),
Validator
:
make
(
record
.
Validator
),
Selector
:
make
(
record
.
Selector
),
...
...
notif.go
View file @
3fc048d4
package
dht
import
(
"context"
"io"
inet
"github.com/libp2p/go-libp2p-net"
ma
"github.com/multiformats/go-multiaddr"
mstream
"github.com/multiformats/go-multistream"
...
...
@@ -12,15 +9,12 @@ import (
// netNotifiee defines methods to be used with the IpfsDHT
type
netNotifiee
IpfsDHT
var
dhtProtocols
=
[]
string
{
string
(
ProtocolDHT
),
string
(
ProtocolDHTOld
)}
func
(
nn
*
netNotifiee
)
DHT
()
*
IpfsDHT
{
return
(
*
IpfsDHT
)(
nn
)
}
type
peerTracker
struct
{
refcount
int
cancel
func
()
}
func
(
nn
*
netNotifiee
)
Connected
(
n
inet
.
Network
,
v
inet
.
Conn
)
{
dht
:=
nn
.
DHT
()
select
{
...
...
@@ -29,61 +23,56 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
default
:
}
dht
.
plk
.
Lock
()
defer
dht
.
plk
.
Unlock
()
conn
,
ok
:=
nn
.
peers
[
v
.
RemotePeer
()]
if
ok
{
conn
.
refcount
++
p
:=
v
.
RemotePeer
()
protos
,
err
:=
dht
.
peerstore
.
SupportsProtocols
(
p
,
dhtProtocols
...
)
if
err
==
nil
&&
len
(
protos
)
!=
0
{
// We lock here for consistency with the lock in testConnection.
// This probably isn't necessary because (dis)connect
// notifications are serialized but it's nice to be consistent.
dht
.
plk
.
Lock
()
defer
dht
.
plk
.
Unlock
()
if
dht
.
host
.
Network
()
.
Connectedness
(
p
)
==
inet
.
Connected
{
dht
.
Update
(
dht
.
Context
(),
p
)
}
return
}
ctx
,
cancel
:=
context
.
WithCancel
(
dht
.
Context
())
nn
.
peers
[
v
.
RemotePeer
()]
=
&
peerTracker
{
refcount
:
1
,
cancel
:
cancel
,
}
// Note: We *could* just check the peerstore to see if the remote side supports the dht
// protocol, but its not clear that that information will make it into the peerstore
// by the time this notification is sent. So just to be very careful, we brute force this
// and open a new stream
go
nn
.
testConnection
(
ctx
,
v
)
// Note: Unfortunately, the peerstore may not yet now that this peer is
// a DHT server. So, if it didn't return a positive response above, test
// manually.
go
nn
.
testConnection
(
v
)
}
func
(
nn
*
netNotifiee
)
testConnection
(
ctx
context
.
Context
,
v
inet
.
Conn
)
{
func
(
nn
*
netNotifiee
)
testConnection
(
v
inet
.
Conn
)
{
dht
:=
nn
.
DHT
()
for
{
s
,
err
:=
dht
.
host
.
NewStream
(
ctx
,
v
.
RemotePeer
(),
ProtocolDHT
,
ProtocolDHTOld
)
switch
err
{
case
nil
:
s
.
Close
()
dht
.
plk
.
Lock
()
// Check if canceled under the lock.
if
ctx
.
Err
()
==
nil
{
dht
.
Update
(
dht
.
Context
(),
v
.
RemotePeer
())
}
dht
.
plk
.
Unlock
()
case
io
.
EOF
:
if
ctx
.
Err
()
==
nil
{
// Connection died but we may still have *an* open connection (context not canceled) so try again.
continue
}
case
context
.
Canceled
:
// Context canceled while connecting.
case
mstream
.
ErrNotSupported
:
// Client mode only, don't bother adding them to our routing table
default
:
// real error? thats odd
log
.
Warningf
(
"checking dht client type: %s"
,
err
)
}
p
:=
v
.
RemotePeer
()
// Forcibly use *this* connection. Otherwise, if we have two connections, we could:
// 1. Test it twice.
// 2. Have it closed from under us leaving the second (open) connection untested.
s
,
err
:=
v
.
NewStream
()
if
err
!=
nil
{
// Connection error
return
}
defer
s
.
Close
()
selected
,
err
:=
mstream
.
SelectOneOf
(
dhtProtocols
,
s
)
if
err
!=
nil
{
// Doesn't support the protocol
return
}
// Remember this choice (makes subsequent negotiations faster)
dht
.
peerstore
.
AddProtocols
(
p
,
selected
)
// We lock here as we race with disconnect. If we didn't lock, we could
// finish processing a connect after handling the associated disconnect
// event and add the peer to the routing table after removing it.
dht
.
plk
.
Lock
()
defer
dht
.
plk
.
Unlock
()
if
dht
.
host
.
Network
()
.
Connectedness
(
p
)
==
inet
.
Connected
{
dht
.
Update
(
dht
.
Context
(),
p
)
}
}
func
(
nn
*
netNotifiee
)
Disconnected
(
n
inet
.
Network
,
v
inet
.
Conn
)
{
...
...
@@ -96,23 +85,16 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
p
:=
v
.
RemotePeer
()
func
()
{
dht
.
plk
.
Lock
()
defer
dht
.
plk
.
Unlock
()
// Lock and check to see if we're still connected. We lock to make sure
// we don't concurrently process a connect event.
dht
.
plk
.
Lock
()
defer
dht
.
plk
.
Unlock
()
if
dht
.
host
.
Network
()
.
Connectedness
(
p
)
==
inet
.
Connected
{
// We're still connected.
return
}
conn
,
ok
:=
nn
.
peers
[
p
]
if
!
ok
{
// Unmatched disconnects are fine. It just means that we were
// already connected when we registered the listener.
return
}
conn
.
refcount
-=
1
if
conn
.
refcount
==
0
{
delete
(
nn
.
peers
,
p
)
conn
.
cancel
()
dht
.
routingTable
.
Remove
(
p
)
}
}()
dht
.
routingTable
.
Remove
(
p
)
dht
.
smlk
.
Lock
()
defer
dht
.
smlk
.
Unlock
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment