query.go 5.8 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4 5
	"sync"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6 7
	peer "github.com/jbenet/go-ipfs/peer"
	queue "github.com/jbenet/go-ipfs/peer/queue"
8
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9
	u "github.com/jbenet/go-ipfs/util"
10
	todoctr "github.com/jbenet/go-ipfs/util/todocounter"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11 12 13 14

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)

15
var maxQueryConcurrency = AlphaValue
16

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17 18 19 20 21
type dhtDialer interface {
	// DialPeer attempts to establish a connection to a given peer
	DialPeer(peer.Peer) error
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22
type dhtQuery struct {
23 24
	// the key we're querying for
	key u.Key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26 27 28
	// dialer used to ensure we're connected to peers
	dialer dhtDialer

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29 30
	// the function to execute per peer
	qfunc queryFunc
31 32 33 34 35 36

	// the concurrency parameter
	concurrency int
}

type dhtQueryResult struct {
37 38 39 40
	value         []byte      // GetValue
	peer          peer.Peer   // FindPeer
	providerPeers []peer.Peer // GetProviders
	closerPeers   []peer.Peer // *
41 42 43 44
	success       bool
}

// constructs query
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45
func newQuery(k u.Key, d dhtDialer, f queryFunc) *dhtQuery {
46 47
	return &dhtQuery{
		key:         k,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
48
		dialer:      d,
49 50 51
		qfunc:       f,
		concurrency: maxQueryConcurrency,
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52 53 54 55 56 57 58
}

// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
59
type queryFunc func(context.Context, peer.Peer) (*dhtQueryResult, error)
60 61

// Run runs the query at hand. pass in a list of peers to use first.
62
func (q *dhtQuery) Run(ctx context.Context, peers []peer.Peer) (*dhtQueryResult, error) {
63 64 65 66 67 68 69 70 71 72 73 74 75 76
	runner := newQueryRunner(ctx, q)
	return runner.Run(peers)
}

type dhtQueryRunner struct {

	// the query to run
	query *dhtQuery

	// peersToQuery is a list of peers remaining to query
	peersToQuery *queue.ChanQueue

	// peersSeen are all the peers queried. used to prevent querying same peer 2x
	peersSeen peer.Map
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77

78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
	// rateLimit is a channel used to rate limit our processing (semaphore)
	rateLimit chan struct{}

	// peersRemaining is a counter of peers remaining (toQuery + processing)
	peersRemaining todoctr.Counter

	// context
	ctx    context.Context
	cancel context.CancelFunc

	// result
	result *dhtQueryResult

	// result errors
	errs []error

	// lock for concurrent access to fields
	sync.RWMutex
}

func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
99 100
	ctx, cancel := context.WithCancel(ctx)

101 102 103 104 105 106 107 108 109 110 111
	return &dhtQueryRunner{
		ctx:            ctx,
		cancel:         cancel,
		query:          q,
		peersToQuery:   queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
		peersRemaining: todoctr.NewSyncCounter(),
		peersSeen:      peer.Map{},
		rateLimit:      make(chan struct{}, q.concurrency),
	}
}

112
func (r *dhtQueryRunner) Run(peers []peer.Peer) (*dhtQueryResult, error) {
113 114 115 116 117
	log.Debug("Run query with %d peers.", len(peers))
	if len(peers) == 0 {
		log.Warning("Running query with no peers!")
		return nil, nil
	}
118

119 120 121
	// setup concurrency rate limiting
	for i := 0; i < r.query.concurrency; i++ {
		r.rateLimit <- struct{}{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
122 123
	}

124 125 126
	// add all the peers we got first.
	for _, p := range peers {
		r.addPeerToQuery(p, nil) // don't have access to self here...
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
127 128
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129 130 131 132 133 134
	// go do this thing.
	go r.spawnWorkers()

	// so workers are working.

	// wait until they're done.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
135 136
	err := u.ErrNotFound

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
137
	select {
138 139 140 141 142 143
	case <-r.peersRemaining.Done():
		r.cancel() // ran all and nothing. cancel all outstanding workers.
		r.RLock()
		defer r.RUnlock()

		if len(r.errs) > 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
144
			err = r.errs[0]
145 146 147 148 149
		}

	case <-r.ctx.Done():
		r.RLock()
		defer r.RUnlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
150 151
		err = r.ctx.Err()
	}
152

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
153 154
	if r.result != nil && r.result.success {
		return r.result, nil
155 156
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157
	return nil, err
158 159
}

160
func (r *dhtQueryRunner) addPeerToQuery(next peer.Peer, benchmark peer.Peer) {
161 162
	if next == nil {
		// wtf why are peers nil?!?
163
		log.Error("Query getting nil peers!!!\n")
164 165 166 167
		return
	}

	// if new peer further away than whom we got it from, bother (loops)
168
	if benchmark != nil && kb.Closer(benchmark.ID(), next.ID(), r.query.key) {
169 170 171 172 173 174 175 176 177 178 179 180 181
		return
	}

	// if already seen, no need.
	r.Lock()
	_, found := r.peersSeen[next.Key()]
	if found {
		r.Unlock()
		return
	}
	r.peersSeen[next.Key()] = next
	r.Unlock()

182
	log.Debug("adding peer to query: %v\n", next)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
183

184 185 186 187 188
	// do this after unlocking to prevent possible deadlocks.
	r.peersRemaining.Increment(1)
	select {
	case r.peersToQuery.EnqChan <- next:
	case <-r.ctx.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
189
	}
190 191
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
192
func (r *dhtQueryRunner) spawnWorkers() {
193
	for {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
194

195 196 197 198 199 200 201
		select {
		case <-r.peersRemaining.Done():
			return

		case <-r.ctx.Done():
			return

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202 203 204 205
		case p, more := <-r.peersToQuery.DeqChan:
			if !more {
				return // channel closed.
			}
206
			log.Debug("spawning worker for: %v\n", p)
207 208 209 210
			go r.queryPeer(p)
		}
	}
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
211

212
func (r *dhtQueryRunner) queryPeer(p peer.Peer) {
213
	log.Debug("spawned worker for: %v\n", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
214

215 216 217 218 219 220 221 222
	// make sure we rate limit concurrency.
	select {
	case <-r.rateLimit:
	case <-r.ctx.Done():
		r.peersRemaining.Decrement(1)
		return
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
	// ok let's do this!
	log.Debug("running worker for: %v", p)

	// make sure we do this when we exit
	defer func() {
		// signal we're done proccessing peer p
		log.Debug("completing worker for: %v", p)
		r.peersRemaining.Decrement(1)
		r.rateLimit <- struct{}{}
	}()

	// make sure we're connected to the peer.
	err := r.query.dialer.DialPeer(p)
	if err != nil {
		log.Debug("ERROR worker for: %v -- err connecting: %v", p, err)
		r.Lock()
		r.errs = append(r.errs, err)
		r.Unlock()
		return
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
243

244 245 246 247
	// finally, run the query against this peer
	res, err := r.query.qfunc(r.ctx, p)

	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
248
		log.Debug("ERROR worker for: %v %v", p, err)
249 250 251 252 253
		r.Lock()
		r.errs = append(r.errs, err)
		r.Unlock()

	} else if res.success {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
254
		log.Debug("SUCCESS worker for: %v", p, res)
255 256 257 258 259 260
		r.Lock()
		r.result = res
		r.Unlock()
		r.cancel() // signal to everyone that we're done.

	} else if res.closerPeers != nil {
261
		log.Debug("PEERS CLOSER -- worker for: %v\n", p)
262 263 264
		for _, next := range res.closerPeers {
			r.addPeerToQuery(next, p)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
265 266
	}
}