routing.go 16.5 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"sync"
Jeromy's avatar
Jeromy committed
8
	"time"
9

Adin Schmahmann's avatar
Adin Schmahmann committed
10
	"github.com/libp2p/go-libp2p-core/network"
11 12 13 14
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/routing"

15
	"github.com/ipfs/go-cid"
16
	u "github.com/ipfs/go-ipfs-util"
Adin Schmahmann's avatar
Adin Schmahmann committed
17
	"github.com/libp2p/go-libp2p-kad-dht/qpeerset"
George Antoniadis's avatar
George Antoniadis committed
18 19
	kb "github.com/libp2p/go-libp2p-kbucket"
	record "github.com/libp2p/go-libp2p-record"
Adin Schmahmann's avatar
Adin Schmahmann committed
20
	"github.com/multiformats/go-multihash"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21 22 23 24 25 26 27
)

// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
28
// This is the top level "Store" operation of the DHT
29
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) {
30 31 32 33
	if !dht.enableValues {
		return routing.ErrNotSupported
	}

34
	logger.Debugw("putting value", "key", LoggableRecordKeyString(key))
Jeromy's avatar
Jeromy committed
35

36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

59
	rec := record.MakePutRecord(key, value)
60
	rec.TimeReceived = u.FormatRFC3339(time.Now())
Jeromy's avatar
Jeromy committed
61
	err = dht.putLocal(key, rec)
62 63 64 65
	if err != nil {
		return err
	}

66
	pchan, err := dht.GetClosestPeers(ctx, key)
67 68 69
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70

71 72 73 74
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
75 76
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
77
			defer wg.Done()
78 79
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.Value,
Jeromy's avatar
Jeromy committed
80 81 82
				ID:   p,
			})

83
			err := dht.protoMessenger.PutValue(ctx, p, rec)
84
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
85
				logger.Debugf("failed putting value to peer: %s", err)
86 87 88 89
			}
		}(p)
	}
	wg.Wait()
90

91
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
92 93
}

Steven Allen's avatar
Steven Allen committed
94 95 96 97 98 99
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
100
// GetValue searches for the value corresponding to given Key.
101
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) {
102 103 104 105
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

106
	// apply defaultQuorum if relevant
107
	var cfg routing.Options
108 109 110 111 112
	if err := cfg.Apply(opts...); err != nil {
		return nil, err
	}
	opts = append(opts, Quorum(getQuorum(&cfg, defaultQuorum)))

113 114 115 116
	responses, err := dht.SearchValue(ctx, key, opts...)
	if err != nil {
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
117 118
	var best []byte

119 120
	for r := range responses {
		best = r
Łukasz Magiera's avatar
Łukasz Magiera committed
121 122
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
123 124 125 126
	if ctx.Err() != nil {
		return best, ctx.Err()
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
127 128 129
	if best == nil {
		return nil, routing.ErrNotFound
	}
130
	logger.Debugf("GetValue %v %x", LoggableRecordKeyString(key), best)
Łukasz Magiera's avatar
Łukasz Magiera committed
131 132 133
	return best, nil
}

Alan Shaw's avatar
Alan Shaw committed
134
// SearchValue searches for the value corresponding to given Key and streams the results.
135
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
136 137 138 139
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

140
	var cfg routing.Options
Steven Allen's avatar
Steven Allen committed
141
	if err := cfg.Apply(opts...); err != nil {
142
		return nil, err
Steven Allen's avatar
Steven Allen committed
143 144 145 146
	}

	responsesNeeded := 0
	if !cfg.Offline {
147
		responsesNeeded = getQuorum(&cfg, defaultQuorum)
Steven Allen's avatar
Steven Allen committed
148 149
	}

150
	stopCh := make(chan struct{})
Adin Schmahmann's avatar
Adin Schmahmann committed
151
	valCh, lookupRes := dht.getValues(ctx, key, stopCh)
152 153

	out := make(chan []byte)
Łukasz Magiera's avatar
Łukasz Magiera committed
154 155
	go func() {
		defer close(out)
156 157
		best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded)
		if best == nil || aborted {
158
			return
159
		}
160

161 162
		updatePeers := make([]peer.ID, 0, dht.bucketSize)
		select {
Adin Schmahmann's avatar
Adin Schmahmann committed
163 164
		case l := <-lookupRes:
			if l == nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
165 166
				return
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
167

Adin Schmahmann's avatar
Adin Schmahmann committed
168
			for _, p := range l.peers {
169 170
				if _, ok := peersWithBest[p]; !ok {
					updatePeers = append(updatePeers, p)
Adin Schmahmann's avatar
Adin Schmahmann committed
171 172
				}
			}
173 174 175
		case <-ctx.Done():
			return
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
176

177 178
		dht.updatePeerValues(dht.Context(), key, best, updatePeers)
	}()
Łukasz Magiera's avatar
Łukasz Magiera committed
179

180 181
	return out, nil
}
182

183 184
func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{},
	out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) {
185 186 187 188 189
	numResponses := 0
	return dht.processValues(ctx, key, valCh,
		func(ctx context.Context, v RecvdVal, better bool) bool {
			numResponses++
			if better {
Steven Allen's avatar
Steven Allen committed
190 191 192
				select {
				case out <- v.Val:
				case <-ctx.Done():
193
					return false
Steven Allen's avatar
Steven Allen committed
194
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
195
			}
196

197
			if nvals > 0 && numResponses > nvals {
198
				close(stopCh)
199 200 201 202
				return true
			}
			return false
		})
203 204
}

Steven Allen's avatar
Steven Allen committed
205
// GetValues gets nvals values corresponding to the given key.
206
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
207 208 209
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}
210

211
	queryCtx, cancel := context.WithCancel(ctx)
212
	defer cancel()
213
	valCh, _ := dht.getValues(queryCtx, key, nil)
214 215 216 217

	out := make([]RecvdVal, 0, nvals)
	for val := range valCh {
		out = append(out, val)
218 219 220
		if len(out) == nvals {
			cancel()
		}
221 222
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
223
	return out, ctx.Err()
224 225
}

226
func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal,
227
	newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) {
228 229 230
loop:
	for {
		if aborted {
231
			return
232
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
233

234 235 236 237
		select {
		case v, ok := <-vals:
			if !ok {
				break loop
238
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
239

240 241 242 243 244 245 246 247 248
			// Select best value
			if best != nil {
				if bytes.Equal(best, v.Val) {
					peersWithBest[v.From] = struct{}{}
					aborted = newVal(ctx, v, false)
					continue
				}
				sel, err := dht.Validator.Select(key, [][]byte{best, v.Val})
				if err != nil {
249
					logger.Warnw("failed to select best value", "key", LoggableRecordKeyString(key), "error", err)
250 251 252 253 254
					continue
				}
				if sel != 1 {
					aborted = newVal(ctx, v, false)
					continue
Adin Schmahmann's avatar
Adin Schmahmann committed
255
				}
256
			}
257 258 259 260 261 262 263 264
			peersWithBest = make(map[peer.ID]struct{})
			peersWithBest[v.From] = struct{}{}
			best = v.Val
			aborted = newVal(ctx, v, true)
		case <-ctx.Done():
			return
		}
	}
265

266 267
	return
}
Adin Schmahmann's avatar
Adin Schmahmann committed
268

269 270 271 272 273 274 275 276 277
func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte, peers []peer.ID) {
	fixupRec := record.MakePutRecord(key, val)
	for _, p := range peers {
		go func(p peer.ID) {
			//TODO: Is this possible?
			if p == dht.self {
				err := dht.putLocal(key, fixupRec)
				if err != nil {
					logger.Error("Error correcting local dht entry:", err)
Adin Schmahmann's avatar
Adin Schmahmann committed
278
				}
279 280 281 282
				return
			}
			ctx, cancel := context.WithTimeout(ctx, time.Second*30)
			defer cancel()
283
			err := dht.protoMessenger.PutValue(ctx, p, fixupRec)
284 285
			if err != nil {
				logger.Debug("Error correcting DHT entry: ", err)
286
			}
287 288 289 290
		}(p)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
291
func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
292
	valCh := make(chan RecvdVal, 1)
Adin Schmahmann's avatar
Adin Schmahmann committed
293
	lookupResCh := make(chan *lookupWithFollowupResult, 1)
294

295
	logger.Debugw("finding value", "key", LoggableRecordKeyString(key))
Steven Allen's avatar
Steven Allen committed
296

297 298 299 300 301 302 303
	if rec, err := dht.getLocal(key); rec != nil && err == nil {
		select {
		case valCh <- RecvdVal{
			Val:  rec.GetValue(),
			From: dht.self,
		}:
		case <-ctx.Done():
304
		}
305
	}
306

Adin Schmahmann's avatar
Adin Schmahmann committed
307
	go func() {
308
		defer close(valCh)
Adin Schmahmann's avatar
Adin Schmahmann committed
309
		defer close(lookupResCh)
Aarsh Shah's avatar
Aarsh Shah committed
310
		lookupRes, err := dht.runLookupWithFollowup(ctx, key,
Adin Schmahmann's avatar
Adin Schmahmann committed
311 312 313 314 315 316 317
			func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type: routing.SendingQuery,
					ID:   p,
				})

318
				rec, peers, err := dht.protoMessenger.GetValue(ctx, p, key)
Adin Schmahmann's avatar
Adin Schmahmann committed
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
				switch err {
				case routing.ErrNotFound:
					// in this case, they responded with nothing,
					// still send a notification so listeners can know the
					// request has completed 'successfully'
					routing.PublishQueryEvent(ctx, &routing.QueryEvent{
						Type: routing.PeerResponse,
						ID:   p,
					})
					return nil, err
				default:
					return nil, err
				case nil, errInvalidRecord:
					// in either of these cases, we want to keep going
				}
Jeromy's avatar
Jeromy committed
334

Adin Schmahmann's avatar
Adin Schmahmann committed
335 336 337 338 339 340 341
				// TODO: What should happen if the record is invalid?
				// Pre-existing code counted it towards the quorum, but should it?
				if rec != nil && rec.GetValue() != nil {
					rv := RecvdVal{
						Val:  rec.GetValue(),
						From: p,
					}
342

Adin Schmahmann's avatar
Adin Schmahmann committed
343
					select {
344 345 346
					case valCh <- rv:
					case <-ctx.Done():
						return nil, ctx.Err()
Adin Schmahmann's avatar
Adin Schmahmann committed
347 348
					}
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
349

Adin Schmahmann's avatar
Adin Schmahmann committed
350 351 352 353 354 355 356 357 358
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type:      routing.PeerResponse,
					ID:        p,
					Responses: peers,
				})

				return peers, err
			},
Adin Schmahmann's avatar
Adin Schmahmann committed
359
			func() bool {
360 361 362 363 364 365
				select {
				case <-stopQuery:
					return true
				default:
					return false
				}
Adin Schmahmann's avatar
Adin Schmahmann committed
366 367 368
			},
		)

369 370
		if err != nil {
			return
Adin Schmahmann's avatar
Adin Schmahmann committed
371
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
372
		lookupResCh <- lookupRes
Łukasz Magiera's avatar
Łukasz Magiera committed
373

374
		if ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
375
			dht.refreshRTIfNoShortcut(kb.ConvertKey(key), lookupRes)
Łukasz Magiera's avatar
Łukasz Magiera committed
376 377
		}
	}()
378

Adin Schmahmann's avatar
Adin Schmahmann committed
379
	return valCh, lookupResCh
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
380 381
}

Adin Schmahmann's avatar
Adin Schmahmann committed
382 383
func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollowupResult) {
	if lookupRes.completed {
384 385 386 387 388
		// refresh the cpl for this key as the query was successful
		dht.routingTable.ResetCplRefreshedAtForID(key, time.Now())
	}
}

389 390 391
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
392

393
// Provide makes this node announce that it can provide a value for the given key
394
func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
395 396
	if !dht.enableProviders {
		return routing.ErrNotSupported
Steven Allen's avatar
Steven Allen committed
397 398
	} else if !key.Defined() {
		return fmt.Errorf("invalid cid: undefined")
399
	}
400
	keyMH := key.Hash()
401
	logger.Debugw("providing", "cid", key, "mh", LoggableProviderRecordBytes(keyMH))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
402 403

	// add self locally
404
	dht.ProviderManager.AddProvider(ctx, keyMH, dht.self)
Jeromy's avatar
Jeromy committed
405 406 407
	if !brdcst {
		return nil
	}
408

409 410 411 412 413 414 415 416 417 418
	closerCtx := ctx
	if deadline, ok := ctx.Deadline(); ok {
		now := time.Now()
		timeout := deadline.Sub(now)

		if timeout < 0 {
			// timed out
			return context.DeadlineExceeded
		} else if timeout < 10*time.Second {
			// Reserve 10% for the final put.
419
			deadline = deadline.Add(-timeout / 10)
420 421 422 423 424 425 426 427 428 429
		} else {
			// Otherwise, reserve a second (we'll already be
			// connected so this should be fast).
			deadline = deadline.Add(-time.Second)
		}
		var cancel context.CancelFunc
		closerCtx, cancel = context.WithDeadline(ctx, deadline)
		defer cancel()
	}

430
	var exceededDeadline bool
431
	peers, err := dht.GetClosestPeers(closerCtx, string(keyMH))
432 433 434 435 436 437 438 439 440 441 442
	switch err {
	case context.DeadlineExceeded:
		// If the _inner_ deadline has been exceeded but the _outer_
		// context is still fine, provide the value to the closest peers
		// we managed to find, even if they're not the _actual_ closest peers.
		if ctx.Err() != nil {
			return ctx.Err()
		}
		exceededDeadline = true
	case nil:
	default:
443
		return err
444 445
	}

Jeromy's avatar
Jeromy committed
446
	wg := sync.WaitGroup{}
447
	for p := range peers {
Jeromy's avatar
Jeromy committed
448 449 450
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
451
			logger.Debugf("putProvider(%s, %s)", LoggableProviderRecordBytes(keyMH), p)
452
			err := dht.protoMessenger.PutProvider(ctx, p, keyMH, dht.host)
Jeromy's avatar
Jeromy committed
453
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
454
				logger.Debug(err)
Jeromy's avatar
Jeromy committed
455 456
			}
		}(p)
457
	}
Jeromy's avatar
Jeromy committed
458
	wg.Wait()
459 460 461 462
	if exceededDeadline {
		return context.DeadlineExceeded
	}
	return ctx.Err()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
463 464
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
465
// FindProviders searches until the context expires.
466
func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
467 468
	if !dht.enableProviders {
		return nil, routing.ErrNotSupported
Steven Allen's avatar
Steven Allen committed
469 470
	} else if !c.Defined() {
		return nil, fmt.Errorf("invalid cid: undefined")
471
	}
Steven Allen's avatar
Steven Allen committed
472

473
	var providers []peer.AddrInfo
474
	for p := range dht.FindProvidersAsync(ctx, c, dht.bucketSize) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
475 476 477 478 479
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
480 481
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
482 483 484
// the search query completes. If count is zero then the query will run until it
// completes. Note: not reading from the returned channel may block the query
// from progressing.
485
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
Steven Allen's avatar
Steven Allen committed
486
	if !dht.enableProviders || !key.Defined() {
487
		peerOut := make(chan peer.AddrInfo)
488 489
		close(peerOut)
		return peerOut
490 491
	}

492 493 494 495 496 497
	chSize := count
	if count == 0 {
		chSize = 1
	}
	peerOut := make(chan peer.AddrInfo, chSize)

Adin Schmahmann's avatar
Adin Schmahmann committed
498
	keyMH := key.Hash()
499

500
	logger.Debugw("finding providers", "cid", key, "mh", LoggableProviderRecordBytes(keyMH))
Adin Schmahmann's avatar
Adin Schmahmann committed
501
	go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut)
Jeromy's avatar
Jeromy committed
502 503 504
	return peerOut
}

Adin Schmahmann's avatar
Adin Schmahmann committed
505
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
Jeromy's avatar
Jeromy committed
506 507
	defer close(peerOut)

508 509 510 511 512 513 514 515
	findAll := count == 0
	var ps *peer.Set
	if findAll {
		ps = peer.NewSet()
	} else {
		ps = peer.NewLimitedSet(count)
	}

516
	provs := dht.ProviderManager.GetProviders(ctx, key)
Jeromy's avatar
Jeromy committed
517
	for _, p := range provs {
518
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
519
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
520
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
521
			select {
Jeromy's avatar
Jeromy committed
522
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
523 524 525
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
526
		}
Jeromy's avatar
Jeromy committed
527

528
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
529
		// TODO: is this a DOS vector?
530
		if !findAll && ps.Size() >= count {
Jeromy's avatar
Jeromy committed
531 532 533 534
			return
		}
	}

Aarsh Shah's avatar
Aarsh Shah committed
535
	lookupRes, err := dht.runLookupWithFollowup(ctx, string(key),
Adin Schmahmann's avatar
Adin Schmahmann committed
536 537 538 539 540 541
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
Jeromy's avatar
Jeromy committed
542

543
			provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key)
Adin Schmahmann's avatar
Adin Schmahmann committed
544 545
			if err != nil {
				return nil, err
546
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
547

548
			logger.Debugf("%d provider entries", len(provs))
Adin Schmahmann's avatar
Adin Schmahmann committed
549 550 551

			// Add unique providers from request, up to 'count'
			for _, prov := range provs {
552
				dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL)
Adin Schmahmann's avatar
Adin Schmahmann committed
553 554 555 556 557 558 559 560 561 562
				logger.Debugf("got provider: %s", prov)
				if ps.TryAdd(prov.ID) {
					logger.Debugf("using provider: %s", prov)
					select {
					case peerOut <- *prov:
					case <-ctx.Done():
						logger.Debug("context timed out sending more providers")
						return nil, ctx.Err()
					}
				}
563
				if !findAll && ps.Size() >= count {
Adin Schmahmann's avatar
Adin Schmahmann committed
564 565
					logger.Debugf("got enough providers (%d/%d)", ps.Size(), count)
					return nil, nil
Jeromy's avatar
Jeromy committed
566
				}
567
			}
568

Adin Schmahmann's avatar
Adin Schmahmann committed
569
			// Give closer peers back to the query to be queried
570
			logger.Debugf("got closer peers: %d %s", len(closest), closest)
Adin Schmahmann's avatar
Adin Schmahmann committed
571 572 573 574

			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
575
				Responses: closest,
Adin Schmahmann's avatar
Adin Schmahmann committed
576
			})
577

578
			return closest, nil
Adin Schmahmann's avatar
Adin Schmahmann committed
579
		},
Adin Schmahmann's avatar
Adin Schmahmann committed
580
		func() bool {
581
			return !findAll && ps.Size() >= count
Adin Schmahmann's avatar
Adin Schmahmann committed
582 583
		},
	)
Jeromy's avatar
Jeromy committed
584

585
	if err == nil && ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
586
		dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes)
Jeromy's avatar
Jeromy committed
587
	}
588 589
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
590
// FindPeer searches for a peer with given ID.
591
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) {
Steven Allen's avatar
Steven Allen committed
592 593 594 595
	if err := id.Validate(); err != nil {
		return peer.AddrInfo{}, err
	}

Steven Allen's avatar
Steven Allen committed
596
	logger.Debugw("finding peer", "peer", id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
597

598
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
599
	if pi := dht.FindLocal(id); pi.ID != "" {
600
		return pi, nil
601 602
	}

Aarsh Shah's avatar
Aarsh Shah committed
603
	lookupRes, err := dht.runLookupWithFollowup(ctx, string(id),
Adin Schmahmann's avatar
Adin Schmahmann committed
604 605 606 607 608 609
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
610

611
			peers, err := dht.protoMessenger.GetClosestPeers(ctx, p, id)
Adin Schmahmann's avatar
Adin Schmahmann committed
612 613 614
			if err != nil {
				logger.Debugf("error getting closer peers: %s", err)
				return nil, err
615 616
			}

Adin Schmahmann's avatar
Adin Schmahmann committed
617 618 619 620 621 622
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
				Responses: peers,
			})
623

Adin Schmahmann's avatar
Adin Schmahmann committed
624 625
			return peers, err
		},
Adin Schmahmann's avatar
Adin Schmahmann committed
626
		func() bool {
Adin Schmahmann's avatar
Adin Schmahmann committed
627 628 629 630
			return dht.host.Network().Connectedness(id) == network.Connected
		},
	)

631 632 633 634
	if err != nil {
		return peer.AddrInfo{}, err
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
635 636 637 638 639 640
	dialedPeerDuringQuery := false
	for i, p := range lookupRes.peers {
		if p == id {
			// Note: we consider PeerUnreachable to be a valid state because the peer may not support the DHT protocol
			// and therefore the peer would fail the query. The fact that a peer that is returned can be a non-DHT
			// server peer and is not identified as such is a bug.
641
			dialedPeerDuringQuery = (lookupRes.state[i] == qpeerset.PeerQueried || lookupRes.state[i] == qpeerset.PeerUnreachable || lookupRes.state[i] == qpeerset.PeerWaiting)
Adin Schmahmann's avatar
Adin Schmahmann committed
642 643
			break
		}
644 645
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
646 647 648 649
	// Return peer information if we tried to dial the peer during the query or we are (or recently were) connected
	// to the peer.
	connectedness := dht.host.Network().Connectedness(id)
	if dialedPeerDuringQuery || connectedness == network.Connected || connectedness == network.CanConnect {
Adin Schmahmann's avatar
Adin Schmahmann committed
650 651
		return dht.peerstore.PeerInfo(id), nil
	}
652 653

	return peer.AddrInfo{}, routing.ErrNotFound
654
}