routing.go 17.3 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"sync"
Jeromy's avatar
Jeromy committed
8
	"time"
9

Adin Schmahmann's avatar
Adin Schmahmann committed
10
	"github.com/libp2p/go-libp2p-core/network"
11 12 13 14
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/routing"

15
	"github.com/ipfs/go-cid"
16
	u "github.com/ipfs/go-ipfs-util"
17
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Adin Schmahmann's avatar
Adin Schmahmann committed
18
	"github.com/libp2p/go-libp2p-kad-dht/qpeerset"
George Antoniadis's avatar
George Antoniadis committed
19 20
	kb "github.com/libp2p/go-libp2p-kbucket"
	record "github.com/libp2p/go-libp2p-record"
Adin Schmahmann's avatar
Adin Schmahmann committed
21
	"github.com/multiformats/go-multihash"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22 23 24 25 26 27 28
)

// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
29
// This is the top level "Store" operation of the DHT
30
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) {
31 32 33 34
	if !dht.enableValues {
		return routing.ErrNotSupported
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
35
	logger.Debugw("putting value", "key", loggableRecordKeyString(key))
Jeromy's avatar
Jeromy committed
36

37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

60
	rec := record.MakePutRecord(key, value)
61
	rec.TimeReceived = u.FormatRFC3339(time.Now())
Jeromy's avatar
Jeromy committed
62
	err = dht.putLocal(key, rec)
63 64 65 66
	if err != nil {
		return err
	}

67
	pchan, err := dht.GetClosestPeers(ctx, key)
68 69 70
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71

72 73 74 75
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
76 77
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
78
			defer wg.Done()
79 80
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.Value,
Jeromy's avatar
Jeromy committed
81 82 83
				ID:   p,
			})

84
			err := dht.putValueToPeer(ctx, p, rec)
85
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
86
				logger.Debugf("failed putting value to peer: %s", err)
87 88 89 90
			}
		}(p)
	}
	wg.Wait()
91

92
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
93 94
}

Steven Allen's avatar
Steven Allen committed
95 96 97 98 99 100
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
101
// GetValue searches for the value corresponding to given Key.
102
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) {
103 104 105 106
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

107
	// apply defaultQuorum if relevant
108
	var cfg routing.Options
109 110 111 112 113
	if err := cfg.Apply(opts...); err != nil {
		return nil, err
	}
	opts = append(opts, Quorum(getQuorum(&cfg, defaultQuorum)))

114 115 116 117
	responses, err := dht.SearchValue(ctx, key, opts...)
	if err != nil {
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
118 119
	var best []byte

120 121
	for r := range responses {
		best = r
Łukasz Magiera's avatar
Łukasz Magiera committed
122 123
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
124 125 126 127
	if ctx.Err() != nil {
		return best, ctx.Err()
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
128 129 130
	if best == nil {
		return nil, routing.ErrNotFound
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
131
	logger.Debugf("GetValue %v %x", loggableRecordKeyString(key), best)
Łukasz Magiera's avatar
Łukasz Magiera committed
132 133 134
	return best, nil
}

Alan Shaw's avatar
Alan Shaw committed
135
// SearchValue searches for the value corresponding to given Key and streams the results.
136
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
137 138 139 140
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

141
	var cfg routing.Options
Steven Allen's avatar
Steven Allen committed
142
	if err := cfg.Apply(opts...); err != nil {
143
		return nil, err
Steven Allen's avatar
Steven Allen committed
144 145 146 147
	}

	responsesNeeded := 0
	if !cfg.Offline {
148
		responsesNeeded = getQuorum(&cfg, defaultQuorum)
Steven Allen's avatar
Steven Allen committed
149 150
	}

151
	stopCh := make(chan struct{})
Adin Schmahmann's avatar
Adin Schmahmann committed
152
	valCh, lookupRes := dht.getValues(ctx, key, stopCh)
153 154

	out := make(chan []byte)
Łukasz Magiera's avatar
Łukasz Magiera committed
155 156
	go func() {
		defer close(out)
157 158
		best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded)
		if best == nil || aborted {
159
			return
160
		}
161

162 163
		updatePeers := make([]peer.ID, 0, dht.bucketSize)
		select {
Adin Schmahmann's avatar
Adin Schmahmann committed
164 165
		case l := <-lookupRes:
			if l == nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
166 167
				return
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
168

Adin Schmahmann's avatar
Adin Schmahmann committed
169
			for _, p := range l.peers {
170 171
				if _, ok := peersWithBest[p]; !ok {
					updatePeers = append(updatePeers, p)
Adin Schmahmann's avatar
Adin Schmahmann committed
172 173
				}
			}
174 175 176
		case <-ctx.Done():
			return
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
177

178 179
		dht.updatePeerValues(dht.Context(), key, best, updatePeers)
	}()
Łukasz Magiera's avatar
Łukasz Magiera committed
180

181 182
	return out, nil
}
183

184 185
func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{},
	out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) {
186 187 188 189 190
	numResponses := 0
	return dht.processValues(ctx, key, valCh,
		func(ctx context.Context, v RecvdVal, better bool) bool {
			numResponses++
			if better {
Steven Allen's avatar
Steven Allen committed
191 192 193
				select {
				case out <- v.Val:
				case <-ctx.Done():
194
					return false
Steven Allen's avatar
Steven Allen committed
195
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
196
			}
197

198
			if nvals > 0 && numResponses > nvals {
199
				close(stopCh)
200 201 202 203
				return true
			}
			return false
		})
204 205
}

Steven Allen's avatar
Steven Allen committed
206
// GetValues gets nvals values corresponding to the given key.
207
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
208 209 210
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}
211

212
	queryCtx, cancel := context.WithCancel(ctx)
213
	defer cancel()
214
	valCh, _ := dht.getValues(queryCtx, key, nil)
215 216 217 218

	out := make([]RecvdVal, 0, nvals)
	for val := range valCh {
		out = append(out, val)
219 220 221
		if len(out) == nvals {
			cancel()
		}
222 223
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
224
	return out, ctx.Err()
225 226
}

227
func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal,
228
	newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) {
229 230 231
loop:
	for {
		if aborted {
232
			return
233
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
234

235 236 237 238
		select {
		case v, ok := <-vals:
			if !ok {
				break loop
239
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
240

241 242 243 244 245 246 247 248 249
			// Select best value
			if best != nil {
				if bytes.Equal(best, v.Val) {
					peersWithBest[v.From] = struct{}{}
					aborted = newVal(ctx, v, false)
					continue
				}
				sel, err := dht.Validator.Select(key, [][]byte{best, v.Val})
				if err != nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
250
					logger.Warnw("failed to select best value", "key", loggableRecordKeyString(key), "error", err)
251 252 253 254 255
					continue
				}
				if sel != 1 {
					aborted = newVal(ctx, v, false)
					continue
Adin Schmahmann's avatar
Adin Schmahmann committed
256
				}
257
			}
258 259 260 261 262 263 264 265
			peersWithBest = make(map[peer.ID]struct{})
			peersWithBest[v.From] = struct{}{}
			best = v.Val
			aborted = newVal(ctx, v, true)
		case <-ctx.Done():
			return
		}
	}
266

267 268
	return
}
Adin Schmahmann's avatar
Adin Schmahmann committed
269

270 271 272 273 274 275 276 277 278
func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte, peers []peer.ID) {
	fixupRec := record.MakePutRecord(key, val)
	for _, p := range peers {
		go func(p peer.ID) {
			//TODO: Is this possible?
			if p == dht.self {
				err := dht.putLocal(key, fixupRec)
				if err != nil {
					logger.Error("Error correcting local dht entry:", err)
Adin Schmahmann's avatar
Adin Schmahmann committed
279
				}
280 281 282 283 284 285 286
				return
			}
			ctx, cancel := context.WithTimeout(ctx, time.Second*30)
			defer cancel()
			err := dht.putValueToPeer(ctx, p, fixupRec)
			if err != nil {
				logger.Debug("Error correcting DHT entry: ", err)
287
			}
288 289 290 291
		}(p)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
292
func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
293
	valCh := make(chan RecvdVal, 1)
Adin Schmahmann's avatar
Adin Schmahmann committed
294
	lookupResCh := make(chan *lookupWithFollowupResult, 1)
295

Adin Schmahmann's avatar
Adin Schmahmann committed
296
	logger.Debugw("finding value", "key", loggableRecordKeyString(key))
Steven Allen's avatar
Steven Allen committed
297

298 299 300 301 302 303 304
	if rec, err := dht.getLocal(key); rec != nil && err == nil {
		select {
		case valCh <- RecvdVal{
			Val:  rec.GetValue(),
			From: dht.self,
		}:
		case <-ctx.Done():
305
		}
306
	}
307

Adin Schmahmann's avatar
Adin Schmahmann committed
308
	go func() {
309
		defer close(valCh)
Adin Schmahmann's avatar
Adin Schmahmann committed
310
		defer close(lookupResCh)
Aarsh Shah's avatar
Aarsh Shah committed
311
		lookupRes, err := dht.runLookupWithFollowup(ctx, key,
Adin Schmahmann's avatar
Adin Schmahmann committed
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
			func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type: routing.SendingQuery,
					ID:   p,
				})

				rec, peers, err := dht.getValueOrPeers(ctx, p, key)
				switch err {
				case routing.ErrNotFound:
					// in this case, they responded with nothing,
					// still send a notification so listeners can know the
					// request has completed 'successfully'
					routing.PublishQueryEvent(ctx, &routing.QueryEvent{
						Type: routing.PeerResponse,
						ID:   p,
					})
					return nil, err
				default:
					return nil, err
				case nil, errInvalidRecord:
					// in either of these cases, we want to keep going
				}
Jeromy's avatar
Jeromy committed
335

Adin Schmahmann's avatar
Adin Schmahmann committed
336 337 338 339 340 341 342
				// TODO: What should happen if the record is invalid?
				// Pre-existing code counted it towards the quorum, but should it?
				if rec != nil && rec.GetValue() != nil {
					rv := RecvdVal{
						Val:  rec.GetValue(),
						From: p,
					}
343

Adin Schmahmann's avatar
Adin Schmahmann committed
344
					select {
345 346 347
					case valCh <- rv:
					case <-ctx.Done():
						return nil, ctx.Err()
Adin Schmahmann's avatar
Adin Schmahmann committed
348 349
					}
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
350

Adin Schmahmann's avatar
Adin Schmahmann committed
351 352 353 354 355 356 357 358 359
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type:      routing.PeerResponse,
					ID:        p,
					Responses: peers,
				})

				return peers, err
			},
Adin Schmahmann's avatar
Adin Schmahmann committed
360
			func() bool {
361 362 363 364 365 366
				select {
				case <-stopQuery:
					return true
				default:
					return false
				}
Adin Schmahmann's avatar
Adin Schmahmann committed
367 368 369
			},
		)

370 371
		if err != nil {
			return
Adin Schmahmann's avatar
Adin Schmahmann committed
372
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
373
		lookupResCh <- lookupRes
Łukasz Magiera's avatar
Łukasz Magiera committed
374

375
		if ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
376
			dht.refreshRTIfNoShortcut(kb.ConvertKey(key), lookupRes)
Łukasz Magiera's avatar
Łukasz Magiera committed
377 378
		}
	}()
379

Adin Schmahmann's avatar
Adin Schmahmann committed
380
	return valCh, lookupResCh
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
381 382
}

Adin Schmahmann's avatar
Adin Schmahmann committed
383 384
func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollowupResult) {
	if lookupRes.completed {
385 386 387 388 389
		// refresh the cpl for this key as the query was successful
		dht.routingTable.ResetCplRefreshedAtForID(key, time.Now())
	}
}

390 391 392
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
393

394
// Provide makes this node announce that it can provide a value for the given key
395
func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
396 397
	if !dht.enableProviders {
		return routing.ErrNotSupported
Steven Allen's avatar
Steven Allen committed
398 399
	} else if !key.Defined() {
		return fmt.Errorf("invalid cid: undefined")
400
	}
401
	keyMH := key.Hash()
Adin Schmahmann's avatar
Adin Schmahmann committed
402
	logger.Debugw("providing", "cid", key, "mh", loggableProviderRecordBytes(keyMH))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
403 404

	// add self locally
405
	dht.ProviderManager.AddProvider(ctx, keyMH, dht.self)
Jeromy's avatar
Jeromy committed
406 407 408
	if !brdcst {
		return nil
	}
409

410 411 412 413 414 415 416 417 418 419
	closerCtx := ctx
	if deadline, ok := ctx.Deadline(); ok {
		now := time.Now()
		timeout := deadline.Sub(now)

		if timeout < 0 {
			// timed out
			return context.DeadlineExceeded
		} else if timeout < 10*time.Second {
			// Reserve 10% for the final put.
420
			deadline = deadline.Add(-timeout / 10)
421 422 423 424 425 426 427 428 429 430
		} else {
			// Otherwise, reserve a second (we'll already be
			// connected so this should be fast).
			deadline = deadline.Add(-time.Second)
		}
		var cancel context.CancelFunc
		closerCtx, cancel = context.WithDeadline(ctx, deadline)
		defer cancel()
	}

431
	var exceededDeadline bool
432
	peers, err := dht.GetClosestPeers(closerCtx, string(keyMH))
433 434 435 436 437 438 439 440 441 442 443
	switch err {
	case context.DeadlineExceeded:
		// If the _inner_ deadline has been exceeded but the _outer_
		// context is still fine, provide the value to the closest peers
		// we managed to find, even if they're not the _actual_ closest peers.
		if ctx.Err() != nil {
			return ctx.Err()
		}
		exceededDeadline = true
	case nil:
	default:
444
		return err
445 446
	}

447
	mes, err := dht.makeProvRecord(keyMH)
448 449 450 451
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
452
	wg := sync.WaitGroup{}
453
	for p := range peers {
Jeromy's avatar
Jeromy committed
454 455 456
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
Adin Schmahmann's avatar
Adin Schmahmann committed
457
			logger.Debugf("putProvider(%s, %s)", loggableProviderRecordBytes(keyMH), p)
458
			err := dht.sendMessage(ctx, p, mes)
Jeromy's avatar
Jeromy committed
459
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
460
				logger.Debug(err)
Jeromy's avatar
Jeromy committed
461 462
			}
		}(p)
463
	}
Jeromy's avatar
Jeromy committed
464
	wg.Wait()
465 466 467 468
	if exceededDeadline {
		return context.DeadlineExceeded
	}
	return ctx.Err()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
469
}
470
func (dht *IpfsDHT) makeProvRecord(key []byte) (*pb.Message, error) {
471
	pi := peer.AddrInfo{
472 473 474 475 476 477 478
		ID:    dht.self,
		Addrs: dht.host.Addrs(),
	}

	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
Alan Shaw's avatar
Alan Shaw committed
479
		return nil, fmt.Errorf("no known addresses for self, cannot put provider")
480 481
	}

482
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, key, 0)
483
	pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]peer.AddrInfo{pi})
484 485
	return pmes, nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
486

Brian Tiger Chow's avatar
Brian Tiger Chow committed
487
// FindProviders searches until the context expires.
488
func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
489 490
	if !dht.enableProviders {
		return nil, routing.ErrNotSupported
Steven Allen's avatar
Steven Allen committed
491 492
	} else if !c.Defined() {
		return nil, fmt.Errorf("invalid cid: undefined")
493
	}
Steven Allen's avatar
Steven Allen committed
494

495
	var providers []peer.AddrInfo
496
	for p := range dht.FindProvidersAsync(ctx, c, dht.bucketSize) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
497 498 499 500 501
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
502 503
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
504 505 506
// the search query completes. If count is zero then the query will run until it
// completes. Note: not reading from the returned channel may block the query
// from progressing.
507
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
Steven Allen's avatar
Steven Allen committed
508
	if !dht.enableProviders || !key.Defined() {
509
		peerOut := make(chan peer.AddrInfo)
510 511
		close(peerOut)
		return peerOut
512 513
	}

514 515 516 517 518 519
	chSize := count
	if count == 0 {
		chSize = 1
	}
	peerOut := make(chan peer.AddrInfo, chSize)

Adin Schmahmann's avatar
Adin Schmahmann committed
520
	keyMH := key.Hash()
521

Adin Schmahmann's avatar
Adin Schmahmann committed
522
	logger.Debugw("finding providers", "cid", key, "mh", loggableProviderRecordBytes(keyMH))
Adin Schmahmann's avatar
Adin Schmahmann committed
523
	go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut)
Jeromy's avatar
Jeromy committed
524 525 526
	return peerOut
}

Adin Schmahmann's avatar
Adin Schmahmann committed
527
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
Jeromy's avatar
Jeromy committed
528 529
	defer close(peerOut)

530 531 532 533 534 535 536 537
	findAll := count == 0
	var ps *peer.Set
	if findAll {
		ps = peer.NewSet()
	} else {
		ps = peer.NewLimitedSet(count)
	}

538
	provs := dht.ProviderManager.GetProviders(ctx, key)
Jeromy's avatar
Jeromy committed
539
	for _, p := range provs {
540
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
541
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
542
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
543
			select {
Jeromy's avatar
Jeromy committed
544
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
545 546 547
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
548
		}
Jeromy's avatar
Jeromy committed
549

550
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
551
		// TODO: is this a DOS vector?
552
		if !findAll && ps.Size() >= count {
Jeromy's avatar
Jeromy committed
553 554 555 556
			return
		}
	}

Aarsh Shah's avatar
Aarsh Shah committed
557
	lookupRes, err := dht.runLookupWithFollowup(ctx, string(key),
Adin Schmahmann's avatar
Adin Schmahmann committed
558 559 560 561 562 563
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
Jeromy's avatar
Jeromy committed
564

Adin Schmahmann's avatar
Adin Schmahmann committed
565 566 567
			pmes, err := dht.findProvidersSingle(ctx, p, key)
			if err != nil {
				return nil, err
568
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
569 570 571 572 573 574 575

			logger.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
			provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
			logger.Debugf("%d provider entries decoded", len(provs))

			// Add unique providers from request, up to 'count'
			for _, prov := range provs {
576
				dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL)
Adin Schmahmann's avatar
Adin Schmahmann committed
577 578 579 580 581 582 583 584 585 586
				logger.Debugf("got provider: %s", prov)
				if ps.TryAdd(prov.ID) {
					logger.Debugf("using provider: %s", prov)
					select {
					case peerOut <- *prov:
					case <-ctx.Done():
						logger.Debug("context timed out sending more providers")
						return nil, ctx.Err()
					}
				}
587
				if !findAll && ps.Size() >= count {
Adin Schmahmann's avatar
Adin Schmahmann committed
588 589
					logger.Debugf("got enough providers (%d/%d)", ps.Size(), count)
					return nil, nil
Jeromy's avatar
Jeromy committed
590
				}
591
			}
592

Adin Schmahmann's avatar
Adin Schmahmann committed
593 594 595 596 597 598 599 600 601 602
			// Give closer peers back to the query to be queried
			closer := pmes.GetCloserPeers()
			peers := pb.PBPeersToPeerInfos(closer)
			logger.Debugf("got closer peers: %d %s", len(peers), peers)

			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
				Responses: peers,
			})
603

Adin Schmahmann's avatar
Adin Schmahmann committed
604 605
			return peers, nil
		},
Adin Schmahmann's avatar
Adin Schmahmann committed
606
		func() bool {
607
			return !findAll && ps.Size() >= count
Adin Schmahmann's avatar
Adin Schmahmann committed
608 609
		},
	)
Jeromy's avatar
Jeromy committed
610

611
	if err == nil && ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
612
		dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes)
Jeromy's avatar
Jeromy committed
613
	}
614 615
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
616
// FindPeer searches for a peer with given ID.
617
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) {
Steven Allen's avatar
Steven Allen committed
618 619 620 621
	if err := id.Validate(); err != nil {
		return peer.AddrInfo{}, err
	}

Steven Allen's avatar
Steven Allen committed
622
	logger.Debugw("finding peer", "peer", id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
623

624
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
625
	if pi := dht.FindLocal(id); pi.ID != "" {
626
		return pi, nil
627 628
	}

Aarsh Shah's avatar
Aarsh Shah committed
629
	lookupRes, err := dht.runLookupWithFollowup(ctx, string(id),
Adin Schmahmann's avatar
Adin Schmahmann committed
630 631 632 633 634 635
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
636

Adin Schmahmann's avatar
Adin Schmahmann committed
637 638 639 640
			pmes, err := dht.findPeerSingle(ctx, p, id)
			if err != nil {
				logger.Debugf("error getting closer peers: %s", err)
				return nil, err
641
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
642
			peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
643

Adin Schmahmann's avatar
Adin Schmahmann committed
644 645 646 647 648 649
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
				Responses: peers,
			})
650

Adin Schmahmann's avatar
Adin Schmahmann committed
651 652
			return peers, err
		},
Adin Schmahmann's avatar
Adin Schmahmann committed
653
		func() bool {
Adin Schmahmann's avatar
Adin Schmahmann committed
654 655 656 657
			return dht.host.Network().Connectedness(id) == network.Connected
		},
	)

658 659 660 661
	if err != nil {
		return peer.AddrInfo{}, err
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
662 663 664 665 666 667
	dialedPeerDuringQuery := false
	for i, p := range lookupRes.peers {
		if p == id {
			// Note: we consider PeerUnreachable to be a valid state because the peer may not support the DHT protocol
			// and therefore the peer would fail the query. The fact that a peer that is returned can be a non-DHT
			// server peer and is not identified as such is a bug.
668
			dialedPeerDuringQuery = (lookupRes.state[i] == qpeerset.PeerQueried || lookupRes.state[i] == qpeerset.PeerUnreachable || lookupRes.state[i] == qpeerset.PeerWaiting)
Adin Schmahmann's avatar
Adin Schmahmann committed
669 670
			break
		}
671 672
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
673 674 675 676
	// Return peer information if we tried to dial the peer during the query or we are (or recently were) connected
	// to the peer.
	connectedness := dht.host.Network().Connectedness(id)
	if dialedPeerDuringQuery || connectedness == network.Connected || connectedness == network.CanConnect {
Adin Schmahmann's avatar
Adin Schmahmann committed
677 678
		return dht.peerstore.PeerInfo(id), nil
	}
679 680

	return peer.AddrInfo{}, routing.ErrNotFound
681
}