Commit 85f88e1e authored by Adin Schmahmann's avatar Adin Schmahmann Committed by Steven Allen

query switches from alpha mode to k mode if no peers closer than one we have...

query switches from alpha mode to k mode if no peers closer than one we have heard about (in a given path) has been found
parent 6dac71ce
......@@ -73,15 +73,16 @@ type SortedPeerset struct {
lock sync.Mutex
}
func (ps *SortedPeerset) Add(p peer.ID) {
// Add adds the peer to the set. It returns true if the peer was newly added to the topK peers.
func (ps *SortedPeerset) Add(p peer.ID) bool {
ps.lock.Lock()
defer ps.lock.Unlock()
if _, ok := ps.topKPeers[p]; ok {
return
return false
}
if _, ok := ps.restOfPeers[p]; ok {
return
return false
}
distance := ks.XORKeySpace.Key([]byte(p)).Distance(ps.from)
......@@ -95,13 +96,14 @@ func (ps *SortedPeerset) Add(p peer.ID) {
if ps.heapTopKPeers.Len() < ps.kvalue {
heap.Push(&ps.heapTopKPeers, pm)
ps.topKPeers[p] = pm
return
return true
}
switch ps.heapTopKPeers.data[0].Metric().Cmp(distance) {
case -1:
heap.Push(&ps.heapRestOfPeers, pm)
ps.restOfPeers[p] = pm
return false
case 1:
bumpedPeer := heap.Pop(&ps.heapTopKPeers).(*peerMetricHeapItem)
delete(ps.topKPeers, bumpedPeer.Peer())
......@@ -111,7 +113,9 @@ func (ps *SortedPeerset) Add(p peer.ID) {
heap.Push(&ps.heapTopKPeers, pm)
ps.topKPeers[p] = pm
return true
default:
return false
}
}
......
......@@ -107,13 +107,8 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
out := make(chan peer.ID, dht.bucketSize)
defer close(out)
allPeers := make([]peer.ID, 0, dht.bucketSize*dht.d)
for _, q := range queries {
allPeers = append(allPeers, q.localPeers.TopK()...)
}
kadID := kb.ConvertKey(key)
allPeers = kb.SortClosestPeers(allPeers, kadID)
allPeers := kb.SortClosestPeers(queries[0].globallyQueriedPeers.Peers(), kadID)
for i, p := range allPeers {
if i == dht.bucketSize {
break
......
......@@ -7,7 +7,6 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/libp2p/go-libp2p-kad-dht/kpeerset"
kb "github.com/libp2p/go-libp2p-kbucket"
pstore "github.com/libp2p/go-libp2p-core/peerstore"
......@@ -109,25 +108,24 @@ func strictParallelismQuery(q *qu) {
else repeat
*/
var lastPeers []peer.ID
foundCloser := false
for {
topPeers := q.localPeers.TopK()
peersToQuery := q.localPeers.KUnqueried()
if len(peersToQuery) == 0 {
return
}
// TODO: Is it finding a closer peer if it's closer than one we know about or one we have queried?
numQuery := AlphaValue
// TODO: alternative: Check if we did not get any peers closer than alpha closest try k
if lastPeers != nil && peerSlicesEqual(lastPeers, topPeers) {
if foundCloser {
numQuery = len(peersToQuery)
} else if pqLen := len(peersToQuery); pqLen < numQuery {
numQuery = pqLen
}
lastPeers = topPeers
foundCloser = false
queryResCh := make(chan bool, numQuery)
queryResCh := make(chan *queryResult, numQuery)
resultsReceived := 0
for _, p := range peersToQuery[:numQuery] {
......@@ -139,7 +137,8 @@ func strictParallelismQuery(q *qu) {
loop:
for {
select {
case <-queryResCh:
case res := <-queryResCh:
foundCloser = foundCloser || res.foundCloserPeer
resultsReceived++
if resultsReceived == numQuery {
break loop
......@@ -181,7 +180,7 @@ func simpleQuery(q *qu) {
for _, p := range peersToQuery[:numQuery] {
peersToQueryCh <- p
}
queryResCh := make(chan bool, numQuery)
queryResCh := make(chan *queryResult, numQuery)
queriesSucceeded, queriesSent := 0, numQuery
dialPeers:
......@@ -191,8 +190,8 @@ func simpleQuery(q *qu) {
go func() {
queryResCh <- q.queryPeer(q.ctx, p)
}()
case success := <-queryResCh:
if success {
case res := <-queryResCh:
if res.success {
queriesSucceeded++
if queriesSucceeded == numQuery {
break dialPeers
......@@ -238,7 +237,7 @@ func boundedDialQuery(q *qu) {
for _, p := range peersToQuery[:numQuery] {
peersToQueryCh <- p
}
queryResCh := make(chan bool, numQuery)
queryResCh := make(chan *queryResult, numQuery)
queriesSucceeded, queriesSent := 0, 0
for {
......@@ -247,8 +246,8 @@ func boundedDialQuery(q *qu) {
go func() {
queryResCh <- q.queryPeer(q.ctx, p)
}()
case success := <-queryResCh:
if success {
case res := <-queryResCh:
if res.success {
queriesSucceeded++
} else {
queriesSent++
......@@ -264,22 +263,27 @@ func boundedDialQuery(q *qu) {
}
}
func (q *qu) queryPeer(ctx context.Context, p peer.ID) bool {
type queryResult struct {
success bool
foundCloserPeer bool
}
func (q *qu) queryPeer(ctx context.Context, p peer.ID) *queryResult {
dialCtx, queryCtx := ctx, ctx
if err := q.dht.dialPeer(dialCtx, p); err != nil {
q.localPeers.Remove(p)
return false
return &queryResult{}
}
if !q.globallyQueriedPeers.TryAdd(p) {
q.localPeers.Remove(p)
return false
return &queryResult{}
}
newPeers, err := q.queryFn(queryCtx, p)
if err != nil {
q.localPeers.Remove(p)
return false
return &queryResult{}
}
q.localPeers.MarkQueried(p)
......@@ -298,14 +302,19 @@ func (q *qu) queryPeer(ctx context.Context, p peer.ID) bool {
q.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
}
foundCloserPeer := false
for _, np := range newPeers {
q.localPeers.Add(np.ID)
closer := q.localPeers.Add(np.ID)
foundCloserPeer = foundCloserPeer || closer
}
if q.stopFn(q.localPeers) {
q.cancel()
}
return true
return &queryResult{
success: true,
foundCloserPeer: foundCloserPeer,
}
}
func (dht *IpfsDHT) dialPeer(ctx context.Context, p peer.ID) error {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment