routing.go 16.8 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
Adin Schmahmann's avatar
Adin Schmahmann committed
7
	"github.com/libp2p/go-libp2p-core/network"
Jeromy's avatar
Jeromy committed
8
	"sync"
Jeromy's avatar
Jeromy committed
9
	"time"
10

11 12 13 14
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/routing"

15
	"github.com/ipfs/go-cid"
16
	u "github.com/ipfs/go-ipfs-util"
Jeromy's avatar
Jeromy committed
17
	logging "github.com/ipfs/go-log"
Adin Schmahmann's avatar
Adin Schmahmann committed
18
	"github.com/libp2p/go-libp2p-kad-dht/kpeerset"
19
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
George Antoniadis's avatar
George Antoniadis committed
20 21
	kb "github.com/libp2p/go-libp2p-kbucket"
	record "github.com/libp2p/go-libp2p-record"
Adin Schmahmann's avatar
Adin Schmahmann committed
22
	"github.com/multiformats/go-multihash"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23 24
)

25 26 27 28 29 30
// asyncQueryBuffer is the size of buffered channels in async queries. This
// buffer allows multiple queries to execute simultaneously, return their
// results and continue querying closer peers. Note that different query
// results will wait for the channel to drain.
var asyncQueryBuffer = 10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31 32 33 34 35
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
36
// This is the top level "Store" operation of the DHT
37
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) {
38 39 40 41
	if !dht.enableValues {
		return routing.ErrNotSupported
	}

Matt Joiner's avatar
Matt Joiner committed
42
	eip := logger.EventBegin(ctx, "PutValue")
43 44 45 46 47 48 49
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Matt Joiner's avatar
Matt Joiner committed
50
	logger.Debugf("PutValue %s", key)
Jeromy's avatar
Jeromy committed
51

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

75
	rec := record.MakePutRecord(key, value)
76
	rec.TimeReceived = u.FormatRFC3339(time.Now())
Jeromy's avatar
Jeromy committed
77
	err = dht.putLocal(key, rec)
78 79 80 81
	if err != nil {
		return err
	}

82
	pchan, err := dht.GetClosestPeers(ctx, key)
83 84 85
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86

87 88 89 90
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
91 92
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
93
			defer wg.Done()
94 95
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.Value,
Jeromy's avatar
Jeromy committed
96 97 98
				ID:   p,
			})

99
			err := dht.putValueToPeer(ctx, p, rec)
100
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
101
				logger.Debugf("failed putting value to peer: %s", err)
102 103 104 105
			}
		}(p)
	}
	wg.Wait()
106

107
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
108 109
}

Steven Allen's avatar
Steven Allen committed
110 111 112 113 114 115
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
116
// GetValue searches for the value corresponding to given Key.
117
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) {
118 119 120 121
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

Matt Joiner's avatar
Matt Joiner committed
122
	eip := logger.EventBegin(ctx, "GetValue")
123 124 125 126 127 128 129
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Jeromy's avatar
Jeromy committed
130

131
	// apply defaultQuorum if relevant
132
	var cfg routing.Options
133 134 135 136 137
	if err := cfg.Apply(opts...); err != nil {
		return nil, err
	}
	opts = append(opts, Quorum(getQuorum(&cfg, defaultQuorum)))

138 139 140 141
	responses, err := dht.SearchValue(ctx, key, opts...)
	if err != nil {
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
142 143
	var best []byte

144 145
	for r := range responses {
		best = r
Łukasz Magiera's avatar
Łukasz Magiera committed
146 147
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
148 149 150 151
	if ctx.Err() != nil {
		return best, ctx.Err()
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
152 153 154
	if best == nil {
		return nil, routing.ErrNotFound
	}
Matt Joiner's avatar
Matt Joiner committed
155
	logger.Debugf("GetValue %v %v", key, best)
Łukasz Magiera's avatar
Łukasz Magiera committed
156 157 158
	return best, nil
}

159
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
160 161 162 163
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

164
	var cfg routing.Options
Steven Allen's avatar
Steven Allen committed
165
	if err := cfg.Apply(opts...); err != nil {
166
		return nil, err
Steven Allen's avatar
Steven Allen committed
167 168 169 170
	}

	responsesNeeded := 0
	if !cfg.Offline {
Adin Schmahmann's avatar
Adin Schmahmann committed
171
		responsesNeeded = getQuorum(&cfg, 0)
Steven Allen's avatar
Steven Allen committed
172 173
	}

174
	valCh, queries := dht.getValues(ctx, key, func() bool { return false })
175 176

	out := make(chan []byte)
Łukasz Magiera's avatar
Łukasz Magiera committed
177 178
	go func() {
		defer close(out)
179 180 181
		best, peersWithBest := dht.searchValueQuorum(ctx, key, valCh, out, responsesNeeded)
		if best == nil {
			return
182
		}
183

184 185 186 187
		updatePeers := make([]peer.ID, 0, dht.bucketSize)
		select {
		case q := <-queries:
			if len(q) < 1 {
Łukasz Magiera's avatar
Łukasz Magiera committed
188 189
				return
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
190

191 192 193 194 195
			peers := q[0].globallyQueriedPeers.Peers()
			peers = kb.SortClosestPeers(peers, kb.ConvertKey(key))
			for _, p := range peers {
				if _, ok := peersWithBest[p]; !ok {
					updatePeers = append(updatePeers, p)
Adin Schmahmann's avatar
Adin Schmahmann committed
196 197
				}
			}
198 199 200
		case <-ctx.Done():
			return
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
201

202 203
		dht.updatePeerValues(dht.Context(), key, best, updatePeers)
	}()
Łukasz Magiera's avatar
Łukasz Magiera committed
204

205 206
	return out, nil
}
207

208 209 210 211 212 213 214
func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal,
	out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}) {
	numResponses := 0
	return dht.processValues(ctx, key, valCh,
		func(ctx context.Context, v RecvdVal, better bool) bool {
			numResponses++
			if better {
Steven Allen's avatar
Steven Allen committed
215 216 217
				select {
				case out <- v.Val:
				case <-ctx.Done():
218
					return false
Steven Allen's avatar
Steven Allen committed
219
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
220
			}
221

222 223 224 225 226
			if nvals > 0 && numResponses > nvals {
				return true
			}
			return false
		})
227 228
}

Steven Allen's avatar
Steven Allen committed
229
// GetValues gets nvals values corresponding to the given key.
230
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
231 232 233
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}
234

235
	eip := logger.EventBegin(ctx, "GetValues")
236 237 238
	eip.Append(loggableKey(key))
	defer eip.Done()

239 240 241 242
	queryCtx, cancel := context.WithCancel(ctx)
	valCh, _ := dht.getValues(queryCtx, key, func() bool {
		return false
	})
243 244 245 246

	out := make([]RecvdVal, 0, nvals)
	for val := range valCh {
		out = append(out, val)
247 248 249
		if len(out) == nvals {
			cancel()
		}
250 251
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
252
	return out, ctx.Err()
253 254
}

255 256 257
func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal,
	newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}) {
	aborted := false
258

259 260 261 262 263
loop:
	for {
		if aborted {
			return best, nil
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
264

265 266 267 268
		select {
		case v, ok := <-vals:
			if !ok {
				break loop
269
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
270

271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
			// Select best value
			if best != nil {
				if bytes.Equal(best, v.Val) {
					peersWithBest[v.From] = struct{}{}
					aborted = newVal(ctx, v, false)
					continue
				}
				sel, err := dht.Validator.Select(key, [][]byte{best, v.Val})
				if err != nil {
					logger.Warning("Failed to select dht key: ", err)
					continue
				}
				if sel != 1 {
					aborted = newVal(ctx, v, false)
					continue
Adin Schmahmann's avatar
Adin Schmahmann committed
286
				}
287
			}
288 289 290 291 292 293 294 295
			peersWithBest = make(map[peer.ID]struct{})
			peersWithBest[v.From] = struct{}{}
			best = v.Val
			aborted = newVal(ctx, v, true)
		case <-ctx.Done():
			return
		}
	}
296

297 298 299 300 301
	if aborted {
		return best, nil
	}
	return
}
Adin Schmahmann's avatar
Adin Schmahmann committed
302

303 304 305 306 307 308 309 310 311
func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte, peers []peer.ID) {
	fixupRec := record.MakePutRecord(key, val)
	for _, p := range peers {
		go func(p peer.ID) {
			//TODO: Is this possible?
			if p == dht.self {
				err := dht.putLocal(key, fixupRec)
				if err != nil {
					logger.Error("Error correcting local dht entry:", err)
Adin Schmahmann's avatar
Adin Schmahmann committed
312
				}
313 314 315 316 317 318 319
				return
			}
			ctx, cancel := context.WithTimeout(ctx, time.Second*30)
			defer cancel()
			err := dht.putValueToPeer(ctx, p, fixupRec)
			if err != nil {
				logger.Debug("Error correcting DHT entry: ", err)
320
			}
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335
		}(p)
	}
}

func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopFn func() bool) (<-chan RecvdVal, <-chan []*qu) {
	valCh := make(chan RecvdVal, 1)
	queriesCh := make(chan []*qu, 1)

	if rec, err := dht.getLocal(key); rec != nil && err == nil {
		select {
		case valCh <- RecvdVal{
			Val:  rec.GetValue(),
			From: dht.self,
		}:
		case <-ctx.Done():
336
		}
337
	}
338

Adin Schmahmann's avatar
Adin Schmahmann committed
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
	go func() {
		queries := dht.runDisjointQueries(ctx, dht.d, key,
			func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type: routing.SendingQuery,
					ID:   p,
				})

				rec, peers, err := dht.getValueOrPeers(ctx, p, key)
				switch err {
				case routing.ErrNotFound:
					// in this case, they responded with nothing,
					// still send a notification so listeners can know the
					// request has completed 'successfully'
					routing.PublishQueryEvent(ctx, &routing.QueryEvent{
						Type: routing.PeerResponse,
						ID:   p,
					})
					return nil, err
				default:
					return nil, err
				case nil, errInvalidRecord:
					// in either of these cases, we want to keep going
				}
Jeromy's avatar
Jeromy committed
364

Adin Schmahmann's avatar
Adin Schmahmann committed
365 366 367 368 369 370 371
				// TODO: What should happen if the record is invalid?
				// Pre-existing code counted it towards the quorum, but should it?
				if rec != nil && rec.GetValue() != nil {
					rv := RecvdVal{
						Val:  rec.GetValue(),
						From: p,
					}
372

Adin Schmahmann's avatar
Adin Schmahmann committed
373
					select {
374 375 376
					case valCh <- rv:
					case <-ctx.Done():
						return nil, ctx.Err()
Adin Schmahmann's avatar
Adin Schmahmann committed
377 378
					}
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
379

Adin Schmahmann's avatar
Adin Schmahmann committed
380 381 382 383 384 385 386 387 388 389
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type:      routing.PeerResponse,
					ID:        p,
					Responses: peers,
				})

				return peers, err
			},
			func(peerset *kpeerset.SortedPeerset) bool {
390
				return stopFn()
Adin Schmahmann's avatar
Adin Schmahmann committed
391 392 393
			},
		)

394 395 396
		close(valCh)
		queriesCh <- queries
		close(queriesCh)
Adin Schmahmann's avatar
Adin Schmahmann committed
397 398 399 400 401 402 403 404

		shortcutTaken := false
		for _, q := range queries {
			if len(q.localPeers.KUnqueried()) > 0 {
				shortcutTaken = true
				break
			}
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
405

406
		if !shortcutTaken && ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
407
			kadID := kb.ConvertKey(key)
Aarsh Shah's avatar
Aarsh Shah committed
408
			// refresh the cpl for this key as the query was successful
Adin Schmahmann's avatar
Adin Schmahmann committed
409
			dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now())
Łukasz Magiera's avatar
Łukasz Magiera committed
410 411
		}
	}()
412

413
	return valCh, queriesCh
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
414 415
}

416 417 418
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
419

420
// Provide makes this node announce that it can provide a value for the given key
421
func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
422 423 424
	if !dht.enableProviders {
		return routing.ErrNotSupported
	}
425
	keyMH := key.Hash()
Adin Schmahmann's avatar
Adin Schmahmann committed
426
	eip := logger.EventBegin(ctx, "Provide", multihashLoggableKey(keyMH), logging.LoggableMap{"broadcast": brdcst})
427 428 429 430 431 432
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
433 434

	// add self locally
435
	dht.ProviderManager.AddProvider(ctx, keyMH, dht.self)
Jeromy's avatar
Jeromy committed
436 437 438
	if !brdcst {
		return nil
	}
439

440 441 442 443 444 445 446 447 448 449
	closerCtx := ctx
	if deadline, ok := ctx.Deadline(); ok {
		now := time.Now()
		timeout := deadline.Sub(now)

		if timeout < 0 {
			// timed out
			return context.DeadlineExceeded
		} else if timeout < 10*time.Second {
			// Reserve 10% for the final put.
450
			deadline = deadline.Add(-timeout / 10)
451 452 453 454 455 456 457 458 459 460
		} else {
			// Otherwise, reserve a second (we'll already be
			// connected so this should be fast).
			deadline = deadline.Add(-time.Second)
		}
		var cancel context.CancelFunc
		closerCtx, cancel = context.WithDeadline(ctx, deadline)
		defer cancel()
	}

461
	peers, err := dht.GetClosestPeers(closerCtx, string(keyMH))
462 463
	if err != nil {
		return err
464 465
	}

466
	mes, err := dht.makeProvRecord(keyMH)
467 468 469 470
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
471
	wg := sync.WaitGroup{}
472
	for p := range peers {
Jeromy's avatar
Jeromy committed
473 474 475
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
476
			logger.Debugf("putProvider(%s, %s)", keyMH, p)
477
			err := dht.sendMessage(ctx, p, mes)
Jeromy's avatar
Jeromy committed
478
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
479
				logger.Debug(err)
Jeromy's avatar
Jeromy committed
480 481
			}
		}(p)
482
	}
Jeromy's avatar
Jeromy committed
483
	wg.Wait()
484
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
485
}
486
func (dht *IpfsDHT) makeProvRecord(key []byte) (*pb.Message, error) {
487
	pi := peer.AddrInfo{
488 489 490 491 492 493 494 495 496 497
		ID:    dht.self,
		Addrs: dht.host.Addrs(),
	}

	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
		return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
	}

498
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, key, 0)
499
	pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]peer.AddrInfo{pi})
500 501
	return pmes, nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
502

Brian Tiger Chow's avatar
Brian Tiger Chow committed
503
// FindProviders searches until the context expires.
504
func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
505 506 507
	if !dht.enableProviders {
		return nil, routing.ErrNotSupported
	}
508
	var providers []peer.AddrInfo
509
	for p := range dht.FindProvidersAsync(ctx, c, dht.bucketSize) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
510 511 512 513 514
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
515 516 517
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
518
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
519
	peerOut := make(chan peer.AddrInfo, count)
520
	if !dht.enableProviders {
521 522
		close(peerOut)
		return peerOut
523 524
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
525 526
	keyMH := key.Hash()
	logger.Event(ctx, "findProviders", multihashLoggableKey(keyMH))
527

Adin Schmahmann's avatar
Adin Schmahmann committed
528
	go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut)
Jeromy's avatar
Jeromy committed
529 530 531
	return peerOut
}

Adin Schmahmann's avatar
Adin Schmahmann committed
532 533
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
	defer logger.EventBegin(ctx, "findProvidersAsync", multihashLoggableKey(key)).Done()
Jeromy's avatar
Jeromy committed
534 535
	defer close(peerOut)

536
	ps := peer.NewLimitedSet(count)
537
	provs := dht.ProviderManager.GetProviders(ctx, key)
Jeromy's avatar
Jeromy committed
538
	for _, p := range provs {
539
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
540
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
541
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
542
			select {
Jeromy's avatar
Jeromy committed
543
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
544 545 546
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
547
		}
Jeromy's avatar
Jeromy committed
548

549
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
550
		// TODO: is this a DOS vector?
Jeromy's avatar
Jeromy committed
551
		if ps.Size() >= count {
Jeromy's avatar
Jeromy committed
552 553 554 555
			return
		}
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
556 557 558 559 560 561 562
	dht.runDisjointQueries(ctx, dht.d, string(key),
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
Jeromy's avatar
Jeromy committed
563

Adin Schmahmann's avatar
Adin Schmahmann committed
564 565 566
			pmes, err := dht.findProvidersSingle(ctx, p, key)
			if err != nil {
				return nil, err
567
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590

			logger.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
			provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
			logger.Debugf("%d provider entries decoded", len(provs))

			// Add unique providers from request, up to 'count'
			for _, prov := range provs {
				if prov.ID != dht.self {
					dht.peerstore.AddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL)
				}
				logger.Debugf("got provider: %s", prov)
				if ps.TryAdd(prov.ID) {
					logger.Debugf("using provider: %s", prov)
					select {
					case peerOut <- *prov:
					case <-ctx.Done():
						logger.Debug("context timed out sending more providers")
						return nil, ctx.Err()
					}
				}
				if ps.Size() >= count {
					logger.Debugf("got enough providers (%d/%d)", ps.Size(), count)
					return nil, nil
Jeromy's avatar
Jeromy committed
591
				}
592
			}
593

Adin Schmahmann's avatar
Adin Schmahmann committed
594 595 596 597 598 599 600 601 602 603
			// Give closer peers back to the query to be queried
			closer := pmes.GetCloserPeers()
			peers := pb.PBPeersToPeerInfos(closer)
			logger.Debugf("got closer peers: %d %s", len(peers), peers)

			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
				Responses: peers,
			})
604

Adin Schmahmann's avatar
Adin Schmahmann committed
605 606 607 608 609 610
			return peers, nil
		},
		func(peerset *kpeerset.SortedPeerset) bool {
			return ps.Size() > count
		},
	)
Jeromy's avatar
Jeromy committed
611

Adin Schmahmann's avatar
Adin Schmahmann committed
612 613 614
	if ctx.Err() == nil {
		// refresh the cpl for this key after the query is run
		dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(string(key)), time.Now())
Jeromy's avatar
Jeromy committed
615
	}
616 617
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
618
// FindPeer searches for a peer with given ID.
619
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) {
Matt Joiner's avatar
Matt Joiner committed
620
	eip := logger.EventBegin(ctx, "FindPeer", id)
621 622 623 624 625 626
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
627

628
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
629
	if pi := dht.FindLocal(id); pi.ID != "" {
630
		return pi, nil
631 632
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
633 634 635 636 637 638 639
	queries := dht.runDisjointQueries(ctx, dht.d, string(id),
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
640

Adin Schmahmann's avatar
Adin Schmahmann committed
641 642 643 644
			pmes, err := dht.findPeerSingle(ctx, p, id)
			if err != nil {
				logger.Debugf("error getting closer peers: %s", err)
				return nil, err
645
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
646
			peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
647

Adin Schmahmann's avatar
Adin Schmahmann committed
648 649 650 651 652 653
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
				Responses: peers,
			})
654

Adin Schmahmann's avatar
Adin Schmahmann committed
655 656 657 658 659 660 661 662 663 664 665 666 667 668 669
			return peers, err
		},
		func(peerset *kpeerset.SortedPeerset) bool {
			return dht.host.Network().Connectedness(id) == network.Connected
		},
	)

	//	logger.Debugf("FindPeer %v %v", id, result.success)

	if dht.host.Network().Connectedness(id) == network.Connected {
		shortcutTaken := false
		for _, q := range queries {
			if len(q.localPeers.KUnqueried()) > 0 {
				shortcutTaken = true
				break
670 671 672
			}
		}

Adin Schmahmann's avatar
Adin Schmahmann committed
673 674 675 676
		if !shortcutTaken {
			kadID := kb.ConvertPeerID(id)
			// refresh the cpl for this key as the query was successful
			dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now())
677 678
		}

Adin Schmahmann's avatar
Adin Schmahmann committed
679 680
		return dht.peerstore.PeerInfo(id), nil
	} else {
681
		if ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
682 683 684 685
			kadID := kb.ConvertPeerID(id)
			// refresh the cpl for this key as the query was successful
			dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now())
		}
686

Adin Schmahmann's avatar
Adin Schmahmann committed
687 688
		return peer.AddrInfo{}, routing.ErrNotFound
	}
689
}