Commit ef6ffec0 authored by Aarsh Shah's avatar Aarsh Shah

refresh cpl in dht

parent dd3d8fbb
......@@ -45,7 +45,7 @@ func init() {
// Start the refresh worker.
func (dht *IpfsDHT) startRefreshing() error {
// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
// scan the RT table periodically & do a random walk for cpl's that haven't been queried since the given period
dht.proc.Go(func(proc process.Process) {
ctx := processctx.OnClosingContext(proc)
......@@ -104,20 +104,20 @@ func (dht *IpfsDHT) doRefresh(ctx context.Context) error {
if err := dht.selfWalk(ctx); err != nil {
merr = multierror.Append(merr, err)
}
if err := dht.refreshBuckets(ctx); err != nil {
if err := dht.refreshCpls(ctx); err != nil {
merr = multierror.Append(merr, err)
}
return merr
}
// refreshBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period
func (dht *IpfsDHT) refreshBuckets(ctx context.Context) error {
doQuery := func(bucketId int, target string, f func(context.Context) error) error {
logger.Infof("starting refreshing bucket %d to %s (routing table size was %d)",
bucketId, target, dht.routingTable.Size())
// refreshCpls scans the routing table, and does a random walk for cpl's that haven't been queried since the given period
func (dht *IpfsDHT) refreshCpls(ctx context.Context) error {
doQuery := func(cpl uint, target string, f func(context.Context) error) error {
logger.Infof("starting refreshing cpl %d to %s (routing table size was %d)",
cpl, target, dht.routingTable.Size())
defer func() {
logger.Infof("finished refreshing bucket %d to %s (routing table size is now %d)",
bucketId, target, dht.routingTable.Size())
logger.Infof("finished refreshing cpl %d to %s (routing table size is now %d)",
cpl, target, dht.routingTable.Size())
}()
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
defer cancel()
......@@ -128,35 +128,33 @@ func (dht *IpfsDHT) refreshBuckets(ctx context.Context) error {
return err
}
buckets := dht.routingTable.GetAllBuckets()
if len(buckets) > 16 {
// Don't bother bootstrapping more than 16 buckets.
// GenRandPeerID can't generate target peer IDs with more than
// 16 bits specified anyways.
buckets = buckets[:16]
}
trackedCpls := dht.routingTable.GetTrackedCplsForRefresh()
var merr error
for bucketID, bucket := range buckets {
if time.Since(bucket.RefreshedAt()) <= dht.rtRefreshPeriod {
for _, tcpl := range trackedCpls {
if time.Since(tcpl.LastRefreshAt) <= dht.rtRefreshPeriod {
continue
}
// gen rand peer with the cpl
randPeer, err := dht.routingTable.GenRandPeerID(tcpl.Cpl)
if err != nil {
logger.Errorf("failed to generate peerID for cpl %d, err: %s", tcpl.Cpl, err)
continue
}
// gen rand peer in the bucket
randPeerInBucket := dht.routingTable.GenRandPeerID(bucketID)
// walk to the generated peer
walkFnc := func(c context.Context) error {
_, err := dht.FindPeer(c, randPeerInBucket)
_, err := dht.FindPeer(c, randPeer)
if err == routing.ErrNotFound {
return nil
}
return err
}
if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil {
if err := doQuery(tcpl.Cpl, randPeer.String(), walkFnc); err != nil {
merr = multierror.Append(
merr,
fmt.Errorf("failed to do a random walk on bucket %d: %s", bucketID, err),
fmt.Errorf("failed to do a random walk for cpl %d: %s", tcpl.Cpl, err),
)
}
}
......
......@@ -103,8 +103,8 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
}
if res != nil && res.queriedSet != nil {
// refresh the k-bucket containing this key as the query was successful
dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now())
// refresh the cpl for this key as the query was successful
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now())
sorted := kb.SortClosestPeers(res.queriedSet.Peers(), kb.ConvertKey(key))
l := len(sorted)
......
......@@ -396,9 +396,8 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-cha
//
// We'll just call this a success.
if got > 0 && (err == routing.ErrNotFound || reqCtx.Err() == context.DeadlineExceeded) {
// refresh the k-bucket containing this key as the query was successful
dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now())
// refresh the cpl for this key as the query was successful
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now())
err = nil
}
done(err)
......@@ -623,8 +622,8 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid,
})
}
// refresh the k-bucket containing this key after the query is run
dht.routingTable.BucketForID(kb.ConvertKey(key.KeyString())).ResetRefreshedAt(time.Now())
// refresh the cpl for this key after the query is run
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key.KeyString()), time.Now())
}
// FindPeer searches for a peer with given ID.
......@@ -696,8 +695,8 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo,
return peer.AddrInfo{}, err
}
// refresh the k-bucket containing this key since the lookup was successful
dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now())
// refresh the cpl for this key since the lookup was successful
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertPeerID(id), time.Now())
logger.Debugf("FindPeer %v %v", id, result.success)
if result.peer.ID == "" {
......@@ -767,8 +766,8 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
logger.Debug(err)
}
// refresh the k-bucket containing this key
dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now())
// refresh the cpl for this key
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertPeerID(id), time.Now())
// close the peerchan channel when done.
close(peerchan)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment