query.go 7.91 KB
Newer Older
1 2 3 4 5
// package query implement a query manager to drive concurrent workers
// to query the DHT. A query is setup with a target key, a queryFunc tasked
// to communicate with a peer, and a set of initial peers. As the query
// progress, queryFunc can return closer peers that will be used to navigate
// closer to the target key in the DHT until an answer is reached.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6 7 8
package dht

import (
Jeromy's avatar
Jeromy committed
9
	"context"
10 11
	"sync"

12 13
	u "github.com/ipfs/go-ipfs-util"
	logging "github.com/ipfs/go-log"
George Antoniadis's avatar
George Antoniadis committed
14
	todoctr "github.com/ipfs/go-todocounter"
15 16
	process "github.com/jbenet/goprocess"
	ctxproc "github.com/jbenet/goprocess/context"
Jeromy's avatar
Jeromy committed
17
	inet "github.com/libp2p/go-libp2p-net"
18 19 20 21
	peer "github.com/libp2p/go-libp2p-peer"
	pset "github.com/libp2p/go-libp2p-peer/peerset"
	pstore "github.com/libp2p/go-libp2p-peerstore"
	queue "github.com/libp2p/go-libp2p-peerstore/queue"
George Antoniadis's avatar
George Antoniadis committed
22 23
	routing "github.com/libp2p/go-libp2p-routing"
	notif "github.com/libp2p/go-libp2p-routing/notifications"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24 25
)

26
var maxQueryConcurrency = AlphaValue
27

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28
type dhtQuery struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29
	dht         *IpfsDHT
30
	key         string    // the key we're querying for
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31 32
	qfunc       queryFunc // the function to execute per peer
	concurrency int       // the concurrency parameter
33 34 35
}

type dhtQueryResult struct {
Jeromy's avatar
Jeromy committed
36 37 38 39
	value         []byte             // GetValue
	peer          *pstore.PeerInfo   // FindPeer
	providerPeers []pstore.PeerInfo  // GetProviders
	closerPeers   []*pstore.PeerInfo // *
40
	success       bool
Jeromy's avatar
Jeromy committed
41 42

	finalSet *pset.PeerSet
43 44 45
}

// constructs query
46
func (dht *IpfsDHT) newQuery(k string, f queryFunc) *dhtQuery {
47 48
	return &dhtQuery{
		key:         k,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49
		dht:         dht,
50 51 52
		qfunc:       f,
		concurrency: maxQueryConcurrency,
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
53 54 55 56 57 58 59
}

// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
60
type queryFunc func(context.Context, peer.ID) (*dhtQueryResult, error)
61 62

// Run runs the query at hand. pass in a list of peers to use first.
63
func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64 65 66 67 68 69
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	default:
	}

70 71 72
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

73 74
	runner := newQueryRunner(q)
	return runner.Run(ctx, peers)
75 76 77
}

type dhtQueryRunner struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
78 79 80 81
	query          *dhtQuery        // query to run
	peersSeen      *pset.PeerSet    // all peers queried. prevent querying same peer 2x
	peersToQuery   *queue.ChanQueue // peers remaining to be queried
	peersRemaining todoctr.Counter  // peersToQuery + currently processing
82

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
83
	result *dhtQueryResult // query result
84
	errs   u.MultiErr      // result errors. maybe should be a map[peer.ID]error
85

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86
	rateLimit chan struct{} // processing semaphore
Jeromy's avatar
Jeromy committed
87
	log       logging.EventLogger
88

89 90
	runCtx context.Context

91
	proc process.Process
92 93 94
	sync.RWMutex
}

95 96
func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
	proc := process.WithParent(process.Background())
97
	ctx := ctxproc.OnClosingContext(proc)
98 99
	return &dhtQueryRunner{
		query:          q,
100
		peersToQuery:   queue.NewChanQueue(ctx, queue.NewXORDistancePQ(string(q.key))),
101
		peersRemaining: todoctr.NewSyncCounter(),
102
		peersSeen:      pset.New(),
103
		rateLimit:      make(chan struct{}, q.concurrency),
104
		proc:           proc,
105 106 107
	}
}

108
func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
109
	r.log = log
110
	r.runCtx = ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
111

112 113 114 115
	if len(peers) == 0 {
		log.Warning("Running query with no peers!")
		return nil, nil
	}
116

117 118 119
	// setup concurrency rate limiting
	for i := 0; i < r.query.concurrency; i++ {
		r.rateLimit <- struct{}{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120 121
	}

122 123
	// add all the peers we got first.
	for _, p := range peers {
124
		r.addPeerToQuery(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
125 126
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
127
	// go do this thing.
128
	// do it as a child proc to make sure Run exits
129
	// ONLY AFTER spawn workers has exited.
130
	r.proc.Go(r.spawnWorkers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
131 132 133 134

	// so workers are working.

	// wait until they're done.
135
	err := routing.ErrNotFound
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
136

137 138 139
	// now, if the context finishes, close the proc.
	// we have to do it here because the logic before is setup, which
	// should run without closing the proc.
rht's avatar
rht committed
140
	ctxproc.CloseAfterContext(r.proc, ctx)
141

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
142
	select {
143
	case <-r.peersRemaining.Done():
144
		r.proc.Close()
145 146 147
		r.RLock()
		defer r.RUnlock()

148 149 150 151 152 153
		err = routing.ErrNotFound

		// if every query to every peer failed, something must be very wrong.
		if len(r.errs) > 0 && len(r.errs) == r.peersSeen.Size() {
			log.Debugf("query errs: %s", r.errs)
			err = r.errs[0]
154 155
		}

156
	case <-r.proc.Closed():
157 158
		r.RLock()
		defer r.RUnlock()
159
		err = context.DeadlineExceeded
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160
	}
161

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
162 163
	if r.result != nil && r.result.success {
		return r.result, nil
164 165
	}

Jeromy's avatar
Jeromy committed
166 167 168
	return &dhtQueryResult{
		finalSet: r.peersSeen,
	}, err
169 170
}

171
func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) {
172
	// if new peer is ourselves...
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
173
	if next == r.query.dht.self {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
174
		r.log.Debug("addPeerToQuery skip self")
175 176 177
		return
	}

178
	if !r.peersSeen.TryAdd(next) {
179 180 181
		return
	}

182 183 184 185 186
	notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
		Type: notif.AddingPeer,
		ID:   next,
	})

187 188 189
	r.peersRemaining.Increment(1)
	select {
	case r.peersToQuery.EnqChan <- next:
190
	case <-r.proc.Closing():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
191
	}
192 193
}

194
func (r *dhtQueryRunner) spawnWorkers(proc process.Process) {
195
	for {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
196

197 198 199 200
		select {
		case <-r.peersRemaining.Done():
			return

201
		case <-r.proc.Closing():
202 203
			return

Jeromy's avatar
Jeromy committed
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
		case <-r.rateLimit:
			select {
			case p, more := <-r.peersToQuery.DeqChan:
				if !more {
					return // channel closed.
				}

				// do it as a child func to make sure Run exits
				// ONLY AFTER spawn workers has exited.
				proc.Go(func(proc process.Process) {
					r.queryPeer(proc, p)
				})
			case <-r.proc.Closing():
				return
			case <-r.peersRemaining.Done():
				return
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
220
			}
221 222 223
		}
	}
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
224

225
func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
226 227
	// ok let's do this!

228
	// create a context from our proc.
229
	ctx := ctxproc.OnClosingContext(proc)
230

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
231 232
	// make sure we do this when we exit
	defer func() {
233
		// signal we're done processing peer p
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
234 235 236 237 238
		r.peersRemaining.Decrement(1)
		r.rateLimit <- struct{}{}
	}()

	// make sure we're connected to the peer.
239
	// FIXME abstract away into the network layer
Jeromy's avatar
Jeromy committed
240 241 242
	// Note: Failure to connect in this block will cause the function to
	// short circuit.
	if r.query.dht.host.Network().Connectedness(p) == inet.NotConnected {
Jeromy's avatar
Jeromy committed
243
		log.Debug("not connected. dialing.")
244 245 246 247 248

		notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
			Type: notif.DialingPeer,
			ID:   p,
		})
249 250 251
		// while we dial, we do not take up a rate limit. this is to allow
		// forward progress during potentially very high latency dials.
		r.rateLimit <- struct{}{}
252

Jeromy's avatar
Jeromy committed
253
		pi := pstore.PeerInfo{ID: p}
254 255

		if err := r.query.dht.host.Connect(ctx, pi); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
256
			log.Debugf("Error connecting: %s", err)
257

258
			notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
259 260
				Type:  notif.QueryError,
				Extra: err.Error(),
261
				ID:    p,
262 263
			})

264 265 266
			r.Lock()
			r.errs = append(r.errs, err)
			r.Unlock()
267
			<-r.rateLimit // need to grab it again, as we deferred.
268 269
			return
		}
270
		<-r.rateLimit // need to grab it again, as we deferred.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
271
		log.Debugf("connected. dial success.")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
272
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
273

274
	// finally, run the query against this peer
275
	res, err := r.query.qfunc(ctx, p)
276 277

	if err != nil {
278
		log.Debugf("ERROR worker for: %v %v", p, err)
279 280 281 282 283
		r.Lock()
		r.errs = append(r.errs, err)
		r.Unlock()

	} else if res.success {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
284
		log.Debugf("SUCCESS worker for: %v %s", p, res)
285 286 287
		r.Lock()
		r.result = res
		r.Unlock()
288
		go r.proc.Close() // signal to everyone that we're done.
289
		// must be async, as we're one of the children, and Close blocks.
290

291 292
	} else if len(res.closerPeers) > 0 {
		log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
293
		for _, next := range res.closerPeers {
294
			if next.ID == r.query.dht.self { // don't add self.
295 296 297 298
				log.Debugf("PEERS CLOSER -- worker for: %v found self", p)
				continue
			}

299
			// add their addresses to the dialer's peerstore
Jeromy's avatar
Jeromy committed
300
			r.query.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
301
			r.addPeerToQuery(next.ID)
302
			log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
303
		}
304 305
	} else {
		log.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
306 307
	}
}