Commit 50d59a42 authored by rht's avatar rht

Change Process interface into object variable

License: MIT
Signed-off-by: default avatarrht <rhtbot@gmail.com>
parent fb7fd036
......@@ -58,8 +58,8 @@ type IpfsDHT struct {
Validator record.Validator // record validator funcs
Context context.Context
goprocess.Process
ctx context.Context
proc goprocess.Process
}
// NewDHT creates a new DHT object with the given peer as the 'local' host
......@@ -73,18 +73,18 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip
// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
procctx = goprocessctx.WithContext(ctx)
procctx.SetTeardown(func() error {
proc := goprocessctx.WithContext(ctx)
proc.SetTeardown(func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
return nil
})
dht.Process = procctx
dht.Context = ctx
dht.proc = proc
dht.ctx = ctx
h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
dht.providers = NewProviderManager(dht.Context, dht.self)
dht.AddChild(dht.providers)
dht.providers = NewProviderManager(dht.ctx, dht.self)
dht.proc.AddChild(dht.providers.proc)
dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore)
dht.birth = time.Now()
......@@ -93,7 +93,9 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip
dht.Validator["pk"] = record.PublicKeyValidator
if doPinging {
dht.Go(func() { dht.PingRoutine(time.Second * 10) })
dht.proc.Go(func(p goprocess.Process) {
dht.PingRoutine(time.Second * 10)
})
}
return dht
}
......@@ -360,15 +362,30 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
rand.Read(id)
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.Key(id)), 5)
for _, p := range peers {
ctx, cancel := context.WithTimeout(dht.Context, time.Second*5)
ctx, cancel := context.WithTimeout(dht.Context(), time.Second*5)
_, err := dht.Ping(ctx, p)
if err != nil {
log.Debugf("Ping error: %s", err)
}
cancel()
}
case <-dht.Closing():
case <-dht.proc.Closing():
return
}
}
}
// Context return dht's context
func (dht *IpfsDHT) Context() context.Context {
return dht.ctx
}
// Process return dht's process
func (dht *IpfsDHT) Process() goprocess.Process {
return dht.proc
}
// Close calls Process Close
func (dht *IpfsDHT) Close() error {
return dht.proc.Close()
}
......@@ -16,7 +16,7 @@ func (nn *netNotifiee) DHT() *IpfsDHT {
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
dht := nn.DHT()
select {
case <-dht.Closing():
case <-dht.Process().Closing():
return
default:
}
......@@ -26,7 +26,7 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
dht := nn.DHT()
select {
case <-dht.Closing():
case <-dht.Process().Closing():
return
default:
}
......
......@@ -22,7 +22,7 @@ type ProviderManager struct {
newprovs chan *addProv
getprovs chan *getProv
period time.Duration
goprocess.Process
proc goprocess.Process
}
type providerSet struct {
......@@ -47,8 +47,8 @@ func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager {
pm.providers = make(map[key.Key]*providerSet)
pm.getlocal = make(chan chan []key.Key)
pm.local = make(map[key.Key]struct{})
pm.Process = goprocessctx.WithContext(ctx)
pm.Go(pm.run)
pm.proc = goprocessctx.WithContext(ctx)
pm.proc.Go(func(p goprocess.Process) { pm.run() })
return pm
}
......@@ -97,7 +97,7 @@ func (pm *ProviderManager) run() {
provs.providers = filtered
}
case <-pm.Closing():
case <-pm.proc.Closing():
return
}
}
......
......@@ -19,5 +19,5 @@ func TestProviderManager(t *testing.T) {
if len(resp) != 1 {
t.Fatal("Could not retrieve provider.")
}
p.Close()
p.proc.Close()
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment