Commit ac6d9f25 authored by Aarsh Shah's avatar Aarsh Shah

add more state to peer

parent e79a3b75
...@@ -12,9 +12,14 @@ import ( ...@@ -12,9 +12,14 @@ import (
// PeerInfo holds all related information for a peer in the K-Bucket. // PeerInfo holds all related information for a peer in the K-Bucket.
type PeerInfo struct { type PeerInfo struct {
Id peer.ID Id peer.ID
// LastSuccessfulOutboundQuery is the time instant when we last made a successful
// outbound query to this peer // LastUsefulAt is the time instant at which the peer was last "useful" to us.
LastSuccessfulOutboundQuery time.Time // Please see the DHT docs for the definition of usefulness.
LastUsefulAt time.Time
// LastSuccessfulOutboundQueryAt is the time instant at which we last got a
// successful query response from the peer.
LastSuccessfulOutboundQueryAt time.Time
// Id of the peer in the DHT XOR keyspace // Id of the peer in the DHT XOR keyspace
dhtId ID dhtId ID
......
...@@ -49,15 +49,14 @@ type RoutingTable struct { ...@@ -49,15 +49,14 @@ type RoutingTable struct {
PeerRemoved func(peer.ID) PeerRemoved func(peer.ID)
PeerAdded func(peer.ID) PeerAdded func(peer.ID)
// maxLastSuccessfulOutboundThreshold is the max threshold/upper limit for the value of "LastSuccessfulOutboundQuery" // maxLastUsefulAt is the max threshold/upper limit for the value of "LastUsefulAt"
// of the peer in the bucket above which we will evict it to make place for a new peer if the bucket // of the peer in the bucket above which we will evict it to make place for a new peer if the bucket
// is full // is full
maxLastSuccessfulOutboundThreshold float64 maxLastUsefulAt float64
} }
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance. // NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
// Passing a nil PeerValidationFunc disables periodic table cleanup. func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics, maxLastUsefulAt float64) (*RoutingTable, error) {
func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics, maxLastSuccessfulOutboundThreshold float64) (*RoutingTable, error) {
rt := &RoutingTable{ rt := &RoutingTable{
buckets: []*bucket{newBucket()}, buckets: []*bucket{newBucket()},
bucketsize: bucketsize, bucketsize: bucketsize,
...@@ -71,7 +70,7 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst ...@@ -71,7 +70,7 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst
PeerRemoved: func(peer.ID) {}, PeerRemoved: func(peer.ID) {},
PeerAdded: func(peer.ID) {}, PeerAdded: func(peer.ID) {},
maxLastSuccessfulOutboundThreshold: maxLastSuccessfulOutboundThreshold, maxLastUsefulAt: maxLastUsefulAt,
} }
rt.ctx, rt.ctxCancel = context.WithCancel(context.Background()) rt.ctx, rt.ctxCancel = context.WithCancel(context.Background())
...@@ -111,9 +110,9 @@ func (rt *RoutingTable) TryAddPeer(p peer.ID, queryPeer bool) (bool, error) { ...@@ -111,9 +110,9 @@ func (rt *RoutingTable) TryAddPeer(p peer.ID, queryPeer bool) (bool, error) {
func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
bucketID := rt.bucketIdForPeer(p) bucketID := rt.bucketIdForPeer(p)
bucket := rt.buckets[bucketID] bucket := rt.buckets[bucketID]
var lastSuccessfulOutboundQuery time.Time var lastUsefulAt time.Time
if queryPeer { if queryPeer {
lastSuccessfulOutboundQuery = time.Now() lastUsefulAt = time.Now()
} }
// peer already exists in the Routing Table. // peer already exists in the Routing Table.
...@@ -129,7 +128,8 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { ...@@ -129,7 +128,8 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
// We have enough space in the bucket (whether spawned or grouped). // We have enough space in the bucket (whether spawned or grouped).
if bucket.len() < rt.bucketsize { if bucket.len() < rt.bucketsize {
bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)}) bucket.pushFront(&PeerInfo{Id: p, LastUsefulAt: lastUsefulAt, LastSuccessfulOutboundQueryAt: time.Now(),
dhtId: ConvertPeerID(p)})
rt.PeerAdded(p) rt.PeerAdded(p)
return true, nil return true, nil
} }
...@@ -143,7 +143,8 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { ...@@ -143,7 +143,8 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
// push the peer only if the bucket isn't overflowing after slitting // push the peer only if the bucket isn't overflowing after slitting
if bucket.len() < rt.bucketsize { if bucket.len() < rt.bucketsize {
bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)}) bucket.pushFront(&PeerInfo{Id: p, LastUsefulAt: lastUsefulAt, LastSuccessfulOutboundQueryAt: time.Now(),
dhtId: ConvertPeerID(p)})
rt.PeerAdded(p) rt.PeerAdded(p)
return true, nil return true, nil
} }
...@@ -153,10 +154,11 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { ...@@ -153,10 +154,11 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
// in that bucket with a LastSuccessfulOutboundQuery value above the maximum threshold and replace it. // in that bucket with a LastSuccessfulOutboundQuery value above the maximum threshold and replace it.
allPeers := bucket.peers() allPeers := bucket.peers()
for _, pc := range allPeers { for _, pc := range allPeers {
if float64(time.Since(pc.LastSuccessfulOutboundQuery)) > rt.maxLastSuccessfulOutboundThreshold { if float64(time.Since(pc.LastUsefulAt)) > rt.maxLastUsefulAt {
// let's evict it and add the new peer // let's evict it and add the new peer
if bucket.remove(pc.Id) { if bucket.remove(pc.Id) {
bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)}) bucket.pushFront(&PeerInfo{Id: p, LastUsefulAt: lastUsefulAt, LastSuccessfulOutboundQueryAt: time.Now(),
dhtId: ConvertPeerID(p)})
rt.PeerAdded(p) rt.PeerAdded(p)
return true, nil return true, nil
} }
...@@ -180,9 +182,9 @@ func (rt *RoutingTable) GetPeerInfos() []PeerInfo { ...@@ -180,9 +182,9 @@ func (rt *RoutingTable) GetPeerInfos() []PeerInfo {
return pis return pis
} }
// UpdateLastSuccessfulOutboundQuery updates the LastSuccessfulOutboundQuery time of the peer // UpdateLastSuccessfulOutboundQuery updates the LastSuccessfulOutboundQueryAt time of the peer.
// Returns true if the update was successful, false otherwise. // Returns true if the update was successful, false otherwise.
func (rt *RoutingTable) UpdateLastSuccessfulOutboundQuery(p peer.ID, t time.Time) bool { func (rt *RoutingTable) UpdateLastSuccessfulOutboundQueryAt(p peer.ID, t time.Time) bool {
rt.tabLock.Lock() rt.tabLock.Lock()
defer rt.tabLock.Unlock() defer rt.tabLock.Unlock()
...@@ -190,7 +192,23 @@ func (rt *RoutingTable) UpdateLastSuccessfulOutboundQuery(p peer.ID, t time.Time ...@@ -190,7 +192,23 @@ func (rt *RoutingTable) UpdateLastSuccessfulOutboundQuery(p peer.ID, t time.Time
bucket := rt.buckets[bucketID] bucket := rt.buckets[bucketID]
if pc := bucket.getPeer(p); pc != nil { if pc := bucket.getPeer(p); pc != nil {
pc.LastSuccessfulOutboundQuery = t pc.LastSuccessfulOutboundQueryAt = t
return true
}
return false
}
// UpdateLastUsefulAt updates the LastUsefulAt time of the peer.
// Returns true if the update was successful, false otherwise.
func (rt *RoutingTable) UpdateLastUsefulAt(p peer.ID, t time.Time) bool {
rt.tabLock.Lock()
defer rt.tabLock.Unlock()
bucketID := rt.bucketIdForPeer(p)
bucket := rt.buckets[bucketID]
if pc := bucket.getPeer(p); pc != nil {
pc.LastUsefulAt = t
return true return true
} }
return false return false
......
...@@ -27,13 +27,14 @@ func TestPrint(t *testing.T) { ...@@ -27,13 +27,14 @@ func TestPrint(t *testing.T) {
func TestBucket(t *testing.T) { func TestBucket(t *testing.T) {
t.Parallel() t.Parallel()
testTime1 := time.Now() testTime1 := time.Now()
testTime2 := time.Now().AddDate(1, 0, 0)
b := newBucket() b := newBucket()
peers := make([]peer.ID, 100) peers := make([]peer.ID, 100)
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
peers[i] = test.RandPeerIDFatal(t) peers[i] = test.RandPeerIDFatal(t)
b.pushFront(&PeerInfo{peers[i], testTime1, ConvertPeerID(peers[i])}) b.pushFront(&PeerInfo{peers[i], testTime1, testTime2, ConvertPeerID(peers[i])})
} }
local := test.RandPeerIDFatal(t) local := test.RandPeerIDFatal(t)
...@@ -47,14 +48,17 @@ func TestBucket(t *testing.T) { ...@@ -47,14 +48,17 @@ func TestBucket(t *testing.T) {
require.NotNil(t, p) require.NotNil(t, p)
require.Equal(t, peers[i], p.Id) require.Equal(t, peers[i], p.Id)
require.Equal(t, ConvertPeerID(peers[i]), p.dhtId) require.Equal(t, ConvertPeerID(peers[i]), p.dhtId)
require.EqualValues(t, testTime1, p.LastSuccessfulOutboundQuery) require.EqualValues(t, testTime1, p.LastUsefulAt)
require.EqualValues(t, testTime2, p.LastSuccessfulOutboundQueryAt)
// mark as missing
t2 := time.Now().Add(1 * time.Hour) t2 := time.Now().Add(1 * time.Hour)
p.LastSuccessfulOutboundQuery = t2 t3 := t2.Add(1 * time.Hour)
p.LastSuccessfulOutboundQueryAt = t2
p.LastUsefulAt = t3
p = b.getPeer(peers[i]) p = b.getPeer(peers[i])
require.NotNil(t, p) require.NotNil(t, p)
require.EqualValues(t, t2, p.LastSuccessfulOutboundQuery) require.EqualValues(t, t2, p.LastSuccessfulOutboundQueryAt)
require.EqualValues(t, t3, p.LastUsefulAt)
spl := b.split(0, ConvertPeerID(local)) spl := b.split(0, ConvertPeerID(local))
llist := b.list llist := b.list
...@@ -201,7 +205,7 @@ func TestTableFind(t *testing.T) { ...@@ -201,7 +205,7 @@ func TestTableFind(t *testing.T) {
} }
} }
func TestUpdateLastSuccessfulOutboundQuery(t *testing.T) { func TestUpdateLastSuccessfulOutboundQueryAt(t *testing.T) {
local := test.RandPeerIDFatal(t) local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics() m := pstore.NewMetrics()
rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, NoOpThreshold) rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, NoOpThreshold)
...@@ -214,11 +218,11 @@ func TestUpdateLastSuccessfulOutboundQuery(t *testing.T) { ...@@ -214,11 +218,11 @@ func TestUpdateLastSuccessfulOutboundQuery(t *testing.T) {
// increment and assert // increment and assert
t2 := time.Now().Add(1 * time.Hour) t2 := time.Now().Add(1 * time.Hour)
rt.UpdateLastSuccessfulOutboundQuery(p, t2) rt.UpdateLastSuccessfulOutboundQueryAt(p, t2)
rt.tabLock.Lock() rt.tabLock.Lock()
pi := rt.buckets[0].getPeer(p) pi := rt.buckets[0].getPeer(p)
require.NotNil(t, pi) require.NotNil(t, pi)
require.EqualValues(t, t2, pi.LastSuccessfulOutboundQuery) require.EqualValues(t, t2, pi.LastSuccessfulOutboundQueryAt)
rt.tabLock.Unlock() rt.tabLock.Unlock()
} }
...@@ -257,9 +261,9 @@ func TestTryAddPeer(t *testing.T) { ...@@ -257,9 +261,9 @@ func TestTryAddPeer(t *testing.T) {
require.True(t, b) require.True(t, b)
require.Equal(t, p4, rt.Find(p4)) require.Equal(t, p4, rt.Find(p4))
// adding a peer with cpl 0 works if an existing peer has LastSuccessfulOutboundQuery above the max threshold // adding a peer with cpl 0 works if an existing peer has LastUsefulAt above the max threshold
// because that existing peer will get replaced // because that existing peer will get replaced
require.True(t, rt.UpdateLastSuccessfulOutboundQuery(p2, time.Now().AddDate(0, 0, -2))) require.True(t, rt.UpdateLastUsefulAt(p2, time.Now().AddDate(0, 0, -2)))
b, err = rt.TryAddPeer(p3, true) b, err = rt.TryAddPeer(p3, true)
require.NoError(t, err) require.NoError(t, err)
require.True(t, b) require.True(t, b)
...@@ -271,7 +275,7 @@ func TestTryAddPeer(t *testing.T) { ...@@ -271,7 +275,7 @@ func TestTryAddPeer(t *testing.T) {
// however adding peer fails if below threshold // however adding peer fails if below threshold
p5, err := rt.GenRandPeerID(0) p5, err := rt.GenRandPeerID(0)
require.NoError(t, err) require.NoError(t, err)
require.True(t, rt.UpdateLastSuccessfulOutboundQuery(p1, time.Now())) require.True(t, rt.UpdateLastUsefulAt(p1, time.Now()))
b, err = rt.TryAddPeer(p5, true) b, err = rt.TryAddPeer(p5, true)
require.Error(t, err) require.Error(t, err)
require.False(t, b) require.False(t, b)
...@@ -285,7 +289,7 @@ func TestTryAddPeer(t *testing.T) { ...@@ -285,7 +289,7 @@ func TestTryAddPeer(t *testing.T) {
rt.tabLock.Lock() rt.tabLock.Lock()
pi := rt.buckets[rt.bucketIdForPeer(p6)].getPeer(p6) pi := rt.buckets[rt.bucketIdForPeer(p6)].getPeer(p6)
require.NotNil(t, p6) require.NotNil(t, p6)
require.True(t, pi.LastSuccessfulOutboundQuery.IsZero()) require.True(t, pi.LastUsefulAt.IsZero())
rt.tabLock.Unlock() rt.tabLock.Unlock()
} }
...@@ -425,9 +429,9 @@ func TestGetPeerInfos(t *testing.T) { ...@@ -425,9 +429,9 @@ func TestGetPeerInfos(t *testing.T) {
} }
require.Equal(t, p1, ms[p1].Id) require.Equal(t, p1, ms[p1].Id)
require.True(t, ms[p1].LastSuccessfulOutboundQuery.IsZero()) require.True(t, ms[p1].LastUsefulAt.IsZero())
require.Equal(t, p2, ms[p2].Id) require.Equal(t, p2, ms[p2].Id)
require.False(t, ms[p2].LastSuccessfulOutboundQuery.IsZero()) require.False(t, ms[p2].LastUsefulAt.IsZero())
} }
func BenchmarkAddPeer(b *testing.B) { func BenchmarkAddPeer(b *testing.B) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment