routing.go 16.7 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"sync"
Jeromy's avatar
Jeromy committed
8
	"time"
9

Adin Schmahmann's avatar
Adin Schmahmann committed
10
	"github.com/libp2p/go-libp2p-core/network"
11 12 13 14
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/routing"

15
	"github.com/ipfs/go-cid"
16
	u "github.com/ipfs/go-ipfs-util"
17
	"github.com/libp2p/go-libp2p-kad-dht/internal"
18
	internalConfig "github.com/libp2p/go-libp2p-kad-dht/internal/config"
Adin Schmahmann's avatar
Adin Schmahmann committed
19
	"github.com/libp2p/go-libp2p-kad-dht/qpeerset"
George Antoniadis's avatar
George Antoniadis committed
20 21
	kb "github.com/libp2p/go-libp2p-kbucket"
	record "github.com/libp2p/go-libp2p-record"
Adin Schmahmann's avatar
Adin Schmahmann committed
22
	"github.com/multiformats/go-multihash"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23 24 25 26 27 28 29
)

// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
30
// This is the top level "Store" operation of the DHT
31
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) {
32 33 34 35
	if !dht.enableValues {
		return routing.ErrNotSupported
	}

36
	logger.Debugw("putting value", "key", internal.LoggableRecordKeyString(key))
Jeromy's avatar
Jeromy committed
37

38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

61
	rec := record.MakePutRecord(key, value)
62
	rec.TimeReceived = u.FormatRFC3339(time.Now())
Jeromy's avatar
Jeromy committed
63
	err = dht.putLocal(key, rec)
64 65 66 67
	if err != nil {
		return err
	}

68
	peers, err := dht.GetClosestPeers(ctx, key)
69 70 71
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72

73
	wg := sync.WaitGroup{}
74
	for _, p := range peers {
75 76
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
77 78
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
79
			defer wg.Done()
80 81
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.Value,
Jeromy's avatar
Jeromy committed
82 83 84
				ID:   p,
			})

85
			err := dht.protoMessenger.PutValue(ctx, p, rec)
86
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
87
				logger.Debugf("failed putting value to peer: %s", err)
88 89 90 91
			}
		}(p)
	}
	wg.Wait()
92

93
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
94 95
}

Steven Allen's avatar
Steven Allen committed
96 97 98 99 100 101
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
102
// GetValue searches for the value corresponding to given Key.
103
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) {
104 105 106 107
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

108
	// apply defaultQuorum if relevant
109
	var cfg routing.Options
110 111 112
	if err := cfg.Apply(opts...); err != nil {
		return nil, err
	}
113
	opts = append(opts, Quorum(internalConfig.GetQuorum(&cfg)))
114

115 116 117 118
	responses, err := dht.SearchValue(ctx, key, opts...)
	if err != nil {
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
119 120
	var best []byte

121 122
	for r := range responses {
		best = r
Łukasz Magiera's avatar
Łukasz Magiera committed
123 124
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
125 126 127 128
	if ctx.Err() != nil {
		return best, ctx.Err()
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
129 130 131
	if best == nil {
		return nil, routing.ErrNotFound
	}
132
	logger.Debugf("GetValue %v %x", internal.LoggableRecordKeyString(key), best)
Łukasz Magiera's avatar
Łukasz Magiera committed
133 134 135
	return best, nil
}

Alan Shaw's avatar
Alan Shaw committed
136
// SearchValue searches for the value corresponding to given Key and streams the results.
137
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
138 139 140 141
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

142
	var cfg routing.Options
Steven Allen's avatar
Steven Allen committed
143
	if err := cfg.Apply(opts...); err != nil {
144
		return nil, err
Steven Allen's avatar
Steven Allen committed
145 146 147 148
	}

	responsesNeeded := 0
	if !cfg.Offline {
149
		responsesNeeded = internalConfig.GetQuorum(&cfg)
Steven Allen's avatar
Steven Allen committed
150 151
	}

152
	stopCh := make(chan struct{})
Adin Schmahmann's avatar
Adin Schmahmann committed
153
	valCh, lookupRes := dht.getValues(ctx, key, stopCh)
154 155

	out := make(chan []byte)
Łukasz Magiera's avatar
Łukasz Magiera committed
156 157
	go func() {
		defer close(out)
158 159
		best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded)
		if best == nil || aborted {
160
			return
161
		}
162

163 164
		updatePeers := make([]peer.ID, 0, dht.bucketSize)
		select {
Adin Schmahmann's avatar
Adin Schmahmann committed
165 166
		case l := <-lookupRes:
			if l == nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
167 168
				return
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
169

Adin Schmahmann's avatar
Adin Schmahmann committed
170
			for _, p := range l.peers {
171 172
				if _, ok := peersWithBest[p]; !ok {
					updatePeers = append(updatePeers, p)
Adin Schmahmann's avatar
Adin Schmahmann committed
173 174
				}
			}
175 176 177
		case <-ctx.Done():
			return
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
178

179 180
		dht.updatePeerValues(dht.Context(), key, best, updatePeers)
	}()
Łukasz Magiera's avatar
Łukasz Magiera committed
181

182 183
	return out, nil
}
184

185 186
func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{},
	out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) {
187 188 189 190 191
	numResponses := 0
	return dht.processValues(ctx, key, valCh,
		func(ctx context.Context, v RecvdVal, better bool) bool {
			numResponses++
			if better {
Steven Allen's avatar
Steven Allen committed
192 193 194
				select {
				case out <- v.Val:
				case <-ctx.Done():
195
					return false
Steven Allen's avatar
Steven Allen committed
196
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
197
			}
198

199
			if nvals > 0 && numResponses > nvals {
200
				close(stopCh)
201 202 203 204
				return true
			}
			return false
		})
205 206
}

Steven Allen's avatar
Steven Allen committed
207
// GetValues gets nvals values corresponding to the given key.
208
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
209 210 211
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}
212

213
	queryCtx, cancel := context.WithCancel(ctx)
214
	defer cancel()
215
	valCh, _ := dht.getValues(queryCtx, key, nil)
216 217 218 219

	out := make([]RecvdVal, 0, nvals)
	for val := range valCh {
		out = append(out, val)
220 221 222
		if len(out) == nvals {
			cancel()
		}
223 224
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
225
	return out, ctx.Err()
226 227
}

228
func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal,
229
	newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) {
230 231 232
loop:
	for {
		if aborted {
233
			return
234
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
235

236 237 238 239
		select {
		case v, ok := <-vals:
			if !ok {
				break loop
240
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
241

242 243 244 245 246 247 248 249 250
			// Select best value
			if best != nil {
				if bytes.Equal(best, v.Val) {
					peersWithBest[v.From] = struct{}{}
					aborted = newVal(ctx, v, false)
					continue
				}
				sel, err := dht.Validator.Select(key, [][]byte{best, v.Val})
				if err != nil {
251
					logger.Warnw("failed to select best value", "key", internal.LoggableRecordKeyString(key), "error", err)
252 253 254 255 256
					continue
				}
				if sel != 1 {
					aborted = newVal(ctx, v, false)
					continue
Adin Schmahmann's avatar
Adin Schmahmann committed
257
				}
258
			}
259 260 261 262 263 264 265 266
			peersWithBest = make(map[peer.ID]struct{})
			peersWithBest[v.From] = struct{}{}
			best = v.Val
			aborted = newVal(ctx, v, true)
		case <-ctx.Done():
			return
		}
	}
267

268 269
	return
}
Adin Schmahmann's avatar
Adin Schmahmann committed
270

271 272 273 274 275 276 277 278 279
func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte, peers []peer.ID) {
	fixupRec := record.MakePutRecord(key, val)
	for _, p := range peers {
		go func(p peer.ID) {
			//TODO: Is this possible?
			if p == dht.self {
				err := dht.putLocal(key, fixupRec)
				if err != nil {
					logger.Error("Error correcting local dht entry:", err)
Adin Schmahmann's avatar
Adin Schmahmann committed
280
				}
281 282 283 284
				return
			}
			ctx, cancel := context.WithTimeout(ctx, time.Second*30)
			defer cancel()
285
			err := dht.protoMessenger.PutValue(ctx, p, fixupRec)
286 287
			if err != nil {
				logger.Debug("Error correcting DHT entry: ", err)
288
			}
289 290 291 292
		}(p)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
293
func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
294
	valCh := make(chan RecvdVal, 1)
Adin Schmahmann's avatar
Adin Schmahmann committed
295
	lookupResCh := make(chan *lookupWithFollowupResult, 1)
296

297
	logger.Debugw("finding value", "key", internal.LoggableRecordKeyString(key))
Steven Allen's avatar
Steven Allen committed
298

299 300 301 302 303 304 305
	if rec, err := dht.getLocal(key); rec != nil && err == nil {
		select {
		case valCh <- RecvdVal{
			Val:  rec.GetValue(),
			From: dht.self,
		}:
		case <-ctx.Done():
306
		}
307
	}
308

Adin Schmahmann's avatar
Adin Schmahmann committed
309
	go func() {
310
		defer close(valCh)
Adin Schmahmann's avatar
Adin Schmahmann committed
311
		defer close(lookupResCh)
Aarsh Shah's avatar
Aarsh Shah committed
312
		lookupRes, err := dht.runLookupWithFollowup(ctx, key,
Adin Schmahmann's avatar
Adin Schmahmann committed
313 314 315 316 317 318 319
			func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type: routing.SendingQuery,
					ID:   p,
				})

320
				rec, peers, err := dht.protoMessenger.GetValue(ctx, p, key)
Adin Schmahmann's avatar
Adin Schmahmann committed
321 322 323 324 325 326 327 328 329 330
				switch err {
				case routing.ErrNotFound:
					// in this case, they responded with nothing,
					// still send a notification so listeners can know the
					// request has completed 'successfully'
					routing.PublishQueryEvent(ctx, &routing.QueryEvent{
						Type: routing.PeerResponse,
						ID:   p,
					})
					return nil, err
331
				case nil, internal.ErrInvalidRecord:
Adin Schmahmann's avatar
Adin Schmahmann committed
332
					// in either of these cases, we want to keep going
Marten Seemann's avatar
Marten Seemann committed
333 334
				default:
					return nil, err
Adin Schmahmann's avatar
Adin Schmahmann committed
335
				}
Jeromy's avatar
Jeromy committed
336

Adin Schmahmann's avatar
Adin Schmahmann committed
337 338 339 340 341 342 343
				// TODO: What should happen if the record is invalid?
				// Pre-existing code counted it towards the quorum, but should it?
				if rec != nil && rec.GetValue() != nil {
					rv := RecvdVal{
						Val:  rec.GetValue(),
						From: p,
					}
344

Adin Schmahmann's avatar
Adin Schmahmann committed
345
					select {
346 347 348
					case valCh <- rv:
					case <-ctx.Done():
						return nil, ctx.Err()
Adin Schmahmann's avatar
Adin Schmahmann committed
349 350
					}
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
351

Adin Schmahmann's avatar
Adin Schmahmann committed
352 353 354 355 356 357 358 359 360
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type:      routing.PeerResponse,
					ID:        p,
					Responses: peers,
				})

				return peers, err
			},
Adin Schmahmann's avatar
Adin Schmahmann committed
361
			func() bool {
362 363 364 365 366 367
				select {
				case <-stopQuery:
					return true
				default:
					return false
				}
Adin Schmahmann's avatar
Adin Schmahmann committed
368 369 370
			},
		)

371 372
		if err != nil {
			return
Adin Schmahmann's avatar
Adin Schmahmann committed
373
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
374
		lookupResCh <- lookupRes
Łukasz Magiera's avatar
Łukasz Magiera committed
375

376
		if ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
377
			dht.refreshRTIfNoShortcut(kb.ConvertKey(key), lookupRes)
Łukasz Magiera's avatar
Łukasz Magiera committed
378 379
		}
	}()
380

Adin Schmahmann's avatar
Adin Schmahmann committed
381
	return valCh, lookupResCh
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
382 383
}

Adin Schmahmann's avatar
Adin Schmahmann committed
384 385
func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollowupResult) {
	if lookupRes.completed {
386 387 388 389 390
		// refresh the cpl for this key as the query was successful
		dht.routingTable.ResetCplRefreshedAtForID(key, time.Now())
	}
}

391 392 393
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
394

395
// Provide makes this node announce that it can provide a value for the given key
396
func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
397 398
	if !dht.enableProviders {
		return routing.ErrNotSupported
Steven Allen's avatar
Steven Allen committed
399 400
	} else if !key.Defined() {
		return fmt.Errorf("invalid cid: undefined")
401
	}
402
	keyMH := key.Hash()
403
	logger.Debugw("providing", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
404 405

	// add self locally
406
	dht.ProviderManager.AddProvider(ctx, keyMH, dht.self)
Jeromy's avatar
Jeromy committed
407 408 409
	if !brdcst {
		return nil
	}
410

411 412 413 414 415 416 417 418 419 420
	closerCtx := ctx
	if deadline, ok := ctx.Deadline(); ok {
		now := time.Now()
		timeout := deadline.Sub(now)

		if timeout < 0 {
			// timed out
			return context.DeadlineExceeded
		} else if timeout < 10*time.Second {
			// Reserve 10% for the final put.
421
			deadline = deadline.Add(-timeout / 10)
422 423 424 425 426 427 428 429 430 431
		} else {
			// Otherwise, reserve a second (we'll already be
			// connected so this should be fast).
			deadline = deadline.Add(-time.Second)
		}
		var cancel context.CancelFunc
		closerCtx, cancel = context.WithDeadline(ctx, deadline)
		defer cancel()
	}

432
	var exceededDeadline bool
433
	peers, err := dht.GetClosestPeers(closerCtx, string(keyMH))
434 435 436 437 438 439 440 441 442 443 444
	switch err {
	case context.DeadlineExceeded:
		// If the _inner_ deadline has been exceeded but the _outer_
		// context is still fine, provide the value to the closest peers
		// we managed to find, even if they're not the _actual_ closest peers.
		if ctx.Err() != nil {
			return ctx.Err()
		}
		exceededDeadline = true
	case nil:
	default:
445
		return err
446 447
	}

Jeromy's avatar
Jeromy committed
448
	wg := sync.WaitGroup{}
449
	for _, p := range peers {
Jeromy's avatar
Jeromy committed
450 451 452
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
453
			logger.Debugf("putProvider(%s, %s)", internal.LoggableProviderRecordBytes(keyMH), p)
454
			err := dht.protoMessenger.PutProvider(ctx, p, keyMH, dht.host)
Jeromy's avatar
Jeromy committed
455
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
456
				logger.Debug(err)
Jeromy's avatar
Jeromy committed
457 458
			}
		}(p)
459
	}
Jeromy's avatar
Jeromy committed
460
	wg.Wait()
461 462 463 464
	if exceededDeadline {
		return context.DeadlineExceeded
	}
	return ctx.Err()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
465 466
}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
467
// FindProviders searches until the context expires.
468
func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
469 470
	if !dht.enableProviders {
		return nil, routing.ErrNotSupported
Steven Allen's avatar
Steven Allen committed
471 472
	} else if !c.Defined() {
		return nil, fmt.Errorf("invalid cid: undefined")
473
	}
Steven Allen's avatar
Steven Allen committed
474

475
	var providers []peer.AddrInfo
476
	for p := range dht.FindProvidersAsync(ctx, c, dht.bucketSize) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
477 478 479 480 481
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
482 483
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
484 485 486
// the search query completes. If count is zero then the query will run until it
// completes. Note: not reading from the returned channel may block the query
// from progressing.
487
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
Steven Allen's avatar
Steven Allen committed
488
	if !dht.enableProviders || !key.Defined() {
489
		peerOut := make(chan peer.AddrInfo)
490 491
		close(peerOut)
		return peerOut
492 493
	}

494 495 496 497 498 499
	chSize := count
	if count == 0 {
		chSize = 1
	}
	peerOut := make(chan peer.AddrInfo, chSize)

Adin Schmahmann's avatar
Adin Schmahmann committed
500
	keyMH := key.Hash()
501

502
	logger.Debugw("finding providers", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))
Adin Schmahmann's avatar
Adin Schmahmann committed
503
	go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut)
Jeromy's avatar
Jeromy committed
504 505 506
	return peerOut
}

Adin Schmahmann's avatar
Adin Schmahmann committed
507
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
Jeromy's avatar
Jeromy committed
508 509
	defer close(peerOut)

510 511 512 513 514 515 516 517
	findAll := count == 0
	var ps *peer.Set
	if findAll {
		ps = peer.NewSet()
	} else {
		ps = peer.NewLimitedSet(count)
	}

518
	provs := dht.ProviderManager.GetProviders(ctx, key)
Jeromy's avatar
Jeromy committed
519
	for _, p := range provs {
520
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
521
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
522
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
523
			select {
Jeromy's avatar
Jeromy committed
524
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
525 526 527
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
528
		}
Jeromy's avatar
Jeromy committed
529

530
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
531
		// TODO: is this a DOS vector?
532
		if !findAll && ps.Size() >= count {
Jeromy's avatar
Jeromy committed
533 534 535 536
			return
		}
	}

Aarsh Shah's avatar
Aarsh Shah committed
537
	lookupRes, err := dht.runLookupWithFollowup(ctx, string(key),
Adin Schmahmann's avatar
Adin Schmahmann committed
538 539 540 541 542 543
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
Jeromy's avatar
Jeromy committed
544

545
			provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key)
Adin Schmahmann's avatar
Adin Schmahmann committed
546 547
			if err != nil {
				return nil, err
548
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
549

550
			logger.Debugf("%d provider entries", len(provs))
Adin Schmahmann's avatar
Adin Schmahmann committed
551 552 553

			// Add unique providers from request, up to 'count'
			for _, prov := range provs {
554
				dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL)
Adin Schmahmann's avatar
Adin Schmahmann committed
555 556 557 558 559 560 561 562 563 564
				logger.Debugf("got provider: %s", prov)
				if ps.TryAdd(prov.ID) {
					logger.Debugf("using provider: %s", prov)
					select {
					case peerOut <- *prov:
					case <-ctx.Done():
						logger.Debug("context timed out sending more providers")
						return nil, ctx.Err()
					}
				}
565
				if !findAll && ps.Size() >= count {
Adin Schmahmann's avatar
Adin Schmahmann committed
566 567
					logger.Debugf("got enough providers (%d/%d)", ps.Size(), count)
					return nil, nil
Jeromy's avatar
Jeromy committed
568
				}
569
			}
570

Adin Schmahmann's avatar
Adin Schmahmann committed
571
			// Give closer peers back to the query to be queried
572
			logger.Debugf("got closer peers: %d %s", len(closest), closest)
Adin Schmahmann's avatar
Adin Schmahmann committed
573 574 575 576

			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
577
				Responses: closest,
Adin Schmahmann's avatar
Adin Schmahmann committed
578
			})
579

580
			return closest, nil
Adin Schmahmann's avatar
Adin Schmahmann committed
581
		},
Adin Schmahmann's avatar
Adin Schmahmann committed
582
		func() bool {
583
			return !findAll && ps.Size() >= count
Adin Schmahmann's avatar
Adin Schmahmann committed
584 585
		},
	)
Jeromy's avatar
Jeromy committed
586

587
	if err == nil && ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
588
		dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes)
Jeromy's avatar
Jeromy committed
589
	}
590 591
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
592
// FindPeer searches for a peer with given ID.
593
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) {
Steven Allen's avatar
Steven Allen committed
594 595 596 597
	if err := id.Validate(); err != nil {
		return peer.AddrInfo{}, err
	}

Steven Allen's avatar
Steven Allen committed
598
	logger.Debugw("finding peer", "peer", id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
599

600
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
601
	if pi := dht.FindLocal(id); pi.ID != "" {
602
		return pi, nil
603 604
	}

Aarsh Shah's avatar
Aarsh Shah committed
605
	lookupRes, err := dht.runLookupWithFollowup(ctx, string(id),
Adin Schmahmann's avatar
Adin Schmahmann committed
606 607 608 609 610 611
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
612

613
			peers, err := dht.protoMessenger.GetClosestPeers(ctx, p, id)
Adin Schmahmann's avatar
Adin Schmahmann committed
614 615 616
			if err != nil {
				logger.Debugf("error getting closer peers: %s", err)
				return nil, err
617 618
			}

Adin Schmahmann's avatar
Adin Schmahmann committed
619 620 621 622 623 624
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
				Responses: peers,
			})
625

Adin Schmahmann's avatar
Adin Schmahmann committed
626 627
			return peers, err
		},
Adin Schmahmann's avatar
Adin Schmahmann committed
628
		func() bool {
Adin Schmahmann's avatar
Adin Schmahmann committed
629 630 631 632
			return dht.host.Network().Connectedness(id) == network.Connected
		},
	)

633 634 635 636
	if err != nil {
		return peer.AddrInfo{}, err
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
637 638 639 640 641 642
	dialedPeerDuringQuery := false
	for i, p := range lookupRes.peers {
		if p == id {
			// Note: we consider PeerUnreachable to be a valid state because the peer may not support the DHT protocol
			// and therefore the peer would fail the query. The fact that a peer that is returned can be a non-DHT
			// server peer and is not identified as such is a bug.
643
			dialedPeerDuringQuery = (lookupRes.state[i] == qpeerset.PeerQueried || lookupRes.state[i] == qpeerset.PeerUnreachable || lookupRes.state[i] == qpeerset.PeerWaiting)
Adin Schmahmann's avatar
Adin Schmahmann committed
644 645
			break
		}
646 647
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
648 649 650 651
	// Return peer information if we tried to dial the peer during the query or we are (or recently were) connected
	// to the peer.
	connectedness := dht.host.Network().Connectedness(id)
	if dialedPeerDuringQuery || connectedness == network.Connected || connectedness == network.CanConnect {
Adin Schmahmann's avatar
Adin Schmahmann committed
652 653
		return dht.peerstore.PeerInfo(id), nil
	}
654 655

	return peer.AddrInfo{}, routing.ErrNotFound
656
}