query.go 6.61 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4 5
	"sync"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	inet "github.com/jbenet/go-ipfs/net"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7 8
	peer "github.com/jbenet/go-ipfs/peer"
	queue "github.com/jbenet/go-ipfs/peer/queue"
9
	"github.com/jbenet/go-ipfs/routing"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	u "github.com/jbenet/go-ipfs/util"
11
	pset "github.com/jbenet/go-ipfs/util/peerset"
12
	todoctr "github.com/jbenet/go-ipfs/util/todocounter"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13 14

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
15
	ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17
)

18
var maxQueryConcurrency = AlphaValue
19

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20
type dhtQuery struct {
21 22
	// the key we're querying for
	key u.Key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24
	// dialer used to ensure we're connected to peers
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25
	dialer inet.Dialer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
27 28
	// the function to execute per peer
	qfunc queryFunc
29 30 31 32 33 34

	// the concurrency parameter
	concurrency int
}

type dhtQueryResult struct {
35 36 37 38
	value         []byte          // GetValue
	peer          peer.PeerInfo   // FindPeer
	providerPeers []peer.PeerInfo // GetProviders
	closerPeers   []peer.PeerInfo // *
39 40 41 42
	success       bool
}

// constructs query
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43
func newQuery(k u.Key, d inet.Dialer, f queryFunc) *dhtQuery {
44 45
	return &dhtQuery{
		key:         k,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
46
		dialer:      d,
47 48 49
		qfunc:       f,
		concurrency: maxQueryConcurrency,
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
50 51 52 53 54 55 56
}

// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
57
type queryFunc func(context.Context, peer.ID) (*dhtQueryResult, error)
58 59

// Run runs the query at hand. pass in a list of peers to use first.
60
func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
61 62 63 64 65 66 67 68 69 70 71 72 73
	runner := newQueryRunner(ctx, q)
	return runner.Run(peers)
}

type dhtQueryRunner struct {

	// the query to run
	query *dhtQuery

	// peersToQuery is a list of peers remaining to query
	peersToQuery *queue.ChanQueue

	// peersSeen are all the peers queried. used to prevent querying same peer 2x
74
	peersSeen *pset.PeerSet
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
75

76 77 78 79 80 81
	// rateLimit is a channel used to rate limit our processing (semaphore)
	rateLimit chan struct{}

	// peersRemaining is a counter of peers remaining (toQuery + processing)
	peersRemaining todoctr.Counter

82 83
	// context group
	cg ctxgroup.ContextGroup
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99

	// result
	result *dhtQueryResult

	// result errors
	errs []error

	// lock for concurrent access to fields
	sync.RWMutex
}

func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
	return &dhtQueryRunner{
		query:          q,
		peersToQuery:   queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
		peersRemaining: todoctr.NewSyncCounter(),
100
		peersSeen:      pset.New(),
101
		rateLimit:      make(chan struct{}, q.concurrency),
102
		cg:             ctxgroup.WithContext(ctx),
103 104 105
	}
}

106
func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
107
	log.Debugf("Run query with %d peers.", len(peers))
108 109 110 111
	if len(peers) == 0 {
		log.Warning("Running query with no peers!")
		return nil, nil
	}
112

113 114 115
	// setup concurrency rate limiting
	for i := 0; i < r.query.concurrency; i++ {
		r.rateLimit <- struct{}{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
116 117
	}

118 119
	// add all the peers we got first.
	for _, p := range peers {
120
		r.addPeerToQuery(r.cg.Context(), p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
121 122
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
123
	// go do this thing.
124 125 126
	// do it as a child func to make sure Run exits
	// ONLY AFTER spawn workers has exited.
	r.cg.AddChildFunc(r.spawnWorkers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
127 128 129 130

	// so workers are working.

	// wait until they're done.
131
	err := routing.ErrNotFound
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
133
	select {
134
	case <-r.peersRemaining.Done():
135
		r.cg.Close()
136 137 138 139
		r.RLock()
		defer r.RUnlock()

		if len(r.errs) > 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
140
			err = r.errs[0]
141 142
		}

143
	case <-r.cg.Closed():
144 145
		r.RLock()
		defer r.RUnlock()
146
		err = r.cg.Context().Err() // collect the error.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
147
	}
148

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
149 150
	if r.result != nil && r.result.success {
		return r.result, nil
151 152
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
153
	return nil, err
154 155
}

156
func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) {
157
	// if new peer is ourselves...
158
	if next == r.query.dialer.LocalPeer() {
159 160 161
		return
	}

162 163
	if !r.peersSeen.TryAdd(next) {
		log.Debug("query peer was already seen")
164 165 166
		return
	}

167
	log.Debugf("adding peer to query: %v", next)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
168

169 170 171 172
	// do this after unlocking to prevent possible deadlocks.
	r.peersRemaining.Increment(1)
	select {
	case r.peersToQuery.EnqChan <- next:
173
	case <-ctx.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
174
	}
175 176
}

177
func (r *dhtQueryRunner) spawnWorkers(parent ctxgroup.ContextGroup) {
178
	for {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
179

180 181 182 183
		select {
		case <-r.peersRemaining.Done():
			return

184
		case <-r.cg.Closing():
185 186
			return

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187 188 189 190
		case p, more := <-r.peersToQuery.DeqChan:
			if !more {
				return // channel closed.
			}
191 192 193 194 195 196 197
			log.Debugf("spawning worker for: %v", p)

			// do it as a child func to make sure Run exits
			// ONLY AFTER spawn workers has exited.
			parent.AddChildFunc(func(cg ctxgroup.ContextGroup) {
				r.queryPeer(cg, p)
			})
198 199 200
		}
	}
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
201

202
func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
203
	log.Debugf("spawned worker for: %v", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
204

205 206 207
	// make sure we rate limit concurrency.
	select {
	case <-r.rateLimit:
208
	case <-cg.Closing():
209 210 211 212
		r.peersRemaining.Decrement(1)
		return
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
213
	// ok let's do this!
214
	log.Debugf("running worker for: %v", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
215 216 217 218

	// make sure we do this when we exit
	defer func() {
		// signal we're done proccessing peer p
219
		log.Debugf("completing worker for: %v", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
220 221 222 223 224
		r.peersRemaining.Decrement(1)
		r.rateLimit <- struct{}{}
	}()

	// make sure we're connected to the peer.
225 226 227 228 229 230 231 232 233 234 235 236
	if conns := r.query.dialer.ConnsToPeer(p); len(conns) == 0 {
		log.Infof("worker for: %v -- not connected. dial start", p)

		if err := r.query.dialer.DialPeer(cg.Context(), p); err != nil {
			log.Debugf("ERROR worker for: %v -- err connecting: %v", p, err)
			r.Lock()
			r.errs = append(r.errs, err)
			r.Unlock()
			return
		}

		log.Infof("worker for: %v -- not connected. dial success!", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
237
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
238

239
	// finally, run the query against this peer
240
	res, err := r.query.qfunc(cg.Context(), p)
241 242

	if err != nil {
243
		log.Debugf("ERROR worker for: %v %v", p, err)
244 245 246 247 248
		r.Lock()
		r.errs = append(r.errs, err)
		r.Unlock()

	} else if res.success {
249
		log.Debugf("SUCCESS worker for: %v", p, res)
250 251 252
		r.Lock()
		r.result = res
		r.Unlock()
253 254
		go r.cg.Close() // signal to everyone that we're done.
		// must be async, as we're one of the children, and Close blocks.
255

256 257
	} else if len(res.closerPeers) > 0 {
		log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
258
		for _, next := range res.closerPeers {
259
			// add their addresses to the dialer's peerstore
260 261 262 263 264
			conns := r.query.dialer.ConnsToPeer(next.ID)
			if len(conns) == 0 {
				log.Infof("PEERS CLOSER -- worker for %v FOUND NEW PEER: %s %s", p, next.ID, next.Addrs)
			}

265
			r.query.dialer.Peerstore().AddAddresses(next.ID, next.Addrs)
266
			r.addPeerToQuery(cg.Context(), next.ID)
267
			log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
268
		}
269 270
	} else {
		log.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
271 272
	}
}