Commit 00d7b498 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet

routing/dht: adjust routing table on peer conn/disc

parent 4ae01e7a
...@@ -65,9 +65,17 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip ...@@ -65,9 +65,17 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip
dht.datastore = dstore dht.datastore = dstore
dht.self = h.ID() dht.self = h.ID()
dht.peerstore = h.Peerstore() dht.peerstore = h.Peerstore()
dht.ContextGroup = ctxgroup.WithContext(ctx)
dht.host = h dht.host = h
// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
dht.ContextGroup = ctxgroup.WithContextAndTeardown(ctx, func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
return nil
})
// sanity check. this should **never** happen // sanity check. this should **never** happen
if len(dht.peerstore.Addresses(dht.self)) < 1 { if len(dht.peerstore.Addresses(dht.self)) < 1 {
panic("attempt to initialize dht without addresses for self") panic("attempt to initialize dht without addresses for self")
......
package dht
import (
inet "github.com/jbenet/go-ipfs/p2p/net"
)
// netNotifiee defines methods to be used with the IpfsDHT
type netNotifiee IpfsDHT
func (nn *netNotifiee) DHT() *IpfsDHT {
return (*IpfsDHT)(nn)
}
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
dht := nn.DHT()
select {
case <-dht.Closing():
return
}
dht.Update(dht.Context(), v.RemotePeer())
}
func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
dht := nn.DHT()
select {
case <-dht.Closing():
return
}
dht.routingTable.Remove(v.RemotePeer())
}
func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {}
func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {}
...@@ -30,6 +30,16 @@ func (b *Bucket) find(id peer.ID) *list.Element { ...@@ -30,6 +30,16 @@ func (b *Bucket) find(id peer.ID) *list.Element {
return nil return nil
} }
func (b *Bucket) remove(id peer.ID) {
b.lk.RLock()
defer b.lk.RUnlock()
for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(peer.ID) == id {
b.list.Remove(e)
}
}
}
func (b *Bucket) moveToFront(e *list.Element) { func (b *Bucket) moveToFront(e *list.Element) {
b.lk.Lock() b.lk.Lock()
b.list.MoveToFront(e) b.list.MoveToFront(e)
......
...@@ -87,6 +87,23 @@ func (rt *RoutingTable) Update(p peer.ID) peer.ID { ...@@ -87,6 +87,23 @@ func (rt *RoutingTable) Update(p peer.ID) peer.ID {
return "" return ""
} }
// Remove deletes a peer from the routing table. This is to be used
// when we are sure a node has disconnected completely.
func (rt *RoutingTable) Remove(p peer.ID) {
rt.tabLock.Lock()
defer rt.tabLock.Unlock()
peerID := ConvertPeerID(p)
cpl := commonPrefixLen(peerID, rt.local)
bucketID := cpl
if bucketID >= len(rt.Buckets) {
bucketID = len(rt.Buckets) - 1
}
bucket := rt.Buckets[bucketID]
bucket.remove(p)
}
func (rt *RoutingTable) nextBucket() peer.ID { func (rt *RoutingTable) nextBucket() peer.ID {
bucket := rt.Buckets[len(rt.Buckets)-1] bucket := rt.Buckets[len(rt.Buckets)-1]
newBucket := bucket.Split(len(rt.Buckets)-1, rt.local) newBucket := bucket.Split(len(rt.Buckets)-1, rt.local)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment