Commit 235a9ec5 authored by Jeromy's avatar Jeromy

Fix dht queries

Queries previously would sometimes only query three (alpha value) peers
before halting the operation. This PR changes the number of peers
grabbed from the routing table to start a query to K.

Dht nodes would also not respond with enough peers, as per the kademlia
paper, this has been changed to from 4 to 'K'.

The query mechanism itself also was flawed in that it would pull all the
peers it had yet to query out of the queue and 'start' the query for
them. The concurrency rate limiting was done inside the 'queryPeer'
method after the goroutine was spawned. This did not allow for peers
receiver from query replies to be properly queried in order of distance.

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
parent cfcc3d6a
...@@ -312,11 +312,7 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) [ ...@@ -312,11 +312,7 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) [
continue continue
} }
// must all be closer than self filtered = append(filtered, clp)
key := key.Key(pmes.GetKey())
if !kb.Closer(dht.self, clp, key) {
filtered = append(filtered, clp)
}
} }
// ok seems like closer nodes // ok seems like closer nodes
......
...@@ -14,7 +14,7 @@ import ( ...@@ -14,7 +14,7 @@ import (
) )
// The number of closer peers to send on requests. // The number of closer peers to send on requests.
var CloserPeerCount = 4 var CloserPeerCount = KValue
// dhthandler specifies the signature of functions that handle DHT messages. // dhthandler specifies the signature of functions that handle DHT messages.
type dhtHandler func(context.Context, peer.ID, *pb.Message) (*pb.Message, error) type dhtHandler func(context.Context, peer.ID, *pb.Message) (*pb.Message, error)
......
...@@ -23,7 +23,7 @@ func pointerizePeerInfos(pis []peer.PeerInfo) []*peer.PeerInfo { ...@@ -23,7 +23,7 @@ func pointerizePeerInfos(pis []peer.PeerInfo) []*peer.PeerInfo {
// to the given key // to the given key
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key key.Key) (<-chan peer.ID, error) { func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key key.Key) (<-chan peer.ID, error) {
e := log.EventBegin(ctx, "getClosestPeers", &key) e := log.EventBegin(ctx, "getClosestPeers", &key)
tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue)
if len(tablepeers) == 0 { if len(tablepeers) == 0 {
return nil, kb.ErrLookupFailure return nil, kb.ErrLookupFailure
} }
......
...@@ -184,29 +184,28 @@ func (r *dhtQueryRunner) spawnWorkers(proc process.Process) { ...@@ -184,29 +184,28 @@ func (r *dhtQueryRunner) spawnWorkers(proc process.Process) {
case <-r.proc.Closing(): case <-r.proc.Closing():
return return
case p, more := <-r.peersToQuery.DeqChan: case <-r.rateLimit:
if !more { select {
return // channel closed. case p, more := <-r.peersToQuery.DeqChan:
if !more {
return // channel closed.
}
// do it as a child func to make sure Run exits
// ONLY AFTER spawn workers has exited.
proc.Go(func(proc process.Process) {
r.queryPeer(proc, p)
})
case <-r.proc.Closing():
return
case <-r.peersRemaining.Done():
return
} }
// do it as a child func to make sure Run exits
// ONLY AFTER spawn workers has exited.
proc.Go(func(proc process.Process) {
r.queryPeer(proc, p)
})
} }
} }
} }
func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) { func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
// make sure we rate limit concurrency.
select {
case <-r.rateLimit:
case <-proc.Closing():
r.peersRemaining.Decrement(1)
return
}
// ok let's do this! // ok let's do this!
// create a context from our proc. // create a context from our proc.
......
...@@ -145,7 +145,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key key.Key, nvals int) ([]ro ...@@ -145,7 +145,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key key.Key, nvals int) ([]ro
} }
// get closest peers in the routing table // get closest peers in the routing table
rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue)
log.Debugf("peers in rt: %s", len(rtp), rtp) log.Debugf("peers in rt: %s", len(rtp), rtp)
if len(rtp) == 0 { if len(rtp) == 0 {
log.Warning("No peers from routing table!") log.Warning("No peers from routing table!")
...@@ -322,7 +322,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key key.Key, ...@@ -322,7 +322,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key key.Key,
return &dhtQueryResult{closerPeers: clpeers}, nil return &dhtQueryResult{closerPeers: clpeers}, nil
}) })
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue)
_, err := query.Run(ctx, peers) _, err := query.Run(ctx, peers)
if err != nil { if err != nil {
log.Debugf("Query error: %s", err) log.Debugf("Query error: %s", err)
...@@ -342,7 +342,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, er ...@@ -342,7 +342,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, er
return pi, nil return pi, nil
} }
peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue) peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), KValue)
if len(peers) == 0 { if len(peers) == 0 {
return peer.PeerInfo{}, kb.ErrLookupFailure return peer.PeerInfo{}, kb.ErrLookupFailure
} }
...@@ -409,7 +409,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< ...@@ -409,7 +409,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
peerchan := make(chan peer.PeerInfo, asyncQueryBuffer) peerchan := make(chan peer.PeerInfo, asyncQueryBuffer)
peersSeen := peer.Set{} peersSeen := peer.Set{}
peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue) peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), KValue)
if len(peers) == 0 { if len(peers) == 0 {
return nil, kb.ErrLookupFailure return nil, kb.ErrLookupFailure
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment