routing.go 17 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"sync"
Jeromy's avatar
Jeromy committed
8
	"time"
9

Adin Schmahmann's avatar
Adin Schmahmann committed
10
	"github.com/libp2p/go-libp2p-core/network"
11 12 13 14
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/routing"

15
	"github.com/ipfs/go-cid"
16
	u "github.com/ipfs/go-ipfs-util"
17
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Adin Schmahmann's avatar
Adin Schmahmann committed
18
	"github.com/libp2p/go-libp2p-kad-dht/qpeerset"
George Antoniadis's avatar
George Antoniadis committed
19 20
	kb "github.com/libp2p/go-libp2p-kbucket"
	record "github.com/libp2p/go-libp2p-record"
Adin Schmahmann's avatar
Adin Schmahmann committed
21
	"github.com/multiformats/go-multihash"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22 23 24 25 26 27 28
)

// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
29
// This is the top level "Store" operation of the DHT
30
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) {
31 32 33 34
	if !dht.enableValues {
		return routing.ErrNotSupported
	}

Steven Allen's avatar
Steven Allen committed
35
	logger.Debugw("putting value", "key", loggableKeyString(key))
Jeromy's avatar
Jeromy committed
36

37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

60
	rec := record.MakePutRecord(key, value)
61
	rec.TimeReceived = u.FormatRFC3339(time.Now())
Jeromy's avatar
Jeromy committed
62
	err = dht.putLocal(key, rec)
63 64 65 66
	if err != nil {
		return err
	}

67
	pchan, err := dht.GetClosestPeers(ctx, key)
68 69 70
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71

72 73 74 75
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
76 77
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
78
			defer wg.Done()
79 80
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.Value,
Jeromy's avatar
Jeromy committed
81 82 83
				ID:   p,
			})

84
			err := dht.putValueToPeer(ctx, p, rec)
85
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
86
				logger.Debugf("failed putting value to peer: %s", err)
87 88 89 90
			}
		}(p)
	}
	wg.Wait()
91

92
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
93 94
}

Steven Allen's avatar
Steven Allen committed
95 96 97 98 99 100
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
101
// GetValue searches for the value corresponding to given Key.
102
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) {
103 104 105 106
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

107
	// apply defaultQuorum if relevant
108
	var cfg routing.Options
109 110 111 112 113
	if err := cfg.Apply(opts...); err != nil {
		return nil, err
	}
	opts = append(opts, Quorum(getQuorum(&cfg, defaultQuorum)))

114 115 116 117
	responses, err := dht.SearchValue(ctx, key, opts...)
	if err != nil {
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
118 119
	var best []byte

120 121
	for r := range responses {
		best = r
Łukasz Magiera's avatar
Łukasz Magiera committed
122 123
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
124 125 126 127
	if ctx.Err() != nil {
		return best, ctx.Err()
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
128 129 130
	if best == nil {
		return nil, routing.ErrNotFound
	}
Matt Joiner's avatar
Matt Joiner committed
131
	logger.Debugf("GetValue %v %v", key, best)
Łukasz Magiera's avatar
Łukasz Magiera committed
132 133 134
	return best, nil
}

135
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
136 137 138 139
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

140
	var cfg routing.Options
Steven Allen's avatar
Steven Allen committed
141
	if err := cfg.Apply(opts...); err != nil {
142
		return nil, err
Steven Allen's avatar
Steven Allen committed
143 144 145 146
	}

	responsesNeeded := 0
	if !cfg.Offline {
147
		responsesNeeded = getQuorum(&cfg, defaultQuorum)
Steven Allen's avatar
Steven Allen committed
148 149
	}

150
	stopCh := make(chan struct{})
Adin Schmahmann's avatar
Adin Schmahmann committed
151
	valCh, lookupRes := dht.getValues(ctx, key, stopCh)
152 153

	out := make(chan []byte)
Łukasz Magiera's avatar
Łukasz Magiera committed
154 155
	go func() {
		defer close(out)
156 157
		best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded)
		if best == nil || aborted {
158
			return
159
		}
160

161 162
		updatePeers := make([]peer.ID, 0, dht.bucketSize)
		select {
Adin Schmahmann's avatar
Adin Schmahmann committed
163 164
		case l := <-lookupRes:
			if l == nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
165 166
				return
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
167

Adin Schmahmann's avatar
Adin Schmahmann committed
168
			for _, p := range l.peers {
169 170
				if _, ok := peersWithBest[p]; !ok {
					updatePeers = append(updatePeers, p)
Adin Schmahmann's avatar
Adin Schmahmann committed
171 172
				}
			}
173 174 175
		case <-ctx.Done():
			return
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
176

177 178
		dht.updatePeerValues(dht.Context(), key, best, updatePeers)
	}()
Łukasz Magiera's avatar
Łukasz Magiera committed
179

180 181
	return out, nil
}
182

183 184
func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{},
	out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) {
185 186 187 188 189
	numResponses := 0
	return dht.processValues(ctx, key, valCh,
		func(ctx context.Context, v RecvdVal, better bool) bool {
			numResponses++
			if better {
Steven Allen's avatar
Steven Allen committed
190 191 192
				select {
				case out <- v.Val:
				case <-ctx.Done():
193
					return false
Steven Allen's avatar
Steven Allen committed
194
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
195
			}
196

197
			if nvals > 0 && numResponses > nvals {
198
				close(stopCh)
199 200 201 202
				return true
			}
			return false
		})
203 204
}

Steven Allen's avatar
Steven Allen committed
205
// GetValues gets nvals values corresponding to the given key.
206
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
207 208 209
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}
210

211
	queryCtx, cancel := context.WithCancel(ctx)
212
	defer cancel()
213
	valCh, _ := dht.getValues(queryCtx, key, nil)
214 215 216 217

	out := make([]RecvdVal, 0, nvals)
	for val := range valCh {
		out = append(out, val)
218 219 220
		if len(out) == nvals {
			cancel()
		}
221 222
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
223
	return out, ctx.Err()
224 225
}

226
func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal,
227
	newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) {
228 229 230
loop:
	for {
		if aborted {
231
			return
232
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
233

234 235 236 237
		select {
		case v, ok := <-vals:
			if !ok {
				break loop
238
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
239

240 241 242 243 244 245 246 247 248
			// Select best value
			if best != nil {
				if bytes.Equal(best, v.Val) {
					peersWithBest[v.From] = struct{}{}
					aborted = newVal(ctx, v, false)
					continue
				}
				sel, err := dht.Validator.Select(key, [][]byte{best, v.Val})
				if err != nil {
Steven Allen's avatar
Steven Allen committed
249
					logger.Warnw("failed to select best value", "key", key, "error", err)
250 251 252 253 254
					continue
				}
				if sel != 1 {
					aborted = newVal(ctx, v, false)
					continue
Adin Schmahmann's avatar
Adin Schmahmann committed
255
				}
256
			}
257 258 259 260 261 262 263 264
			peersWithBest = make(map[peer.ID]struct{})
			peersWithBest[v.From] = struct{}{}
			best = v.Val
			aborted = newVal(ctx, v, true)
		case <-ctx.Done():
			return
		}
	}
265

266 267
	return
}
Adin Schmahmann's avatar
Adin Schmahmann committed
268

269 270 271 272 273 274 275 276 277
func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte, peers []peer.ID) {
	fixupRec := record.MakePutRecord(key, val)
	for _, p := range peers {
		go func(p peer.ID) {
			//TODO: Is this possible?
			if p == dht.self {
				err := dht.putLocal(key, fixupRec)
				if err != nil {
					logger.Error("Error correcting local dht entry:", err)
Adin Schmahmann's avatar
Adin Schmahmann committed
278
				}
279 280 281 282 283 284 285
				return
			}
			ctx, cancel := context.WithTimeout(ctx, time.Second*30)
			defer cancel()
			err := dht.putValueToPeer(ctx, p, fixupRec)
			if err != nil {
				logger.Debug("Error correcting DHT entry: ", err)
286
			}
287 288 289 290
		}(p)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
291
func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
292
	valCh := make(chan RecvdVal, 1)
Adin Schmahmann's avatar
Adin Schmahmann committed
293
	lookupResCh := make(chan *lookupWithFollowupResult, 1)
294

Steven Allen's avatar
Steven Allen committed
295 296
	logger.Debugw("finding value", "key", loggableKeyString(key))

297 298 299 300 301 302 303
	if rec, err := dht.getLocal(key); rec != nil && err == nil {
		select {
		case valCh <- RecvdVal{
			Val:  rec.GetValue(),
			From: dht.self,
		}:
		case <-ctx.Done():
304
		}
305
	}
306

Adin Schmahmann's avatar
Adin Schmahmann committed
307
	go func() {
308
		defer close(valCh)
Adin Schmahmann's avatar
Adin Schmahmann committed
309
		defer close(lookupResCh)
Aarsh Shah's avatar
Aarsh Shah committed
310
		lookupRes, err := dht.runLookupWithFollowup(ctx, key,
Adin Schmahmann's avatar
Adin Schmahmann committed
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
			func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type: routing.SendingQuery,
					ID:   p,
				})

				rec, peers, err := dht.getValueOrPeers(ctx, p, key)
				switch err {
				case routing.ErrNotFound:
					// in this case, they responded with nothing,
					// still send a notification so listeners can know the
					// request has completed 'successfully'
					routing.PublishQueryEvent(ctx, &routing.QueryEvent{
						Type: routing.PeerResponse,
						ID:   p,
					})
					return nil, err
				default:
					return nil, err
				case nil, errInvalidRecord:
					// in either of these cases, we want to keep going
				}
Jeromy's avatar
Jeromy committed
334

Adin Schmahmann's avatar
Adin Schmahmann committed
335 336 337 338 339 340 341
				// TODO: What should happen if the record is invalid?
				// Pre-existing code counted it towards the quorum, but should it?
				if rec != nil && rec.GetValue() != nil {
					rv := RecvdVal{
						Val:  rec.GetValue(),
						From: p,
					}
342

Adin Schmahmann's avatar
Adin Schmahmann committed
343
					select {
344 345 346
					case valCh <- rv:
					case <-ctx.Done():
						return nil, ctx.Err()
Adin Schmahmann's avatar
Adin Schmahmann committed
347 348
					}
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
349

Adin Schmahmann's avatar
Adin Schmahmann committed
350 351 352 353 354 355 356 357 358
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type:      routing.PeerResponse,
					ID:        p,
					Responses: peers,
				})

				return peers, err
			},
Adin Schmahmann's avatar
Adin Schmahmann committed
359
			func() bool {
360 361 362 363 364 365
				select {
				case <-stopQuery:
					return true
				default:
					return false
				}
Adin Schmahmann's avatar
Adin Schmahmann committed
366 367 368
			},
		)

369 370
		if err != nil {
			return
Adin Schmahmann's avatar
Adin Schmahmann committed
371
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
372
		lookupResCh <- lookupRes
Łukasz Magiera's avatar
Łukasz Magiera committed
373

374
		if ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
375
			dht.refreshRTIfNoShortcut(kb.ConvertKey(key), lookupRes)
Łukasz Magiera's avatar
Łukasz Magiera committed
376 377
		}
	}()
378

Adin Schmahmann's avatar
Adin Schmahmann committed
379
	return valCh, lookupResCh
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
380 381
}

Adin Schmahmann's avatar
Adin Schmahmann committed
382 383
func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollowupResult) {
	if lookupRes.completed {
384 385 386 387 388
		// refresh the cpl for this key as the query was successful
		dht.routingTable.ResetCplRefreshedAtForID(key, time.Now())
	}
}

389 390 391
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
392

393
// Provide makes this node announce that it can provide a value for the given key
394
func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
395 396
	if !dht.enableProviders {
		return routing.ErrNotSupported
Steven Allen's avatar
Steven Allen committed
397 398
	} else if !key.Defined() {
		return fmt.Errorf("invalid cid: undefined")
399
	}
Steven Allen's avatar
Steven Allen committed
400 401
	logger.Debugw("finding provider", "cid", key)

402
	keyMH := key.Hash()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
403 404

	// add self locally
405
	dht.ProviderManager.AddProvider(ctx, keyMH, dht.self)
Jeromy's avatar
Jeromy committed
406 407 408
	if !brdcst {
		return nil
	}
409

410 411 412 413 414 415 416 417 418 419
	closerCtx := ctx
	if deadline, ok := ctx.Deadline(); ok {
		now := time.Now()
		timeout := deadline.Sub(now)

		if timeout < 0 {
			// timed out
			return context.DeadlineExceeded
		} else if timeout < 10*time.Second {
			// Reserve 10% for the final put.
420
			deadline = deadline.Add(-timeout / 10)
421 422 423 424 425 426 427 428 429 430
		} else {
			// Otherwise, reserve a second (we'll already be
			// connected so this should be fast).
			deadline = deadline.Add(-time.Second)
		}
		var cancel context.CancelFunc
		closerCtx, cancel = context.WithDeadline(ctx, deadline)
		defer cancel()
	}

431
	var exceededDeadline bool
432
	peers, err := dht.GetClosestPeers(closerCtx, string(keyMH))
433 434 435 436 437 438 439 440 441 442 443
	switch err {
	case context.DeadlineExceeded:
		// If the _inner_ deadline has been exceeded but the _outer_
		// context is still fine, provide the value to the closest peers
		// we managed to find, even if they're not the _actual_ closest peers.
		if ctx.Err() != nil {
			return ctx.Err()
		}
		exceededDeadline = true
	case nil:
	default:
444
		return err
445 446
	}

447
	mes, err := dht.makeProvRecord(keyMH)
448 449 450 451
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
452
	wg := sync.WaitGroup{}
453
	for p := range peers {
Jeromy's avatar
Jeromy committed
454 455 456
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
457
			logger.Debugf("putProvider(%s, %s)", keyMH, p)
458
			err := dht.sendMessage(ctx, p, mes)
Jeromy's avatar
Jeromy committed
459
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
460
				logger.Debug(err)
Jeromy's avatar
Jeromy committed
461 462
			}
		}(p)
463
	}
Jeromy's avatar
Jeromy committed
464
	wg.Wait()
465 466 467 468
	if exceededDeadline {
		return context.DeadlineExceeded
	}
	return ctx.Err()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
469
}
470
func (dht *IpfsDHT) makeProvRecord(key []byte) (*pb.Message, error) {
471
	pi := peer.AddrInfo{
472 473 474 475 476 477 478 479 480 481
		ID:    dht.self,
		Addrs: dht.host.Addrs(),
	}

	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
		return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
	}

482
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, key, 0)
483
	pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]peer.AddrInfo{pi})
484 485
	return pmes, nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
486

Brian Tiger Chow's avatar
Brian Tiger Chow committed
487
// FindProviders searches until the context expires.
488
func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
489 490
	if !dht.enableProviders {
		return nil, routing.ErrNotSupported
Steven Allen's avatar
Steven Allen committed
491 492
	} else if !c.Defined() {
		return nil, fmt.Errorf("invalid cid: undefined")
493
	}
Steven Allen's avatar
Steven Allen committed
494

495
	var providers []peer.AddrInfo
496
	for p := range dht.FindProvidersAsync(ctx, c, dht.bucketSize) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
497 498 499 500 501
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
502 503
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
504 505 506
// the search query completes. If count is zero then the query will run until it
// completes. Note: not reading from the returned channel may block the query
// from progressing.
507
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
Steven Allen's avatar
Steven Allen committed
508
	if !dht.enableProviders || !key.Defined() {
509
		peerOut := make(chan peer.AddrInfo)
510 511
		close(peerOut)
		return peerOut
512 513
	}

514 515 516 517 518 519
	chSize := count
	if count == 0 {
		chSize = 1
	}
	peerOut := make(chan peer.AddrInfo, chSize)

Adin Schmahmann's avatar
Adin Schmahmann committed
520
	keyMH := key.Hash()
521

Adin Schmahmann's avatar
Adin Schmahmann committed
522
	go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut)
Jeromy's avatar
Jeromy committed
523 524 525
	return peerOut
}

Adin Schmahmann's avatar
Adin Schmahmann committed
526
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
Steven Allen's avatar
Steven Allen committed
527 528
	logger.Debugw("finding providers", "key", key)

Jeromy's avatar
Jeromy committed
529 530
	defer close(peerOut)

531 532 533 534 535 536 537 538
	findAll := count == 0
	var ps *peer.Set
	if findAll {
		ps = peer.NewSet()
	} else {
		ps = peer.NewLimitedSet(count)
	}

539
	provs := dht.ProviderManager.GetProviders(ctx, key)
Jeromy's avatar
Jeromy committed
540
	for _, p := range provs {
541
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
542
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
543
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
544
			select {
Jeromy's avatar
Jeromy committed
545
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
546 547 548
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
549
		}
Jeromy's avatar
Jeromy committed
550

551
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
552
		// TODO: is this a DOS vector?
553
		if !findAll && ps.Size() >= count {
Jeromy's avatar
Jeromy committed
554 555 556 557
			return
		}
	}

Aarsh Shah's avatar
Aarsh Shah committed
558
	lookupRes, err := dht.runLookupWithFollowup(ctx, string(key),
Adin Schmahmann's avatar
Adin Schmahmann committed
559 560 561 562 563 564
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
Jeromy's avatar
Jeromy committed
565

Adin Schmahmann's avatar
Adin Schmahmann committed
566 567 568
			pmes, err := dht.findProvidersSingle(ctx, p, key)
			if err != nil {
				return nil, err
569
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589

			logger.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
			provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
			logger.Debugf("%d provider entries decoded", len(provs))

			// Add unique providers from request, up to 'count'
			for _, prov := range provs {
				if prov.ID != dht.self {
					dht.peerstore.AddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL)
				}
				logger.Debugf("got provider: %s", prov)
				if ps.TryAdd(prov.ID) {
					logger.Debugf("using provider: %s", prov)
					select {
					case peerOut <- *prov:
					case <-ctx.Done():
						logger.Debug("context timed out sending more providers")
						return nil, ctx.Err()
					}
				}
590
				if !findAll && ps.Size() >= count {
Adin Schmahmann's avatar
Adin Schmahmann committed
591 592
					logger.Debugf("got enough providers (%d/%d)", ps.Size(), count)
					return nil, nil
Jeromy's avatar
Jeromy committed
593
				}
594
			}
595

Adin Schmahmann's avatar
Adin Schmahmann committed
596 597 598 599 600 601 602 603 604 605
			// Give closer peers back to the query to be queried
			closer := pmes.GetCloserPeers()
			peers := pb.PBPeersToPeerInfos(closer)
			logger.Debugf("got closer peers: %d %s", len(peers), peers)

			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
				Responses: peers,
			})
606

Adin Schmahmann's avatar
Adin Schmahmann committed
607 608
			return peers, nil
		},
Adin Schmahmann's avatar
Adin Schmahmann committed
609
		func() bool {
610
			return !findAll && ps.Size() >= count
Adin Schmahmann's avatar
Adin Schmahmann committed
611 612
		},
	)
Jeromy's avatar
Jeromy committed
613

614
	if err == nil && ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
615
		dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes)
Jeromy's avatar
Jeromy committed
616
	}
617 618
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
619
// FindPeer searches for a peer with given ID.
620
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) {
Steven Allen's avatar
Steven Allen committed
621 622 623 624
	if err := id.Validate(); err != nil {
		return peer.AddrInfo{}, err
	}

Steven Allen's avatar
Steven Allen committed
625
	logger.Debugw("finding peer", "peer", id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
626

627
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
628
	if pi := dht.FindLocal(id); pi.ID != "" {
629
		return pi, nil
630 631
	}

Aarsh Shah's avatar
Aarsh Shah committed
632
	lookupRes, err := dht.runLookupWithFollowup(ctx, string(id),
Adin Schmahmann's avatar
Adin Schmahmann committed
633 634 635 636 637 638
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
639

Adin Schmahmann's avatar
Adin Schmahmann committed
640 641 642 643
			pmes, err := dht.findPeerSingle(ctx, p, id)
			if err != nil {
				logger.Debugf("error getting closer peers: %s", err)
				return nil, err
644
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
645
			peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
646

Adin Schmahmann's avatar
Adin Schmahmann committed
647 648 649 650 651 652
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
				Responses: peers,
			})
653

Adin Schmahmann's avatar
Adin Schmahmann committed
654 655
			return peers, err
		},
Adin Schmahmann's avatar
Adin Schmahmann committed
656
		func() bool {
Adin Schmahmann's avatar
Adin Schmahmann committed
657 658 659 660
			return dht.host.Network().Connectedness(id) == network.Connected
		},
	)

661 662 663 664
	if err != nil {
		return peer.AddrInfo{}, err
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
665 666 667 668 669 670 671 672 673
	dialedPeerDuringQuery := false
	for i, p := range lookupRes.peers {
		if p == id {
			// Note: we consider PeerUnreachable to be a valid state because the peer may not support the DHT protocol
			// and therefore the peer would fail the query. The fact that a peer that is returned can be a non-DHT
			// server peer and is not identified as such is a bug.
			dialedPeerDuringQuery = lookupRes.state[i] != qpeerset.PeerHeard
			break
		}
674 675
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
676 677 678 679
	// Return peer information if we tried to dial the peer during the query or we are (or recently were) connected
	// to the peer.
	connectedness := dht.host.Network().Connectedness(id)
	if dialedPeerDuringQuery || connectedness == network.Connected || connectedness == network.CanConnect {
Adin Schmahmann's avatar
Adin Schmahmann committed
680 681
		return dht.peerstore.PeerInfo(id), nil
	}
682 683

	return peer.AddrInfo{}, routing.ErrNotFound
684
}