From dc0fbfd3d37af81d804f36103caa0c634fcf37c2 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet <juan@benet.ai> Date: Thu, 18 Sep 2014 19:41:41 -0700 Subject: [PATCH] added some logging --- routing/dht/query.go | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/routing/dht/query.go b/routing/dht/query.go index ecdc4c62..40025963 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -111,7 +111,12 @@ func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) { r.addPeerToQuery(p, nil) // don't have access to self here... } - // wait until we're done. yep. + // go do this thing. + go r.spawnWorkers() + + // so workers are working. + + // wait until they're done. select { case <-r.peersRemaining.Done(): r.cancel() // ran all and nothing. cancel all outstanding workers. @@ -158,6 +163,8 @@ func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) { r.peersSeen[next.Key()] = next r.Unlock() + u.POut("adding peer to query: %v\n", next.ID.Pretty()) + // do this after unlocking to prevent possible deadlocks. r.peersRemaining.Increment(1) select { @@ -166,8 +173,9 @@ func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) { } } -func (r *dhtQueryRunner) spawnWorkers(p *peer.Peer) { +func (r *dhtQueryRunner) spawnWorkers() { for { + select { case <-r.peersRemaining.Done(): return @@ -175,13 +183,19 @@ func (r *dhtQueryRunner) spawnWorkers(p *peer.Peer) { case <-r.ctx.Done(): return - case p := <-r.peersToQuery.DeqChan: + case p, more := <-r.peersToQuery.DeqChan: + if !more { + return // channel closed. + } + u.POut("spawning worker for: %v\n", p.ID.Pretty()) go r.queryPeer(p) } } } func (r *dhtQueryRunner) queryPeer(p *peer.Peer) { + u.POut("spawned worker for: %v\n", p.ID.Pretty()) + // make sure we rate limit concurrency. select { case <-r.rateLimit: @@ -190,27 +204,33 @@ func (r *dhtQueryRunner) queryPeer(p *peer.Peer) { return } + u.POut("running worker for: %v\n", p.ID.Pretty()) + // finally, run the query against this peer res, err := r.query.qfunc(r.ctx, p) if err != nil { + u.POut("ERROR worker for: %v %v\n", p.ID.Pretty(), err) r.Lock() r.errs = append(r.errs, err) r.Unlock() } else if res.success { + u.POut("SUCCESS worker for: %v\n", p.ID.Pretty(), res) r.Lock() r.result = res r.Unlock() r.cancel() // signal to everyone that we're done. } else if res.closerPeers != nil { + u.POut("PEERS CLOSER -- worker for: %v\n", p.ID.Pretty()) for _, next := range res.closerPeers { r.addPeerToQuery(next, p) } } // signal we're done proccessing peer p + u.POut("completing worker for: %v\n", p.ID.Pretty()) r.peersRemaining.Decrement(1) r.rateLimit <- struct{}{} } -- GitLab