Unverified Commit 98222a4e authored by Sam's avatar Sam

Make it async

parent 099c43eb
......@@ -138,7 +138,8 @@ type IpfsDHT struct {
// networks).
enableProviders, enableValues bool
fixLowPeersChan chan struct{}
disableFixLowPeers bool
fixLowPeersChan chan struct{}
addPeerToRTChan chan addPeerRTReq
refreshFinishedCh chan struct{}
......@@ -186,6 +187,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
dht.maxRecordAge = cfg.maxRecordAge
dht.enableProviders = cfg.enableProviders
dht.enableValues = cfg.enableValues
dht.disableFixLowPeers = cfg.disableFixLowPeers
dht.Validator = cfg.validator
......@@ -216,21 +218,12 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
// handle providers
dht.proc.AddChild(dht.ProviderManager.Process())
dht.fixLowPeers()
if err := dht.rtRefreshManager.Start(); err != nil {
return nil, err
}
dht.proc.Go(dht.populatePeers)
// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
// since RT membership is decoupled from connectivity
go dht.persistRTPeersInPeerStore()
// listens to the fix low peers chan and tries to fix the Routing Table
if !cfg.disableFixLowPeers {
dht.proc.Go(dht.fixLowPeersRoutine)
}
dht.proc.Go(dht.rtPeerLoop)
return dht, nil
......@@ -419,6 +412,22 @@ func (dht *IpfsDHT) Mode() ModeOpt {
return dht.auto
}
func (dht *IpfsDHT) populatePeers(_ goprocess.Process) {
if !dht.disableFixLowPeers {
dht.fixLowPeers(dht.ctx)
}
if err := dht.rtRefreshManager.Start(); err != nil {
logger.Error(err)
}
// listens to the fix low peers chan and tries to fix the Routing Table
if !dht.disableFixLowPeers {
dht.proc.Go(dht.fixLowPeersRoutine)
}
}
// fixLowPeersRouting manages simultaneous requests to fixLowPeers
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
ticker := time.NewTicker(periodicBootstrapInterval)
......
......@@ -58,7 +58,7 @@ func GetDefaultBootstrapPeerAddrInfos() []peer.AddrInfo {
// Bootstrap tells the DHT to get into a bootstrapped state satisfying the
// IpfsRouter interface.
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
dht.fixLowPeers(ctx)
dht.fixRTIfNeeded()
dht.rtRefreshManager.RefreshNoWait()
return nil
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment