query.go 5.29 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4 5
	"sync"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6 7
	peer "github.com/jbenet/go-ipfs/peer"
	queue "github.com/jbenet/go-ipfs/peer/queue"
8
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9
	u "github.com/jbenet/go-ipfs/util"
10
	todoctr "github.com/jbenet/go-ipfs/util/todocounter"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11 12 13 14

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)

15
var maxQueryConcurrency = AlphaValue
16

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17
type dhtQuery struct {
18 19
	// the key we're querying for
	key u.Key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20 21 22

	// the function to execute per peer
	qfunc queryFunc
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42

	// the concurrency parameter
	concurrency int
}

type dhtQueryResult struct {
	value         []byte       // GetValue
	peer          *peer.Peer   // FindPeer
	providerPeers []*peer.Peer // GetProviders
	closerPeers   []*peer.Peer // *
	success       bool
}

// constructs query
func newQuery(k u.Key, f queryFunc) *dhtQuery {
	return &dhtQuery{
		key:         k,
		qfunc:       f,
		concurrency: maxQueryConcurrency,
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43 44 45 46 47 48 49
}

// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
type queryFunc func(context.Context, *peer.Peer) (*dhtQueryResult, error)

// Run runs the query at hand. pass in a list of peers to use first.
func (q *dhtQuery) Run(ctx context.Context, peers []*peer.Peer) (*dhtQueryResult, error) {
	runner := newQueryRunner(ctx, q)
	return runner.Run(peers)
}

type dhtQueryRunner struct {

	// the query to run
	query *dhtQuery

	// peersToQuery is a list of peers remaining to query
	peersToQuery *queue.ChanQueue

	// peersSeen are all the peers queried. used to prevent querying same peer 2x
	peersSeen peer.Map
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68

69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
	// rateLimit is a channel used to rate limit our processing (semaphore)
	rateLimit chan struct{}

	// peersRemaining is a counter of peers remaining (toQuery + processing)
	peersRemaining todoctr.Counter

	// context
	ctx    context.Context
	cancel context.CancelFunc

	// result
	result *dhtQueryResult

	// result errors
	errs []error

	// lock for concurrent access to fields
	sync.RWMutex
}

func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
90 91
	ctx, cancel := context.WithCancel(ctx)

92 93 94 95 96 97 98 99 100 101 102 103
	return &dhtQueryRunner{
		ctx:            ctx,
		cancel:         cancel,
		query:          q,
		peersToQuery:   queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
		peersRemaining: todoctr.NewSyncCounter(),
		peersSeen:      peer.Map{},
		rateLimit:      make(chan struct{}, q.concurrency),
	}
}

func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) {
104 105 106 107 108
	log.Debug("Run query with %d peers.", len(peers))
	if len(peers) == 0 {
		log.Warning("Running query with no peers!")
		return nil, nil
	}
109

110 111 112
	// setup concurrency rate limiting
	for i := 0; i < r.query.concurrency; i++ {
		r.rateLimit <- struct{}{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
113 114
	}

115 116 117
	// add all the peers we got first.
	for _, p := range peers {
		r.addPeerToQuery(p, nil) // don't have access to self here...
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118 119
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120 121 122 123 124 125
	// go do this thing.
	go r.spawnWorkers()

	// so workers are working.

	// wait until they're done.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
126 127
	err := u.ErrNotFound

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
128
	select {
129 130 131 132 133 134
	case <-r.peersRemaining.Done():
		r.cancel() // ran all and nothing. cancel all outstanding workers.
		r.RLock()
		defer r.RUnlock()

		if len(r.errs) > 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
135
			err = r.errs[0]
136 137 138 139 140
		}

	case <-r.ctx.Done():
		r.RLock()
		defer r.RUnlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
141 142
		err = r.ctx.Err()
	}
143

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
144 145
	if r.result != nil && r.result.success {
		return r.result, nil
146 147
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
148
	return nil, err
149 150 151 152 153
}

func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) {
	if next == nil {
		// wtf why are peers nil?!?
154
		log.Error("Query getting nil peers!!!\n")
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
		return
	}

	// if new peer further away than whom we got it from, bother (loops)
	if benchmark != nil && kb.Closer(benchmark.ID, next.ID, r.query.key) {
		return
	}

	// if already seen, no need.
	r.Lock()
	_, found := r.peersSeen[next.Key()]
	if found {
		r.Unlock()
		return
	}
	r.peersSeen[next.Key()] = next
	r.Unlock()

173
	log.Debug("adding peer to query: %v\n", next)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
174

175 176 177 178 179
	// do this after unlocking to prevent possible deadlocks.
	r.peersRemaining.Increment(1)
	select {
	case r.peersToQuery.EnqChan <- next:
	case <-r.ctx.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
180
	}
181 182
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
183
func (r *dhtQueryRunner) spawnWorkers() {
184
	for {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
185

186 187 188 189 190 191 192
		select {
		case <-r.peersRemaining.Done():
			return

		case <-r.ctx.Done():
			return

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
193 194 195 196
		case p, more := <-r.peersToQuery.DeqChan:
			if !more {
				return // channel closed.
			}
197
			log.Debug("spawning worker for: %v\n", p)
198 199 200 201
			go r.queryPeer(p)
		}
	}
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202

203
func (r *dhtQueryRunner) queryPeer(p *peer.Peer) {
204
	log.Debug("spawned worker for: %v\n", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
205

206 207 208 209 210 211 212 213
	// make sure we rate limit concurrency.
	select {
	case <-r.rateLimit:
	case <-r.ctx.Done():
		r.peersRemaining.Decrement(1)
		return
	}

214
	log.Debug("running worker for: %v\n", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
215

216 217 218 219
	// finally, run the query against this peer
	res, err := r.query.qfunc(r.ctx, p)

	if err != nil {
220
		log.Debug("ERROR worker for: %v %v\n", p, err)
221 222 223 224 225
		r.Lock()
		r.errs = append(r.errs, err)
		r.Unlock()

	} else if res.success {
226
		log.Debug("SUCCESS worker for: %v\n", p, res)
227 228 229 230 231 232
		r.Lock()
		r.result = res
		r.Unlock()
		r.cancel() // signal to everyone that we're done.

	} else if res.closerPeers != nil {
233
		log.Debug("PEERS CLOSER -- worker for: %v\n", p)
234 235 236
		for _, next := range res.closerPeers {
			r.addPeerToQuery(next, p)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
237 238
	}

239
	// signal we're done proccessing peer p
240
	log.Debug("completing worker for: %v\n", p)
241 242
	r.peersRemaining.Decrement(1)
	r.rateLimit <- struct{}{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
243
}