routing.go 16.8 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"sync"
Jeromy's avatar
Jeromy committed
8
	"time"
9

Adin Schmahmann's avatar
Adin Schmahmann committed
10
	"github.com/libp2p/go-libp2p-core/network"
11 12 13 14
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/routing"

15
	"github.com/ipfs/go-cid"
16
	u "github.com/ipfs/go-ipfs-util"
17
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Adin Schmahmann's avatar
Adin Schmahmann committed
18
	"github.com/libp2p/go-libp2p-kad-dht/qpeerset"
George Antoniadis's avatar
George Antoniadis committed
19 20
	kb "github.com/libp2p/go-libp2p-kbucket"
	record "github.com/libp2p/go-libp2p-record"
Adin Schmahmann's avatar
Adin Schmahmann committed
21
	"github.com/multiformats/go-multihash"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22 23 24 25 26 27 28
)

// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
29
// This is the top level "Store" operation of the DHT
30
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) {
31 32 33 34
	if !dht.enableValues {
		return routing.ErrNotSupported
	}

Steven Allen's avatar
Steven Allen committed
35
	logger.Debugw("putting value", "key", loggableKeyString(key))
Jeromy's avatar
Jeromy committed
36

37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

60
	rec := record.MakePutRecord(key, value)
61
	rec.TimeReceived = u.FormatRFC3339(time.Now())
Jeromy's avatar
Jeromy committed
62
	err = dht.putLocal(key, rec)
63 64 65 66
	if err != nil {
		return err
	}

67
	pchan, err := dht.GetClosestPeers(ctx, key)
68 69 70
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71

72 73 74 75
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
76 77
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
78
			defer wg.Done()
79 80
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.Value,
Jeromy's avatar
Jeromy committed
81 82 83
				ID:   p,
			})

84
			err := dht.putValueToPeer(ctx, p, rec)
85
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
86
				logger.Debugf("failed putting value to peer: %s", err)
87 88 89 90
			}
		}(p)
	}
	wg.Wait()
91

92
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
93 94
}

Steven Allen's avatar
Steven Allen committed
95 96 97 98 99 100
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
101
// GetValue searches for the value corresponding to given Key.
102
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) {
103 104 105 106
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

107
	// apply defaultQuorum if relevant
108
	var cfg routing.Options
109 110 111 112 113
	if err := cfg.Apply(opts...); err != nil {
		return nil, err
	}
	opts = append(opts, Quorum(getQuorum(&cfg, defaultQuorum)))

114 115 116 117
	responses, err := dht.SearchValue(ctx, key, opts...)
	if err != nil {
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
118 119
	var best []byte

120 121
	for r := range responses {
		best = r
Łukasz Magiera's avatar
Łukasz Magiera committed
122 123
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
124 125 126 127
	if ctx.Err() != nil {
		return best, ctx.Err()
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
128 129 130
	if best == nil {
		return nil, routing.ErrNotFound
	}
Matt Joiner's avatar
Matt Joiner committed
131
	logger.Debugf("GetValue %v %v", key, best)
Łukasz Magiera's avatar
Łukasz Magiera committed
132 133 134
	return best, nil
}

135
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
136 137 138 139
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

140
	var cfg routing.Options
Steven Allen's avatar
Steven Allen committed
141
	if err := cfg.Apply(opts...); err != nil {
142
		return nil, err
Steven Allen's avatar
Steven Allen committed
143 144 145 146
	}

	responsesNeeded := 0
	if !cfg.Offline {
147
		responsesNeeded = getQuorum(&cfg, defaultQuorum)
Steven Allen's avatar
Steven Allen committed
148 149
	}

150
	stopCh := make(chan struct{})
Adin Schmahmann's avatar
Adin Schmahmann committed
151
	valCh, lookupRes := dht.getValues(ctx, key, stopCh)
152 153

	out := make(chan []byte)
Łukasz Magiera's avatar
Łukasz Magiera committed
154 155
	go func() {
		defer close(out)
156 157
		best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded)
		if best == nil || aborted {
158
			return
159
		}
160

161 162
		updatePeers := make([]peer.ID, 0, dht.bucketSize)
		select {
Adin Schmahmann's avatar
Adin Schmahmann committed
163 164
		case l := <-lookupRes:
			if l == nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
165 166
				return
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
167

Adin Schmahmann's avatar
Adin Schmahmann committed
168
			for _, p := range l.peers {
169 170
				if _, ok := peersWithBest[p]; !ok {
					updatePeers = append(updatePeers, p)
Adin Schmahmann's avatar
Adin Schmahmann committed
171 172
				}
			}
173 174 175
		case <-ctx.Done():
			return
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
176

177 178
		dht.updatePeerValues(dht.Context(), key, best, updatePeers)
	}()
Łukasz Magiera's avatar
Łukasz Magiera committed
179

180 181
	return out, nil
}
182

183 184
func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{},
	out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) {
185 186 187 188 189
	numResponses := 0
	return dht.processValues(ctx, key, valCh,
		func(ctx context.Context, v RecvdVal, better bool) bool {
			numResponses++
			if better {
Steven Allen's avatar
Steven Allen committed
190 191 192
				select {
				case out <- v.Val:
				case <-ctx.Done():
193
					return false
Steven Allen's avatar
Steven Allen committed
194
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
195
			}
196

197
			if nvals > 0 && numResponses > nvals {
198
				close(stopCh)
199 200 201 202
				return true
			}
			return false
		})
203 204
}

Steven Allen's avatar
Steven Allen committed
205
// GetValues gets nvals values corresponding to the given key.
206
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
207 208 209
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}
210

211
	queryCtx, cancel := context.WithCancel(ctx)
212
	defer cancel()
213
	valCh, _ := dht.getValues(queryCtx, key, nil)
214 215 216 217

	out := make([]RecvdVal, 0, nvals)
	for val := range valCh {
		out = append(out, val)
218 219 220
		if len(out) == nvals {
			cancel()
		}
221 222
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
223
	return out, ctx.Err()
224 225
}

226
func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal,
227
	newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) {
228 229 230
loop:
	for {
		if aborted {
231
			return
232
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
233

234 235 236 237
		select {
		case v, ok := <-vals:
			if !ok {
				break loop
238
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
239

240 241 242 243 244 245 246 247 248
			// Select best value
			if best != nil {
				if bytes.Equal(best, v.Val) {
					peersWithBest[v.From] = struct{}{}
					aborted = newVal(ctx, v, false)
					continue
				}
				sel, err := dht.Validator.Select(key, [][]byte{best, v.Val})
				if err != nil {
Steven Allen's avatar
Steven Allen committed
249
					logger.Warnw("failed to select best value", "key", key, "error", err)
250 251 252 253 254
					continue
				}
				if sel != 1 {
					aborted = newVal(ctx, v, false)
					continue
Adin Schmahmann's avatar
Adin Schmahmann committed
255
				}
256
			}
257 258 259 260 261 262 263 264
			peersWithBest = make(map[peer.ID]struct{})
			peersWithBest[v.From] = struct{}{}
			best = v.Val
			aborted = newVal(ctx, v, true)
		case <-ctx.Done():
			return
		}
	}
265

266 267
	return
}
Adin Schmahmann's avatar
Adin Schmahmann committed
268

269 270 271 272 273 274 275 276 277
func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte, peers []peer.ID) {
	fixupRec := record.MakePutRecord(key, val)
	for _, p := range peers {
		go func(p peer.ID) {
			//TODO: Is this possible?
			if p == dht.self {
				err := dht.putLocal(key, fixupRec)
				if err != nil {
					logger.Error("Error correcting local dht entry:", err)
Adin Schmahmann's avatar
Adin Schmahmann committed
278
				}
279 280 281 282 283 284 285
				return
			}
			ctx, cancel := context.WithTimeout(ctx, time.Second*30)
			defer cancel()
			err := dht.putValueToPeer(ctx, p, fixupRec)
			if err != nil {
				logger.Debug("Error correcting DHT entry: ", err)
286
			}
287 288 289 290
		}(p)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
291
func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
292
	valCh := make(chan RecvdVal, 1)
Adin Schmahmann's avatar
Adin Schmahmann committed
293
	lookupResCh := make(chan *lookupWithFollowupResult, 1)
294

Steven Allen's avatar
Steven Allen committed
295 296
	logger.Debugw("finding value", "key", loggableKeyString(key))

297 298 299 300 301 302 303
	if rec, err := dht.getLocal(key); rec != nil && err == nil {
		select {
		case valCh <- RecvdVal{
			Val:  rec.GetValue(),
			From: dht.self,
		}:
		case <-ctx.Done():
304
		}
305
	}
306

Adin Schmahmann's avatar
Adin Schmahmann committed
307
	go func() {
308
		defer close(valCh)
Adin Schmahmann's avatar
Adin Schmahmann committed
309
		defer close(lookupResCh)
Aarsh Shah's avatar
Aarsh Shah committed
310
		lookupRes, err := dht.runLookupWithFollowup(ctx, key,
Adin Schmahmann's avatar
Adin Schmahmann committed
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
			func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type: routing.SendingQuery,
					ID:   p,
				})

				rec, peers, err := dht.getValueOrPeers(ctx, p, key)
				switch err {
				case routing.ErrNotFound:
					// in this case, they responded with nothing,
					// still send a notification so listeners can know the
					// request has completed 'successfully'
					routing.PublishQueryEvent(ctx, &routing.QueryEvent{
						Type: routing.PeerResponse,
						ID:   p,
					})
					return nil, err
				default:
					return nil, err
				case nil, errInvalidRecord:
					// in either of these cases, we want to keep going
				}
Jeromy's avatar
Jeromy committed
334

Adin Schmahmann's avatar
Adin Schmahmann committed
335 336 337 338 339 340 341
				// TODO: What should happen if the record is invalid?
				// Pre-existing code counted it towards the quorum, but should it?
				if rec != nil && rec.GetValue() != nil {
					rv := RecvdVal{
						Val:  rec.GetValue(),
						From: p,
					}
342

Adin Schmahmann's avatar
Adin Schmahmann committed
343
					select {
344 345 346
					case valCh <- rv:
					case <-ctx.Done():
						return nil, ctx.Err()
Adin Schmahmann's avatar
Adin Schmahmann committed
347 348
					}
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
349

Adin Schmahmann's avatar
Adin Schmahmann committed
350 351 352 353 354 355 356 357 358
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type:      routing.PeerResponse,
					ID:        p,
					Responses: peers,
				})

				return peers, err
			},
Adin Schmahmann's avatar
Adin Schmahmann committed
359
			func() bool {
360 361 362 363 364 365
				select {
				case <-stopQuery:
					return true
				default:
					return false
				}
Adin Schmahmann's avatar
Adin Schmahmann committed
366 367 368
			},
		)

369 370
		if err != nil {
			return
Adin Schmahmann's avatar
Adin Schmahmann committed
371
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
372
		lookupResCh <- lookupRes
Łukasz Magiera's avatar
Łukasz Magiera committed
373

374
		if ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
375
			dht.refreshRTIfNoShortcut(kb.ConvertKey(key), lookupRes)
Łukasz Magiera's avatar
Łukasz Magiera committed
376 377
		}
	}()
378

Adin Schmahmann's avatar
Adin Schmahmann committed
379
	return valCh, lookupResCh
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
380 381
}

Adin Schmahmann's avatar
Adin Schmahmann committed
382 383
func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollowupResult) {
	if lookupRes.completed {
384 385 386 387 388
		// refresh the cpl for this key as the query was successful
		dht.routingTable.ResetCplRefreshedAtForID(key, time.Now())
	}
}

389 390 391
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
392

393
// Provide makes this node announce that it can provide a value for the given key
394
func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
395 396 397
	if !dht.enableProviders {
		return routing.ErrNotSupported
	}
Steven Allen's avatar
Steven Allen committed
398 399
	logger.Debugw("finding provider", "cid", key)

400
	keyMH := key.Hash()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
401 402

	// add self locally
403
	dht.ProviderManager.AddProvider(ctx, keyMH, dht.self)
Jeromy's avatar
Jeromy committed
404 405 406
	if !brdcst {
		return nil
	}
407

408 409 410 411 412 413 414 415 416 417
	closerCtx := ctx
	if deadline, ok := ctx.Deadline(); ok {
		now := time.Now()
		timeout := deadline.Sub(now)

		if timeout < 0 {
			// timed out
			return context.DeadlineExceeded
		} else if timeout < 10*time.Second {
			// Reserve 10% for the final put.
418
			deadline = deadline.Add(-timeout / 10)
419 420 421 422 423 424 425 426 427 428
		} else {
			// Otherwise, reserve a second (we'll already be
			// connected so this should be fast).
			deadline = deadline.Add(-time.Second)
		}
		var cancel context.CancelFunc
		closerCtx, cancel = context.WithDeadline(ctx, deadline)
		defer cancel()
	}

429
	var exceededDeadline bool
430
	peers, err := dht.GetClosestPeers(closerCtx, string(keyMH))
431 432 433 434 435 436 437 438 439 440 441
	switch err {
	case context.DeadlineExceeded:
		// If the _inner_ deadline has been exceeded but the _outer_
		// context is still fine, provide the value to the closest peers
		// we managed to find, even if they're not the _actual_ closest peers.
		if ctx.Err() != nil {
			return ctx.Err()
		}
		exceededDeadline = true
	case nil:
	default:
442
		return err
443 444
	}

445
	mes, err := dht.makeProvRecord(keyMH)
446 447 448 449
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
450
	wg := sync.WaitGroup{}
451
	for p := range peers {
Jeromy's avatar
Jeromy committed
452 453 454
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
455
			logger.Debugf("putProvider(%s, %s)", keyMH, p)
456
			err := dht.sendMessage(ctx, p, mes)
Jeromy's avatar
Jeromy committed
457
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
458
				logger.Debug(err)
Jeromy's avatar
Jeromy committed
459 460
			}
		}(p)
461
	}
Jeromy's avatar
Jeromy committed
462
	wg.Wait()
463 464 465 466
	if exceededDeadline {
		return context.DeadlineExceeded
	}
	return ctx.Err()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
467
}
468
func (dht *IpfsDHT) makeProvRecord(key []byte) (*pb.Message, error) {
469
	pi := peer.AddrInfo{
470 471 472 473 474 475 476 477 478 479
		ID:    dht.self,
		Addrs: dht.host.Addrs(),
	}

	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
		return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
	}

480
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, key, 0)
481
	pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]peer.AddrInfo{pi})
482 483
	return pmes, nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
484

Brian Tiger Chow's avatar
Brian Tiger Chow committed
485
// FindProviders searches until the context expires.
486
func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
487 488 489
	if !dht.enableProviders {
		return nil, routing.ErrNotSupported
	}
490
	var providers []peer.AddrInfo
491
	for p := range dht.FindProvidersAsync(ctx, c, dht.bucketSize) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
492 493 494 495 496
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
497 498
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
499 500 501
// the search query completes. If count is zero then the query will run until it
// completes. Note: not reading from the returned channel may block the query
// from progressing.
502
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
503
	if !dht.enableProviders {
504
		peerOut := make(chan peer.AddrInfo)
505 506
		close(peerOut)
		return peerOut
507 508
	}

509 510 511 512 513 514
	chSize := count
	if count == 0 {
		chSize = 1
	}
	peerOut := make(chan peer.AddrInfo, chSize)

Adin Schmahmann's avatar
Adin Schmahmann committed
515
	keyMH := key.Hash()
516

Adin Schmahmann's avatar
Adin Schmahmann committed
517
	go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut)
Jeromy's avatar
Jeromy committed
518 519 520
	return peerOut
}

Adin Schmahmann's avatar
Adin Schmahmann committed
521
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
Steven Allen's avatar
Steven Allen committed
522 523
	logger.Debugw("finding providers", "key", key)

Jeromy's avatar
Jeromy committed
524 525
	defer close(peerOut)

526 527 528 529 530 531 532 533
	findAll := count == 0
	var ps *peer.Set
	if findAll {
		ps = peer.NewSet()
	} else {
		ps = peer.NewLimitedSet(count)
	}

534
	provs := dht.ProviderManager.GetProviders(ctx, key)
Jeromy's avatar
Jeromy committed
535
	for _, p := range provs {
536
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
537
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
538
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
539
			select {
Jeromy's avatar
Jeromy committed
540
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
541 542 543
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
544
		}
Jeromy's avatar
Jeromy committed
545

546
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
547
		// TODO: is this a DOS vector?
548
		if !findAll && ps.Size() >= count {
Jeromy's avatar
Jeromy committed
549 550 551 552
			return
		}
	}

Aarsh Shah's avatar
Aarsh Shah committed
553
	lookupRes, err := dht.runLookupWithFollowup(ctx, string(key),
Adin Schmahmann's avatar
Adin Schmahmann committed
554 555 556 557 558 559
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
Jeromy's avatar
Jeromy committed
560

Adin Schmahmann's avatar
Adin Schmahmann committed
561 562 563
			pmes, err := dht.findProvidersSingle(ctx, p, key)
			if err != nil {
				return nil, err
564
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584

			logger.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
			provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
			logger.Debugf("%d provider entries decoded", len(provs))

			// Add unique providers from request, up to 'count'
			for _, prov := range provs {
				if prov.ID != dht.self {
					dht.peerstore.AddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL)
				}
				logger.Debugf("got provider: %s", prov)
				if ps.TryAdd(prov.ID) {
					logger.Debugf("using provider: %s", prov)
					select {
					case peerOut <- *prov:
					case <-ctx.Done():
						logger.Debug("context timed out sending more providers")
						return nil, ctx.Err()
					}
				}
585
				if !findAll && ps.Size() >= count {
Adin Schmahmann's avatar
Adin Schmahmann committed
586 587
					logger.Debugf("got enough providers (%d/%d)", ps.Size(), count)
					return nil, nil
Jeromy's avatar
Jeromy committed
588
				}
589
			}
590

Adin Schmahmann's avatar
Adin Schmahmann committed
591 592 593 594 595 596 597 598 599 600
			// Give closer peers back to the query to be queried
			closer := pmes.GetCloserPeers()
			peers := pb.PBPeersToPeerInfos(closer)
			logger.Debugf("got closer peers: %d %s", len(peers), peers)

			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
				Responses: peers,
			})
601

Adin Schmahmann's avatar
Adin Schmahmann committed
602 603
			return peers, nil
		},
Adin Schmahmann's avatar
Adin Schmahmann committed
604
		func() bool {
605
			return !findAll && ps.Size() >= count
Adin Schmahmann's avatar
Adin Schmahmann committed
606 607
		},
	)
Jeromy's avatar
Jeromy committed
608

609
	if err == nil && ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
610
		dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes)
Jeromy's avatar
Jeromy committed
611
	}
612 613
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
614
// FindPeer searches for a peer with given ID.
615
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) {
Steven Allen's avatar
Steven Allen committed
616
	logger.Debugw("finding peer", "peer", id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
617

618
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
619
	if pi := dht.FindLocal(id); pi.ID != "" {
620
		return pi, nil
621 622
	}

Aarsh Shah's avatar
Aarsh Shah committed
623
	lookupRes, err := dht.runLookupWithFollowup(ctx, string(id),
Adin Schmahmann's avatar
Adin Schmahmann committed
624 625 626 627 628 629
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
630

Adin Schmahmann's avatar
Adin Schmahmann committed
631 632 633 634
			pmes, err := dht.findPeerSingle(ctx, p, id)
			if err != nil {
				logger.Debugf("error getting closer peers: %s", err)
				return nil, err
635
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
636
			peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
637

Adin Schmahmann's avatar
Adin Schmahmann committed
638 639 640 641 642 643
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
				Responses: peers,
			})
644

Adin Schmahmann's avatar
Adin Schmahmann committed
645 646
			return peers, err
		},
Adin Schmahmann's avatar
Adin Schmahmann committed
647
		func() bool {
Adin Schmahmann's avatar
Adin Schmahmann committed
648 649 650 651
			return dht.host.Network().Connectedness(id) == network.Connected
		},
	)

652 653 654 655
	if err != nil {
		return peer.AddrInfo{}, err
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
656 657 658 659 660 661 662 663 664
	dialedPeerDuringQuery := false
	for i, p := range lookupRes.peers {
		if p == id {
			// Note: we consider PeerUnreachable to be a valid state because the peer may not support the DHT protocol
			// and therefore the peer would fail the query. The fact that a peer that is returned can be a non-DHT
			// server peer and is not identified as such is a bug.
			dialedPeerDuringQuery = lookupRes.state[i] != qpeerset.PeerHeard
			break
		}
665 666
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
667 668 669 670
	// Return peer information if we tried to dial the peer during the query or we are (or recently were) connected
	// to the peer.
	connectedness := dht.host.Network().Connectedness(id)
	if dialedPeerDuringQuery || connectedness == network.Connected || connectedness == network.CanConnect {
Adin Schmahmann's avatar
Adin Schmahmann committed
671 672
		return dht.peerstore.PeerInfo(id), nil
	}
673 674

	return peer.AddrInfo{}, routing.ErrNotFound
675
}