routing.go 17.5 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
Adin Schmahmann's avatar
Adin Schmahmann committed
7
	"github.com/libp2p/go-libp2p-core/network"
Jeromy's avatar
Jeromy committed
8
	"sync"
Jeromy's avatar
Jeromy committed
9
	"time"
10

11 12 13 14
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/routing"

15
	"github.com/ipfs/go-cid"
16
	u "github.com/ipfs/go-ipfs-util"
Jeromy's avatar
Jeromy committed
17
	logging "github.com/ipfs/go-log"
Adin Schmahmann's avatar
Adin Schmahmann committed
18
	"github.com/libp2p/go-libp2p-kad-dht/kpeerset"
19
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
George Antoniadis's avatar
George Antoniadis committed
20 21
	kb "github.com/libp2p/go-libp2p-kbucket"
	record "github.com/libp2p/go-libp2p-record"
Adin Schmahmann's avatar
Adin Schmahmann committed
22
	"github.com/multiformats/go-multihash"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23 24
)

25 26 27 28 29 30
// asyncQueryBuffer is the size of buffered channels in async queries. This
// buffer allows multiple queries to execute simultaneously, return their
// results and continue querying closer peers. Note that different query
// results will wait for the channel to drain.
var asyncQueryBuffer = 10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31 32 33 34 35
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
36
// This is the top level "Store" operation of the DHT
37
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) {
38 39 40 41
	if !dht.enableValues {
		return routing.ErrNotSupported
	}

Matt Joiner's avatar
Matt Joiner committed
42
	eip := logger.EventBegin(ctx, "PutValue")
43 44 45 46 47 48 49
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Matt Joiner's avatar
Matt Joiner committed
50
	logger.Debugf("PutValue %s", key)
Jeromy's avatar
Jeromy committed
51

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

75
	rec := record.MakePutRecord(key, value)
76
	rec.TimeReceived = u.FormatRFC3339(time.Now())
Jeromy's avatar
Jeromy committed
77
	err = dht.putLocal(key, rec)
78 79 80 81
	if err != nil {
		return err
	}

82
	pchan, err := dht.GetClosestPeers(ctx, key)
83 84 85
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86

87 88 89 90
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
91 92
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
93
			defer wg.Done()
94 95
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.Value,
Jeromy's avatar
Jeromy committed
96 97 98
				ID:   p,
			})

99
			err := dht.putValueToPeer(ctx, p, rec)
100
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
101
				logger.Debugf("failed putting value to peer: %s", err)
102 103 104 105
			}
		}(p)
	}
	wg.Wait()
106

107
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
108 109
}

Steven Allen's avatar
Steven Allen committed
110 111 112 113 114 115
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
116
// GetValue searches for the value corresponding to given Key.
117
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) {
118 119 120 121
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

Matt Joiner's avatar
Matt Joiner committed
122
	eip := logger.EventBegin(ctx, "GetValue")
123 124 125 126 127 128 129
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Jeromy's avatar
Jeromy committed
130

131
	// apply defaultQuorum if relevant
132
	var cfg routing.Options
133 134 135 136 137
	if err := cfg.Apply(opts...); err != nil {
		return nil, err
	}
	opts = append(opts, Quorum(getQuorum(&cfg, defaultQuorum)))

138 139 140 141
	responses, err := dht.SearchValue(ctx, key, opts...)
	if err != nil {
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
142 143
	var best []byte

144 145
	for r := range responses {
		best = r
Łukasz Magiera's avatar
Łukasz Magiera committed
146 147
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
148 149 150 151
	if ctx.Err() != nil {
		return best, ctx.Err()
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
152 153 154
	if best == nil {
		return nil, routing.ErrNotFound
	}
Matt Joiner's avatar
Matt Joiner committed
155
	logger.Debugf("GetValue %v %v", key, best)
Łukasz Magiera's avatar
Łukasz Magiera committed
156 157 158
	return best, nil
}

159
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
160 161 162 163
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

164
	var cfg routing.Options
Steven Allen's avatar
Steven Allen committed
165
	if err := cfg.Apply(opts...); err != nil {
166
		return nil, err
Steven Allen's avatar
Steven Allen committed
167 168 169 170
	}

	responsesNeeded := 0
	if !cfg.Offline {
Adin Schmahmann's avatar
Adin Schmahmann committed
171
		responsesNeeded = getQuorum(&cfg, 0)
Steven Allen's avatar
Steven Allen committed
172 173
	}

174 175
	stopCh := make(chan struct{})
	valCh, queries := dht.getValues(ctx, key, stopCh)
176 177

	out := make(chan []byte)
Łukasz Magiera's avatar
Łukasz Magiera committed
178 179
	go func() {
		defer close(out)
180 181
		best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded)
		if best == nil || aborted {
182
			return
183
		}
184

185 186 187 188
		updatePeers := make([]peer.ID, 0, dht.bucketSize)
		select {
		case q := <-queries:
			if len(q) < 1 {
Łukasz Magiera's avatar
Łukasz Magiera committed
189 190
				return
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
191

192 193
			peers := q[0].globallyQueriedPeers.Peers()
			peers = kb.SortClosestPeers(peers, kb.ConvertKey(key))
194 195 196 197
			for i, p := range peers {
				if i == dht.bucketSize {
					break
				}
198 199
				if _, ok := peersWithBest[p]; !ok {
					updatePeers = append(updatePeers, p)
Adin Schmahmann's avatar
Adin Schmahmann committed
200 201
				}
			}
202 203 204
		case <-ctx.Done():
			return
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
205

206 207
		dht.updatePeerValues(dht.Context(), key, best, updatePeers)
	}()
Łukasz Magiera's avatar
Łukasz Magiera committed
208

209 210
	return out, nil
}
211

212 213
func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{},
	out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) {
214 215 216 217 218
	numResponses := 0
	return dht.processValues(ctx, key, valCh,
		func(ctx context.Context, v RecvdVal, better bool) bool {
			numResponses++
			if better {
Steven Allen's avatar
Steven Allen committed
219 220 221
				select {
				case out <- v.Val:
				case <-ctx.Done():
222
					return false
Steven Allen's avatar
Steven Allen committed
223
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
224
			}
225

226
			if nvals > 0 && numResponses > nvals {
227
				close(stopCh)
228 229 230 231
				return true
			}
			return false
		})
232 233
}

Steven Allen's avatar
Steven Allen committed
234
// GetValues gets nvals values corresponding to the given key.
235
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
236 237 238
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}
239

240
	eip := logger.EventBegin(ctx, "GetValues")
241 242 243
	eip.Append(loggableKey(key))
	defer eip.Done()

244
	queryCtx, cancel := context.WithCancel(ctx)
245
	valCh, _ := dht.getValues(queryCtx, key, nil)
246 247 248 249

	out := make([]RecvdVal, 0, nvals)
	for val := range valCh {
		out = append(out, val)
250 251 252
		if len(out) == nvals {
			cancel()
		}
253 254
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
255
	return out, ctx.Err()
256 257
}

258
func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal,
259
	newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) {
260 261 262
loop:
	for {
		if aborted {
263
			return
264
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
265

266 267 268 269
		select {
		case v, ok := <-vals:
			if !ok {
				break loop
270
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
271

272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
			// Select best value
			if best != nil {
				if bytes.Equal(best, v.Val) {
					peersWithBest[v.From] = struct{}{}
					aborted = newVal(ctx, v, false)
					continue
				}
				sel, err := dht.Validator.Select(key, [][]byte{best, v.Val})
				if err != nil {
					logger.Warning("Failed to select dht key: ", err)
					continue
				}
				if sel != 1 {
					aborted = newVal(ctx, v, false)
					continue
Adin Schmahmann's avatar
Adin Schmahmann committed
287
				}
288
			}
289 290 291 292 293 294 295 296
			peersWithBest = make(map[peer.ID]struct{})
			peersWithBest[v.From] = struct{}{}
			best = v.Val
			aborted = newVal(ctx, v, true)
		case <-ctx.Done():
			return
		}
	}
297

298 299
	return
}
Adin Schmahmann's avatar
Adin Schmahmann committed
300

301 302 303 304 305 306 307 308 309
func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte, peers []peer.ID) {
	fixupRec := record.MakePutRecord(key, val)
	for _, p := range peers {
		go func(p peer.ID) {
			//TODO: Is this possible?
			if p == dht.self {
				err := dht.putLocal(key, fixupRec)
				if err != nil {
					logger.Error("Error correcting local dht entry:", err)
Adin Schmahmann's avatar
Adin Schmahmann committed
310
				}
311 312 313 314 315 316 317
				return
			}
			ctx, cancel := context.WithTimeout(ctx, time.Second*30)
			defer cancel()
			err := dht.putValueToPeer(ctx, p, fixupRec)
			if err != nil {
				logger.Debug("Error correcting DHT entry: ", err)
318
			}
319 320 321 322
		}(p)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
323
func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan []*query) {
324
	valCh := make(chan RecvdVal, 1)
Adin Schmahmann's avatar
Adin Schmahmann committed
325
	queriesCh := make(chan []*query, 1)
326 327 328 329 330 331 332 333

	if rec, err := dht.getLocal(key); rec != nil && err == nil {
		select {
		case valCh <- RecvdVal{
			Val:  rec.GetValue(),
			From: dht.self,
		}:
		case <-ctx.Done():
334
		}
335
	}
336

Adin Schmahmann's avatar
Adin Schmahmann committed
337
	go func() {
338
		queries, _ := dht.runDisjointQueries(ctx, dht.d, key,
Adin Schmahmann's avatar
Adin Schmahmann committed
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
			func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type: routing.SendingQuery,
					ID:   p,
				})

				rec, peers, err := dht.getValueOrPeers(ctx, p, key)
				switch err {
				case routing.ErrNotFound:
					// in this case, they responded with nothing,
					// still send a notification so listeners can know the
					// request has completed 'successfully'
					routing.PublishQueryEvent(ctx, &routing.QueryEvent{
						Type: routing.PeerResponse,
						ID:   p,
					})
					return nil, err
				default:
					return nil, err
				case nil, errInvalidRecord:
					// in either of these cases, we want to keep going
				}
Jeromy's avatar
Jeromy committed
362

Adin Schmahmann's avatar
Adin Schmahmann committed
363 364 365 366 367 368 369
				// TODO: What should happen if the record is invalid?
				// Pre-existing code counted it towards the quorum, but should it?
				if rec != nil && rec.GetValue() != nil {
					rv := RecvdVal{
						Val:  rec.GetValue(),
						From: p,
					}
370

Adin Schmahmann's avatar
Adin Schmahmann committed
371
					select {
372 373 374
					case valCh <- rv:
					case <-ctx.Done():
						return nil, ctx.Err()
Adin Schmahmann's avatar
Adin Schmahmann committed
375 376
					}
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
377

Adin Schmahmann's avatar
Adin Schmahmann committed
378 379 380 381 382 383 384 385 386 387
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type:      routing.PeerResponse,
					ID:        p,
					Responses: peers,
				})

				return peers, err
			},
			func(peerset *kpeerset.SortedPeerset) bool {
388 389 390 391 392 393
				select {
				case <-stopQuery:
					return true
				default:
					return false
				}
Adin Schmahmann's avatar
Adin Schmahmann committed
394 395 396
			},
		)

397 398 399
		close(valCh)
		queriesCh <- queries
		close(queriesCh)
Adin Schmahmann's avatar
Adin Schmahmann committed
400 401 402 403 404 405 406 407

		shortcutTaken := false
		for _, q := range queries {
			if len(q.localPeers.KUnqueried()) > 0 {
				shortcutTaken = true
				break
			}
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
408

409
		if !shortcutTaken && ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
410
			kadID := kb.ConvertKey(key)
Aarsh Shah's avatar
Aarsh Shah committed
411
			// refresh the cpl for this key as the query was successful
Adin Schmahmann's avatar
Adin Schmahmann committed
412
			dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now())
Łukasz Magiera's avatar
Łukasz Magiera committed
413 414
		}
	}()
415

416
	return valCh, queriesCh
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
417 418
}

419 420 421
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
422

423
// Provide makes this node announce that it can provide a value for the given key
424
func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
425 426 427
	if !dht.enableProviders {
		return routing.ErrNotSupported
	}
428
	keyMH := key.Hash()
Adin Schmahmann's avatar
Adin Schmahmann committed
429
	eip := logger.EventBegin(ctx, "Provide", multihashLoggableKey(keyMH), logging.LoggableMap{"broadcast": brdcst})
430 431 432 433 434 435
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
436 437

	// add self locally
438
	dht.ProviderManager.AddProvider(ctx, keyMH, dht.self)
Jeromy's avatar
Jeromy committed
439 440 441
	if !brdcst {
		return nil
	}
442

443 444 445 446 447 448 449 450 451 452
	closerCtx := ctx
	if deadline, ok := ctx.Deadline(); ok {
		now := time.Now()
		timeout := deadline.Sub(now)

		if timeout < 0 {
			// timed out
			return context.DeadlineExceeded
		} else if timeout < 10*time.Second {
			// Reserve 10% for the final put.
453
			deadline = deadline.Add(-timeout / 10)
454 455 456 457 458 459 460 461 462 463
		} else {
			// Otherwise, reserve a second (we'll already be
			// connected so this should be fast).
			deadline = deadline.Add(-time.Second)
		}
		var cancel context.CancelFunc
		closerCtx, cancel = context.WithDeadline(ctx, deadline)
		defer cancel()
	}

464
	var exceededDeadline bool
465
	peers, err := dht.GetClosestPeers(closerCtx, string(keyMH))
466 467 468 469 470 471 472 473 474 475 476 477
	switch err {
	case context.DeadlineExceeded:
		// If the _inner_ deadline has been exceeded but the _outer_
		// context is still fine, provide the value to the closest peers
		// we managed to find, even if they're not the _actual_ closest peers.
		if ctx.Err() != nil {
			return ctx.Err()
		}
		exceededDeadline = true
	case nil:
	default:
	}
478 479
	if err != nil {
		return err
480 481
	}

482
	mes, err := dht.makeProvRecord(keyMH)
483 484 485 486
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
487
	wg := sync.WaitGroup{}
488
	for p := range peers {
Jeromy's avatar
Jeromy committed
489 490 491
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
492
			logger.Debugf("putProvider(%s, %s)", keyMH, p)
493
			err := dht.sendMessage(ctx, p, mes)
Jeromy's avatar
Jeromy committed
494
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
495
				logger.Debug(err)
Jeromy's avatar
Jeromy committed
496 497
			}
		}(p)
498
	}
Jeromy's avatar
Jeromy committed
499
	wg.Wait()
500 501 502 503
	if exceededDeadline {
		return context.DeadlineExceeded
	}
	return ctx.Err()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
504
}
505
func (dht *IpfsDHT) makeProvRecord(key []byte) (*pb.Message, error) {
506
	pi := peer.AddrInfo{
507 508 509 510 511 512 513 514 515 516
		ID:    dht.self,
		Addrs: dht.host.Addrs(),
	}

	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
		return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
	}

517
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, key, 0)
518
	pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]peer.AddrInfo{pi})
519 520
	return pmes, nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
521

Brian Tiger Chow's avatar
Brian Tiger Chow committed
522
// FindProviders searches until the context expires.
523
func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
524 525 526
	if !dht.enableProviders {
		return nil, routing.ErrNotSupported
	}
527
	var providers []peer.AddrInfo
528
	for p := range dht.FindProvidersAsync(ctx, c, dht.bucketSize) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
529 530 531 532 533
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
534 535 536
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
537
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
538
	peerOut := make(chan peer.AddrInfo, count)
539
	if !dht.enableProviders {
540 541
		close(peerOut)
		return peerOut
542 543
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
544 545
	keyMH := key.Hash()
	logger.Event(ctx, "findProviders", multihashLoggableKey(keyMH))
546

Adin Schmahmann's avatar
Adin Schmahmann committed
547
	go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut)
Jeromy's avatar
Jeromy committed
548 549 550
	return peerOut
}

Adin Schmahmann's avatar
Adin Schmahmann committed
551 552
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
	defer logger.EventBegin(ctx, "findProvidersAsync", multihashLoggableKey(key)).Done()
Jeromy's avatar
Jeromy committed
553 554
	defer close(peerOut)

555
	ps := peer.NewLimitedSet(count)
556
	provs := dht.ProviderManager.GetProviders(ctx, key)
Jeromy's avatar
Jeromy committed
557
	for _, p := range provs {
558
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
559
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
560
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
561
			select {
Jeromy's avatar
Jeromy committed
562
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
563 564 565
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
566
		}
Jeromy's avatar
Jeromy committed
567

568
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
569
		// TODO: is this a DOS vector?
Jeromy's avatar
Jeromy committed
570
		if ps.Size() >= count {
Jeromy's avatar
Jeromy committed
571 572 573 574
			return
		}
	}

575
	_, _ = dht.runDisjointQueries(ctx, dht.d, string(key),
Adin Schmahmann's avatar
Adin Schmahmann committed
576 577 578 579 580 581
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
Jeromy's avatar
Jeromy committed
582

Adin Schmahmann's avatar
Adin Schmahmann committed
583 584 585
			pmes, err := dht.findProvidersSingle(ctx, p, key)
			if err != nil {
				return nil, err
586
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609

			logger.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
			provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
			logger.Debugf("%d provider entries decoded", len(provs))

			// Add unique providers from request, up to 'count'
			for _, prov := range provs {
				if prov.ID != dht.self {
					dht.peerstore.AddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL)
				}
				logger.Debugf("got provider: %s", prov)
				if ps.TryAdd(prov.ID) {
					logger.Debugf("using provider: %s", prov)
					select {
					case peerOut <- *prov:
					case <-ctx.Done():
						logger.Debug("context timed out sending more providers")
						return nil, ctx.Err()
					}
				}
				if ps.Size() >= count {
					logger.Debugf("got enough providers (%d/%d)", ps.Size(), count)
					return nil, nil
Jeromy's avatar
Jeromy committed
610
				}
611
			}
612

Adin Schmahmann's avatar
Adin Schmahmann committed
613 614 615 616 617 618 619 620 621 622
			// Give closer peers back to the query to be queried
			closer := pmes.GetCloserPeers()
			peers := pb.PBPeersToPeerInfos(closer)
			logger.Debugf("got closer peers: %d %s", len(peers), peers)

			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
				Responses: peers,
			})
623

Adin Schmahmann's avatar
Adin Schmahmann committed
624 625 626
			return peers, nil
		},
		func(peerset *kpeerset.SortedPeerset) bool {
627
			return ps.Size() >= count
Adin Schmahmann's avatar
Adin Schmahmann committed
628 629
		},
	)
Jeromy's avatar
Jeromy committed
630

Adin Schmahmann's avatar
Adin Schmahmann committed
631 632 633
	if ctx.Err() == nil {
		// refresh the cpl for this key after the query is run
		dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(string(key)), time.Now())
Jeromy's avatar
Jeromy committed
634
	}
635 636
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
637
// FindPeer searches for a peer with given ID.
638
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) {
Matt Joiner's avatar
Matt Joiner committed
639
	eip := logger.EventBegin(ctx, "FindPeer", id)
640 641 642 643 644 645
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
646

647
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
648
	if pi := dht.FindLocal(id); pi.ID != "" {
649
		return pi, nil
650 651
	}

652
	queries, err := dht.runDisjointQueries(ctx, dht.d, string(id),
Adin Schmahmann's avatar
Adin Schmahmann committed
653 654 655 656 657 658
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
659

Adin Schmahmann's avatar
Adin Schmahmann committed
660 661 662 663
			pmes, err := dht.findPeerSingle(ctx, p, id)
			if err != nil {
				logger.Debugf("error getting closer peers: %s", err)
				return nil, err
664
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
665
			peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
666

Adin Schmahmann's avatar
Adin Schmahmann committed
667 668 669 670 671 672
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
				Responses: peers,
			})
673

Adin Schmahmann's avatar
Adin Schmahmann committed
674 675 676 677 678 679 680
			return peers, err
		},
		func(peerset *kpeerset.SortedPeerset) bool {
			return dht.host.Network().Connectedness(id) == network.Connected
		},
	)

681 682 683 684
	if err != nil {
		return peer.AddrInfo{}, err
	}

685
	// TODO: Consider unlucky disconnect timing and potentially utilizing network.CanConnect or something similar
Adin Schmahmann's avatar
Adin Schmahmann committed
686 687 688 689 690 691
	if dht.host.Network().Connectedness(id) == network.Connected {
		shortcutTaken := false
		for _, q := range queries {
			if len(q.localPeers.KUnqueried()) > 0 {
				shortcutTaken = true
				break
692 693 694
			}
		}

Adin Schmahmann's avatar
Adin Schmahmann committed
695 696 697 698
		if !shortcutTaken {
			kadID := kb.ConvertPeerID(id)
			// refresh the cpl for this key as the query was successful
			dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now())
699 700
		}

Adin Schmahmann's avatar
Adin Schmahmann committed
701 702
		return dht.peerstore.PeerInfo(id), nil
	} else {
703
		if ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
704 705 706 707
			kadID := kb.ConvertPeerID(id)
			// refresh the cpl for this key as the query was successful
			dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now())
		}
708

Adin Schmahmann's avatar
Adin Schmahmann committed
709 710
		return peer.AddrInfo{}, routing.ErrNotFound
	}
711
}