routing.go 17.1 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
Jeromy's avatar
Jeromy committed
7
	"sync"
Jeromy's avatar
Jeromy committed
8
	"time"
9

Adin Schmahmann's avatar
Adin Schmahmann committed
10
	"github.com/libp2p/go-libp2p-core/network"
11 12 13 14
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/routing"

15
	"github.com/ipfs/go-cid"
16
	u "github.com/ipfs/go-ipfs-util"
17
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Adin Schmahmann's avatar
Adin Schmahmann committed
18
	"github.com/libp2p/go-libp2p-kad-dht/qpeerset"
George Antoniadis's avatar
George Antoniadis committed
19 20
	kb "github.com/libp2p/go-libp2p-kbucket"
	record "github.com/libp2p/go-libp2p-record"
Adin Schmahmann's avatar
Adin Schmahmann committed
21
	"github.com/multiformats/go-multihash"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22 23 24 25 26 27 28
)

// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
29
// This is the top level "Store" operation of the DHT
30
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) {
31 32 33 34
	if !dht.enableValues {
		return routing.ErrNotSupported
	}

Steven Allen's avatar
Steven Allen committed
35
	logger.Debugw("putting value", "key", loggableKeyString(key))
Jeromy's avatar
Jeromy committed
36

37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

60
	rec := record.MakePutRecord(key, value)
61
	rec.TimeReceived = u.FormatRFC3339(time.Now())
Jeromy's avatar
Jeromy committed
62
	err = dht.putLocal(key, rec)
63 64 65 66
	if err != nil {
		return err
	}

67
	pchan, err := dht.GetClosestPeers(ctx, key)
68 69 70
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71

72 73 74 75
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
76 77
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
78
			defer wg.Done()
79 80
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.Value,
Jeromy's avatar
Jeromy committed
81 82 83
				ID:   p,
			})

84
			err := dht.putValueToPeer(ctx, p, rec)
85
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
86
				logger.Debugf("failed putting value to peer: %s", err)
87 88 89 90
			}
		}(p)
	}
	wg.Wait()
91

92
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
93 94
}

Steven Allen's avatar
Steven Allen committed
95 96 97 98 99 100
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
101
// GetValue searches for the value corresponding to given Key.
102
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) {
103 104 105 106
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

107
	// apply defaultQuorum if relevant
108
	var cfg routing.Options
109 110 111 112 113
	if err := cfg.Apply(opts...); err != nil {
		return nil, err
	}
	opts = append(opts, Quorum(getQuorum(&cfg, defaultQuorum)))

114 115 116 117
	responses, err := dht.SearchValue(ctx, key, opts...)
	if err != nil {
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
118 119
	var best []byte

120 121
	for r := range responses {
		best = r
Łukasz Magiera's avatar
Łukasz Magiera committed
122 123
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
124 125 126 127
	if ctx.Err() != nil {
		return best, ctx.Err()
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
128 129 130
	if best == nil {
		return nil, routing.ErrNotFound
	}
Matt Joiner's avatar
Matt Joiner committed
131
	logger.Debugf("GetValue %v %v", key, best)
Łukasz Magiera's avatar
Łukasz Magiera committed
132 133 134
	return best, nil
}

Alan Shaw's avatar
Alan Shaw committed
135
// SearchValue searches for the value corresponding to given Key and streams the results.
136
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
137 138 139 140
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

141
	var cfg routing.Options
Steven Allen's avatar
Steven Allen committed
142
	if err := cfg.Apply(opts...); err != nil {
143
		return nil, err
Steven Allen's avatar
Steven Allen committed
144 145 146 147
	}

	responsesNeeded := 0
	if !cfg.Offline {
148
		responsesNeeded = getQuorum(&cfg, defaultQuorum)
Steven Allen's avatar
Steven Allen committed
149 150
	}

151
	stopCh := make(chan struct{})
Adin Schmahmann's avatar
Adin Schmahmann committed
152
	valCh, lookupRes := dht.getValues(ctx, key, stopCh)
153 154

	out := make(chan []byte)
Łukasz Magiera's avatar
Łukasz Magiera committed
155 156
	go func() {
		defer close(out)
157 158
		best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded)
		if best == nil || aborted {
159
			return
160
		}
161

162 163
		updatePeers := make([]peer.ID, 0, dht.bucketSize)
		select {
Adin Schmahmann's avatar
Adin Schmahmann committed
164 165
		case l := <-lookupRes:
			if l == nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
166 167
				return
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
168

Adin Schmahmann's avatar
Adin Schmahmann committed
169
			for _, p := range l.peers {
170 171
				if _, ok := peersWithBest[p]; !ok {
					updatePeers = append(updatePeers, p)
Adin Schmahmann's avatar
Adin Schmahmann committed
172 173
				}
			}
174 175 176
		case <-ctx.Done():
			return
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
177

178 179
		dht.updatePeerValues(dht.Context(), key, best, updatePeers)
	}()
Łukasz Magiera's avatar
Łukasz Magiera committed
180

181 182
	return out, nil
}
183

184 185
func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{},
	out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) {
186 187 188 189 190
	numResponses := 0
	return dht.processValues(ctx, key, valCh,
		func(ctx context.Context, v RecvdVal, better bool) bool {
			numResponses++
			if better {
Steven Allen's avatar
Steven Allen committed
191 192 193
				select {
				case out <- v.Val:
				case <-ctx.Done():
194
					return false
Steven Allen's avatar
Steven Allen committed
195
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
196
			}
197

198
			if nvals > 0 && numResponses > nvals {
199
				close(stopCh)
200 201 202 203
				return true
			}
			return false
		})
204 205
}

Steven Allen's avatar
Steven Allen committed
206
// GetValues gets nvals values corresponding to the given key.
207
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
208 209 210
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}
211

212
	queryCtx, cancel := context.WithCancel(ctx)
213
	defer cancel()
214
	valCh, _ := dht.getValues(queryCtx, key, nil)
215 216 217 218

	out := make([]RecvdVal, 0, nvals)
	for val := range valCh {
		out = append(out, val)
219 220 221
		if len(out) == nvals {
			cancel()
		}
222 223
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
224
	return out, ctx.Err()
225 226
}

227
func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal,
228
	newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) {
229 230 231
loop:
	for {
		if aborted {
232
			return
233
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
234

235 236 237 238
		select {
		case v, ok := <-vals:
			if !ok {
				break loop
239
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
240

241 242 243 244 245 246 247 248 249
			// Select best value
			if best != nil {
				if bytes.Equal(best, v.Val) {
					peersWithBest[v.From] = struct{}{}
					aborted = newVal(ctx, v, false)
					continue
				}
				sel, err := dht.Validator.Select(key, [][]byte{best, v.Val})
				if err != nil {
Steven Allen's avatar
Steven Allen committed
250
					logger.Warnw("failed to select best value", "key", key, "error", err)
251 252 253 254 255
					continue
				}
				if sel != 1 {
					aborted = newVal(ctx, v, false)
					continue
Adin Schmahmann's avatar
Adin Schmahmann committed
256
				}
257
			}
258 259 260 261 262 263 264 265
			peersWithBest = make(map[peer.ID]struct{})
			peersWithBest[v.From] = struct{}{}
			best = v.Val
			aborted = newVal(ctx, v, true)
		case <-ctx.Done():
			return
		}
	}
266

267 268
	return
}
Adin Schmahmann's avatar
Adin Schmahmann committed
269

270 271 272 273 274 275 276 277 278
func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte, peers []peer.ID) {
	fixupRec := record.MakePutRecord(key, val)
	for _, p := range peers {
		go func(p peer.ID) {
			//TODO: Is this possible?
			if p == dht.self {
				err := dht.putLocal(key, fixupRec)
				if err != nil {
					logger.Error("Error correcting local dht entry:", err)
Adin Schmahmann's avatar
Adin Schmahmann committed
279
				}
280 281 282 283 284 285 286
				return
			}
			ctx, cancel := context.WithTimeout(ctx, time.Second*30)
			defer cancel()
			err := dht.putValueToPeer(ctx, p, fixupRec)
			if err != nil {
				logger.Debug("Error correcting DHT entry: ", err)
287
			}
288 289 290 291
		}(p)
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
292
func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
293
	valCh := make(chan RecvdVal, 1)
Adin Schmahmann's avatar
Adin Schmahmann committed
294
	lookupResCh := make(chan *lookupWithFollowupResult, 1)
295

Steven Allen's avatar
Steven Allen committed
296 297
	logger.Debugw("finding value", "key", loggableKeyString(key))

298 299 300 301 302 303 304
	if rec, err := dht.getLocal(key); rec != nil && err == nil {
		select {
		case valCh <- RecvdVal{
			Val:  rec.GetValue(),
			From: dht.self,
		}:
		case <-ctx.Done():
305
		}
306
	}
307

Adin Schmahmann's avatar
Adin Schmahmann committed
308
	go func() {
309
		defer close(valCh)
Adin Schmahmann's avatar
Adin Schmahmann committed
310
		defer close(lookupResCh)
Aarsh Shah's avatar
Aarsh Shah committed
311
		lookupRes, err := dht.runLookupWithFollowup(ctx, key,
Adin Schmahmann's avatar
Adin Schmahmann committed
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
			func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type: routing.SendingQuery,
					ID:   p,
				})

				rec, peers, err := dht.getValueOrPeers(ctx, p, key)
				switch err {
				case routing.ErrNotFound:
					// in this case, they responded with nothing,
					// still send a notification so listeners can know the
					// request has completed 'successfully'
					routing.PublishQueryEvent(ctx, &routing.QueryEvent{
						Type: routing.PeerResponse,
						ID:   p,
					})
					return nil, err
				default:
					return nil, err
				case nil, errInvalidRecord:
					// in either of these cases, we want to keep going
				}
Jeromy's avatar
Jeromy committed
335

Adin Schmahmann's avatar
Adin Schmahmann committed
336 337 338 339 340 341 342
				// TODO: What should happen if the record is invalid?
				// Pre-existing code counted it towards the quorum, but should it?
				if rec != nil && rec.GetValue() != nil {
					rv := RecvdVal{
						Val:  rec.GetValue(),
						From: p,
					}
343

Adin Schmahmann's avatar
Adin Schmahmann committed
344
					select {
345 346 347
					case valCh <- rv:
					case <-ctx.Done():
						return nil, ctx.Err()
Adin Schmahmann's avatar
Adin Schmahmann committed
348 349
					}
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
350

Adin Schmahmann's avatar
Adin Schmahmann committed
351 352 353 354 355 356 357 358 359
				// For DHT query command
				routing.PublishQueryEvent(ctx, &routing.QueryEvent{
					Type:      routing.PeerResponse,
					ID:        p,
					Responses: peers,
				})

				return peers, err
			},
Adin Schmahmann's avatar
Adin Schmahmann committed
360
			func() bool {
361 362 363 364 365 366
				select {
				case <-stopQuery:
					return true
				default:
					return false
				}
Adin Schmahmann's avatar
Adin Schmahmann committed
367 368 369
			},
		)

370 371
		if err != nil {
			return
Adin Schmahmann's avatar
Adin Schmahmann committed
372
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
373
		lookupResCh <- lookupRes
Łukasz Magiera's avatar
Łukasz Magiera committed
374

375
		if ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
376
			dht.refreshRTIfNoShortcut(kb.ConvertKey(key), lookupRes)
Łukasz Magiera's avatar
Łukasz Magiera committed
377 378
		}
	}()
379

Adin Schmahmann's avatar
Adin Schmahmann committed
380
	return valCh, lookupResCh
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
381 382
}

Adin Schmahmann's avatar
Adin Schmahmann committed
383 384
func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollowupResult) {
	if lookupRes.completed {
385 386 387 388 389
		// refresh the cpl for this key as the query was successful
		dht.routingTable.ResetCplRefreshedAtForID(key, time.Now())
	}
}

390 391 392
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
393

394
// Provide makes this node announce that it can provide a value for the given key
395
func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
396 397
	if !dht.enableProviders {
		return routing.ErrNotSupported
Steven Allen's avatar
Steven Allen committed
398 399
	} else if !key.Defined() {
		return fmt.Errorf("invalid cid: undefined")
400
	}
Steven Allen's avatar
Steven Allen committed
401 402
	logger.Debugw("finding provider", "cid", key)

403
	keyMH := key.Hash()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
404 405

	// add self locally
406
	dht.ProviderManager.AddProvider(ctx, keyMH, dht.self)
Jeromy's avatar
Jeromy committed
407 408 409
	if !brdcst {
		return nil
	}
410

411 412 413 414 415 416 417 418 419 420
	closerCtx := ctx
	if deadline, ok := ctx.Deadline(); ok {
		now := time.Now()
		timeout := deadline.Sub(now)

		if timeout < 0 {
			// timed out
			return context.DeadlineExceeded
		} else if timeout < 10*time.Second {
			// Reserve 10% for the final put.
421
			deadline = deadline.Add(-timeout / 10)
422 423 424 425 426 427 428 429 430 431
		} else {
			// Otherwise, reserve a second (we'll already be
			// connected so this should be fast).
			deadline = deadline.Add(-time.Second)
		}
		var cancel context.CancelFunc
		closerCtx, cancel = context.WithDeadline(ctx, deadline)
		defer cancel()
	}

432
	var exceededDeadline bool
433
	peers, err := dht.GetClosestPeers(closerCtx, string(keyMH))
434 435 436 437 438 439 440 441 442 443 444
	switch err {
	case context.DeadlineExceeded:
		// If the _inner_ deadline has been exceeded but the _outer_
		// context is still fine, provide the value to the closest peers
		// we managed to find, even if they're not the _actual_ closest peers.
		if ctx.Err() != nil {
			return ctx.Err()
		}
		exceededDeadline = true
	case nil:
	default:
445
		return err
446 447
	}

448
	mes, err := dht.makeProvRecord(keyMH)
449 450 451 452
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
453
	wg := sync.WaitGroup{}
454
	for p := range peers {
Jeromy's avatar
Jeromy committed
455 456 457
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
458
			logger.Debugf("putProvider(%s, %s)", keyMH, p)
459
			err := dht.sendMessage(ctx, p, mes)
Jeromy's avatar
Jeromy committed
460
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
461
				logger.Debug(err)
Jeromy's avatar
Jeromy committed
462 463
			}
		}(p)
464
	}
Jeromy's avatar
Jeromy committed
465
	wg.Wait()
466 467 468 469
	if exceededDeadline {
		return context.DeadlineExceeded
	}
	return ctx.Err()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
470
}
471
func (dht *IpfsDHT) makeProvRecord(key []byte) (*pb.Message, error) {
472
	pi := peer.AddrInfo{
473 474 475 476 477 478 479
		ID:    dht.self,
		Addrs: dht.host.Addrs(),
	}

	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
Alan Shaw's avatar
Alan Shaw committed
480
		return nil, fmt.Errorf("no known addresses for self, cannot put provider")
481 482
	}

483
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, key, 0)
484
	pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]peer.AddrInfo{pi})
485 486
	return pmes, nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
487

Brian Tiger Chow's avatar
Brian Tiger Chow committed
488
// FindProviders searches until the context expires.
489
func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
490 491
	if !dht.enableProviders {
		return nil, routing.ErrNotSupported
Steven Allen's avatar
Steven Allen committed
492 493
	} else if !c.Defined() {
		return nil, fmt.Errorf("invalid cid: undefined")
494
	}
Steven Allen's avatar
Steven Allen committed
495

496
	var providers []peer.AddrInfo
497
	for p := range dht.FindProvidersAsync(ctx, c, dht.bucketSize) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
498 499 500 501 502
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
503 504
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
505 506 507
// the search query completes. If count is zero then the query will run until it
// completes. Note: not reading from the returned channel may block the query
// from progressing.
508
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
Steven Allen's avatar
Steven Allen committed
509
	if !dht.enableProviders || !key.Defined() {
510
		peerOut := make(chan peer.AddrInfo)
511 512
		close(peerOut)
		return peerOut
513 514
	}

515 516 517 518 519 520
	chSize := count
	if count == 0 {
		chSize = 1
	}
	peerOut := make(chan peer.AddrInfo, chSize)

Adin Schmahmann's avatar
Adin Schmahmann committed
521
	keyMH := key.Hash()
522

Adin Schmahmann's avatar
Adin Schmahmann committed
523
	go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut)
Jeromy's avatar
Jeromy committed
524 525 526
	return peerOut
}

Adin Schmahmann's avatar
Adin Schmahmann committed
527
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
Steven Allen's avatar
Steven Allen committed
528 529
	logger.Debugw("finding providers", "key", key)

Jeromy's avatar
Jeromy committed
530 531
	defer close(peerOut)

532 533 534 535 536 537 538 539
	findAll := count == 0
	var ps *peer.Set
	if findAll {
		ps = peer.NewSet()
	} else {
		ps = peer.NewLimitedSet(count)
	}

540
	provs := dht.ProviderManager.GetProviders(ctx, key)
Jeromy's avatar
Jeromy committed
541
	for _, p := range provs {
542
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
543
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
544
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
545
			select {
Jeromy's avatar
Jeromy committed
546
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
547 548 549
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
550
		}
Jeromy's avatar
Jeromy committed
551

552
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
553
		// TODO: is this a DOS vector?
554
		if !findAll && ps.Size() >= count {
Jeromy's avatar
Jeromy committed
555 556 557 558
			return
		}
	}

Aarsh Shah's avatar
Aarsh Shah committed
559
	lookupRes, err := dht.runLookupWithFollowup(ctx, string(key),
Adin Schmahmann's avatar
Adin Schmahmann committed
560 561 562 563 564 565
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
Jeromy's avatar
Jeromy committed
566

Adin Schmahmann's avatar
Adin Schmahmann committed
567 568 569
			pmes, err := dht.findProvidersSingle(ctx, p, key)
			if err != nil {
				return nil, err
570
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
571 572 573 574 575 576 577

			logger.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
			provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
			logger.Debugf("%d provider entries decoded", len(provs))

			// Add unique providers from request, up to 'count'
			for _, prov := range provs {
578
				dht.maybeAddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL)
Adin Schmahmann's avatar
Adin Schmahmann committed
579 580 581 582 583 584 585 586 587 588
				logger.Debugf("got provider: %s", prov)
				if ps.TryAdd(prov.ID) {
					logger.Debugf("using provider: %s", prov)
					select {
					case peerOut <- *prov:
					case <-ctx.Done():
						logger.Debug("context timed out sending more providers")
						return nil, ctx.Err()
					}
				}
589
				if !findAll && ps.Size() >= count {
Adin Schmahmann's avatar
Adin Schmahmann committed
590 591
					logger.Debugf("got enough providers (%d/%d)", ps.Size(), count)
					return nil, nil
Jeromy's avatar
Jeromy committed
592
				}
593
			}
594

Adin Schmahmann's avatar
Adin Schmahmann committed
595 596 597 598 599 600 601 602 603 604
			// Give closer peers back to the query to be queried
			closer := pmes.GetCloserPeers()
			peers := pb.PBPeersToPeerInfos(closer)
			logger.Debugf("got closer peers: %d %s", len(peers), peers)

			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
				Responses: peers,
			})
605

Adin Schmahmann's avatar
Adin Schmahmann committed
606 607
			return peers, nil
		},
Adin Schmahmann's avatar
Adin Schmahmann committed
608
		func() bool {
609
			return !findAll && ps.Size() >= count
Adin Schmahmann's avatar
Adin Schmahmann committed
610 611
		},
	)
Jeromy's avatar
Jeromy committed
612

613
	if err == nil && ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
614
		dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes)
Jeromy's avatar
Jeromy committed
615
	}
616 617
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
618
// FindPeer searches for a peer with given ID.
619
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) {
Steven Allen's avatar
Steven Allen committed
620 621 622 623
	if err := id.Validate(); err != nil {
		return peer.AddrInfo{}, err
	}

Steven Allen's avatar
Steven Allen committed
624
	logger.Debugw("finding peer", "peer", id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
625

626
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
627
	if pi := dht.FindLocal(id); pi.ID != "" {
628
		return pi, nil
629 630
	}

Aarsh Shah's avatar
Aarsh Shah committed
631
	lookupRes, err := dht.runLookupWithFollowup(ctx, string(id),
Adin Schmahmann's avatar
Adin Schmahmann committed
632 633 634 635 636 637
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})
638

Adin Schmahmann's avatar
Adin Schmahmann committed
639 640 641 642
			pmes, err := dht.findPeerSingle(ctx, p, id)
			if err != nil {
				logger.Debugf("error getting closer peers: %s", err)
				return nil, err
643
			}
Adin Schmahmann's avatar
Adin Schmahmann committed
644
			peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
645

Adin Schmahmann's avatar
Adin Schmahmann committed
646 647 648 649 650 651
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
				Responses: peers,
			})
652

Adin Schmahmann's avatar
Adin Schmahmann committed
653 654
			return peers, err
		},
Adin Schmahmann's avatar
Adin Schmahmann committed
655
		func() bool {
Adin Schmahmann's avatar
Adin Schmahmann committed
656 657 658 659
			return dht.host.Network().Connectedness(id) == network.Connected
		},
	)

660 661 662 663
	if err != nil {
		return peer.AddrInfo{}, err
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
664 665 666 667 668 669
	dialedPeerDuringQuery := false
	for i, p := range lookupRes.peers {
		if p == id {
			// Note: we consider PeerUnreachable to be a valid state because the peer may not support the DHT protocol
			// and therefore the peer would fail the query. The fact that a peer that is returned can be a non-DHT
			// server peer and is not identified as such is a bug.
670
			dialedPeerDuringQuery = (lookupRes.state[i] == qpeerset.PeerQueried || lookupRes.state[i] == qpeerset.PeerUnreachable || lookupRes.state[i] == qpeerset.PeerWaiting)
Adin Schmahmann's avatar
Adin Schmahmann committed
671 672
			break
		}
673 674
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
675 676 677 678
	// Return peer information if we tried to dial the peer during the query or we are (or recently were) connected
	// to the peer.
	connectedness := dht.host.Network().Connectedness(id)
	if dialedPeerDuringQuery || connectedness == network.Connected || connectedness == network.CanConnect {
Adin Schmahmann's avatar
Adin Schmahmann committed
679 680
		return dht.peerstore.PeerInfo(id), nil
	}
681 682

	return peer.AddrInfo{}, routing.ErrNotFound
683
}