routing.go 16.7 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"sync"
Jeromy's avatar
Jeromy committed
8
	"time"
9

Adin Schmahmann's avatar
Adin Schmahmann committed
10
	"github.com/libp2p/go-libp2p-core/network"
11 12 13 14
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/routing"

15
	"github.com/ipfs/go-cid"
16
	u "github.com/ipfs/go-ipfs-util"
17
	"github.com/libp2p/go-libp2p-kad-dht/internal"
Adin Schmahmann's avatar
Adin Schmahmann committed
18
	"github.com/libp2p/go-libp2p-kad-dht/qpeerset"
George Antoniadis's avatar
George Antoniadis committed
19 20
	kb "github.com/libp2p/go-libp2p-kbucket"
	record "github.com/libp2p/go-libp2p-record"
Adin Schmahmann's avatar
Adin Schmahmann committed
21
	"github.com/multiformats/go-multihash"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22 23 24 25 26 27 28
)

// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
29
// This is the top level "Store" operation of the DHT
30
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) {
31 32 33 34
	if !dht.enableValues {
		return routing.ErrNotSupported
	}

35
	logger.Debugw("putting value", "key", internal.LoggableRecordKeyString(key))
Jeromy's avatar
Jeromy committed
36

37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

60
	rec := record.MakePutRecord(key, value)
61
	rec.TimeReceived = u.FormatRFC3339(time.Now())
Jeromy's avatar
Jeromy committed
62
	err = dht.putLocal(key, rec)
63 64 65 66
	if err != nil {
		return err
	}

67
	pchan, err := dht.GetClosestPeers(ctx, key)
68 69 70
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71

72 73 74 75
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
76 77
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
78
			defer wg.Done()
79 80
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.Value,
Jeromy's avatar
Jeromy committed
81 82 83
				ID:   p,
			})

84
			err := dht.protoMessenger.PutValue(ctx, p, rec)
85
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
86
				logger.Debugf("failed putting value to peer: %s", err)
87 88 89 90
			}
		}(p)
	}
	wg.Wait()
91

92
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
93 94
}

