query.go 5.22 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4 5
	"sync"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6 7
	peer "github.com/jbenet/go-ipfs/peer"
	queue "github.com/jbenet/go-ipfs/peer/queue"
8
	kb "github.com/jbenet/go-ipfs/routing/kbucket"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9
	u "github.com/jbenet/go-ipfs/util"
10
	todoctr "github.com/jbenet/go-ipfs/util/todocounter"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11 12 13 14

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)

15
var maxQueryConcurrency = AlphaValue
16

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17
type dhtQuery struct {
18 19
	// the key we're querying for
	key u.Key
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20 21 22

	// the function to execute per peer
	qfunc queryFunc
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42

	// the concurrency parameter
	concurrency int
}

type dhtQueryResult struct {
	value         []byte       // GetValue
	peer          *peer.Peer   // FindPeer
	providerPeers []*peer.Peer // GetProviders
	closerPeers   []*peer.Peer // *
	success       bool
}

// constructs query
func newQuery(k u.Key, f queryFunc) *dhtQuery {
	return &dhtQuery{
		key:         k,
		qfunc:       f,
		concurrency: maxQueryConcurrency,
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43 44 45 46 47 48 49
}

// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
type queryFunc func(context.Context, *peer.Peer) (*dhtQueryResult, error)

// Run runs the query at hand. pass in a list of peers to use first.
func (q *dhtQuery) Run(ctx context.Context, peers []*peer.Peer) (*dhtQueryResult, error) {
	runner := newQueryRunner(ctx, q)
	return runner.Run(peers)
}

type dhtQueryRunner struct {

	// the query to run
	query *dhtQuery

	// peersToQuery is a list of peers remaining to query
	peersToQuery *queue.ChanQueue

	// peersSeen are all the peers queried. used to prevent querying same peer 2x
	peersSeen peer.Map
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68

69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
	// rateLimit is a channel used to rate limit our processing (semaphore)
	rateLimit chan struct{}

	// peersRemaining is a counter of peers remaining (toQuery + processing)
	peersRemaining todoctr.Counter

	// context
	ctx    context.Context
	cancel context.CancelFunc

	// result
	result *dhtQueryResult

	// result errors
	errs []error

	// lock for concurrent access to fields
	sync.RWMutex
}

func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
90 91
	ctx, cancel := context.WithCancel(ctx)

92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
	return &dhtQueryRunner{
		ctx:            ctx,
		cancel:         cancel,
		query:          q,
		peersToQuery:   queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
		peersRemaining: todoctr.NewSyncCounter(),
		peersSeen:      peer.Map{},
		rateLimit:      make(chan struct{}, q.concurrency),
	}
}

func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) {
	// setup concurrency rate limiting
	for i := 0; i < r.query.concurrency; i++ {
		r.rateLimit <- struct{}{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107 108
	}

109 110 111
	// add all the peers we got first.
	for _, p := range peers {
		r.addPeerToQuery(p, nil) // don't have access to self here...
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112 113
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
114 115 116 117 118 119
	// go do this thing.
	go r.spawnWorkers()

	// so workers are working.

	// wait until they're done.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120 121
	err := u.ErrNotFound

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
122
	select {
123 124 125 126 127 128
	case <-r.peersRemaining.Done():
		r.cancel() // ran all and nothing. cancel all outstanding workers.
		r.RLock()
		defer r.RUnlock()

		if len(r.errs) > 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129
			err = r.errs[0]
130 131 132 133 134
		}

	case <-r.ctx.Done():
		r.RLock()
		defer r.RUnlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
135 136
		err = r.ctx.Err()
	}
137

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
138 139
	if r.result != nil && r.result.success {
		return r.result, nil
140 141
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
142
	return nil, err
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
}

func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) {
	if next == nil {
		// wtf why are peers nil?!?
		u.PErr("Query getting nil peers!!!\n")
		return
	}

	// if new peer further away than whom we got it from, bother (loops)
	if benchmark != nil && kb.Closer(benchmark.ID, next.ID, r.query.key) {
		return
	}

	// if already seen, no need.
	r.Lock()
	_, found := r.peersSeen[next.Key()]
	if found {
		r.Unlock()
		return
	}
	r.peersSeen[next.Key()] = next
	r.Unlock()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
167
	u.DOut("adding peer to query: %v\n", next.ID.Pretty())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
168

169 170 171 172 173
	// do this after unlocking to prevent possible deadlocks.
	r.peersRemaining.Increment(1)
	select {
	case r.peersToQuery.EnqChan <- next:
	case <-r.ctx.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
174
	}
175 176
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
177
func (r *dhtQueryRunner) spawnWorkers() {
178
	for {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
179

180 181 182 183 184 185 186
		select {
		case <-r.peersRemaining.Done():
			return

		case <-r.ctx.Done():
			return

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187 188 189 190
		case p, more := <-r.peersToQuery.DeqChan:
			if !more {
				return // channel closed.
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
191
			u.DOut("spawning worker for: %v\n", p.ID.Pretty())
192 193 194 195
			go r.queryPeer(p)
		}
	}
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
196

197
func (r *dhtQueryRunner) queryPeer(p *peer.Peer) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
198
	u.DOut("spawned worker for: %v\n", p.ID.Pretty())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
199

200 201 202 203 204 205 206 207
	// make sure we rate limit concurrency.
	select {
	case <-r.rateLimit:
	case <-r.ctx.Done():
		r.peersRemaining.Decrement(1)
		return
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
208
	u.DOut("running worker for: %v\n", p.ID.Pretty())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
209

210 211 212 213
	// finally, run the query against this peer
	res, err := r.query.qfunc(r.ctx, p)

	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
214
		u.DOut("ERROR worker for: %v %v\n", p.ID.Pretty(), err)
215 216 217 218 219
		r.Lock()
		r.errs = append(r.errs, err)
		r.Unlock()

	} else if res.success {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
220
		u.DOut("SUCCESS worker for: %v\n", p.ID.Pretty(), res)
221 222 223 224 225 226
		r.Lock()
		r.result = res
		r.Unlock()
		r.cancel() // signal to everyone that we're done.

	} else if res.closerPeers != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
227
		u.DOut("PEERS CLOSER -- worker for: %v\n", p.ID.Pretty())
228 229 230
		for _, next := range res.closerPeers {
			r.addPeerToQuery(next, p)
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
231 232
	}

233
	// signal we're done proccessing peer p
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
234
	u.DOut("completing worker for: %v\n", p.ID.Pretty())
235 236
	r.peersRemaining.Decrement(1)
	r.rateLimit <- struct{}{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
237
}