Unverified Commit ee7b926e authored by Steven Allen's avatar Steven Allen Committed by GitHub

Merge pull request #69 from libp2p/feat/expose-peerinfo

Get Peer Infos
parents 3d3bf8ce 0779168b
...@@ -9,12 +9,12 @@ import ( ...@@ -9,12 +9,12 @@ import (
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
) )
// peerInfo holds all related information for a peer in the K-Bucket. // PeerInfo holds all related information for a peer in the K-Bucket.
type peerInfo struct { type PeerInfo struct {
Id peer.ID Id peer.ID
// lastSuccessfulOutboundQuery is the time instant when we last made a successful // LastSuccessfulOutboundQuery is the time instant when we last made a successful
// outbound query to this peer // outbound query to this peer
lastSuccessfulOutboundQuery time.Time LastSuccessfulOutboundQuery time.Time
// Id of the peer in the DHT XOR keyspace // Id of the peer in the DHT XOR keyspace
dhtId ID dhtId ID
...@@ -37,10 +37,10 @@ func newBucket() *bucket { ...@@ -37,10 +37,10 @@ func newBucket() *bucket {
// returns all peers in the bucket // returns all peers in the bucket
// it is safe for the caller to modify the returned objects as it is a defensive copy // it is safe for the caller to modify the returned objects as it is a defensive copy
func (b *bucket) peers() []peerInfo { func (b *bucket) peers() []PeerInfo {
var ps []peerInfo var ps []PeerInfo
for e := b.list.Front(); e != nil; e = e.Next() { for e := b.list.Front(); e != nil; e = e.Next() {
p := e.Value.(*peerInfo) p := e.Value.(*PeerInfo)
ps = append(ps, *p) ps = append(ps, *p)
} }
return ps return ps
...@@ -50,7 +50,7 @@ func (b *bucket) peers() []peerInfo { ...@@ -50,7 +50,7 @@ func (b *bucket) peers() []peerInfo {
func (b *bucket) peerIds() []peer.ID { func (b *bucket) peerIds() []peer.ID {
ps := make([]peer.ID, 0, b.list.Len()) ps := make([]peer.ID, 0, b.list.Len())
for e := b.list.Front(); e != nil; e = e.Next() { for e := b.list.Front(); e != nil; e = e.Next() {
p := e.Value.(*peerInfo) p := e.Value.(*PeerInfo)
ps = append(ps, p.Id) ps = append(ps, p.Id)
} }
return ps return ps
...@@ -58,10 +58,10 @@ func (b *bucket) peerIds() []peer.ID { ...@@ -58,10 +58,10 @@ func (b *bucket) peerIds() []peer.ID {
// returns the peer with the given Id if it exists // returns the peer with the given Id if it exists
// returns nil if the peerId does not exist // returns nil if the peerId does not exist
func (b *bucket) getPeer(p peer.ID) *peerInfo { func (b *bucket) getPeer(p peer.ID) *PeerInfo {
for e := b.list.Front(); e != nil; e = e.Next() { for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(*peerInfo).Id == p { if e.Value.(*PeerInfo).Id == p {
return e.Value.(*peerInfo) return e.Value.(*PeerInfo)
} }
} }
return nil return nil
...@@ -71,7 +71,7 @@ func (b *bucket) getPeer(p peer.ID) *peerInfo { ...@@ -71,7 +71,7 @@ func (b *bucket) getPeer(p peer.ID) *peerInfo {
// returns true if successful, false otherwise. // returns true if successful, false otherwise.
func (b *bucket) remove(id peer.ID) bool { func (b *bucket) remove(id peer.ID) bool {
for e := b.list.Front(); e != nil; e = e.Next() { for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(*peerInfo).Id == id { if e.Value.(*PeerInfo).Id == id {
b.list.Remove(e) b.list.Remove(e)
return true return true
} }
...@@ -82,13 +82,13 @@ func (b *bucket) remove(id peer.ID) bool { ...@@ -82,13 +82,13 @@ func (b *bucket) remove(id peer.ID) bool {
func (b *bucket) moveToFront(id peer.ID) { func (b *bucket) moveToFront(id peer.ID) {
for e := b.list.Front(); e != nil; e = e.Next() { for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(*peerInfo).Id == id { if e.Value.(*PeerInfo).Id == id {
b.list.MoveToFront(e) b.list.MoveToFront(e)
} }
} }
} }
func (b *bucket) pushFront(p *peerInfo) { func (b *bucket) pushFront(p *PeerInfo) {
b.list.PushFront(p) b.list.PushFront(p)
} }
...@@ -105,7 +105,7 @@ func (b *bucket) split(cpl int, target ID) *bucket { ...@@ -105,7 +105,7 @@ func (b *bucket) split(cpl int, target ID) *bucket {
newbuck.list = out newbuck.list = out
e := b.list.Front() e := b.list.Front()
for e != nil { for e != nil {
pDhtId := e.Value.(*peerInfo).dhtId pDhtId := e.Value.(*PeerInfo).dhtId
peerCPL := CommonPrefixLen(pDhtId, target) peerCPL := CommonPrefixLen(pDhtId, target)
if peerCPL > cpl { if peerCPL > cpl {
cur := e cur := e
......
...@@ -38,7 +38,7 @@ func (pds *peerDistanceSorter) appendPeer(p peer.ID, pDhtId ID) { ...@@ -38,7 +38,7 @@ func (pds *peerDistanceSorter) appendPeer(p peer.ID, pDhtId ID) {
// Append the peer.ID values in the list to the sorter's slice. It may no longer be sorted. // Append the peer.ID values in the list to the sorter's slice. It may no longer be sorted.
func (pds *peerDistanceSorter) appendPeersFromList(l *list.List) { func (pds *peerDistanceSorter) appendPeersFromList(l *list.List) {
for e := l.Front(); e != nil; e = e.Next() { for e := l.Front(); e != nil; e = e.Next() {
pds.appendPeer(e.Value.(*peerInfo).Id, e.Value.(*peerInfo).dhtId) pds.appendPeer(e.Value.(*PeerInfo).Id, e.Value.(*PeerInfo).dhtId)
} }
} }
......
...@@ -49,7 +49,7 @@ type RoutingTable struct { ...@@ -49,7 +49,7 @@ type RoutingTable struct {
PeerRemoved func(peer.ID) PeerRemoved func(peer.ID)
PeerAdded func(peer.ID) PeerAdded func(peer.ID)
// maxLastSuccessfulOutboundThreshold is the max threshold/upper limit for the value of "lastSuccessfulOutboundQuery" // maxLastSuccessfulOutboundThreshold is the max threshold/upper limit for the value of "LastSuccessfulOutboundQuery"
// of the peer in the bucket above which we will evict it to make place for a new peer if the bucket // of the peer in the bucket above which we will evict it to make place for a new peer if the bucket
// is full // is full
maxLastSuccessfulOutboundThreshold float64 maxLastSuccessfulOutboundThreshold float64
...@@ -87,12 +87,12 @@ func (rt *RoutingTable) Close() error { ...@@ -87,12 +87,12 @@ func (rt *RoutingTable) Close() error {
} }
// TryAddPeer tries to add a peer to the Routing table. If the peer ALREADY exists in the Routing Table, this call is a no-op. // TryAddPeer tries to add a peer to the Routing table. If the peer ALREADY exists in the Routing Table, this call is a no-op.
// If the peer is a queryPeer i.e. we queried it or it queried us, we set the lastSuccessfulOutboundQuery to the current time. // If the peer is a queryPeer i.e. we queried it or it queried us, we set the LastSuccessfulOutboundQuery to the current time.
// If the peer is just a peer that we connect to/it connected to us without any DHT query, we consider it as having // If the peer is just a peer that we connect to/it connected to us without any DHT query, we consider it as having
// no lastSuccessfulOutboundQuery. // no LastSuccessfulOutboundQuery.
// //
// If the logical bucket to which the peer belongs is full and it's not the last bucket, we try to replace an existing peer // If the logical bucket to which the peer belongs is full and it's not the last bucket, we try to replace an existing peer
// whose lastSuccessfulOutboundQuery is above the maximum allowed threshold in that bucket with the new peer. // whose LastSuccessfulOutboundQuery is above the maximum allowed threshold in that bucket with the new peer.
// If no such peer exists in that bucket, we do NOT add the peer to the Routing Table and return error "ErrPeerRejectedNoCapacity". // If no such peer exists in that bucket, we do NOT add the peer to the Routing Table and return error "ErrPeerRejectedNoCapacity".
// It returns a boolean value set to true if the peer was newly added to the Routing Table, false otherwise. // It returns a boolean value set to true if the peer was newly added to the Routing Table, false otherwise.
...@@ -129,7 +129,7 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { ...@@ -129,7 +129,7 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
// We have enough space in the bucket (whether spawned or grouped). // We have enough space in the bucket (whether spawned or grouped).
if bucket.len() < rt.bucketsize { if bucket.len() < rt.bucketsize {
bucket.pushFront(&peerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)}) bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
rt.PeerAdded(p) rt.PeerAdded(p)
return true, nil return true, nil
} }
...@@ -143,20 +143,20 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { ...@@ -143,20 +143,20 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
// push the peer only if the bucket isn't overflowing after slitting // push the peer only if the bucket isn't overflowing after slitting
if bucket.len() < rt.bucketsize { if bucket.len() < rt.bucketsize {
bucket.pushFront(&peerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)}) bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
rt.PeerAdded(p) rt.PeerAdded(p)
return true, nil return true, nil
} }
} }
// the bucket to which the peer belongs is full. Let's try to find a peer // the bucket to which the peer belongs is full. Let's try to find a peer
// in that bucket with a lastSuccessfulOutboundQuery value above the maximum threshold and replace it. // in that bucket with a LastSuccessfulOutboundQuery value above the maximum threshold and replace it.
allPeers := bucket.peers() allPeers := bucket.peers()
for _, pc := range allPeers { for _, pc := range allPeers {
if float64(time.Since(pc.lastSuccessfulOutboundQuery)) > rt.maxLastSuccessfulOutboundThreshold { if float64(time.Since(pc.LastSuccessfulOutboundQuery)) > rt.maxLastSuccessfulOutboundThreshold {
// let's evict it and add the new peer // let's evict it and add the new peer
if bucket.remove(pc.Id) { if bucket.remove(pc.Id) {
bucket.pushFront(&peerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)}) bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
rt.PeerAdded(p) rt.PeerAdded(p)
return true, nil return true, nil
} }
...@@ -166,7 +166,21 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { ...@@ -166,7 +166,21 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
return false, ErrPeerRejectedNoCapacity return false, ErrPeerRejectedNoCapacity
} }
// UpdateLastSuccessfulOutboundQuery updates the lastSuccessfulOutboundQuery time of the peer // GetPeerInfos returns the peer information that we've stored in the buckets
func (rt *RoutingTable) GetPeerInfos() []PeerInfo {
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()
var pis []PeerInfo
for _, b := range rt.buckets {
for _, p := range b.peers() {
pis = append(pis, p)
}
}
return pis
}
// UpdateLastSuccessfulOutboundQuery updates the LastSuccessfulOutboundQuery time of the peer
// Returns true if the update was successful, false otherwise. // Returns true if the update was successful, false otherwise.
func (rt *RoutingTable) UpdateLastSuccessfulOutboundQuery(p peer.ID, t time.Time) bool { func (rt *RoutingTable) UpdateLastSuccessfulOutboundQuery(p peer.ID, t time.Time) bool {
rt.tabLock.Lock() rt.tabLock.Lock()
...@@ -176,7 +190,7 @@ func (rt *RoutingTable) UpdateLastSuccessfulOutboundQuery(p peer.ID, t time.Time ...@@ -176,7 +190,7 @@ func (rt *RoutingTable) UpdateLastSuccessfulOutboundQuery(p peer.ID, t time.Time
bucket := rt.buckets[bucketID] bucket := rt.buckets[bucketID]
if pc := bucket.getPeer(p); pc != nil { if pc := bucket.getPeer(p); pc != nil {
pc.lastSuccessfulOutboundQuery = t pc.LastSuccessfulOutboundQuery = t
return true return true
} }
return false return false
...@@ -334,7 +348,7 @@ func (rt *RoutingTable) Print() { ...@@ -334,7 +348,7 @@ func (rt *RoutingTable) Print() {
fmt.Printf("\tbucket: %d\n", i) fmt.Printf("\tbucket: %d\n", i)
for e := b.list.Front(); e != nil; e = e.Next() { for e := b.list.Front(); e != nil; e = e.Next() {
p := e.Value.(*peerInfo).Id p := e.Value.(*PeerInfo).Id
fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String()) fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String())
} }
} }
......
...@@ -33,7 +33,7 @@ func TestBucket(t *testing.T) { ...@@ -33,7 +33,7 @@ func TestBucket(t *testing.T) {
peers := make([]peer.ID, 100) peers := make([]peer.ID, 100)
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
peers[i] = test.RandPeerIDFatal(t) peers[i] = test.RandPeerIDFatal(t)
b.pushFront(&peerInfo{peers[i], testTime1, ConvertPeerID(peers[i])}) b.pushFront(&PeerInfo{peers[i], testTime1, ConvertPeerID(peers[i])})
} }
local := test.RandPeerIDFatal(t) local := test.RandPeerIDFatal(t)
...@@ -47,19 +47,19 @@ func TestBucket(t *testing.T) { ...@@ -47,19 +47,19 @@ func TestBucket(t *testing.T) {
require.NotNil(t, p) require.NotNil(t, p)
require.Equal(t, peers[i], p.Id) require.Equal(t, peers[i], p.Id)
require.Equal(t, ConvertPeerID(peers[i]), p.dhtId) require.Equal(t, ConvertPeerID(peers[i]), p.dhtId)
require.EqualValues(t, testTime1, p.lastSuccessfulOutboundQuery) require.EqualValues(t, testTime1, p.LastSuccessfulOutboundQuery)
// mark as missing // mark as missing
t2 := time.Now().Add(1 * time.Hour) t2 := time.Now().Add(1 * time.Hour)
p.lastSuccessfulOutboundQuery = t2 p.LastSuccessfulOutboundQuery = t2
p = b.getPeer(peers[i]) p = b.getPeer(peers[i])
require.NotNil(t, p) require.NotNil(t, p)
require.EqualValues(t, t2, p.lastSuccessfulOutboundQuery) require.EqualValues(t, t2, p.LastSuccessfulOutboundQuery)
spl := b.split(0, ConvertPeerID(local)) spl := b.split(0, ConvertPeerID(local))
llist := b.list llist := b.list
for e := llist.Front(); e != nil; e = e.Next() { for e := llist.Front(); e != nil; e = e.Next() {
p := ConvertPeerID(e.Value.(*peerInfo).Id) p := ConvertPeerID(e.Value.(*PeerInfo).Id)
cpl := CommonPrefixLen(p, localID) cpl := CommonPrefixLen(p, localID)
if cpl > 0 { if cpl > 0 {
t.Fatalf("split failed. found id with cpl > 0 in 0 bucket") t.Fatalf("split failed. found id with cpl > 0 in 0 bucket")
...@@ -68,7 +68,7 @@ func TestBucket(t *testing.T) { ...@@ -68,7 +68,7 @@ func TestBucket(t *testing.T) {
rlist := spl.list rlist := spl.list
for e := rlist.Front(); e != nil; e = e.Next() { for e := rlist.Front(); e != nil; e = e.Next() {
p := ConvertPeerID(e.Value.(*peerInfo).Id) p := ConvertPeerID(e.Value.(*PeerInfo).Id)
cpl := CommonPrefixLen(p, localID) cpl := CommonPrefixLen(p, localID)
if cpl == 0 { if cpl == 0 {
t.Fatalf("split failed. found id with cpl == 0 in non 0 bucket") t.Fatalf("split failed. found id with cpl == 0 in non 0 bucket")
...@@ -218,7 +218,7 @@ func TestUpdateLastSuccessfulOutboundQuery(t *testing.T) { ...@@ -218,7 +218,7 @@ func TestUpdateLastSuccessfulOutboundQuery(t *testing.T) {
rt.tabLock.Lock() rt.tabLock.Lock()
pi := rt.buckets[0].getPeer(p) pi := rt.buckets[0].getPeer(p)
require.NotNil(t, pi) require.NotNil(t, pi)
require.EqualValues(t, t2, pi.lastSuccessfulOutboundQuery) require.EqualValues(t, t2, pi.LastSuccessfulOutboundQuery)
rt.tabLock.Unlock() rt.tabLock.Unlock()
} }
...@@ -257,7 +257,7 @@ func TestTryAddPeer(t *testing.T) { ...@@ -257,7 +257,7 @@ func TestTryAddPeer(t *testing.T) {
require.True(t, b) require.True(t, b)
require.Equal(t, p4, rt.Find(p4)) require.Equal(t, p4, rt.Find(p4))
// adding a peer with cpl 0 works if an existing peer has lastSuccessfulOutboundQuery above the max threshold // adding a peer with cpl 0 works if an existing peer has LastSuccessfulOutboundQuery above the max threshold
// because that existing peer will get replaced // because that existing peer will get replaced
require.True(t, rt.UpdateLastSuccessfulOutboundQuery(p2, time.Now().AddDate(0, 0, -1))) require.True(t, rt.UpdateLastSuccessfulOutboundQuery(p2, time.Now().AddDate(0, 0, -1)))
b, err = rt.TryAddPeer(p3, true) b, err = rt.TryAddPeer(p3, true)
...@@ -285,7 +285,7 @@ func TestTryAddPeer(t *testing.T) { ...@@ -285,7 +285,7 @@ func TestTryAddPeer(t *testing.T) {
rt.tabLock.Lock() rt.tabLock.Lock()
pi := rt.buckets[rt.bucketIdForPeer(p6)].getPeer(p6) pi := rt.buckets[rt.bucketIdForPeer(p6)].getPeer(p6)
require.NotNil(t, p6) require.NotNil(t, p6)
require.True(t, pi.lastSuccessfulOutboundQuery.IsZero()) require.True(t, pi.LastSuccessfulOutboundQuery.IsZero())
rt.tabLock.Unlock() rt.tabLock.Unlock()
} }
...@@ -399,6 +399,37 @@ func TestTableMultithreaded(t *testing.T) { ...@@ -399,6 +399,37 @@ func TestTableMultithreaded(t *testing.T) {
<-done <-done
} }
func TestGetPeerInfos(t *testing.T) {
local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics()
rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, NoOpThreshold)
require.NoError(t, err)
require.Empty(t, rt.GetPeerInfos())
p1 := test.RandPeerIDFatal(t)
p2 := test.RandPeerIDFatal(t)
b, err := rt.TryAddPeer(p1, false)
require.True(t, b)
require.NoError(t, err)
b, err = rt.TryAddPeer(p2, true)
require.True(t, b)
require.NoError(t, err)
ps := rt.GetPeerInfos()
require.Len(t, ps, 2)
ms := make(map[peer.ID]PeerInfo)
for _, p := range ps {
ms[p.Id] = p
}
require.Equal(t, p1, ms[p1].Id)
require.True(t, ms[p1].LastSuccessfulOutboundQuery.IsZero())
require.Equal(t, p2, ms[p2].Id)
require.False(t, ms[p2].LastSuccessfulOutboundQuery.IsZero())
}
func BenchmarkAddPeer(b *testing.B) { func BenchmarkAddPeer(b *testing.B) {
b.StopTimer() b.StopTimer()
local := ConvertKey("localKey") local := ConvertKey("localKey")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment