routing.go 17.7 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"sync"
Jeromy's avatar
Jeromy committed
8
	"time"
9

Adin Schmahmann's avatar
Adin Schmahmann committed
10
	"github.com/libp2p/go-libp2p-core/network"
11 12 13 14
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/routing"

15
	"github.com/ipfs/go-cid"
16
	u "github.com/ipfs/go-ipfs-util"
Jeromy's avatar
Jeromy committed
17
	logging "github.com/ipfs/go-log"
18
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Adin Schmahmann's avatar
Adin Schmahmann committed
19
	"github.com/libp2p/go-libp2p-kad-dht/qpeerset"
George Antoniadis's avatar
George Antoniadis committed
20 21
	kb "github.com/libp2p/go-libp2p-kbucket"
	record "github.com/libp2p/go-libp2p-record"
Adin Schmahmann's avatar
Adin Schmahmann committed
22
	"github.com/multiformats/go-multihash"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23 24
)

25 26 27 28 29 30
// asyncQueryBuffer is the size of buffered channels in async queries. This
// buffer allows multiple queries to execute simultaneously, return their
// results and continue querying closer peers. Note that different query
// results will wait for the channel to drain.
var asyncQueryBuffer = 10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31 32 33 34 35
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
36
// This is the top level "Store" operation of the DHT
37
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) {
38 39 40 41
	if !dht.enableValues {
		return routing.ErrNotSupported
	}

Matt Joiner's avatar
Matt Joiner committed
42
	eip := logger.EventBegin(ctx, "PutValue")
43 44 45 46 47 48 49
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Matt Joiner's avatar
Matt Joiner committed
50
	logger.Debugf("PutValue %s", key)
Jeromy's avatar
Jeromy committed
51

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

75
	rec := record.MakePutRecord(key, value)
76
	rec.TimeReceived = u.FormatRFC3339(time.Now())
Jeromy's avatar
Jeromy committed
77
	err = dht.putLocal(key, rec)
78 79 80 81
	if err != nil {
		return err
	}

82
	pchan, err := dht.GetClosestPeers(ctx, key)
83 84 85
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86

87 88 89 90
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
91 92
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
93
			defer wg.Done()
94 95
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.Value,
Jeromy's avatar
Jeromy committed
96 97 98
				ID:   p,
			})

99
			err := dht.putValueToPeer(ctx, p, rec)
100
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
101
				logger.Debugf("failed putting value to peer: %s", err)
102 103 104 105
			}
		}(p)
	}
	wg.Wait()
106

107
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
108 109
}

Steven Allen's avatar
Steven Allen committed
110 111 112 113 114 115
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
116
// GetValue searches for the value corresponding to given Key.
117
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) {
118 119 120 121
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

Matt Joiner's avatar
Matt Joiner committed
122
	eip := logger.EventBegin(ctx, "GetValue")
123 124 125 126 127 128 129
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Jeromy's avatar
Jeromy committed
130

131
	// apply defaultQuorum if relevant
132
	var cfg routing.Options
133 134 135 136 137
	if err := cfg.Apply(opts...); err != nil {
		return nil, err
	}
	opts = append(opts, Quorum(getQuorum(&cfg, defaultQuorum)))

138 139 140 141
	responses, err := dht.SearchValue(ctx, key, opts...)
	if err != nil {
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
142 143
	var best []byte

144 145
	for r := range responses {
		best = r
Łukasz Magiera's avatar
Łukasz Magiera committed
146 147
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
148 149 150 151
	if ctx.Err() != nil {
		return best, ctx.Err()
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
152 153 154
	if best == nil {
		return nil, routing.ErrNotFound
	}
Matt Joiner's avatar
Matt Joiner committed
155
	logger.Debugf("GetValue %v %v", key, best)
Łukasz Magiera's avatar
Łukasz Magiera committed
156 157 158
	return best, nil
}

159
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
160 161 162 163
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

164
	var cfg routing.Options
Steven Allen's avatar
Steven Allen committed
165
	if err := cfg.Apply(opts...); err != nil {
166
		return nil, err
Steven Allen's avatar
Steven Allen committed
167 168 169 170
	}

	responsesNeeded := 0
	if !cfg.Offline {
171
		responsesNeeded = getQuorum(&cfg, defaultQuorum)
Steven Allen's avatar
Steven Allen committed
172 173
	}

174
	stopCh := make(chan struct{})
Adin Schmahmann's avatar
Adin Schmahmann committed
175
	valCh, lookupRes := dht.getValues(ctx, key, stopCh)
176 177

	out := make(chan []byte)
Łukasz Magiera's avatar
Łukasz Magiera committed
178 179
	go func() {
		defer close(out)
180 181
		best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded)
		if best == nil || aborted {
182
			return
183
		}
184

185 186
		updatePeers := make([]peer.ID, 0, dht.bucketSize)
		select {
Adin Schmahmann's avatar
Adin Schmahmann committed
187 188
		case l := <-lookupRes:
			if l == nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
189 190
				return
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
191

Adin Schmahmann's avatar
Adin Schmahmann committed
192
			for _, p := range l.peers {
193 194
				if _, ok := peersWithBest[p]; !ok {
					updatePeers = append(updatePeers, p)
Adin Schmahmann's avatar
Adin Schmahmann committed
195 196
				}
			}
197 198 199
		case <-ctx.Done():
			return
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
200

201 202
		dht.updatePeerValues(dht.Context(), key, best, updatePeers)
	}()
Łukasz Magiera's avatar
Łukasz Magiera committed
203

204 205
	return out, nil
}
206

207 208
func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{},
	out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) {
209 210 211 212 213
	numResponses := 0
	return dht.processValues(ctx, key, valCh,
		func(ctx context.Context, v RecvdVal, better bool) bool {
			numResponses++
			if better {
Steven Allen's avatar
Steven Allen committed
214 215 216
				select {
				case out <- v.Val:
				case <-ctx.Done():
217
					return false
Steven Allen's avatar
Steven Allen committed
218
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
219
			}
220

221
			if nvals > 0 && numResponses > nvals {
222
				close(stopCh)
223 224 225 226
				return true
			}
			return false
		})
227 228
}

Steven Allen's avatar
Steven Allen committed
229
// GetValues gets nvals values corresponding to the given key.
230
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
231 232 233
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}
234

235
	eip := logger.EventBegin(ctx, "GetValues")
236 237 238
	eip.Append(loggableKey(key))
	defer eip.Done()

239
	queryCtx, cancel := context.WithCancel(ctx)
240
	valCh, _ := dht.getValues(queryCtx, key, nil)
241 242 243 244

	out := make([]RecvdVal, 0, nvals)
	for val := range valCh {
		out = append(out, val)
245 246 247
		if len(out) == nvals {
			cancel()
		}
248 249
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
250
	return out, ctx.Err()
251 252
}

253
func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal,
254
	newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) {
255 256 257
loop:
	for {
		if aborted {
258
			return
259
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
260

261 262 263 264
		select {
		case v, ok := <-vals:
			if !ok {
				break loop
265
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
266

267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
			// Select best value
			if best != nil {
				if bytes.Equal(best, v.Val) {
					peersWithBest[v.From] = struct{}{}
					aborted = newVal(ctx, v, false)
					continue
				}
				sel, err := dht.Validator.Select(key, [][]byte{best, v.Val})
				if err != nil {
					logger.Warning("Failed to select dht key: ", err)
					continue
				}
				if sel != 1 {
					aborted = newVal(ctx, v, false)
					continue
Adin Schmahmann's avatar
Adin Schmahmann committed
282
				}
283
			}
284 285 286 287 288 289 290 291
			peersWithBest = make(map[peer.ID]struct{})
			peersWithBest[v.From] = struct{}{}
			best = v.Val
			aborted = newVal(ctx, v, true)
		case <-ctx.Done():
			return
		}
	}
292

293 294
	return
}
Adin Schmahmann's avatar
Adin Schmahmann committed
295

296 297 298 299 300 301 302 303 304
func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte, peers []peer.ID) {
	fixupRec := record.MakePutRecord(key, val)
	for _, p := range peers {
		go func(p peer.ID) {
			//TODO: Is this possible?
			if p == dht.self {
				err := dht.putLocal(key, fixupRec)
				if err != nil {
					logger.Error("Error correcting local dht entry:", err)
Adin Schmahmann's avatar
Adin Schmahmann committed
305
				}
306 307 308 309 310 311 312
				return
			}
			ctx, cancel := context.WithTimeout(ctx, time.Second*30)
			defer cancel()
			err := dht.putValueToPeer(ctx, p, fixupRec)
			if err != nil {
				logger.Debug("Error correcting DHT entry: ", err)
313
			}
314 315 316 317
		}(p)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
318
func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
319
	valCh := make(chan RecvdVal, 1)
Adin Schmahmann's avatar
Adin Schmahmann committed
320
	lookupResCh := make(chan *lookupWithFollowupResult, 1)
321 322 323 324 325 326 327 328

	if rec, err := dht.getLocal(key); rec != nil && err == nil {
		select {
		case valCh <- RecvdVal{
			Val:  rec.GetValue(),
			From: dht.self,
		}:
		case <-ctx.Done():
329
		}
330
	}
331

Adin Schmahmann's avatar
Adin Schmahmann committed
332
	go func() {
333
		defer close(valCh)
Adin Schmahmann's avatar
Adin Schmahmann committed
334
		defer close(lookupResCh)
Aarsh Shah's avatar
Aarsh Shah committed
335
		lookupRes, err := dht.runLookupWithFollowup(ctx, key,
Adin Schmahmann's avatar
Adin Schmahmann committed
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
			func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type: routing.SendingQuery,
					ID:   p,
				})

				rec, peers, err := dht.getValueOrPeers(ctx, p, key)
				switch err {
				case routing.ErrNotFound:
					// in this case, they responded with nothing,
					// still send a notification so listeners can know the
					// request has completed 'successfully'
					routing.PublishQueryEvent(ctx, &routing.QueryEvent{
						Type: routing.PeerResponse,
						ID:   p,
					})
					return nil, err
				default:
					return nil, err
				case nil, errInvalidRecord:
					// in either of these cases, we want to keep going
				}
Jeromy's avatar
Jeromy committed
359

Adin Schmahmann's avatar
Adin Schmahmann committed
360 361 362 363 364 365 366
				// TODO: What should happen if the record is invalid?
				// Pre-existing code counted it towards the quorum, but should it?
				if rec != nil && rec.GetValue() != nil {
					rv := RecvdVal{
						Val:  rec.GetValue(),
						From: p,
					}
367

Adin Schmahmann's avatar
Adin Schmahmann committed
368
					select {
369 370 371
					case valCh <- rv:
					case <-ctx.Done():
						return nil, ctx.Err()
Adin Schmahmann's avatar
Adin Schmahmann committed
372 373
					}
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
374

Adin Schmahmann's avatar
Adin Schmahmann committed
375 376 377 378 379 380 381 382 383
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type:      routing.PeerResponse,
					ID:        p,
					Responses: peers,
				})

				return peers, err
			},
Adin Schmahmann's avatar
Adin Schmahmann committed
384
			func() bool {
385 386 387 388 389 390
				select {
				case <-stopQuery:
					return true
				default:
					return false
				}
Adin Schmahmann's avatar
Adin Schmahmann committed
391 392 393
			},
		)

394 395
		if err != nil {
			return
Adin Schmahmann's avatar
Adin Schmahmann committed
396
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
397
		lookupResCh <- lookupRes
Łukasz Magiera's avatar
Łukasz Magiera committed
398

399
		if ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
400
			dht.refreshRTIfNoShortcut(kb.ConvertKey(key), lookupRes)
Łukasz Magiera's avatar
Łukasz Magiera committed
401 402
		}
	}()
403

Adin Schmahmann's avatar
Adin Schmahmann committed
404
	return valCh, lookupResCh
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
405 406
}

Adin Schmahmann's avatar
Adin Schmahmann committed
407 408
func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollowupResult) {
	if lookupRes.completed {
409 410 411 412 413
		// refresh the cpl for this key as the query was successful
		dht.routingTable.ResetCplRefreshedAtForID(key, time.Now())
	}
}

414 415 416
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
417

418
// Provide makes this node announce that it can provide a value for the given key
419
func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
420 421 422
	if !dht.enableProviders {
		return routing.ErrNotSupported
	}
423
	keyMH := key.Hash()
Adin Schmahmann's avatar
Adin Schmahmann committed
424
	eip := logger.EventBegin(ctx, "Provide", multihashLoggableKey(keyMH), logging.LoggableMap{"broadcast": brdcst})
425 426 427 428 429 430
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
431 432

	// add self locally
433
	dht.ProviderManager.AddProvider(ctx, keyMH, dht.self)
Jeromy's avatar
Jeromy committed
434 435 436
	if !brdcst {
		return nil
	}
437

438 439 440 441 442 443 444 445 446 447
	closerCtx := ctx
	if deadline, ok := ctx.Deadline(); ok {
		now := time.Now()
		timeout := deadline.Sub(now)

		if timeout < 0 {
			// timed out
			return context.DeadlineExceeded
		} else if timeout < 10*time.Second {
			// Reserve 10% for the final put.
448
			deadline = deadline.Add(-timeout / 10)
449 450 451 452 453 454 455 456 457 458
		} else {
			// Otherwise, reserve a second (we'll already be
			// connected so this should be fast).
			deadline = deadline.Add(-time.Second)
		}
		var cancel context.CancelFunc
		closerCtx, cancel = context.WithDeadline(ctx, deadline)
		defer cancel()
	}

459
	var exceededDeadline bool
460
	peers, err := dht.GetClosestPeers(closerCtx, string(keyMH))
461 462 463 464 465 466 467 468 469 470 471
	switch err {
	case context.DeadlineExceeded:
		// If the _inner_ deadline has been exceeded but the _outer_
		// context is still fine, provide the value to the closest peers
		// we managed to find, even if they're not the _actual_ closest peers.
		if ctx.Err() != nil {
			return ctx.Err()
		}
		exceededDeadline = true
	case nil:
	default:
472
		return err
473 474
	}

475
	mes, err := dht.makeProvRecord(keyMH)
476 477 478 479
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
480
	wg := sync.WaitGroup{}
481
	for p := range peers {
Jeromy's avatar
Jeromy committed
482 483 484
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
485
			logger.Debugf("putProvider(%s, %s)", keyMH, p)
486
			err := dht.sendMessage(ctx, p, mes)
Jeromy's avatar
Jeromy committed
487
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
488
				logger.Debug(err)
Jeromy's avatar
Jeromy committed
489 490
			}
		}(p)
491
	}
Jeromy's avatar
Jeromy committed
492
	wg.Wait()
493 494 495 496
	if exceededDeadline {
		return context.DeadlineExceeded
	}
	return ctx.Err()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
497
}
498
func (dht *IpfsDHT) makeProvRecord(key []byte) (*pb.Message, error) {
499
	pi := peer.AddrInfo{
500 501 502 503 504 505 506 507 508 509
		ID:    dht.self,
		Addrs: dht.host.Addrs(),
	}

	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
		return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
	}

510
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, key, 0)
511
	pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]peer.AddrInfo{pi})
512 513
	return pmes, nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
514

Brian Tiger Chow's avatar
Brian Tiger Chow committed
515
// FindProviders searches until the context expires.
516
func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
517 518 519
	if !dht.enableProviders {
		return nil, routing.ErrNotSupported
	}
520
	var providers []peer.AddrInfo
521
	for p := range dht.FindProvidersAsync(ctx, c, dht.bucketSize) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
522 523 524 525 526
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
527 528
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
529 530 531
// the search query completes. If count is zero then the query will run until it
// completes. Note: not reading from the returned channel may block the query
// from progressing.
532
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
533
	if !dht.enableProviders {
534
		peerOut := make(chan peer.AddrInfo)
535 536
		close(peerOut)
		return peerOut
537 538
	}

539 540 541 542 543 544
	chSize := count
	if count == 0 {
		chSize = 1
	}
	peerOut := make(chan peer.AddrInfo, chSize)

Adin Schmahmann's avatar
Adin Schmahmann committed
545 546
	keyMH := key.Hash()
	logger.Event(ctx, "findProviders", multihashLoggableKey(keyMH))
547

Adin Schmahmann's avatar
Adin Schmahmann committed
548
	go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut)
Jeromy's avatar
Jeromy committed
549 550 551
	return peerOut
}

Adin Schmahmann's avatar
Adin Schmahmann committed
552 553
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
	defer logger.EventBegin(ctx, "findProvidersAsync", multihashLoggableKey(key)).Done()
Jeromy's avatar
Jeromy committed
554 555
	defer close(peerOut)

556 557 558 559 560 561 562 563
	findAll := count == 0
	var ps *peer.Set
	if findAll {
		ps = peer.NewSet()
	} else {
		ps = peer.NewLimitedSet(count)
	}

564
	provs := dht.ProviderManager.GetProviders(ctx, key)
Jeromy's avatar
Jeromy committed
565
	for _, p := range provs {
566
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
567
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
568
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
569
			select {
Jeromy's avatar
Jeromy committed
570
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
571 572 573
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
574
		}
Jeromy's avatar
Jeromy committed
575

576
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
577
		// TODO: is this a DOS vector?
578
		if !findAll && ps.Size() >= count {
Jeromy's avatar
Jeromy committed
579 580 581 582
			return
		}
	}

Aarsh Shah's avatar
Aarsh Shah committed
583
	lookupRes, err := dht.runLookupWithFollowup(ctx, string(key),
Adin Schmahmann's avatar
Adin Schmahmann committed
584 585 586 587 588 589
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
Jeromy's avatar
Jeromy committed
590

Adin Schmahmann's avatar
Adin Schmahmann committed
591 592 593
			pmes, err := dht.findProvidersSingle(ctx, p, key)
			if err != nil {
				return nil, err
594
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614

			logger.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
			provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
			logger.Debugf("%d provider entries decoded", len(provs))

			// Add unique providers from request, up to 'count'
			for _, prov := range provs {
				if prov.ID != dht.self {
					dht.peerstore.AddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL)
				}
				logger.Debugf("got provider: %s", prov)
				if ps.TryAdd(prov.ID) {
					logger.Debugf("using provider: %s", prov)
					select {
					case peerOut <- *prov:
					case <-ctx.Done():
						logger.Debug("context timed out sending more providers")
						return nil, ctx.Err()
					}
				}
615
				if !findAll && ps.Size() >= count {
Adin Schmahmann's avatar
Adin Schmahmann committed
616 617
					logger.Debugf("got enough providers (%d/%d)", ps.Size(), count)
					return nil, nil
Jeromy's avatar
Jeromy committed
618
				}
619
			}
620

Adin Schmahmann's avatar
Adin Schmahmann committed
621 622 623 624 625 626 627 628 629 630
			// Give closer peers back to the query to be queried
			closer := pmes.GetCloserPeers()
			peers := pb.PBPeersToPeerInfos(closer)
			logger.Debugf("got closer peers: %d %s", len(peers), peers)

			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
				Responses: peers,
			})
631

Adin Schmahmann's avatar
Adin Schmahmann committed
632 633
			return peers, nil
		},
Adin Schmahmann's avatar
Adin Schmahmann committed
634
		func() bool {
635
			return !findAll && ps.Size() >= count
Adin Schmahmann's avatar
Adin Schmahmann committed
636 637
		},
	)
Jeromy's avatar
Jeromy committed
638

639
	if err != nil && ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
640
		dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes)
Jeromy's avatar
Jeromy committed
641
	}
642 643
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
644
// FindPeer searches for a peer with given ID.
645
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) {
Matt Joiner's avatar
Matt Joiner committed
646
	eip := logger.EventBegin(ctx, "FindPeer", id)
647 648 649 650 651 652
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
653

654
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
655
	if pi := dht.FindLocal(id); pi.ID != "" {
656
		return pi, nil
657 658
	}

Aarsh Shah's avatar
Aarsh Shah committed
659
	lookupRes, err := dht.runLookupWithFollowup(ctx, string(id),
Adin Schmahmann's avatar
Adin Schmahmann committed
660 661 662 663 664 665
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
666

Adin Schmahmann's avatar
Adin Schmahmann committed
667 668 669 670
			pmes, err := dht.findPeerSingle(ctx, p, id)
			if err != nil {
				logger.Debugf("error getting closer peers: %s", err)
				return nil, err
671
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
672
			peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
673

Adin Schmahmann's avatar
Adin Schmahmann committed
674 675 676 677 678 679
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
				Responses: peers,
			})
680

Adin Schmahmann's avatar
Adin Schmahmann committed
681 682
			return peers, err
		},
Adin Schmahmann's avatar
Adin Schmahmann committed
683
		func() bool {
Adin Schmahmann's avatar
Adin Schmahmann committed
684 685 686 687
			return dht.host.Network().Connectedness(id) == network.Connected
		},
	)

688 689 690 691
	if err != nil {
		return peer.AddrInfo{}, err
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
692 693 694 695 696 697 698 699 700
	dialedPeerDuringQuery := false
	for i, p := range lookupRes.peers {
		if p == id {
			// Note: we consider PeerUnreachable to be a valid state because the peer may not support the DHT protocol
			// and therefore the peer would fail the query. The fact that a peer that is returned can be a non-DHT
			// server peer and is not identified as such is a bug.
			dialedPeerDuringQuery = lookupRes.state[i] != qpeerset.PeerHeard
			break
		}
701 702
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
703 704 705 706
	// Return peer information if we tried to dial the peer during the query or we are (or recently were) connected
	// to the peer.
	connectedness := dht.host.Network().Connectedness(id)
	if dialedPeerDuringQuery || connectedness == network.Connected || connectedness == network.CanConnect {
Adin Schmahmann's avatar
Adin Schmahmann committed
707 708
		return dht.peerstore.PeerInfo(id), nil
	}
709 710

	return peer.AddrInfo{}, routing.ErrNotFound
711
}