Commit 229cacc6 authored by Aarsh Shah's avatar Aarsh Shah Committed by Steven Allen

Cypress Disjoint Query & Query Termination code cleanup (#489)

* kpeerset refactoring
* query code cleanup
parent c17965fa
package kpeerset
import (
"github.com/libp2p/go-libp2p-core/peer"
"math/big"
)
type IPeerMetric interface {
Peer() peer.ID
Metric() *big.Int
}
// peerMetric tracks a peer and its distance to something else.
type peerMetric struct {
// the peer
peer peer.ID
// big.Int for XOR metric
metric *big.Int
}
func (pm peerMetric) Peer() peer.ID { return pm.peer }
func (pm peerMetric) Metric() *big.Int { return pm.metric }
type peerMetricHeapItem struct {
IPeerMetric
// The index of the item in the heap
index int
}
// peerMetricHeap implements a heap of peerDistances.
// The heap sorts by furthest if direction = 1 and closest if direction = -1
type peerMetricHeap struct {
data []*peerMetricHeapItem
direction int
}
func (ph *peerMetricHeap) Len() int {
return len(ph.data)
}
func (ph *peerMetricHeap) Less(i, j int) bool {
h := ph.data
return ph.direction == h[i].Metric().Cmp(h[j].Metric())
}
func (ph *peerMetricHeap) Swap(i, j int) {
h := ph.data
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (ph *peerMetricHeap) Push(x interface{}) {
n := len(ph.data)
item := x.(*peerMetricHeapItem)
item.index = n
ph.data = append(ph.data, item)
}
func (ph *peerMetricHeap) Pop() interface{} {
old := ph.data
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
ph.data = old[0 : n-1]
return item
}
package kpeerset
import (
"math/big"
"sort"
"time"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
)
type peerLatencyMetric struct {
peerMetric
connectedness network.Connectedness
latency time.Duration
}
type peerLatencyMetricList []peerLatencyMetric
func (p peerLatencyMetricList) Len() int { return len(p) }
func (p peerLatencyMetricList) Less(i, j int) bool {
pm1, pm2 := p[i], p[j]
return calculationLess(pm1, pm2)
}
func (p peerLatencyMetricList) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p peerLatencyMetricList) GetPeerID(i int) peer.ID { return p[i].peer }
func calculationLess(pm1, pm2 peerLatencyMetric) bool {
return calc(pm1).Cmp(calc(pm2)) == -1
}
func calc(pm peerLatencyMetric) *big.Int {
var c int64
switch pm.connectedness {
case network.Connected:
c = 1
case network.CanConnect:
c = 5
case network.CannotConnect:
c = 10000
default:
c = 20
}
l := int64(pm.latency)
if l <= 0 {
l = int64(time.Second) * 10
}
res := big.NewInt(c)
tmp := big.NewInt(l)
res.Mul(res, tmp)
res.Mul(res, pm.metric)
return res
}
var _ SortablePeers = (*peerLatencyMetricList)(nil)
func PeersSortedByLatency(peers []IPeerMetric, net network.Network, metrics peerstore.Metrics) SortablePeers {
lst := make(peerLatencyMetricList, len(peers))
for i := range lst {
p := peers[i].Peer()
lst[i] = peerLatencyMetric{
peerMetric: peerMetric{peer: p, metric: peers[i].Metric()},
connectedness: net.Connectedness(p),
latency: metrics.LatencyEWMA(p),
}
}
sort.Sort(lst)
return lst
}
package peerheap
import (
"github.com/libp2p/go-libp2p-core/peer"
)
// Comparator is the type of a function that compares two peer Heap items to determine the ordering between them.
// It returns true if i1 is "less" than i2 and false otherwise.
type Comparator func(i1 Item, i2 Item) bool
// Item is one "item" in the Heap.
// It contains the Id of the peer, an arbitrary value associated with the peer
// and the index of the "item" in the Heap.
type Item struct {
Peer peer.ID
Value interface{}
Index int
}
// Heap implements a heap of peer Items.
// It uses the "compare" member function to compare two peers to determine the order between them.
// If isMaxHeap is set to true, this Heap is a maxHeap, otherwise it's a minHeap.
//
// Note: It is the responsibility of the caller to enforce locking & synchronization.
type Heap struct {
items []*Item
compare Comparator
isMaxHeap bool
}
// New creates & returns a peer Heap.
func New(isMaxHeap bool, compare Comparator) *Heap {
return &Heap{isMaxHeap: isMaxHeap, compare: compare}
}
// PeekTop returns a copy of the top/first Item in the heap.
// This would be the "maximum" or the "minimum" peer depending on whether
// the heap is a maxHeap or a minHeap.
//
// A call to PeekTop will panic if the Heap is empty.
func (ph *Heap) PeekTop() Item {
return *ph.items[0]
}
// FilterItems returns Copies of ALL Items in the Heap that satisfy the given predicate
func (ph *Heap) FilterItems(p func(i Item) bool) []Item {
var items []Item
for _, i := range ph.items {
ih := *i
if p(ih) {
items = append(items, ih)
}
}
return items
}
// Peers returns all the peers currently in the heap
func (ph *Heap) Peers() []peer.ID {
peers := make([]peer.ID, 0, len(ph.items))
for _, i := range ph.items {
peers = append(peers, i.Peer)
}
return peers
}
// Note: The functions below make the Heap satisfy the "heap.Interface" as required by the "heap" package in the
// standard library. Please refer to the docs for "heap.Interface" in the standard library for more details.
func (ph *Heap) Len() int {
return len(ph.items)
}
func (ph *Heap) Less(i, j int) bool {
h := ph.items
isLess := ph.compare(*h[i], *h[j])
// because the "compare" function returns true if item1 is less than item2,
// we need to reverse it's result if the Heap is a maxHeap.
if ph.isMaxHeap {
return !isLess
}
return isLess
}
func (ph *Heap) Swap(i, j int) {
h := ph.items
h[i], h[j] = h[j], h[i]
h[i].Index = i
h[j].Index = j
}
func (ph *Heap) Push(x interface{}) {
n := len(ph.items)
item := x.(*Item)
item.Index = n
ph.items = append(ph.items, item)
}
func (ph *Heap) Pop() interface{} {
old := ph.items
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.Index = -1 // for safety
ph.items = old[0 : n-1]
return item
}
package peerheap
import (
"container/heap"
"testing"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/require"
)
// a comparator that compares peer Ids based on their length
var cmp = func(i1 Item, i2 Item) bool {
return len(i1.Peer) < len(i2.Peer)
}
var (
peer1 = peer.ID("22")
peer2 = peer.ID("1")
peer3 = peer.ID("333")
)
func TestMinHeap(t *testing.T) {
// create new
ph := New(false, cmp)
require.Zero(t, ph.Len())
// push the element
heap.Push(ph, &Item{Peer: peer1})
// assertions
require.True(t, ph.Len() == 1)
require.Equal(t, peer1, ph.PeekTop().Peer)
// push another element
heap.Push(ph, &Item{Peer: peer2})
// assertions
require.True(t, ph.Len() == 2)
require.Equal(t, peer2, ph.PeekTop().Peer)
// push another element
heap.Push(ph, &Item{Peer: peer3})
// assertions
require.True(t, ph.Len() == 3)
require.Equal(t, peer2, ph.PeekTop().Peer)
// remove & add again
heap.Remove(ph, 1)
require.True(t, ph.Len() == 2)
heap.Remove(ph, 0)
require.True(t, ph.Len() == 1)
heap.Push(ph, &Item{Peer: peer1})
heap.Push(ph, &Item{Peer: peer2})
// test filter peers
filtered := ph.FilterItems(func(i Item) bool {
return len(i.Peer) != 2
})
require.Len(t, filtered, 2)
require.Contains(t, itemsToPeers(filtered), peer2)
require.Contains(t, itemsToPeers(filtered), peer3)
// Assert Min Heap Order
require.Equal(t, peer2, heap.Pop(ph).(*Item).Peer)
require.Equal(t, peer1, heap.Pop(ph).(*Item).Peer)
require.Equal(t, peer3, heap.Pop(ph).(*Item).Peer)
}
func itemsToPeers(is []Item) []peer.ID {
peers := make([]peer.ID, 0, len(is))
for _, i := range is {
peers = append(peers, i.Peer)
}
return peers
}
func TestMaxHeap(t *testing.T) {
// create new
ph := New(true, cmp)
require.Zero(t, ph.Len())
// push all three peers
heap.Push(ph, &Item{Peer: peer1})
heap.Push(ph, &Item{Peer: peer3})
heap.Push(ph, &Item{Peer: peer2})
// Assert Max Heap Order
require.Equal(t, peer3, heap.Pop(ph).(*Item).Peer)
require.Equal(t, peer1, heap.Pop(ph).(*Item).Peer)
require.Equal(t, peer2, heap.Pop(ph).(*Item).Peer)
}
......@@ -2,128 +2,183 @@ package kpeerset
import (
"container/heap"
"sort"
"math/big"
"sync"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-kad-dht/kpeerset/peerheap"
ks "github.com/whyrusleeping/go-keyspace"
)
type SortablePeers interface {
sort.Interface
GetPeerID(i int) peer.ID
}
// SortedPeerset is a data-structure that maintains the queried & unqueried peers for a query
// based on their distance from the key.
// It's main use is to allow peer addition, removal & retrieval for the query as per the
// semantics described in the Kad DHT paper.
type SortedPeerset struct {
// the key being searched for
key ks.Key
func NewSortedPeerset(kvalue int, from string, sortPeers func([]IPeerMetric) SortablePeers) *SortedPeerset {
fromKey := ks.XORKeySpace.Key([]byte(from))
// the K parameter in the Kad DHT paper
kvalue int
return &SortedPeerset{
kvalue: kvalue,
from: fromKey,
heapTopKPeers: peerMetricHeap{direction: 1},
heapRestOfPeers: peerMetricHeap{direction: -1},
topKPeers: make(map[peer.ID]*peerMetricHeapItem),
restOfPeers: make(map[peer.ID]*peerMetricHeapItem),
queried: make(map[peer.ID]struct{}),
sortPeers: sortPeers,
}
}
// a maxHeap maintaining the K closest(Kademlia XOR distance) peers to the key.
// the topmost peer will be the peer furthest from the key in this heap.
heapKClosestPeers *peerheap.Heap
type SortedPeerset struct {
kvalue int
// a minHeap for for rest of the peers ordered by their distance from the key.
// the topmost peer will be the peer closest to the key in this heap.
heapRestOfPeers *peerheap.Heap
// from is the Key this PQ measures against
from ks.Key
// pointer to the item in the heap of K closest peers.
kClosestPeers map[peer.ID]*peerheap.Item
heapTopKPeers, heapRestOfPeers peerMetricHeap
// pointer to the item in the heap of the rest of peers.
restOfPeers map[peer.ID]*peerheap.Item
topKPeers, restOfPeers map[peer.ID]*peerMetricHeapItem
queried map[peer.ID]struct{}
// peers that have already been queried.
queried map[peer.ID]struct{}
sortPeers func([]IPeerMetric) SortablePeers
// the closest peer to the key that we have heard about
closestKnownPeer peer.ID
// the distance of the closest known peer from the key
dClosestKnownPeer *big.Int
lock sync.Mutex
}
// Add adds the peer to the set. It returns true if the peer was newly added to the topK peers.
// NewSortedPeerset creates and returns a new SortedPeerset.
func NewSortedPeerset(kvalue int, key string) *SortedPeerset {
compare := func(i1 peerheap.Item, i2 peerheap.Item) bool {
// distance of the first peer from the key
d1 := i1.Value.(*big.Int)
// distance of the second peer from the key
d2 := i2.Value.(*big.Int)
// Is the first peer closer to the key than the second peer ?
return d1.Cmp(d2) == -1
}
return &SortedPeerset{
key: ks.XORKeySpace.Key([]byte(key)),
kvalue: kvalue,
heapKClosestPeers: peerheap.New(true, compare),
heapRestOfPeers: peerheap.New(false, compare),
kClosestPeers: make(map[peer.ID]*peerheap.Item),
restOfPeers: make(map[peer.ID]*peerheap.Item),
queried: make(map[peer.ID]struct{}),
}
}
// Add adds the peer to the SortedPeerset.
//
// If there are less than K peers in the K closest peers, we add the peer to
// the K closest peers.
//
// Otherwise, we do one of the following:
// 1. If this peer is closer to the key than the peer furthest from the key in the
// K closest peers, we move that furthest peer to the rest of peers and then
// add this peer to the K closest peers.
// 2. If this peer is further from the key than the peer furthest from the key in the
// K closest peers, we add it to the rest of peers.
//
// Returns true if the peer is closer to key than the closet peer we've heard about.
func (ps *SortedPeerset) Add(p peer.ID) bool {
ps.lock.Lock()
defer ps.lock.Unlock()
if _, ok := ps.topKPeers[p]; ok {
return false
}
if _, ok := ps.restOfPeers[p]; ok {
// we've already added the peer
if ps.kClosestPeers[p] != nil || ps.restOfPeers[p] != nil {
return false
}
distance := ks.XORKeySpace.Key([]byte(p)).Distance(ps.from)
pm := &peerMetricHeapItem{
IPeerMetric: peerMetric{
peer: p,
metric: distance,
},
// calculate the distance of the given peer from the key
distancePeer := ks.XORKeySpace.Key([]byte(p)).Distance(ps.key)
item := &peerheap.Item{Peer: p, Value: distancePeer}
if ps.heapKClosestPeers.Len() < ps.kvalue {
// add the peer to the K closest peers if we have space
heap.Push(ps.heapKClosestPeers, item)
ps.kClosestPeers[p] = item
} else if top := ps.heapKClosestPeers.PeekTop(); distancePeer.Cmp(top.Value.(*big.Int)) == -1 {
// peer is closer to the key than the top peer in the heap of K closest peers
// which is basically the peer furthest from the key because the K closest peers
// are stored in a maxHeap ordered by the distance from the key.
// remove the top peer from the K closest peers & add it to the rest of peers.
bumpedPeer := heap.Pop(ps.heapKClosestPeers).(*peerheap.Item)
delete(ps.kClosestPeers, bumpedPeer.Peer)
heap.Push(ps.heapRestOfPeers, bumpedPeer)
ps.restOfPeers[bumpedPeer.Peer] = bumpedPeer
// add the peer p to the K closest peers
heap.Push(ps.heapKClosestPeers, item)
ps.kClosestPeers[p] = item
} else {
// add the peer to the rest of peers.
heap.Push(ps.heapRestOfPeers, item)
ps.restOfPeers[p] = item
}
if ps.heapTopKPeers.Len() < ps.kvalue {
heap.Push(&ps.heapTopKPeers, pm)
ps.topKPeers[p] = pm
if ps.closestKnownPeer == "" || (distancePeer.Cmp(ps.dClosestKnownPeer) == -1) {
// given peer is closer to the key than the current closest known peer.
// So, let's update the closest known peer
ps.closestKnownPeer = p
ps.dClosestKnownPeer = distancePeer
return true
}
switch ps.heapTopKPeers.data[0].Metric().Cmp(distance) {
case -1:
heap.Push(&ps.heapRestOfPeers, pm)
ps.restOfPeers[p] = pm
return false
case 1:
bumpedPeer := heap.Pop(&ps.heapTopKPeers).(*peerMetricHeapItem)
delete(ps.topKPeers, bumpedPeer.Peer())
heap.Push(&ps.heapRestOfPeers, bumpedPeer)
ps.restOfPeers[bumpedPeer.Peer()] = bumpedPeer
heap.Push(&ps.heapTopKPeers, pm)
ps.topKPeers[p] = pm
return true
default:
return false
}
return false
}
func (ps *SortedPeerset) TopK() []peer.ID {
// UnqueriedFromKClosest returns the unqueried peers among the K closest peers AFTER
// sorting them in Ascending Order with the given comparator.
// It uses the `getValue` function to get the value with which to compare the peers for sorting
// and the `sortWith` function to compare two peerHeap items to determine the ordering between them.
func (ps *SortedPeerset) UnqueriedFromKClosest(getValue func(id peer.ID, distance *big.Int) interface{},
sortWith peerheap.Comparator) []peer.ID {
ps.lock.Lock()
defer ps.lock.Unlock()
topK := make([]peer.ID, 0, len(ps.heapTopKPeers.data))
for _, pm := range ps.heapTopKPeers.data {
topK = append(topK, pm.Peer())
unqueriedPeerItems := ps.heapKClosestPeers.FilterItems(ps.isPeerItemQueried)
// create a min-heap to sort the unqueried peer Items using the given comparator
ph := peerheap.New(false, sortWith)
for _, i := range unqueriedPeerItems {
p := i.Peer
d := i.Value.(*big.Int)
heap.Push(ph, &peerheap.Item{Peer: p, Value: getValue(p, d)})
}
// now pop so we get them in sorted order
peers := make([]peer.ID, 0, ph.Len())
for ph.Len() != 0 {
popped := heap.Pop(ph).(*peerheap.Item)
peers = append(peers, popped.Peer)
}
return topK
return peers
}
func (ps *SortedPeerset) KUnqueried() []peer.ID {
// LenUnqueriedFromKClosest returns the number of unqueried peers among
// the K closest peers.
func (ps *SortedPeerset) LenUnqueriedFromKClosest() int {
ps.lock.Lock()
defer ps.lock.Unlock()
topK := make([]IPeerMetric, 0, len(ps.heapTopKPeers.data))
for _, pm := range ps.heapTopKPeers.data {
if _, ok := ps.queried[pm.Peer()]; !ok {
topK = append(topK, pm.IPeerMetric)
}
}
unqueriedPeerItems := ps.heapKClosestPeers.FilterItems(ps.isPeerItemQueried)
sortedPeers := ps.sortPeers(topK)
peers := make([]peer.ID, 0, sortedPeers.Len())
for i := range topK {
p := sortedPeers.GetPeerID(i)
peers = append(peers, p)
}
return len(unqueriedPeerItems)
}
return peers
// caller is responsible for the locking
func (ps *SortedPeerset) isPeerItemQueried(i peerheap.Item) bool {
_, ok := ps.queried[i.Peer]
return !ok
}
// MarkQueried marks the peer as queried.
// It should be called when we have successfully dialed to and gotten a response from the peer.
func (ps *SortedPeerset) MarkQueried(p peer.ID) {
ps.lock.Lock()
defer ps.lock.Unlock()
......@@ -131,25 +186,51 @@ func (ps *SortedPeerset) MarkQueried(p peer.ID) {
ps.queried[p] = struct{}{}
}
// Remove removes the peer from the SortedPeerset.
//
// If the removed peer was among the K closest peers, we pop a peer from the heap of rest of peers
// and add it to the K closest peers to replace the removed peer. The peer added to the K closest peers in this way
// would be the peer that was closest to the key among the rest of peers since the rest of peers are in a
// minHeap ordered on the distance from the key.
func (ps *SortedPeerset) Remove(p peer.ID) {
ps.lock.Lock()
defer ps.lock.Unlock()
delete(ps.queried, p)
if item, ok := ps.topKPeers[p]; ok {
heap.Remove(&ps.heapTopKPeers, item.index)
delete(ps.topKPeers, p)
if len(ps.heapRestOfPeers.data) > 0 {
upgrade := heap.Pop(&ps.heapRestOfPeers).(*peerMetricHeapItem)
delete(ps.restOfPeers, upgrade.Peer())
if item, ok := ps.kClosestPeers[p]; ok {
// peer is among the K closest peers
// remove it from the K closest peers
heap.Remove(ps.heapKClosestPeers, item.Index)
delete(ps.kClosestPeers, p)
// if this peer was the closest peer we knew, we need to find the new closest peer.
if ps.closestKnownPeer == p {
var minDistance *big.Int
var closest peer.ID
for _, i := range ps.kClosestPeers {
d := i.Value.(*big.Int)
if minDistance == nil || (d.Cmp(minDistance) == -1) {
minDistance = d
closest = i.Peer
}
}
ps.closestKnownPeer = closest
ps.dClosestKnownPeer = minDistance
}
heap.Push(&ps.heapTopKPeers, upgrade)
ps.topKPeers[upgrade.Peer()] = upgrade
// we now need to add a peer to the K closest peers from the rest of peers
// to make up for the peer that was just removed
if ps.heapRestOfPeers.Len() > 0 {
// pop a peer from the rest of peers & add it to the K closest peers
upgrade := heap.Pop(ps.heapRestOfPeers).(*peerheap.Item)
delete(ps.restOfPeers, upgrade.Peer)
heap.Push(ps.heapKClosestPeers, upgrade)
ps.kClosestPeers[upgrade.Peer] = upgrade
}
} else if item, ok := ps.restOfPeers[p]; ok {
heap.Remove(&ps.heapRestOfPeers, item.index)
// peer is not among the K closest, so remove it from the rest of peers.
heap.Remove(ps.heapRestOfPeers, item.Index)
delete(ps.restOfPeers, p)
}
}
package kpeerset
import (
"math/big"
"testing"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/test"
"github.com/libp2p/go-libp2p-kad-dht/kpeerset/peerheap"
kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/stretchr/testify/require"
)
var noopCompare = func(i1 peerheap.Item, i2 peerheap.Item) bool {
return true
}
var noopGetValue = func(p peer.ID, d *big.Int) interface{} {
return d
}
func TestSortedPeerset(t *testing.T) {
key := "test"
sp := NewSortedPeerset(2, key)
require.Empty(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare))
// -----------------Ordering between peers for the Test -----
// KEY < peer0 < peer3 < peer1 < peer4 < peer2 < peer5
// ----------------------------------------------------------
peer2 := test.RandPeerIDFatal(t)
// add peer 2 & assert
require.True(t, sp.Add(peer2))
require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 1)
require.True(t, sp.LenUnqueriedFromKClosest() == 1)
require.Equal(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare)[0], peer2)
assertClosestKnownPeer(t, sp, peer2)
// add peer4 & assert
var peer4 peer.ID
for {
peer4 = test.RandPeerIDFatal(t)
if kb.Closer(peer4, peer2, key) {
break
}
}
require.True(t, sp.Add(peer4))
require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 2)
require.True(t, sp.LenUnqueriedFromKClosest() == 2)
require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer2)
require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer4)
assertClosestKnownPeer(t, sp, peer4)
// add peer1 which will displace peer2 in the kClosest
var peer1 peer.ID
for {
peer1 = test.RandPeerIDFatal(t)
if kb.Closer(peer1, peer4, key) {
break
}
}
require.True(t, sp.Add(peer1))
require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 2)
require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer1)
require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer4)
assertClosestKnownPeer(t, sp, peer1)
// add peer 3 which will displace peer4 in the kClosest
var peer3 peer.ID
for {
peer3 = test.RandPeerIDFatal(t)
if kb.Closer(peer3, peer1, key) {
break
}
}
require.True(t, sp.Add(peer3))
require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 2)
require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer1)
require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer3)
assertClosestKnownPeer(t, sp, peer3)
// removing peer1 moves peer4 to the KClosest
sp.Remove(peer1)
require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 2)
require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer3)
require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer4)
sp.lock.Lock()
require.True(t, sp.heapRestOfPeers.Len() == 1)
require.Contains(t, sp.heapRestOfPeers.Peers(), peer2)
sp.lock.Unlock()
// mark a peer as queried so it's not returned as unqueried
sp.MarkQueried(peer4)
require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 1)
require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer3)
// removing peer3 moves peer2 to the kClosest & updates the closest known peer to peer4
sp.Remove(peer3)
require.Len(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), 1)
require.Contains(t, sp.UnqueriedFromKClosest(noopGetValue, noopCompare), peer2)
sp.lock.Lock()
require.Empty(t, sp.heapRestOfPeers.Peers())
sp.lock.Unlock()
assertClosestKnownPeer(t, sp, peer4)
// adding peer5 does not change the closest known peer
var peer5 peer.ID
for {
peer5 = test.RandPeerIDFatal(t)
if kb.Closer(peer2, peer5, key) {
break
}
}
require.False(t, sp.Add(peer5))
assertClosestKnownPeer(t, sp, peer4)
// adding peer0 changes the closest known peer
var peer0 peer.ID
for {
peer0 = test.RandPeerIDFatal(t)
if kb.Closer(peer0, peer3, key) {
break
}
}
require.True(t, sp.Add(peer0))
assertClosestKnownPeer(t, sp, peer0)
}
func TestSortingUnqueriedFromKClosest(t *testing.T) {
p1 := peer.ID("1")
p2 := peer.ID("22")
p3 := peer.ID("333")
key := "test"
sp := NewSortedPeerset(3, key)
sp.Add(p1)
sp.Add(p3)
sp.Add(p2)
ps := sp.UnqueriedFromKClosest(noopGetValue, func(i1 peerheap.Item, i2 peerheap.Item) bool {
return len(i1.Peer) > len(i2.Peer)
})
require.Len(t, ps, 3)
require.Equal(t, p3, ps[0])
require.Equal(t, p2, ps[1])
require.Equal(t, p1, ps[2])
// mark one as queried
scoref := func(p peer.ID, d *big.Int) interface{} {
return len(p)
}
sp.MarkQueried(p3)
ps = sp.UnqueriedFromKClosest(scoref, func(i1 peerheap.Item, i2 peerheap.Item) bool {
return i1.Value.(int) > i2.Value.(int)
})
require.Len(t, ps, 2)
require.Equal(t, p2, ps[0])
require.Equal(t, p1, ps[1])
}
func assertClosestKnownPeer(t *testing.T, sp *SortedPeerset, p peer.ID) {
sp.lock.Lock()
defer sp.lock.Unlock()
require.Equal(t, sp.closestKnownPeer, p)
}
......@@ -5,11 +5,14 @@ import (
"errors"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
pstore "github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/libp2p/go-libp2p-kad-dht/kpeerset/peerheap"
"math/big"
"time"
"github.com/libp2p/go-libp2p-kad-dht/kpeerset"
kb "github.com/libp2p/go-libp2p-kbucket"
pstore "github.com/libp2p/go-libp2p-core/peerstore"
)
// ErrNoPeersQueried is returned when we failed to connect to any peers.
......@@ -18,16 +21,26 @@ var ErrNoPeersQueried = errors.New("failed to query any peers")
type queryFn func(context.Context, peer.ID) ([]*peer.AddrInfo, error)
type stopFn func(*kpeerset.SortedPeerset) bool
// query represents a single disjoint query.
type query struct {
ctx context.Context
// the query context.
ctx context.Context
// the cancellation function for the query context.
cancel context.CancelFunc
dht *IpfsDHT
localPeers *kpeerset.SortedPeerset
// localPeers is the set of peers that need to be queried or have already been queried for this query.
localPeers *kpeerset.SortedPeerset
// globallyQueriedPeers is the combined set of peers queried across ALL the disjoint queries.
globallyQueriedPeers *peer.Set
queryFn queryFn
stopFn stopFn
// the function that will be used to query a single peer.
queryFn queryFn
// stopFn is used to determine if we should stop the WHOLE disjoint query.
stopFn stopFn
}
func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) ([]*query, error) {
......@@ -36,6 +49,7 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string
numQueriesComplete := 0
queryDone := make(chan struct{}, d)
// pick the K closest peers to the key in our Routing table and shuffle them.
seedPeers := dht.routingTable.NearestPeers(kb.ConvertKey(target), dht.bucketSize)
if len(seedPeers) == 0 {
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
......@@ -49,15 +63,15 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string
seedPeers[i], seedPeers[j] = seedPeers[j], seedPeers[i]
})
// create "d" disjoint queries
queries := make([]*query, d)
peersQueried := peer.NewSet()
for i := 0; i < d; i++ {
query := &query{
ctx: queryCtx,
cancel: cancelQuery,
dht: dht,
localPeers: kpeerset.NewSortedPeerset(dht.bucketSize, target, dht.sortPeers),
localPeers: kpeerset.NewSortedPeerset(dht.bucketSize, target),
globallyQueriedPeers: peersQueried,
queryFn: queryFn,
stopFn: stopFn,
......@@ -66,10 +80,12 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string
queries[i] = query
}
// distribute the shuffled K closest peers as seeds among the "d" disjoint queries
for i := 0; i < len(seedPeers); i++ {
queries[i%d].localPeers.Add(seedPeers[i])
}
// start the "d" disjoint queries
for i := 0; i < d; i++ {
query := queries[i]
go func() {
......@@ -79,6 +95,7 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string
}
loop:
// wait for all the "d" disjoint queries to complete before we return
for {
select {
case <-queryDone:
......@@ -94,47 +111,86 @@ loop:
return queries, nil
}
func (dht *IpfsDHT) sortPeers(peers []kpeerset.IPeerMetric) kpeerset.SortablePeers {
return kpeerset.PeersSortedByLatency(peers, dht.host.Network(), dht.peerstore)
// TODO This function should be owned by the DHT as it dosen't really belong to "a query".
// scorePeerByDistanceAndLatency scores a peer using metrics such as connectendness of the peer, it's distance from the key
// and it's current known latency.
func (q query) scorePeerByDistanceAndLatency(p peer.ID, distanceFromKey *big.Int) interface{} {
connectedness := q.dht.host.Network().Connectedness(p)
latency := q.dht.host.Peerstore().LatencyEWMA(p)
var c int64
switch connectedness {
case network.Connected:
c = 1
case network.CanConnect:
c = 5
case network.CannotConnect:
c = 10000
default:
c = 20
}
l := int64(latency)
if l <= 0 {
l = int64(time.Second) * 10
}
res := big.NewInt(c)
res.Mul(res, big.NewInt(l))
res.Mul(res, distanceFromKey)
return res
}
// strictParallelismQuery concurrently sends the query RPC to all eligible peers
// and waits for ALL the RPC's to complete before starting the next round of RPC's.
func strictParallelismQuery(q *query) {
/*
start with K closest peers (some queried already some not)
take best alpha (sorted by some metric)
query those alpha
once they complete:
if the alpha requests did not add any new peers to top K, repeat with unqueried top K
else repeat
*/
foundCloser := false
for {
peersToQuery := q.localPeers.KUnqueried()
// get the unqueried peers from among the K closest peers to the key sorted in ascending order
// of their 'distance-latency` score.
// We sort peers like this so that "better" peers are chosen to be in the α peers
// which get queried from among the unqueried K closet.
peersToQuery := q.localPeers.UnqueriedFromKClosest(q.scorePeerByDistanceAndLatency,
func(i1 peerheap.Item, i2 peerheap.Item) bool {
return i1.Value.(*big.Int).Cmp(i2.Value.(*big.Int)) == -1
})
// The lookup terminates when the initiator has queried and gotten responses from the k
// closest nodes it has heard about.
if len(peersToQuery) == 0 {
return
}
// TODO: Is it finding a closer peer if it's closer than one we know about or one we have queried?
// Of the k nodes the initiator has heard of closest to the target,
// it picks α that it has not yet queried and resends the FIND NODE RPC to them.
numQuery := q.dht.alpha
if foundCloser {
// However, If a round of RPC's fails to return a node any closer than the closest already heard about,
// the initiator resends the RPC'S to all of the k closest nodes it has
// not already queried.
if !foundCloser {
numQuery = len(peersToQuery)
} else if pqLen := len(peersToQuery); pqLen < numQuery {
// if we don't have α peers, pick whatever number we have.
numQuery = pqLen
}
// reset foundCloser to false for the next round of RPC's
foundCloser = false
queryResCh := make(chan *queryResult, numQuery)
resultsReceived := 0
// send RPC's to all the chosen peers concurrently
for _, p := range peersToQuery[:numQuery] {
go func(p peer.ID) {
queryResCh <- q.queryPeer(q.ctx, p)
queryResCh <- q.queryPeer(p)
}(p)
}
loop:
// wait for all outstanding RPC's to complete before we start the next round.
for {
select {
case res := <-queryResCh:
......@@ -150,153 +206,50 @@ func strictParallelismQuery(q *query) {
}
}
func simpleQuery(q *query) {
/*
start with K closest peers (some queried already some not)
take best alpha (sorted by some metric)
query those alpha
- if a query fails then take the next one
once they complete:
if the alpha requests did not add any new peers to top K, repeat with unqueried top K
else repeat
*/
var lastPeers []peer.ID
for {
peersToQuery := q.localPeers.KUnqueried()
if len(peersToQuery) == 0 {
return
}
numQuery := q.dht.alpha
if lastPeers != nil && peerSlicesEqual(lastPeers, peersToQuery) {
numQuery = len(peersToQuery)
} else if pqLen := len(peersToQuery); pqLen < numQuery {
numQuery = pqLen
}
peersToQueryCh := make(chan peer.ID, numQuery)
for _, p := range peersToQuery[:numQuery] {
peersToQueryCh <- p
}
queryResCh := make(chan *queryResult, numQuery)
queriesSucceeded, queriesSent := 0, numQuery
dialPeers:
for {
select {
case p := <-peersToQueryCh:
go func() {
queryResCh <- q.queryPeer(q.ctx, p)
}()
case res := <-queryResCh:
if res.success {
queriesSucceeded++
if queriesSucceeded == numQuery {
break dialPeers
}
} else {
queriesSent++
if queriesSent >= len(peersToQuery) {
break dialPeers
}
peersToQueryCh <- peersToQuery[queriesSent]
}
case <-q.ctx.Done():
return
}
}
}
}
func boundedDialQuery(q *query) {
/*
start with K closest peers (some queried already some not)
take best alpha (sorted by some metric)
query those alpha
-- if queried peer falls out of top K we've heard of + top alpha we've received responses from
+ others like percentage of way through the timeout, their reputation, etc.
1) Cancel dial 2) Cancel query but not dial 3) Continue with query
*/
var lastPeers []peer.ID
for {
peersToQuery := q.localPeers.KUnqueried()
if len(peersToQuery) == 0 {
return
}
numQuery := q.dht.alpha
if lastPeers != nil && peerSlicesEqual(lastPeers, peersToQuery) {
numQuery = len(peersToQuery)
}
peersToQueryCh := make(chan peer.ID, numQuery)
for _, p := range peersToQuery[:numQuery] {
peersToQueryCh <- p
}
queryResCh := make(chan *queryResult, numQuery)
queriesSucceeded, queriesSent := 0, 0
for {
select {
case p := <-peersToQueryCh:
go func() {
queryResCh <- q.queryPeer(q.ctx, p)
}()
case res := <-queryResCh:
if res.success {
queriesSucceeded++
} else {
queriesSent++
if queriesSent >= len(peersToQuery) {
return
}
peersToQueryCh <- peersToQuery[queriesSent]
}
case <-q.ctx.Done():
return
}
}
}
}
type queryResult struct {
success bool
// foundCloserPeer is true if the peer we're querying returns a peer
// closer than the closest we've already heard about
foundCloserPeer bool
}
func (q *query) queryPeer(ctx context.Context, p peer.ID) *queryResult {
dialCtx, queryCtx := ctx, ctx
// queryPeer queries a single peer.
func (q *query) queryPeer(p peer.ID) *queryResult {
dialCtx, queryCtx := q.ctx, q.ctx
// dial the peer
if err := q.dht.dialPeer(dialCtx, p); err != nil {
q.localPeers.Remove(p)
return &queryResult{}
}
// add the peer to the global set of queried peers since the dial was successful
// so that no other disjoint query tries sending an RPC to the same peer
if !q.globallyQueriedPeers.TryAdd(p) {
q.localPeers.Remove(p)
return &queryResult{}
}
// did the dial fulfill the stop condition ?
if q.stopFn(q.localPeers) {
q.cancel()
return &queryResult{}
}
// send query RPC to the remote peer
newPeers, err := q.queryFn(queryCtx, p)
if err != nil {
q.localPeers.Remove(p)
return &queryResult{}
}
// mark the peer as queried.
q.localPeers.MarkQueried(p)
if len(newPeers) == 0 {
logger.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
}
foundCloserPeer := false
for _, next := range newPeers {
if next.ID == q.dht.self { // don't add self.
logger.Debugf("PEERS CLOSER -- worker for: %v found self", p)
......@@ -305,19 +258,16 @@ func (q *query) queryPeer(ctx context.Context, p peer.ID) *queryResult {
// add their addresses to the dialer's peerstore
q.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
}
foundCloserPeer := false
for _, np := range newPeers {
closer := q.localPeers.Add(np.ID)
closer := q.localPeers.Add(next.ID)
foundCloserPeer = foundCloserPeer || closer
}
// did the successful query RPC fulfill the query stop condition ?
if q.stopFn(q.localPeers) {
q.cancel()
}
return &queryResult{
success: true,
foundCloserPeer: foundCloserPeer,
}
}
......@@ -348,17 +298,3 @@ func (dht *IpfsDHT) dialPeer(ctx context.Context, p peer.ID) error {
logger.Debugf("connected. dial success.")
return nil
}
// Equal tells whether a and b contain the same elements.
// A nil argument is equivalent to an empty slice.
func peerSlicesEqual(a, b []peer.ID) bool {
if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}
......@@ -335,7 +335,9 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st
}
go func() {
queries, _ := dht.runDisjointQueries(ctx, dht.d, key,
defer close(valCh)
defer close(queriesCh)
queries, err := dht.runDisjointQueries(ctx, dht.d, key,
func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
......@@ -394,28 +396,34 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st
},
)
close(valCh)
queriesCh <- queries
close(queriesCh)
shortcutTaken := false
for _, q := range queries {
if len(q.localPeers.KUnqueried()) > 0 {
shortcutTaken = true
break
}
if err != nil {
return
}
queriesCh <- queries
if !shortcutTaken && ctx.Err() == nil {
kadID := kb.ConvertKey(key)
// refresh the cpl for this key as the query was successful
dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now())
if ctx.Err() == nil {
dht.refreshRTIfNoShortcut(kb.ConvertKey(key), queries)
}
}()
return valCh, queriesCh
}
func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, queries []*query) {
shortcutTaken := false
for _, q := range queries {
if q.localPeers.LenUnqueriedFromKClosest() > 0 {
shortcutTaken = true
break
}
}
if !shortcutTaken {
// refresh the cpl for this key as the query was successful
dht.routingTable.ResetCplRefreshedAtForID(key, time.Now())
}
}
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
......@@ -570,7 +578,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash
}
}
_, _ = dht.runDisjointQueries(ctx, dht.d, string(key),
queries, err := dht.runDisjointQueries(ctx, dht.d, string(key),
func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
......@@ -626,9 +634,8 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash
},
)
if ctx.Err() == nil {
// refresh the cpl for this key after the query is run
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(string(key)), time.Now())
if err != nil && ctx.Err() == nil {
dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), queries)
}
}
......@@ -680,30 +687,16 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo,
return peer.AddrInfo{}, err
}
// refresh the cpl for this key if we discovered the peer because of the query
if ctx.Err() == nil && queries[0].globallyQueriedPeers.Contains(id) {
kadID := kb.ConvertPeerID(id)
dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now())
}
// TODO: Consider unlucky disconnect timing and potentially utilizing network.CanConnect or something similar
if dht.host.Network().Connectedness(id) == network.Connected {
shortcutTaken := false
for _, q := range queries {
if len(q.localPeers.KUnqueried()) > 0 {
shortcutTaken = true
break
}
}
if !shortcutTaken {
kadID := kb.ConvertPeerID(id)
// refresh the cpl for this key as the query was successful
dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now())
}
return dht.peerstore.PeerInfo(id), nil
} else {
if ctx.Err() == nil {
kadID := kb.ConvertPeerID(id)
// refresh the cpl for this key as the query was successful
dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now())
}
return peer.AddrInfo{}, routing.ErrNotFound
}
return peer.AddrInfo{}, routing.ErrNotFound
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment