query.go 7.85 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Steven Allen's avatar
Steven Allen committed
5
	"errors"
6 7
	"sync"

8 9 10
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"

11
	logging "github.com/ipfs/go-log"
George Antoniadis's avatar
George Antoniadis committed
12
	todoctr "github.com/ipfs/go-todocounter"
13 14
	process "github.com/jbenet/goprocess"
	ctxproc "github.com/jbenet/goprocess/context"
15
	kb "github.com/libp2p/go-libp2p-kbucket"
16 17 18

	pstore "github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/routing"
19
	queue "github.com/libp2p/go-libp2p-peerstore/queue"
George Antoniadis's avatar
George Antoniadis committed
20
	notif "github.com/libp2p/go-libp2p-routing/notifications"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21 22
)

Steven Allen's avatar
Steven Allen committed
23 24 25
// ErrNoPeersQueried is returned when we failed to connect to any peers.
var ErrNoPeersQueried = errors.New("failed to query any peers")

26
var maxQueryConcurrency = AlphaValue
27

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28
type dhtQuery struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29
	dht         *IpfsDHT
30
	key         string    // the key we're querying for
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31 32
	qfunc       queryFunc // the function to execute per peer
	concurrency int       // the concurrency parameter
33 34 35
}

type dhtQueryResult struct {
36 37
	peer        *peer.AddrInfo   // FindPeer
	closerPeers []*peer.AddrInfo // *
38
	success     bool
Jeromy's avatar
Jeromy committed
39

40 41
	finalSet   *peer.Set
	queriedSet *peer.Set
42 43 44
}

// constructs query
45
func (dht *IpfsDHT) newQuery(k string, f queryFunc) *dhtQuery {
46 47
	return &dhtQuery{
		key:         k,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
48
		dht:         dht,
49 50 51
		qfunc:       f,
		concurrency: maxQueryConcurrency,
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52 53 54 55 56 57 58
}

// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
59
type queryFunc func(context.Context, peer.ID) (*dhtQueryResult, error)
60 61

// Run runs the query at hand. pass in a list of peers to use first.
62
func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
63 64 65 66 67
	if len(peers) == 0 {
		logger.Warning("Running query with no peers!")
		return nil, kb.ErrLookupFailure
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68 69 70 71 72 73
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	default:
	}

74 75 76
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

77 78
	runner := newQueryRunner(q)
	return runner.Run(ctx, peers)
79 80 81
}

type dhtQueryRunner struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
82
	query          *dhtQuery        // query to run
83 84
	peersSeen      *peer.Set        // all peers queried. prevent querying same peer 2x
	peersQueried   *peer.Set        // peers successfully connected to and queried
85
	peersDialed    *dialQueue       // peers we have dialed to
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86 87
	peersToQuery   *queue.ChanQueue // peers remaining to be queried
	peersRemaining todoctr.Counter  // peersToQuery + currently processing
88

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
89
	result *dhtQueryResult // query result
90

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
91
	rateLimit chan struct{} // processing semaphore
Jeromy's avatar
Jeromy committed
92
	log       logging.EventLogger
93

94 95
	runCtx context.Context

96
	proc process.Process
97 98 99
	sync.RWMutex
}

100 101
func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
	proc := process.WithParent(process.Background())
102
	ctx := ctxproc.OnClosingContext(proc)
103 104
	peersToQuery := queue.NewChanQueue(ctx, queue.NewXORDistancePQ(string(q.key)))
	r := &dhtQueryRunner{
105 106
		query:          q,
		peersRemaining: todoctr.NewSyncCounter(),
107 108
		peersSeen:      peer.NewSet(),
		peersQueried:   peer.NewSet(),
109
		rateLimit:      make(chan struct{}, q.concurrency),
110
		peersToQuery:   peersToQuery,
111
		proc:           proc,
112
	}
113 114 115 116 117 118 119 120 121 122 123
	dq, err := newDialQueue(&dqParams{
		ctx:    ctx,
		target: q.key,
		in:     peersToQuery,
		dialFn: r.dialPeer,
		config: dqDefaultConfig(),
	})
	if err != nil {
		panic(err)
	}
	r.peersDialed = dq
124
	return r
125 126
}

127
func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
Matt Joiner's avatar
Matt Joiner committed
128
	r.log = logger
129
	r.runCtx = ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
130

131 132 133
	// setup concurrency rate limiting
	for i := 0; i < r.query.concurrency; i++ {
		r.rateLimit <- struct{}{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
134 135
	}

136 137
	// add all the peers we got first.
	for _, p := range peers {
138
		r.addPeerToQuery(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
139 140
	}

141 142 143 144 145
	// start the dial queue only after we've added the initial set of peers.
	// this is to avoid race conditions that could cause the peersRemaining todoctr
	// to be done too early if the initial dial fails before others make it into the queue.
	r.peersDialed.Start()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
146
	// go do this thing.
147
	// do it as a child proc to make sure Run exits
148
	// ONLY AFTER spawn workers has exited.
149
	r.proc.Go(r.spawnWorkers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
150 151

	// wait until they're done.
152
	var err error
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
153

154 155 156
	// now, if the context finishes, close the proc.
	// we have to do it here because the logic before is setup, which
	// should run without closing the proc.
rht's avatar
rht committed
157
	ctxproc.CloseAfterContext(r.proc, ctx)
158

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
159
	select {
160
	case <-r.peersRemaining.Done():
161
		r.proc.Close()
Steven Allen's avatar
Steven Allen committed
162 163 164 165
		if r.peersQueried.Size() == 0 {
			err = ErrNoPeersQueried
		} else {
			err = routing.ErrNotFound
166 167
		}

168
	case <-r.proc.Closed():
169
		err = r.runCtx.Err()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
170
	}
171

Steven Allen's avatar
Steven Allen committed
172 173 174
	r.RLock()
	defer r.RUnlock()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
175 176
	if r.result != nil && r.result.success {
		return r.result, nil
177 178
	}

Jeromy's avatar
Jeromy committed
179
	return &dhtQueryResult{
180 181
		finalSet:   r.peersSeen,
		queriedSet: r.peersQueried,
Jeromy's avatar
Jeromy committed
182
	}, err
183 184
}

185
func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) {
186
	// if new peer is ourselves...
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187
	if next == r.query.dht.self {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
188
		r.log.Debug("addPeerToQuery skip self")
189 190 191
		return
	}

192
	if !r.peersSeen.TryAdd(next) {
193 194 195
		return
	}

196 197 198 199 200
	notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
		Type: notif.AddingPeer,
		ID:   next,
	})

201 202 203
	r.peersRemaining.Increment(1)
	select {
	case r.peersToQuery.EnqChan <- next:
204
	case <-r.proc.Closing():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
205
	}
206 207
}

208
func (r *dhtQueryRunner) spawnWorkers(proc process.Process) {
209 210 211 212 213
	for {
		select {
		case <-r.peersRemaining.Done():
			return

214
		case <-r.proc.Closing():
215 216
			return

Jeromy's avatar
Jeromy committed
217
		case <-r.rateLimit:
218
			ch := r.peersDialed.Consume()
Jeromy's avatar
Jeromy committed
219
			select {
220 221 222
			case p, ok := <-ch:
				if !ok {
					// this signals context cancellation.
223 224
					return
				}
Jeromy's avatar
Jeromy committed
225 226 227 228 229 230 231 232 233
				// do it as a child func to make sure Run exits
				// ONLY AFTER spawn workers has exited.
				proc.Go(func(proc process.Process) {
					r.queryPeer(proc, p)
				})
			case <-r.proc.Closing():
				return
			case <-r.peersRemaining.Done():
				return
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
234
			}
235 236 237
		}
	}
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
238

239 240
func (r *dhtQueryRunner) dialPeer(ctx context.Context, p peer.ID) error {
	// short-circuit if we're already connected.
241
	if r.query.dht.host.Network().Connectedness(p) == network.Connected {
242 243 244
		return nil
	}

Matt Joiner's avatar
Matt Joiner committed
245
	logger.Debug("not connected. dialing.")
246 247 248 249 250
	notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
		Type: notif.DialingPeer,
		ID:   p,
	})

251
	pi := peer.AddrInfo{ID: p}
252
	if err := r.query.dht.host.Connect(ctx, pi); err != nil {
Matt Joiner's avatar
Matt Joiner committed
253
		logger.Debugf("error connecting: %s", err)
254 255 256 257 258 259
		notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
			Type:  notif.QueryError,
			Extra: err.Error(),
			ID:    p,
		})

260 261
		// This peer is dropping out of the race.
		r.peersRemaining.Decrement(1)
262 263
		return err
	}
Matt Joiner's avatar
Matt Joiner committed
264
	logger.Debugf("connected. dial success.")
265 266 267
	return nil
}

268
func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
269 270
	// ok let's do this!

271
	// create a context from our proc.
272
	ctx := ctxproc.OnClosingContext(proc)
273

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
274 275
	// make sure we do this when we exit
	defer func() {
276
		// signal we're done processing peer p
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
277 278 279 280
		r.peersRemaining.Decrement(1)
		r.rateLimit <- struct{}{}
	}()

281
	// finally, run the query against this peer
282
	res, err := r.query.qfunc(ctx, p)
283

284 285
	r.peersQueried.Add(p)

286
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
287
		logger.Debugf("ERROR worker for: %v %v", p, err)
288
	} else if res.success {
Matt Joiner's avatar
Matt Joiner committed
289
		logger.Debugf("SUCCESS worker for: %v %s", p, res)
290 291 292
		r.Lock()
		r.result = res
		r.Unlock()
293 294 295
		if res.peer != nil {
			r.query.dht.peerstore.AddAddrs(res.peer.ID, res.peer.Addrs, pstore.TempAddrTTL)
		}
296
		go r.proc.Close() // signal to everyone that we're done.
297
		// must be async, as we're one of the children, and Close blocks.
298

299
	} else if len(res.closerPeers) > 0 {
Matt Joiner's avatar
Matt Joiner committed
300
		logger.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
301
		for _, next := range res.closerPeers {
302
			if next.ID == r.query.dht.self { // don't add self.
Matt Joiner's avatar
Matt Joiner committed
303
				logger.Debugf("PEERS CLOSER -- worker for: %v found self", p)
304 305 306
				continue
			}

307
			// add their addresses to the dialer's peerstore
Jeromy's avatar
Jeromy committed
308
			r.query.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
309
			r.addPeerToQuery(next.ID)
Matt Joiner's avatar
Matt Joiner committed
310
			logger.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
311
		}
312
	} else {
Matt Joiner's avatar
Matt Joiner committed
313
		logger.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
314 315
	}
}