Commit 27c68619 authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #74 from libp2p/fix/70

Properly track connections to peers in the DHT.
parents 9b87e924 1d553110
2.5.5: QmTHyAbD9KzGrseLNzmEoNkVxA8F2h7LQG2iV6uhBqs6kX
2.5.6: QmRKEzkaiwud2LnwJ9CgBrKw122ddKPTMtLizV3DNimVRD
......@@ -62,6 +62,9 @@ type IpfsDHT struct {
strmap map[peer.ID]*messageSender
smlk sync.Mutex
plk sync.Mutex
peers map[peer.ID]*peerTracker
}
// NewDHT creates a new DHT object with the given peer as the 'local' host
......@@ -106,6 +109,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
providers: providers.NewProviderManager(ctx, h.ID(), dstore),
birth: time.Now(),
routingTable: kb.NewRoutingTable(KValue, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore()),
peers: make(map[peer.ID]*peerTracker),
Validator: make(record.Validator),
Selector: make(record.Selector),
......
......@@ -3,21 +3,12 @@ package dht
import (
"context"
"io"
"time"
inet "github.com/libp2p/go-libp2p-net"
ma "github.com/multiformats/go-multiaddr"
mstream "github.com/multiformats/go-multistream"
)
// TODO: There is a race condition here where we could process notifications
// out-of-order and incorrectly mark some peers as DHT nodes (or not DHT nodes).
// The correct fix for this is nasty so I'm not really sure it's worth it.
// Incorrectly recording or failing to record a DHT node in the routing table
// isn't a big issue.
const dhtCheckTimeout = 10 * time.Second
// netNotifiee defines methods to be used with the IpfsDHT
type netNotifiee IpfsDHT
......@@ -25,6 +16,11 @@ func (nn *netNotifiee) DHT() *IpfsDHT {
return (*IpfsDHT)(nn)
}
type peerTracker struct {
refcount int
cancel func()
}
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
dht := nn.DHT()
select {
......@@ -33,36 +29,59 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
default:
}
go func() {
dht.plk.Lock()
defer dht.plk.Unlock()
conn, ok := nn.peers[v.RemotePeer()]
if ok {
conn.refcount++
return
}
ctx, cancel := context.WithCancel(dht.Context())
nn.peers[v.RemotePeer()] = &peerTracker{
refcount: 1,
cancel: cancel,
}
// Note: We *could* just check the peerstore to see if the remote side supports the dht
// protocol, but its not clear that that information will make it into the peerstore
// by the time this notification is sent. So just to be very careful, we brute force this
// and open a new stream
// Note: We *could* just check the peerstore to see if the remote side supports the dht
// protocol, but its not clear that that information will make it into the peerstore
// by the time this notification is sent. So just to be very careful, we brute force this
// and open a new stream
go nn.testConnection(ctx, v)
// TODO: There's a race condition here where the connection may
// not be open (and we may sit here trying to connect). I've
// added a timeout but that's not really the correct fix.
}
ctx, cancel := context.WithTimeout(dht.Context(), dhtCheckTimeout)
defer cancel()
func (nn *netNotifiee) testConnection(ctx context.Context, v inet.Conn) {
dht := nn.DHT()
for {
s, err := dht.host.NewStream(ctx, v.RemotePeer(), ProtocolDHT, ProtocolDHTOld)
switch err {
case nil:
s.Close()
// connected fine? full dht node
dht.Update(dht.Context(), v.RemotePeer())
dht.plk.Lock()
// Check if canceled under the lock.
if ctx.Err() == nil {
dht.Update(dht.Context(), v.RemotePeer())
}
dht.plk.Unlock()
case io.EOF:
if ctx.Err() == nil {
// Connection died but we may still have *an* open connection (context not canceled) so try again.
continue
}
case mstream.ErrNotSupported:
// Client mode only, don't bother adding them to our routing table
case io.EOF:
// This is kindof an error, but it happens someone often so make it a warning
log.Warningf("checking dht client type: %s", err)
default:
// real error? thats odd
log.Errorf("checking dht client type: %s", err)
}
}()
return
}
}
func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
......@@ -72,7 +91,22 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
return
default:
}
go dht.routingTable.Remove(v.RemotePeer())
dht.plk.Lock()
defer dht.plk.Unlock()
conn, ok := nn.peers[v.RemotePeer()]
if !ok {
// Unmatched disconnects are fine. It just means that we were
// already connected when we registered the listener.
return
}
conn.refcount -= 1
if conn.refcount == 0 {
delete(nn.peers, v.RemotePeer())
conn.cancel()
dht.routingTable.Remove(v.RemotePeer())
}
}
func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {}
......
package dht
import (
"context"
"testing"
)
func TestNotifieeMultipleConn(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d1 := setupDHT(ctx, t, false)
d2 := setupDHT(ctx, t, false)
nn1 := (*netNotifiee)(d1)
nn2 := (*netNotifiee)(d2)
connect(t, ctx, d1, d2)
c12 := d1.host.Network().ConnsToPeer(d2.self)[0]
c21 := d2.host.Network().ConnsToPeer(d1.self)[0]
// Pretend to reestablish/re-kill connection
nn1.Connected(d1.host.Network(), c12)
nn2.Connected(d2.host.Network(), c21)
if !checkRoutingTable(d1, d2) {
t.Fatal("no routes")
}
nn1.Disconnected(d1.host.Network(), c12)
nn2.Disconnected(d2.host.Network(), c21)
if !checkRoutingTable(d1, d2) {
t.Fatal("no routes")
}
for _, conn := range d1.host.Network().ConnsToPeer(d2.self) {
conn.Close()
}
for _, conn := range d2.host.Network().ConnsToPeer(d1.self) {
conn.Close()
}
if checkRoutingTable(d1, d2) {
t.Fatal("routes")
}
}
func TestNotifieeFuzz(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d1 := setupDHT(ctx, t, false)
d2 := setupDHT(ctx, t, false)
for i := 0; i < 100; i++ {
connectNoSync(t, ctx, d1, d2)
for _, conn := range d1.host.Network().ConnsToPeer(d2.self) {
conn.Close()
}
}
if checkRoutingTable(d1, d2) {
t.Fatal("should not have routes")
}
connect(t, ctx, d1, d2)
}
func checkRoutingTable(a, b *IpfsDHT) bool {
// loop until connection notification has been received.
// under high load, this may not happen as immediately as we would like.
return a.routingTable.Find(b.self) != "" && b.routingTable.Find(a.self) != ""
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment