query.go 5.22 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4 5
	"sync"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6 7
	peer "github.com/jbenet/go-ipfs/peer"
	queue "github.com/jbenet/go-ipfs/peer/queue"
8
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9
	u "github.com/jbenet/go-ipfs/util"
10
	todoctr "github.com/jbenet/go-ipfs/util/todocounter"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11 12 13 14

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)

15 16
const maxQueryConcurrency = 5

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17
type dhtQuery struct {
18 19
	// the key we're querying for
	key u.Key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20 21 22

	// the function to execute per peer
	qfunc queryFunc
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42

	// the concurrency parameter
	concurrency int
}

type dhtQueryResult struct {
	value         []byte       // GetValue
	peer          *peer.Peer   // FindPeer
	providerPeers []*peer.Peer // GetProviders
	closerPeers   []*peer.Peer // *
	success       bool
}

// constructs query
func newQuery(k u.Key, f queryFunc) *dhtQuery {
	return &dhtQuery{
		key:         k,
		qfunc:       f,
		concurrency: maxQueryConcurrency,
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43 44 45 46 47 48 49
}

// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
type queryFunc func(context.Context, *peer.Peer) (*dhtQueryResult, error)

// Run runs the query at hand. pass in a list of peers to use first.
func (q *dhtQuery) Run(ctx context.Context, peers []*peer.Peer) (*dhtQueryResult, error) {
	runner := newQueryRunner(ctx, q)
	return runner.Run(peers)
}

type dhtQueryRunner struct {

	// the query to run
	query *dhtQuery

	// peersToQuery is a list of peers remaining to query
	peersToQuery *queue.ChanQueue

	// peersSeen are all the peers queried. used to prevent querying same peer 2x
	peersSeen peer.Map
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68

69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
	// rateLimit is a channel used to rate limit our processing (semaphore)
	rateLimit chan struct{}

	// peersRemaining is a counter of peers remaining (toQuery + processing)
	peersRemaining todoctr.Counter

	// context
	ctx    context.Context
	cancel context.CancelFunc

	// result
	result *dhtQueryResult

	// result errors
	errs []error

	// lock for concurrent access to fields
	sync.RWMutex
}

func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
90 91
	ctx, cancel := context.WithCancel(ctx)

92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
	return &dhtQueryRunner{
		ctx:            ctx,
		cancel:         cancel,
		query:          q,
		peersToQuery:   queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
		peersRemaining: todoctr.NewSyncCounter(),
		peersSeen:      peer.Map{},
		rateLimit:      make(chan struct{}, q.concurrency),
	}
}

func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) {
	// setup concurrency rate limiting
	for i := 0; i < r.query.concurrency; i++ {
		r.rateLimit <- struct{}{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107 108
	}

109 110 111
	// add all the peers we got first.
	for _, p := range peers {
		r.addPeerToQuery(p, nil) // don't have access to self here...
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112 113
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
114 115 116 117 118 119
	// go do this thing.
	go r.spawnWorkers()

	// so workers are working.

	// wait until they're done.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120
	select {
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
	case <-r.peersRemaining.Done():
		r.cancel() // ran all and nothing. cancel all outstanding workers.

		r.RLock()
		defer r.RUnlock()

		if len(r.errs) > 0 {
			return nil, r.errs[0]
		}
		return nil, u.ErrNotFound

	case <-r.ctx.Done():
		r.RLock()
		defer r.RUnlock()

		if r.result != nil && r.result.success {
			return r.result, nil
		}
		return nil, r.ctx.Err()
	}

}

func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) {
	if next == nil {
		// wtf why are peers nil?!?
		u.PErr("Query getting nil peers!!!\n")
		return
	}

	// if new peer further away than whom we got it from, bother (loops)
	if benchmark != nil && kb.Closer(benchmark.ID, next.ID, r.query.key) {
		return
	}

	// if already seen, no need.
	r.Lock()
	_, found := r.peersSeen[next.Key()]
	if found {
		r.Unlock()
		return
	}
	r.peersSeen[next.Key()] = next
	r.Unlock()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
166 167
	u.POut("adding peer to query: %v\n", next.ID.Pretty())

168 169 170 171 172
	// do this after unlocking to prevent possible deadlocks.
	r.peersRemaining.Increment(1)
	select {
	case r.peersToQuery.EnqChan <- next:
	case <-r.ctx.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
173
	}
174 175
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
176
func (r *dhtQueryRunner) spawnWorkers() {
177
	for {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
178

179 180 181 182 183 184 185
		select {
		case <-r.peersRemaining.Done():
			return

		case <-r.ctx.Done():
			return

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
186 187 188 189 190
		case p, more := <-r.peersToQuery.DeqChan:
			if !more {
				return // channel closed.
			}
			u.POut("spawning worker for: %v\n", p.ID.Pretty())
191 192 193 194
			go r.queryPeer(p)
		}
	}
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
195

196
func (r *dhtQueryRunner) queryPeer(p *peer.Peer) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
197 198
	u.POut("spawned worker for: %v\n", p.ID.Pretty())

199 200 201 202 203 204 205 206
	// make sure we rate limit concurrency.
	select {
	case <-r.rateLimit:
	case <-r.ctx.Done():
		r.peersRemaining.Decrement(1)
		return
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
207 208
	u.POut("running worker for: %v\n", p.ID.Pretty())

209 210 211 212
	// finally, run the query against this peer
	res, err := r.query.qfunc(r.ctx, p)

	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
213
		u.POut("ERROR worker for: %v %v\n", p.ID.Pretty(), err)
214 215 216 217 218
		r.Lock()
		r.errs = append(r.errs, err)
		r.Unlock()

	} else if res.success {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
219
		u.POut("SUCCESS worker for: %v\n", p.ID.Pretty(), res)
220 221 222 223 224 225
		r.Lock()
		r.result = res
		r.Unlock()
		r.cancel() // signal to everyone that we're done.

	} else if res.closerPeers != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
226
		u.POut("PEERS CLOSER -- worker for: %v\n", p.ID.Pretty())
227 228 229
		for _, next := range res.closerPeers {
			r.addPeerToQuery(next, p)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
230 231
	}

232
	// signal we're done proccessing peer p
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
233
	u.POut("completing worker for: %v\n", p.ID.Pretty())
234 235
	r.peersRemaining.Decrement(1)
	r.rateLimit <- struct{}{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
236
}