Commit 616ef868 authored by Aarsh Shah's avatar Aarsh Shah

changes to peer mantainence logic

parent 82c23336
......@@ -109,7 +109,7 @@ type IpfsDHT struct {
enableProviders, enableValues bool
// maxLastSuccessfulOutboundThreshold is the max threshold/upper limit on the time duration
// between the current time and the last time a peer was useful to us.
// between the current time and the last time we successfully queried a peer.
maxLastSuccessfulOutboundThreshold float64
fixLowPeersChan chan struct{}
......@@ -448,22 +448,37 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
// peerFound signals the routingTable that we've found a peer that
// might support the DHT protocol.
// If we have a connection a peer but no exchange of a query RPC ->
// LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
// LastUsefulAt=N/A
// If we connect to a peer and exchange a query RPC ->
// LastQueriedAt=time.Now (same reason as above)
// LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
// If we query a peer we already have in our Routing Table ->
// LastQueriedAt=time.Now()
// LastUsefulAt remains unchanged
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
// Do Nothing.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID, queryPeer bool) {
logger.Debugw("peer found", "peer", p)
b, err := dht.validRTPeer(p)
if err != nil {
logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
} else if b {
_, err := dht.routingTable.TryAddPeer(p, queryPeer)
newlyAdded, err := dht.routingTable.TryAddPeer(p, queryPeer)
if err != nil {
// peer not added.
return
}
// If we discovered the peer because of a query, we need to ensure we override the "zero" lastSuccessfulOutboundQuery
// If we freshly added the peer because of a query, we need to ensure we override the "zero" lastUsefulAt
// value that must have been set in the Routing Table for this peer when it was first added during a connection.
if queryPeer {
dht.routingTable.UpdateLastSuccessfulOutboundQuery(p, time.Now())
if newlyAdded && queryPeer {
dht.routingTable.UpdateLastUsefulAt(p, time.Now())
} else if queryPeer {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
}
}
}
......
......@@ -128,7 +128,7 @@ func (dht *IpfsDHT) startRefreshing() {
// ping Routing Table peers that haven't been hear of/from in the interval they should have been.
for _, ps := range dht.routingTable.GetPeerInfos() {
// ping the peer if it's due for a ping and evict it if the ping fails
if float64(time.Since(ps.LastSuccessfulOutboundQuery)) > dht.maxLastSuccessfulOutboundThreshold {
if float64(time.Since(ps.LastSuccessfulOutboundQueryAt)) > dht.maxLastSuccessfulOutboundThreshold {
livelinessCtx, cancel := context.WithTimeout(ctx, peerPingTimeout)
if err := dht.host.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil {
logger.Debugw("evicting peer after failed ping", "peer", ps.Id, "error", err)
......
......@@ -174,7 +174,7 @@ func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn
}
func (q *query) recordPeerIsValuable(p peer.ID) {
q.dht.routingTable.UpdateLastSuccessfulOutboundQuery(p, time.Now())
q.dht.routingTable.UpdateLastUsefulAt(p, time.Now())
}
func (q *query) recordValuablePeers() {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment