Commit 5da16344 authored by Aarsh Shah's avatar Aarsh Shah Committed by Steven Allen

refresh bucket only when query is successful

parent 232357ea
......@@ -8,7 +8,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
kb "github.com/libp2p/go-libp2p-kbucket"
......@@ -103,6 +103,9 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
}
if res != nil && res.queriedSet != nil {
// refresh the k-bucket containing this key as the query was successful
dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now())
sorted := kb.SortClosestPeers(res.queriedSet.Peers(), kb.ConvertKey(key))
l := len(sorted)
if l > dht.bucketSize {
......
......@@ -74,11 +74,6 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts
return err
}
// refresh the k-bucket containing this key
defer func() {
dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now())
}()
pchan, err := dht.GetClosestPeers(ctx, key)
if err != nil {
return err
......@@ -163,11 +158,6 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing
responsesNeeded = getQuorum(&cfg, -1)
}
// refresh the k-bucket containing this key
defer func() {
dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now())
}()
valCh, err := dht.getValues(ctx, key, responsesNeeded)
if err != nil {
return nil, err
......@@ -265,11 +255,6 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R
eip.Append(loggableKey(key))
defer eip.Done()
// refresh the k-bucket containing this key
defer func() {
dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now())
}()
valCh, err := dht.getValues(ctx, key, nvals)
if err != nil {
eip.SetError(err)
......@@ -396,6 +381,9 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-cha
//
// We'll just call this a success.
if got > 0 && (err == routing.ErrNotFound || reqCtx.Err() == context.DeadlineExceeded) {
// refresh the k-bucket containing this key as the query was successful
dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now())
err = nil
}
done(err)
......@@ -445,11 +433,6 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err
defer cancel()
}
// refresh the k-bucket containing this key
defer func() {
dht.routingTable.BucketForID(kb.ConvertKey(key.KeyString())).ResetRefreshedAt(time.Now())
}()
peers, err := dht.GetClosestPeers(closerCtx, key.KeyString())
if err != nil {
return err
......@@ -508,11 +491,6 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i
logger.Event(ctx, "findProviders", key)
peerOut := make(chan peer.AddrInfo, count)
// refresh the k-bucket containing this key
defer func() {
dht.routingTable.BucketForID(kb.ConvertKey(key.KeyString())).ResetRefreshedAt(time.Now())
}()
go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
return peerOut
}
......@@ -618,6 +596,9 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid,
Extra: err.Error(),
})
}
// refresh the k-bucket containing this key after the query is run
dht.routingTable.BucketForID(kb.ConvertKey(key.KeyString())).ResetRefreshedAt(time.Now())
}
// FindPeer searches for a peer with given ID.
......@@ -648,11 +629,6 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo,
}
}
// refresh the k-bucket containing this key
defer func() {
dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now())
}()
// setup the Query
parent := ctx
query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
......@@ -694,6 +670,9 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo,
return peer.AddrInfo{}, err
}
// refresh the k-bucket containing this key since the lookup was successful
dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now())
logger.Debugf("FindPeer %v %v", id, result.success)
if result.peer.ID == "" {
return peer.AddrInfo{}, routing.ErrNotFound
......@@ -714,11 +693,6 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
return nil, kb.ErrLookupFailure
}
// refresh the k-bucket containing this key
defer func() {
dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now())
}()
// setup the Query
query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
......@@ -767,6 +741,9 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
logger.Debug(err)
}
// refresh the k-bucket containing this key
dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now())
// close the peerchan channel when done.
close(peerchan)
}()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment