query.go 5.83 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4 5
	"sync"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	inet "github.com/jbenet/go-ipfs/net"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7 8
	peer "github.com/jbenet/go-ipfs/peer"
	queue "github.com/jbenet/go-ipfs/peer/queue"
9
	"github.com/jbenet/go-ipfs/routing"
10
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11
	u "github.com/jbenet/go-ipfs/util"
12
	todoctr "github.com/jbenet/go-ipfs/util/todocounter"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13 14 15 16

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)

17
var maxQueryConcurrency = AlphaValue
18

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19
type dhtQuery struct {
20 21
	// the key we're querying for
	key u.Key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23
	// dialer used to ensure we're connected to peers
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24
	dialer inet.Dialer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26 27
	// the function to execute per peer
	qfunc queryFunc
28 29 30 31 32 33

	// the concurrency parameter
	concurrency int
}

type dhtQueryResult struct {
34 35 36 37
	value         []byte      // GetValue
	peer          peer.Peer   // FindPeer
	providerPeers []peer.Peer // GetProviders
	closerPeers   []peer.Peer // *
38 39 40 41
	success       bool
}

// constructs query
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
42
func newQuery(k u.Key, d inet.Dialer, f queryFunc) *dhtQuery {
43 44
	return &dhtQuery{
		key:         k,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45
		dialer:      d,
46 47 48
		qfunc:       f,
		concurrency: maxQueryConcurrency,
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49 50 51 52 53 54 55
}

// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
56
type queryFunc func(context.Context, peer.Peer) (*dhtQueryResult, error)
57 58

// Run runs the query at hand. pass in a list of peers to use first.
59
func (q *dhtQuery) Run(ctx context.Context, peers []peer.Peer) (*dhtQueryResult, error) {
60 61 62 63 64 65 66 67 68 69 70 71 72 73
	runner := newQueryRunner(ctx, q)
	return runner.Run(peers)
}

type dhtQueryRunner struct {

	// the query to run
	query *dhtQuery

	// peersToQuery is a list of peers remaining to query
	peersToQuery *queue.ChanQueue

	// peersSeen are all the peers queried. used to prevent querying same peer 2x
	peersSeen peer.Map
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
74

75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
	// rateLimit is a channel used to rate limit our processing (semaphore)
	rateLimit chan struct{}

	// peersRemaining is a counter of peers remaining (toQuery + processing)
	peersRemaining todoctr.Counter

	// context
	ctx    context.Context
	cancel context.CancelFunc

	// result
	result *dhtQueryResult

	// result errors
	errs []error

	// lock for concurrent access to fields
	sync.RWMutex
}

func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
96 97
	ctx, cancel := context.WithCancel(ctx)

98 99 100 101 102 103 104 105 106 107 108
	return &dhtQueryRunner{
		ctx:            ctx,
		cancel:         cancel,
		query:          q,
		peersToQuery:   queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
		peersRemaining: todoctr.NewSyncCounter(),
		peersSeen:      peer.Map{},
		rateLimit:      make(chan struct{}, q.concurrency),
	}
}

109
func (r *dhtQueryRunner) Run(peers []peer.Peer) (*dhtQueryResult, error) {
110
	log.Debugf("Run query with %d peers.", len(peers))
111 112 113 114
	if len(peers) == 0 {
		log.Warning("Running query with no peers!")
		return nil, nil
	}
115

116 117 118
	// setup concurrency rate limiting
	for i := 0; i < r.query.concurrency; i++ {
		r.rateLimit <- struct{}{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
119 120
	}

121 122 123
	// add all the peers we got first.
	for _, p := range peers {
		r.addPeerToQuery(p, nil) // don't have access to self here...
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
124 125
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
126 127 128 129 130 131
	// go do this thing.
	go r.spawnWorkers()

	// so workers are working.

	// wait until they're done.
132
	err := routing.ErrNotFound
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
133

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
134
	select {
135 136 137 138 139 140
	case <-r.peersRemaining.Done():
		r.cancel() // ran all and nothing. cancel all outstanding workers.
		r.RLock()
		defer r.RUnlock()

		if len(r.errs) > 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
141
			err = r.errs[0]
142 143 144 145 146
		}

	case <-r.ctx.Done():
		r.RLock()
		defer r.RUnlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
147 148
		err = r.ctx.Err()
	}
149

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
150 151
	if r.result != nil && r.result.success {
		return r.result, nil
152 153
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
154
	return nil, err
155 156
}

157
func (r *dhtQueryRunner) addPeerToQuery(next peer.Peer, benchmark peer.Peer) {
158 159
	if next == nil {
		// wtf why are peers nil?!?
160
		log.Error("Query getting nil peers!!!\n")
161 162 163 164
		return
	}

	// if new peer further away than whom we got it from, bother (loops)
165
	if benchmark != nil && kb.Closer(benchmark.ID(), next.ID(), r.query.key) {
166 167 168 169 170 171 172 173 174 175 176 177 178
		return
	}

	// if already seen, no need.
	r.Lock()
	_, found := r.peersSeen[next.Key()]
	if found {
		r.Unlock()
		return
	}
	r.peersSeen[next.Key()] = next
	r.Unlock()

179
	log.Debugf("adding peer to query: %v\n", next)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
180

181 182 183 184 185
	// do this after unlocking to prevent possible deadlocks.
	r.peersRemaining.Increment(1)
	select {
	case r.peersToQuery.EnqChan <- next:
	case <-r.ctx.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
186
	}
187 188
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
189
func (r *dhtQueryRunner) spawnWorkers() {
190
	for {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
191

192 193 194 195 196 197 198
		select {
		case <-r.peersRemaining.Done():
			return

		case <-r.ctx.Done():
			return

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
199 200 201 202
		case p, more := <-r.peersToQuery.DeqChan:
			if !more {
				return // channel closed.
			}
203
			log.Debugf("spawning worker for: %v\n", p)
204 205 206 207
			go r.queryPeer(p)
		}
	}
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
208

209
func (r *dhtQueryRunner) queryPeer(p peer.Peer) {
210
	log.Debugf("spawned worker for: %v\n", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
211

212 213 214 215 216 217 218 219
	// make sure we rate limit concurrency.
	select {
	case <-r.rateLimit:
	case <-r.ctx.Done():
		r.peersRemaining.Decrement(1)
		return
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
220
	// ok let's do this!
221
	log.Debugf("running worker for: %v", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
222 223 224 225

	// make sure we do this when we exit
	defer func() {
		// signal we're done proccessing peer p
226
		log.Debugf("completing worker for: %v", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
227 228 229 230 231
		r.peersRemaining.Decrement(1)
		r.rateLimit <- struct{}{}
	}()

	// make sure we're connected to the peer.
Juan Batiz-Benet's avatar
notes  
Juan Batiz-Benet committed
232
	// (Incidentally, this will add it to the peerstore too)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
233 234
	err := r.query.dialer.DialPeer(p)
	if err != nil {
235
		log.Debugf("ERROR worker for: %v -- err connecting: %v", p, err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
236 237 238 239 240
		r.Lock()
		r.errs = append(r.errs, err)
		r.Unlock()
		return
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
241

242 243 244 245
	// finally, run the query against this peer
	res, err := r.query.qfunc(r.ctx, p)

	if err != nil {
246
		log.Debugf("ERROR worker for: %v %v", p, err)
247 248 249 250 251
		r.Lock()
		r.errs = append(r.errs, err)
		r.Unlock()

	} else if res.success {
252
		log.Debugf("SUCCESS worker for: %v", p, res)
253 254 255 256 257 258
		r.Lock()
		r.result = res
		r.Unlock()
		r.cancel() // signal to everyone that we're done.

	} else if res.closerPeers != nil {
259
		log.Debugf("PEERS CLOSER -- worker for: %v\n", p)
260 261 262
		for _, next := range res.closerPeers {
			r.addPeerToQuery(next, p)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
263 264
	}
}