query.go 7.05 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4 5
	"sync"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6 7
	peer "github.com/jbenet/go-ipfs/p2p/peer"
	queue "github.com/jbenet/go-ipfs/p2p/peer/queue"
8
	"github.com/jbenet/go-ipfs/routing"
9
	eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	u "github.com/jbenet/go-ipfs/util"
11
	pset "github.com/jbenet/go-ipfs/util/peerset"
12
	todoctr "github.com/jbenet/go-ipfs/util/todocounter"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13 14

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
15
	ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17
)

18
var maxQueryConcurrency = AlphaValue
19

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20
type dhtQuery struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21 22 23 24
	dht         *IpfsDHT
	key         u.Key     // the key we're querying for
	qfunc       queryFunc // the function to execute per peer
	concurrency int       // the concurrency parameter
25 26 27
}

type dhtQueryResult struct {
28 29 30 31
	value         []byte          // GetValue
	peer          peer.PeerInfo   // FindPeer
	providerPeers []peer.PeerInfo // GetProviders
	closerPeers   []peer.PeerInfo // *
32 33 34 35
	success       bool
}

// constructs query
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
36
func (dht *IpfsDHT) newQuery(k u.Key, f queryFunc) *dhtQuery {
37 38
	return &dhtQuery{
		key:         k,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
39
		dht:         dht,
40 41 42
		qfunc:       f,
		concurrency: maxQueryConcurrency,
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43 44 45 46 47 48 49
}

// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
50
type queryFunc func(context.Context, peer.ID) (*dhtQueryResult, error)
51 52

// Run runs the query at hand. pass in a list of peers to use first.
53
func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
54 55 56 57 58
	runner := newQueryRunner(ctx, q)
	return runner.Run(peers)
}

type dhtQueryRunner struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59 60 61 62
	query          *dhtQuery        // query to run
	peersSeen      *pset.PeerSet    // all peers queried. prevent querying same peer 2x
	peersToQuery   *queue.ChanQueue // peers remaining to be queried
	peersRemaining todoctr.Counter  // peersToQuery + currently processing
63

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64
	result *dhtQueryResult // query result
65
	errs   u.MultiErr      // result errors. maybe should be a map[peer.ID]error
66

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67 68
	rateLimit chan struct{} // processing semaphore
	log       eventlog.EventLogger
69

70
	cg ctxgroup.ContextGroup
71 72 73 74 75 76 77 78
	sync.RWMutex
}

func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
	return &dhtQueryRunner{
		query:          q,
		peersToQuery:   queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
		peersRemaining: todoctr.NewSyncCounter(),
79
		peersSeen:      pset.New(),
80
		rateLimit:      make(chan struct{}, q.concurrency),
81
		cg:             ctxgroup.WithContext(ctx),
82 83 84
	}
}

85
func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86 87 88 89 90
	log := log.Prefix("dht(%s).Query(%s).Run(%d)", r.query.dht.self, r.query.key, len(peers))
	r.log = log
	log.Debug("enter")
	defer log.Debug("end")

91
	log.Debugf("Run query with %d peers.", len(peers))
92 93 94 95
	if len(peers) == 0 {
		log.Warning("Running query with no peers!")
		return nil, nil
	}
96

97 98 99
	// setup concurrency rate limiting
	for i := 0; i < r.query.concurrency; i++ {
		r.rateLimit <- struct{}{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
100 101
	}

102 103
	// add all the peers we got first.
	for _, p := range peers {
104
		r.addPeerToQuery(r.cg.Context(), p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
105 106
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107
	// go do this thing.
108 109
	// do it as a child func to make sure Run exits
	// ONLY AFTER spawn workers has exited.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
110
	log.Debugf("go spawn workers")
111
	r.cg.AddChildFunc(r.spawnWorkers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112 113 114 115

	// so workers are working.

	// wait until they're done.
116
	err := routing.ErrNotFound
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118
	select {
119
	case <-r.peersRemaining.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120
		log.Debug("all peers ended")
121
		r.cg.Close()
122 123 124
		r.RLock()
		defer r.RUnlock()

125 126 127 128 129 130
		err = routing.ErrNotFound

		// if every query to every peer failed, something must be very wrong.
		if len(r.errs) > 0 && len(r.errs) == r.peersSeen.Size() {
			log.Debugf("query errs: %s", r.errs)
			err = r.errs[0]
131 132
		}

133
	case <-r.cg.Closed():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
134 135
		log.Debug("r.cg.Closed()")

136 137
		r.RLock()
		defer r.RUnlock()
138
		err = r.cg.Context().Err() // collect the error.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
139
	}
140

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
141
	if r.result != nil && r.result.success {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
142
		log.Debug("success: %s", r.result)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
143
		return r.result, nil
144 145
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
146
	log.Debug("failure: %s", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
147
	return nil, err
148 149
}

150
func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) {
151
	// if new peer is ourselves...
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
152
	if next == r.query.dht.self {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
153
		r.log.Debug("addPeerToQuery skip self")
154 155 156
		return
	}

157
	if !r.peersSeen.TryAdd(next) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158
		r.log.Debugf("addPeerToQuery skip seen %s", next)
159 160 161
		return
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
162
	r.log.Debugf("addPeerToQuery adding %s", next)
163 164 165
	r.peersRemaining.Increment(1)
	select {
	case r.peersToQuery.EnqChan <- next:
166
	case <-ctx.Done():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
167
	}
168 169
}

170
func (r *dhtQueryRunner) spawnWorkers(parent ctxgroup.ContextGroup) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
171 172 173 174
	log := r.log.Prefix("spawnWorkers")
	log.Debugf("begin")
	defer log.Debugf("end")

175
	for {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
176

177 178 179 180
		select {
		case <-r.peersRemaining.Done():
			return

181
		case <-r.cg.Closing():
182 183
			return

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
184 185 186 187
		case p, more := <-r.peersToQuery.DeqChan:
			if !more {
				return // channel closed.
			}
188 189 190 191 192 193 194
			log.Debugf("spawning worker for: %v", p)

			// do it as a child func to make sure Run exits
			// ONLY AFTER spawn workers has exited.
			parent.AddChildFunc(func(cg ctxgroup.ContextGroup) {
				r.queryPeer(cg, p)
			})
195 196 197
		}
	}
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
198

199
func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
200 201 202
	log := r.log.Prefix("queryPeer(%s)", p)
	log.Debugf("spawned")
	defer log.Debugf("finished")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
203

204 205 206
	// make sure we rate limit concurrency.
	select {
	case <-r.rateLimit:
207
	case <-cg.Closing():
208 209 210 211
		r.peersRemaining.Decrement(1)
		return
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
212
	// ok let's do this!
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
213
	log.Debugf("running")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
214 215 216 217

	// make sure we do this when we exit
	defer func() {
		// signal we're done proccessing peer p
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
218
		log.Debugf("completed")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
219 220 221 222 223
		r.peersRemaining.Decrement(1)
		r.rateLimit <- struct{}{}
	}()

	// make sure we're connected to the peer.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
224
	if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
225
		log.Infof("not connected. dialing.")
226

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
227 228
		pi := peer.PeerInfo{ID: p}
		if err := r.query.dht.host.Connect(cg.Context(), pi); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
229
			log.Debugf("Error connecting: %s", err)
230 231 232 233 234 235
			r.Lock()
			r.errs = append(r.errs, err)
			r.Unlock()
			return
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
236
		log.Debugf("connected. dial success.")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
237
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
238

239
	// finally, run the query against this peer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
240
	log.Debugf("query running")
241
	res, err := r.query.qfunc(cg.Context(), p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
242
	log.Debugf("query finished")
243 244

	if err != nil {
245
		log.Debugf("ERROR worker for: %v %v", p, err)
246 247 248 249 250
		r.Lock()
		r.errs = append(r.errs, err)
		r.Unlock()

	} else if res.success {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
251
		log.Debugf("SUCCESS worker for: %v %s", p, res)
252 253 254
		r.Lock()
		r.result = res
		r.Unlock()
255 256
		go r.cg.Close() // signal to everyone that we're done.
		// must be async, as we're one of the children, and Close blocks.
257

258 259
	} else if len(res.closerPeers) > 0 {
		log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
260
		for _, next := range res.closerPeers {
261 262 263 264 265
			if next.ID == r.query.dht.self { // dont add self.
				log.Debugf("PEERS CLOSER -- worker for: %v found self", p)
				continue
			}

266
			// add their addresses to the dialer's peerstore
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
267
			r.query.dht.peerstore.AddPeerInfo(next)
268
			r.addPeerToQuery(cg.Context(), next.ID)
269
			log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
270
		}
271 272
	} else {
		log.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
273 274
	}
}