Steven Allen's avatar
Steven Allen committed
95 96 97 98 99 100
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
101
// GetValue searches for the value corresponding to given Key.
102
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) {
103 104 105 106
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

107
	// apply defaultQuorum if relevant
108
	var cfg routing.Options
109 110 111 112 113
	if err := cfg.Apply(opts...); err != nil {
		return nil, err
	}
	opts = append(opts, Quorum(getQuorum(&cfg, defaultQuorum)))

114 115 116 117
	responses, err := dht.SearchValue(ctx, key, opts...)
	if err != nil {
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
118 119
	var best []byte

120 121
	for r := range responses {
		best = r
Łukasz Magiera's avatar
Łukasz Magiera committed
122 123
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
124 125 126 127
	if ctx.Err() != nil {
		return best, ctx.Err()
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
128 129 130
	if best == nil {
		return nil, routing.ErrNotFound
	}
131
	logger.Debugf("GetValue %v %x", internal.LoggableRecordKeyString(key), best)
Łukasz Magiera's avatar
Łukasz Magiera committed
132 133 134
	return best, nil
}

Alan Shaw's avatar
Alan Shaw committed
135
// SearchValue searches for the value corresponding to given Key and streams the results.
136
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
137 138 139 140
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

141
	var cfg routing.Options
Steven Allen's avatar
Steven Allen committed
142
	if err := cfg.Apply(opts...); err != nil {
143
		return nil, err
Steven Allen's avatar
Steven Allen committed
144 145 146 147
	}

	responsesNeeded := 0
	if !cfg.Offline {
148
		responsesNeeded = getQuorum(&cfg, defaultQuorum)
Steven Allen's avatar
Steven Allen committed
149 150
	}

151
	stopCh := make(chan struct{})
Adin Schmahmann's avatar
Adin Schmahmann committed
152
	valCh, lookupRes := dht.getValues(ctx, key, stopCh)
153 154

	out := make(chan []byte)
Łukasz Magiera's avatar
Łukasz Magiera committed
155 156
	go func() {
		defer close(out)
157 158
		best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded)
		if best == nil || aborted {
159
			return
160
		}
161

162 163
		updatePeers := make([]peer.ID, 0, dht.bucketSize)
		select {
Adin Schmahmann's avatar
Adin Schmahmann committed
164 165
		case l := <-lookupRes:
			if l == nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
166 167
				return
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
168

Adin Schmahmann's avatar
Adin Schmahmann committed
169
			for _, p := range l.peers {
170 171
				if _, ok := peersWithBest[p]; !ok {
					updatePeers = append(updatePeers, p)
Adin Schmahmann's avatar
Adin Schmahmann committed
172 173
				}
			}
174 175 176
		case <-ctx.Done():
			return
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
177

178 179
		dht.updatePeerValues(dht.Context(), key, best, updatePeers)
	}()
Łukasz Magiera's avatar
Łukasz Magiera committed
180

181 182
	return out, nil
}
183

184 185
func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{},
	out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) {
186 187 188 189 190
	numResponses := 0
	return dht.processValues(ctx, key, valCh,
		func(ctx context.Context, v RecvdVal, better bool) bool {
			numResponses++
			if better {
Steven Allen's avatar
Steven Allen committed
191 192 193
				select {
				case out <- v.Val:
				case <-ctx.Done():
194
					return false
Steven Allen's avatar
Steven Allen committed
195
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
196
			}
197

198
			if nvals > 0 && numResponses > nvals {
199
				close(stopCh)
200 201 202 203
				return true
			}
			return false
		})
204 205
}

Steven Allen's avatar
Steven Allen committed
206
// GetValues gets nvals values corresponding to the given key.
207
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
208 209 210
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}
211

212
	queryCtx, cancel := context.WithCancel(ctx)
213
	defer cancel()
214
	valCh, _ := dht.getValues(queryCtx, key, nil)
215 216 217 218

	out := make([]RecvdVal, 0, nvals)
	for val := range valCh {
		out = append(out, val)
219 220 221
		if len(out) == nvals {
			cancel()
		}
222 223
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
224
	return out, ctx.Err()
225 226
}

227
func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal,
228
	newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) {
229 230 231
loop:
	for {
		if aborted {
232
			return
233
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
234

235 236 237 238
		select {
		case v, ok := <-vals:
			if !ok {
				break loop
239
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
240

241 242 243 244 245 246 247 248 249
			// Select best value
			if best != nil {
				if bytes.Equal(best, v.Val) {
					peersWithBest[v.From] = struct{}{}
					aborted = newVal(ctx, v, false)
					continue
				}
				sel, err := dht.Validator.Select(key, [][]byte{best, v.Val})
				if err != nil {
250
					logger.Warnw("failed to select best value", "key", internal.LoggableRecordKeyString(key), "error", err)
251 252 253 254 255
					continue
				}
				if sel != 1 {
					aborted = newVal(ctx, v, false)
					continue
Adin Schmahmann's avatar
Adin Schmahmann committed
256
				}
257
			}
258 259 260 261 262 263 264 265
			peersWithBest = make(map[peer.ID]struct{})
			peersWithBest[v.From] = struct{}{}
			best = v.Val
			aborted = newVal(ctx, v, true)
		case <-ctx.Done():
			return
		}
	}
266

267 268
	return
}
Adin Schmahmann's avatar
Adin Schmahmann committed
269

270 271 272 273 274 275 276 277 278
func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte, peers []peer.ID) {
	fixupRec := record.MakePutRecord(key, val)
	for _, p := range peers {
		go func(p peer.ID) {
			//TODO: Is this possible?
			if p == dht.self {
				err := dht.putLocal(key, fixupRec)
				if err != nil {
					logger.Error("Error correcting local dht entry:", err)
Adin Schmahmann's avatar
Adin Schmahmann committed
279
				}
280 281 282 283
				return
			}
			ctx, cancel := context.WithTimeout(ctx, time.Second*30)
			defer cancel()
284
			err := dht.protoMessenger.PutValue(ctx, p, fixupRec)
285 286
			if err != nil {
				logger.Debug("Error correcting DHT entry: ", err)
287
			}
288 289 290 291
		}(p)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
292
func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
293
	valCh := make(chan RecvdVal, 1)
Adin Schmahmann's avatar
Adin Schmahmann committed
294
	lookupResCh := make(chan *lookupWithFollowupResult, 1)
295

296
	logger.Debugw("finding value", "key", internal.LoggableRecordKeyString(key))
Steven Allen's avatar
Steven Allen committed
297

298 299 300 301 302 303 304
	if rec, err := dht.getLocal(key); rec != nil && err == nil {
		select {
		case valCh <- RecvdVal{
			Val:  rec.GetValue(),
			From: dht.self,
		}:
		case <-ctx.Done():
305
		}
306
	}
307

Adin Schmahmann's avatar
Adin Schmahmann committed
308
	go func() {
309
		defer close(valCh)
Adin Schmahmann's avatar
Adin Schmahmann committed
310
		defer close(lookupResCh)
Aarsh Shah's avatar
Aarsh Shah committed
311
		lookupRes, err := dht.runLookupWithFollowup(ctx, key,
Adin Schmahmann's avatar
Adin Schmahmann committed
312 313 314 315 316 317 318
			func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type: routing.SendingQuery,
					ID:   p,
				})

319
				rec, peers, err := dht.protoMessenger.GetValue(ctx, p, key)
Adin Schmahmann's avatar
Adin Schmahmann committed
320 321 322 323 324 325 326 327 328 329
				switch err {
				case routing.ErrNotFound:
					// in this case, they responded with nothing,
					// still send a notification so listeners can know the
					// request has completed 'successfully'
					routing.PublishQueryEvent(ctx, &routing.QueryEvent{
						Type: routing.PeerResponse,
						ID:   p,
					})
					return nil, err
330
				case nil, internal.ErrInvalidRecord:
Adin Schmahmann's avatar
Adin Schmahmann committed
331
					// in either of these cases, we want to keep going
Marten Seemann's avatar
Marten Seemann committed
332 333
				default:
					return nil, err
Adin Schmahmann's avatar
Adin Schmahmann committed
334
				}
Jeromy's avatar
Jeromy committed
335

Adin Schmahmann's avatar
Adin Schmahmann committed
336 337 338 339 340 341 342
				// TODO: What should happen if the record is invalid?
				// Pre-existing code counted it towards the quorum, but should it?
				if rec != nil && rec.GetValue() != nil {
					rv := RecvdVal{
						Val:  rec.GetValue(),
						From: p,
					}
343

Adin Schmahmann's avatar
Adin Schmahmann committed
344
					select {
345 346 347
					case valCh <- rv:
					case <-ctx.Done():
						return nil, ctx.Err()
Adin Schmahmann's avatar
Adin Schmahmann committed
348 349
					}
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
350

Adin Schmahmann's avatar
Adin Schmahmann committed
351 352 353 354 355 356 357 358 359
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type:      routing.PeerResponse,
					ID:        p,
					Responses: peers,
				})

				return peers, err
			},
Adin Schmahmann's avatar
Adin Schmahmann committed
360
			func() bool {
361 362 363 364 365 366
				select {
				case <-stopQuery:
					return true
				default:
					return false
				}
Adin Schmahmann's avatar
Adin Schmahmann committed
367 368 369
			},
		)

370 371
		if err != nil {
			return
Adin Schmahmann's avatar
Adin Schmahmann committed
372
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
373
		lookupResCh <- lookupRes
Łukasz Magiera's avatar
Łukasz Magiera committed
374

375
		if ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
376
			dht.refreshRTIfNoShortcut(kb.ConvertKey(key), lookupRes)
Łukasz Magiera's avatar
Łukasz Magiera committed
377 378
		}
	}()
379

Adin Schmahmann's avatar
Adin Schmahmann committed
380
	return valCh, lookupResCh
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
381 382
}

Adin Schmahmann's avatar
Adin Schmahmann committed
383 384
func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollowupResult) {
	if lookupRes.completed {
385 386 387 388 389
		// refresh the cpl for this key as the query was successful
		dht.routingTable.ResetCplRefreshedAtForID(key, time.Now())
	}
}

390 391 392
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
393

394
// Provide makes this node announce that it can provide a value for the given key
395
func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
396 397
	if !dht.enableProviders {
		return routing.ErrNotSupported
Steven Allen's avatar
Steven Allen committed
398 399
	} else if !key.Defined() {
		return fmt.Errorf("invalid cid: undefined")
400
	}
401
	keyMH := key.Hash()
402
	logger.Debugw("providing", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
403 404

	// add self locally
405
	dht.ProviderManager.AddProvider(ctx, keyMH, dht.self)
Jeromy's avatar
Jeromy committed
406 407 408
	if !brdcst {
		return nil
	}
409

410 411 412 413 414 415 416 417 418 419
	closerCtx := ctx
	if deadline, ok := ctx.Deadline(); ok {
		now := time.Now()
		timeout := deadline.Sub(now)

		if timeout < 0 {
			// timed out
			return context.DeadlineExceeded
		} else if timeout < 10*time.Second {
			// Reserve 10% for the final put.
420
			deadline = deadline.Add(-timeout / 10)
421 422 423 424 425 426 427 428 429 430
		} else {
			// Otherwise, reserve a second (we'll already be
			// connected so this should be fast).
			deadline = deadline.Add(-time.Second)
		}
		var cancel context.CancelFunc
		closerCtx, cancel = context.WithDeadline(ctx, deadline)
		defer cancel()
	}

431
	var exceededDeadline bool
432
	peers, err := dht.GetClosestPeers(closerCtx, string(keyMH))
433 434 435 436 437 438 439 440 441 442 443
	switch err {
	case context.DeadlineExceeded:
		// If the _inner_ deadline has been exceeded but the _outer_
		// context is still fine, provide the value to the closest peers
		// we managed to find, even if they're not the _actual_ closest peers.
		if ctx.Err() != nil {
			return ctx.Err()
		}
		exceededDeadline = true
	case nil:
	default:
444
		return err
445 446
	}

Jeromy's avatar
Jeromy committed
447
	wg := sync.WaitGroup{}
448
	for p := range peers {
Jeromy's avatar
Jeromy committed
449 450 451
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
452
			logger.Debugf("putProvider(%s, %s)", internal.LoggableProviderRecordBytes(keyMH), p)
453
			err := dht.protoMessenger.PutProvider(ctx, p, keyMH, dht.host)
Jeromy's avatar
Jeromy committed
454
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
455
				logger.Debug(err)
Jeromy's avatar
Jeromy committed
456 457
			}
		}(p)
458
	}
Jeromy's avatar
Jeromy committed
459
	wg.Wait()
460 461 462 463
	if exceededDeadline {
		return context.DeadlineExceeded
	}
	return ctx.Err()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
464 465
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
466
// FindProviders searches until the context expires.
467
func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
468 469
	if !dht.enableProviders {
		return nil, routing.ErrNotSupported
Steven Allen's avatar
Steven Allen committed
470 471
	} else if !c.Defined() {
		return nil, fmt.Errorf("invalid cid: undefined")
472
	}
Steven Allen's avatar
Steven Allen committed
473

474
	var providers []peer.AddrInfo
475
	for p := range dht.FindProvidersAsync(ctx, c, dht.bucketSize) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
476 477 478 479 480
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
481 482
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
483 484 485
// the search query completes. If count is zero then the query will run until it
// completes. Note: not reading from the returned channel may block the query
// from progressing.
486
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
Steven Allen's avatar
Steven Allen committed
487
	if !dht.enableProviders || !key.Defined() {
488
		peerOut := make(chan peer.AddrInfo)
489 490
		close(peerOut)
		return peerOut
491 492
	}

493 494 495 496 497 498
	chSize := count
	if count == 0 {
		chSize = 1
	}
	peerOut := make(chan peer.AddrInfo, chSize)

Adin Schmahmann's avatar
Adin Schmahmann committed
499
	keyMH := key.Hash()
500

501
	logger.Debugw("finding providers", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))
Adin Schmahmann's avatar
Adin Schmahmann committed
502
	go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut)
Jeromy's avatar
Jeromy committed
503 504 505
	return peerOut
}

Adin Schmahmann's avatar
Adin Schmahmann committed
506
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
Jeromy's avatar
Jeromy committed
507 508
	defer close(peerOut)

509 510 511 512 513 514 515 516
	findAll := count == 0
	var ps *peer.Set
	if findAll {
		ps = peer.NewSet()
	} else {
		ps = peer.NewLimitedSet(count)
	}

517
	provs := dht.ProviderManager.GetProviders(ctx, key)
Jeromy's avatar
Jeromy committed
518
	for _, p := range provs {
519
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
520
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
521
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
522
			select {
Jeromy's avatar
Jeromy committed
523
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
524 525 526
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
527
		}
Jeromy's avatar
Jeromy committed
528

529
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
530
		// TODO: is this a DOS vector?
531
		if !findAll && ps.Size() >= count {
Jeromy's avatar
Jeromy committed
532 533 534 535
			return
		}
	}

Aarsh Shah's avatar
Aarsh Shah committed
536
	lookupRes, err := dht.runLookupWithFollowup(ctx, string(key),
Adin Schmahmann's avatar
Adin Schmahmann committed
537 538 539 540 541 542
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
Jeromy's avatar
Jeromy committed
543

544
			provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key)
Adin Schmahmann's avatar
Adin Schmahmann committed
545 546
			if err != nil {
				return nil, err
547
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
548

549
			logger.Debugf("%d provider entries", len(provs))
Adin Schmahmann's avatar
Adin Schmahmann committed
550 551 552

			// Add unique providers from request, up to 'count'
			for _, prov := range provs {
553
				dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL)
Adin Schmahmann's avatar
Adin Schmahmann committed
554 555 556 557 558 559 560 561 562 563
				logger.Debugf("got provider: %s", prov)
				if ps.TryAdd(prov.ID) {
					logger.Debugf("using provider: %s", prov)
					select {
					case peerOut <- *prov:
					case <-ctx.Done():
						logger.Debug("context timed out sending more providers")
						return nil, ctx.Err()
					}
				}
564
				if !findAll && ps.Size() >= count {
Adin Schmahmann's avatar
Adin Schmahmann committed
565 566
					logger.Debugf("got enough providers (%d/%d)", ps.Size(), count)
					return nil, nil
Jeromy's avatar
Jeromy committed
567
				}
568
			}
569

Adin Schmahmann's avatar
Adin Schmahmann committed
570
			// Give closer peers back to the query to be queried
571
			logger.Debugf("got closer peers: %d %s", len(closest), closest)
Adin Schmahmann's avatar
Adin Schmahmann committed
572 573 574 575

			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
576
				Responses: closest,
Adin Schmahmann's avatar
Adin Schmahmann committed
577
			})
578

579
			return closest, nil
Adin Schmahmann's avatar
Adin Schmahmann committed
580
		},
Adin Schmahmann's avatar
Adin Schmahmann committed
581
		func() bool {
582
			return !findAll && ps.Size() >= count
Adin Schmahmann's avatar
Adin Schmahmann committed
583 584
		},
	)
Jeromy's avatar
Jeromy committed
585

586
	if err == nil && ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
587
		dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes)
Jeromy's avatar
Jeromy committed
588
	}
589 590
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
591
// FindPeer searches for a peer with given ID.
592
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) {
Steven Allen's avatar
Steven Allen committed
593 594 595 596
	if err := id.Validate(); err != nil {
		return peer.AddrInfo{}, err
	}

Steven Allen's avatar
Steven Allen committed
597
	logger.Debugw("finding peer", "peer", id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
598

599
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
600
	if pi := dht.FindLocal(id); pi.ID != "" {
601
		return pi, nil
602 603
	}

Aarsh Shah's avatar
Aarsh Shah committed
604
	lookupRes, err := dht.runLookupWithFollowup(ctx, string(id),
Adin Schmahmann's avatar
Adin Schmahmann committed
605 606 607 608 609 610
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
611

612
			peers, err := dht.protoMessenger.GetClosestPeers(ctx, p, id)
Adin Schmahmann's avatar
Adin Schmahmann committed
613 614 615
			if err != nil {
				logger.Debugf("error getting closer peers: %s", err)
				return nil, err
616 617
			}

Adin Schmahmann's avatar
Adin Schmahmann committed
618 619 620 621 622 623
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
				Responses: peers,
			})
624

Adin Schmahmann's avatar
Adin Schmahmann committed
625 626
			return peers, err
		},
Adin Schmahmann's avatar
Adin Schmahmann committed
627
		func() bool {
Adin Schmahmann's avatar
Adin Schmahmann committed
628 629 630 631
			return dht.host.Network().Connectedness(id) == network.Connected
		},
	)

632 633 634 635
	if err != nil {
		return peer.AddrInfo{}, err
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
636 637 638 639 640 641
	dialedPeerDuringQuery := false
	for i, p := range lookupRes.peers {
		if p == id {
			// Note: we consider PeerUnreachable to be a valid state because the peer may not support the DHT protocol
			// and therefore the peer would fail the query. The fact that a peer that is returned can be a non-DHT
			// server peer and is not identified as such is a bug.
642
			dialedPeerDuringQuery = (lookupRes.state[i] == qpeerset.PeerQueried || lookupRes.state[i] == qpeerset.PeerUnreachable || lookupRes.state[i] == qpeerset.PeerWaiting)
Adin Schmahmann's avatar
Adin Schmahmann committed
643 644
			break
		}
645 646
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
647 648 649 650
	// Return peer information if we tried to dial the peer during the query or we are (or recently were) connected
	// to the peer.
	connectedness := dht.host.Network().Connectedness(id)
	if dialedPeerDuringQuery || connectedness == network.Connected || connectedness == network.CanConnect {
Adin Schmahmann's avatar
Adin Schmahmann committed
651 652
		return dht.peerstore.PeerInfo(id), nil
	}
653 654

	return peer.AddrInfo{}, routing.ErrNotFound
655
}