Commit 7f403bd4 authored by rht's avatar rht

Replace ctxgroup.ContextGroup -> goprocess.Process

License: MIT
Signed-off-by: default avatarrht <rhtbot@gmail.com>
parent e9b2168e
......@@ -23,8 +23,9 @@ import (
u "github.com/ipfs/go-ipfs/util"
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
goprocess "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
......@@ -57,7 +58,8 @@ type IpfsDHT struct {
Validator record.Validator // record validator funcs
ctxgroup.ContextGroup
Context context.Context
goprocess.Process
}
// NewDHT creates a new DHT object with the given peer as the 'local' host
......@@ -71,14 +73,17 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip
// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
dht.ContextGroup = ctxgroup.WithContextAndTeardown(ctx, func() error {
procctx = goprocessctx.WithContext(ctx)
procctx.SetTeardown(func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
return nil
})
dht.Process = procctx
dht.Context = ctx
h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
dht.providers = NewProviderManager(dht.Context(), dht.self)
dht.providers = NewProviderManager(dht.Context, dht.self)
dht.AddChild(dht.providers)
dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore)
......@@ -88,8 +93,7 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip
dht.Validator["pk"] = record.PublicKeyValidator
if doPinging {
dht.Children().Add(1)
go dht.PingRoutine(time.Second * 10)
dht.Go(func() { dht.PingRoutine(time.Second * 10) })
}
return dht
}
......@@ -348,8 +352,6 @@ func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, p peer.ID) error
// PingRoutine periodically pings nearest neighbors.
func (dht *IpfsDHT) PingRoutine(t time.Duration) {
defer dht.Children().Done()
tick := time.Tick(t)
for {
select {
......@@ -358,7 +360,7 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
rand.Read(id)
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.Key(id)), 5)
for _, p := range peers {
ctx, cancel := context.WithTimeout(dht.Context(), time.Second*5)
ctx, cancel := context.WithTimeout(dht.Context, time.Second*5)
_, err := dht.Ping(ctx, p)
if err != nil {
log.Debugf("Ping error: %s", err)
......
......@@ -3,7 +3,8 @@ package dht
import (
"time"
ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
key "github.com/ipfs/go-ipfs/blocks/key"
peer "github.com/ipfs/go-ipfs/p2p/peer"
......@@ -21,7 +22,7 @@ type ProviderManager struct {
newprovs chan *addProv
getprovs chan *getProv
period time.Duration
ctxgroup.ContextGroup
goprocess.Process
}
type providerSet struct {
......@@ -46,17 +47,13 @@ func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager {
pm.providers = make(map[key.Key]*providerSet)
pm.getlocal = make(chan chan []key.Key)
pm.local = make(map[key.Key]struct{})
pm.ContextGroup = ctxgroup.WithContext(ctx)
pm.Children().Add(1)
go pm.run()
pm.Process = goprocessctx.WithContext(ctx)
pm.Go(pm.run)
return pm
}
func (pm *ProviderManager) run() {
defer pm.Children().Done()
tick := time.NewTicker(time.Hour)
for {
select {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment