Commit 585d6725 authored by aarshkshah1992's avatar aarshkshah1992 Committed by Steven Allen

refresh a bucket whenever we lookup a key in it

parent 5329454a
......@@ -307,10 +307,6 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
// reset the timer for the k-bucket we just searched in ONLY if there was no error
// so that we can retry during the next bootstrap
bucket := dht.routingTable.BucketForPeer(id)
bucket.ResetLastQueriedAt(time.Now())
return resp, nil
case ErrReadTimeout:
logger.Warningf("read timeout: %s %s", p.Pretty(), id)
......
......@@ -13,7 +13,7 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-cid"
u "github.com/ipfs/go-ipfs-util"
logging "github.com/ipfs/go-log"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
......@@ -74,6 +74,11 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts
return err
}
// refresh the k-bucket containing this key
defer func() {
dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now())
}()
pchan, err := dht.GetClosestPeers(ctx, key)
if err != nil {
return err
......@@ -98,6 +103,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts
}(p)
}
wg.Wait()
return nil
}
......@@ -157,6 +163,11 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing
responsesNeeded = getQuorum(&cfg, -1)
}
// refresh the k-bucket containing this key
defer func() {
dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now())
}()
valCh, err := dht.getValues(ctx, key, responsesNeeded)
if err != nil {
return nil, err
......@@ -254,6 +265,11 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R
eip.Append(loggableKey(key))
defer eip.Done()
// refresh the k-bucket containing this key
defer func() {
dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now())
}()
valCh, err := dht.getValues(ctx, key, nvals)
if err != nil {
eip.SetError(err)
......@@ -429,6 +445,11 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err
defer cancel()
}
// refresh the k-bucket containing this key
defer func() {
dht.routingTable.BucketForID(kb.ConvertKey(key.KeyString())).ResetRefreshedAt(time.Now())
}()
peers, err := dht.GetClosestPeers(closerCtx, key.KeyString())
if err != nil {
return err
......@@ -486,6 +507,12 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrIn
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
logger.Event(ctx, "findProviders", key)
peerOut := make(chan peer.AddrInfo, count)
// refresh the k-bucket containing this key
defer func() {
dht.routingTable.BucketForID(kb.ConvertKey(key.KeyString())).ResetRefreshedAt(time.Now())
}()
go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
return peerOut
}
......@@ -621,6 +648,11 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo,
}
}
// refresh the k-bucket containing this key
defer func() {
dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now())
}()
// setup the Query
parent := ctx
query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
......@@ -682,6 +714,11 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
return nil, kb.ErrLookupFailure
}
// refresh the k-bucket containing this key
defer func() {
dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now())
}()
// setup the Query
query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment