routing.go 16.5 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
7
	"runtime"
Jeromy's avatar
Jeromy committed
8
	"sync"
Jeromy's avatar
Jeromy committed
9
	"time"
10

11
	cid "github.com/ipfs/go-cid"
12
	u "github.com/ipfs/go-ipfs-util"
Jeromy's avatar
Jeromy committed
13
	logging "github.com/ipfs/go-log"
George Antoniadis's avatar
George Antoniadis committed
14 15
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
	kb "github.com/libp2p/go-libp2p-kbucket"
16 17 18 19
	inet "github.com/libp2p/go-libp2p-net"
	peer "github.com/libp2p/go-libp2p-peer"
	pset "github.com/libp2p/go-libp2p-peer/peerset"
	pstore "github.com/libp2p/go-libp2p-peerstore"
George Antoniadis's avatar
George Antoniadis committed
20 21 22
	record "github.com/libp2p/go-libp2p-record"
	routing "github.com/libp2p/go-libp2p-routing"
	notif "github.com/libp2p/go-libp2p-routing/notifications"
Steven Allen's avatar
Steven Allen committed
23
	ropts "github.com/libp2p/go-libp2p-routing/options"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24 25
)

26 27 28 29 30 31
// asyncQueryBuffer is the size of buffered channels in async queries. This
// buffer allows multiple queries to execute simultaneously, return their
// results and continue querying closer peers. Note that different query
// results will wait for the channel to drain.
var asyncQueryBuffer = 10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32 33 34 35 36
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
37
// This is the top level "Store" operation of the DHT
Steven Allen's avatar
Steven Allen committed
38
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...ropts.Option) (err error) {
39 40 41 42 43 44 45 46
	eip := log.EventBegin(ctx, "PutValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47
	log.Debugf("PutValue %s", key)
Jeromy's avatar
Jeromy committed
48

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

72
	rec := record.MakePutRecord(key, value)
73
	rec.TimeReceived = u.FormatRFC3339(time.Now())
Jeromy's avatar
Jeromy committed
74
	err = dht.putLocal(key, rec)
75 76 77 78
	if err != nil {
		return err
	}

79
	pchan, err := dht.GetClosestPeers(ctx, key)
80 81 82
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
83

84 85 86 87
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
88 89
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
90
			defer wg.Done()
Jeromy's avatar
Jeromy committed
91 92 93 94 95
			notif.PublishQueryEvent(ctx, &notif.QueryEvent{
				Type: notif.Value,
				ID:   p,
			})

96
			err := dht.putValueToPeer(ctx, p, rec)
97
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
98
				log.Debugf("failed putting value to peer: %s", err)
99 100 101 102 103
			}
		}(p)
	}
	wg.Wait()
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104 105
}

Steven Allen's avatar
Steven Allen committed
106 107 108 109 110 111
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112
// GetValue searches for the value corresponding to given Key.
Steven Allen's avatar
Steven Allen committed
113
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Option) (_ []byte, err error) {
114 115 116 117 118 119 120 121
	eip := log.EventBegin(ctx, "GetValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Jeromy's avatar
Jeromy committed
122

123 124 125 126
	responses, err := dht.SearchValue(ctx, key, opts...)
	if err != nil {
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
127 128
	var best []byte

129 130
	for r := range responses {
		best = r
Łukasz Magiera's avatar
Łukasz Magiera committed
131 132 133 134 135 136 137 138 139
	}

	if best == nil {
		return nil, routing.ErrNotFound
	}
	log.Debugf("GetValue %v %v", key, best)
	return best, nil
}

140
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
Steven Allen's avatar
Steven Allen committed
141 142
	var cfg ropts.Options
	if err := cfg.Apply(opts...); err != nil {
143
		return nil, err
Steven Allen's avatar
Steven Allen committed
144 145 146 147 148 149 150
	}

	responsesNeeded := 0
	if !cfg.Offline {
		responsesNeeded = getQuorum(&cfg)
	}

151 152 153 154 155 156
	valCh, err := dht.getValues(ctx, key, responsesNeeded)
	if err != nil {
		return nil, err
	}

	out := make(chan []byte)
Łukasz Magiera's avatar
Łukasz Magiera committed
157 158
	go func() {
		defer close(out)
159

Łukasz Magiera's avatar
Łukasz Magiera committed
160 161
		vals := make([]RecvdVal, 0, responsesNeeded)
		best := -1
162

Łukasz Magiera's avatar
Łukasz Magiera committed
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
		defer func() {
			if len(vals) <= 1 || best < 0 {
				return
			}
			fixupRec := record.MakePutRecord(key, vals[best].Val)
			for _, v := range vals {
				// if someone sent us a different 'less-valid' record, lets correct them
				if !bytes.Equal(v.Val, vals[best].Val) {
					go func(v RecvdVal) {
						if v.From == dht.self {
							err := dht.putLocal(key, fixupRec)
							if err != nil {
								log.Error("Error correcting local dht entry:", err)
							}
							return
						}
						ctx, cancel := context.WithTimeout(dht.Context(), time.Second*30)
						defer cancel()
						err := dht.putValueToPeer(ctx, v.From, fixupRec)
						if err != nil {
							log.Debug("Error correcting DHT entry: ", err)
						}
					}(v)
				}
			}
		}()
189

Łukasz Magiera's avatar
Łukasz Magiera committed
190 191 192 193
		for {
			select {
			case v, ok := <-valCh:
				if !ok {
194
					return
Łukasz Magiera's avatar
Łukasz Magiera committed
195 196 197
				}

				vals = append(vals, v)
198

Łukasz Magiera's avatar
Łukasz Magiera committed
199 200 201 202 203 204
				if v.Val == nil {
					continue
				}
				// Select best value
				if best > -1 {
					i, err := dht.Validator.Select(key, [][]byte{vals[best].Val, v.Val})
205
					if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
206 207 208 209 210 211 212 213 214 215 216
						continue //TODO: Do we want to do something with the error here?
					}
					if i != best && !bytes.Equal(v.Val, vals[best].Val) {
						best = i
						out <- v.Val
					}
				} else {
					// Output first valid value
					if err := dht.Validator.Validate(key, v.Val); err == nil {
						best = len(vals) - 1
						out <- v.Val
217 218
					}
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
219 220 221
			case <-ctx.Done():
				return
			}
222
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
223
	}()
224

225
	return out, nil
226 227
}

Steven Allen's avatar
Steven Allen committed
228
// GetValues gets nvals values corresponding to the given key.
229
func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-chan RecvdVal, error) {
230
	eip := log.EventBegin(ctx, "GetValues")
Łukasz Magiera's avatar
Łukasz Magiera committed
231

232
	vals := make(chan RecvdVal, 1)
Łukasz Magiera's avatar
Łukasz Magiera committed
233

234
	done := func(err error) (<-chan RecvdVal, error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
235 236
		defer close(vals)

237 238 239 240 241
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
242
		return vals, err
Łukasz Magiera's avatar
Łukasz Magiera committed
243
	}
244

245
	// If we have it local, don't bother doing an RPC!
246
	lrec, err := dht.getLocal(key)
247 248
	if err != nil {
		// something is wrong with the datastore.
Łukasz Magiera's avatar
Łukasz Magiera committed
249
		return done(err)
250 251
	}
	if lrec != nil {
252
		// TODO: this is tricky, we don't always want to trust our own value
253
		// what if the authoritative source updated it?
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
254
		log.Debug("have it locally")
Łukasz Magiera's avatar
Łukasz Magiera committed
255
		vals <- RecvdVal{
256 257
			Val:  lrec.GetValue(),
			From: dht.self,
Łukasz Magiera's avatar
Łukasz Magiera committed
258
		}
259 260

		if nvals <= 1 {
Łukasz Magiera's avatar
Łukasz Magiera committed
261
			return done(nil)
262
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
263 264

		nvals--
265
	} else if nvals == 0 {
Łukasz Magiera's avatar
Łukasz Magiera committed
266
		return done(routing.ErrNotFound)
Jeromy's avatar
Jeromy committed
267 268
	}

269
	// get closest peers in the routing table
Jeromy's avatar
Jeromy committed
270
	rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
271
	log.Debugf("peers in rt: %d %s", len(rtp), rtp)
272
	if len(rtp) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
273
		log.Warning("No peers from routing table!")
Łukasz Magiera's avatar
Łukasz Magiera committed
274
		return done(kb.ErrLookupFailure)
275 276
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
277 278 279
	var valslock sync.Mutex
	var got int

280
	// setup the Query
Jeromy's avatar
Jeromy committed
281
	parent := ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
282
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
283
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
284 285 286 287
			Type: notif.SendingQuery,
			ID:   p,
		})

288
		rec, peers, err := dht.getValueOrPeers(ctx, p, key)
289 290 291 292 293 294 295 296 297
		switch err {
		case routing.ErrNotFound:
			// in this case, they responded with nothing,
			// still send a notification so listeners can know the
			// request has completed 'successfully'
			notif.PublishQueryEvent(parent, &notif.QueryEvent{
				Type: notif.PeerResponse,
				ID:   p,
			})
298
			return nil, err
299 300 301 302 303
		default:
			return nil, err

		case nil, errInvalidRecord:
			// in either of these cases, we want to keep going
304
		}
305

306 307
		res := &dhtQueryResult{closerPeers: peers}

308
		if rec.GetValue() != nil || err == errInvalidRecord {
Steven Allen's avatar
Steven Allen committed
309
			rv := RecvdVal{
310 311 312 313
				Val:  rec.GetValue(),
				From: p,
			}
			valslock.Lock()
Łukasz Magiera's avatar
Łukasz Magiera committed
314 315
			vals <- rv
			got++
316

317
			// If we have collected enough records, we're done
Łukasz Magiera's avatar
Łukasz Magiera committed
318
			if nvals == got {
319 320 321
				res.success = true
			}
			valslock.Unlock()
322 323
		}

Jeromy's avatar
Jeromy committed
324
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
325 326
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
327
			Responses: peers,
Jeromy's avatar
Jeromy committed
328 329
		})

330 331
		return res, nil
	})
332

Łukasz Magiera's avatar
Łukasz Magiera committed
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347
	go func() {
		reqCtx, cancel := context.WithTimeout(ctx, time.Minute)
		defer cancel()

		_, err = query.Run(reqCtx, rtp)

		// We do have some values but we either ran out of peers to query or
		// searched for a whole minute.
		//
		// We'll just call this a success.
		if got > 0 && (err == routing.ErrNotFound || reqCtx.Err() == context.DeadlineExceeded) {
			err = nil
		}
		done(err)
	}()
348

349
	return vals, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
350 351
}

352 353 354
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
355

356
// Provide makes this node announce that it can provide a value for the given key
357 358 359 360 361 362 363 364
func (dht *IpfsDHT) Provide(ctx context.Context, key *cid.Cid, brdcst bool) (err error) {
	eip := log.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst})
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
365 366

	// add self locally
367
	dht.providers.AddProvider(ctx, key, dht.self)
Jeromy's avatar
Jeromy committed
368 369 370
	if !brdcst {
		return nil
	}
371

372
	peers, err := dht.GetClosestPeers(ctx, key.KeyString())
373 374
	if err != nil {
		return err
375 376
	}

377 378 379 380 381
	mes, err := dht.makeProvRecord(key)
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
382
	wg := sync.WaitGroup{}
383
	for p := range peers {
Jeromy's avatar
Jeromy committed
384 385 386
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
387
			log.Debugf("putProvider(%s, %s)", key, p)
388
			err := dht.sendMessage(ctx, p, mes)
Jeromy's avatar
Jeromy committed
389
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
390
				log.Debug(err)
Jeromy's avatar
Jeromy committed
391 392
			}
		}(p)
393
	}
Jeromy's avatar
Jeromy committed
394
	wg.Wait()
395
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
396
}
397
func (dht *IpfsDHT) makeProvRecord(skey *cid.Cid) (*pb.Message, error) {
398 399 400 401 402 403 404 405 406 407 408
	pi := pstore.PeerInfo{
		ID:    dht.self,
		Addrs: dht.host.Addrs(),
	}

	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
		return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
	}

409
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey.Bytes(), 0)
410 411 412
	pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]pstore.PeerInfo{pi})
	return pmes, nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
413

Brian Tiger Chow's avatar
Brian Tiger Chow committed
414
// FindProviders searches until the context expires.
415
func (dht *IpfsDHT) FindProviders(ctx context.Context, c *cid.Cid) ([]pstore.PeerInfo, error) {
Jeromy's avatar
Jeromy committed
416
	var providers []pstore.PeerInfo
417
	for p := range dht.FindProvidersAsync(ctx, c, KValue) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
418 419 420 421 422
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
423 424 425
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
426 427
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key *cid.Cid, count int) <-chan pstore.PeerInfo {
	log.Event(ctx, "findProviders", key)
Jeromy's avatar
Jeromy committed
428
	peerOut := make(chan pstore.PeerInfo, count)
Jeromy's avatar
Jeromy committed
429 430 431 432
	go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
	return peerOut
}

433 434
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key *cid.Cid, count int, peerOut chan pstore.PeerInfo) {
	defer log.EventBegin(ctx, "findProvidersAsync", key).Done()
Jeromy's avatar
Jeromy committed
435 436
	defer close(peerOut)

Jeromy's avatar
Jeromy committed
437
	ps := pset.NewLimited(count)
Jeromy's avatar
Jeromy committed
438 439
	provs := dht.providers.GetProviders(ctx, key)
	for _, p := range provs {
440
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
441
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
442
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
443
			select {
Jeromy's avatar
Jeromy committed
444
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
445 446 447
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
448
		}
Jeromy's avatar
Jeromy committed
449

450
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
451
		// TODO: is this a DOS vector?
Jeromy's avatar
Jeromy committed
452
		if ps.Size() >= count {
Jeromy's avatar
Jeromy committed
453 454 455 456 457
			return
		}
	}

	// setup the Query
Jeromy's avatar
Jeromy committed
458
	parent := ctx
459
	query := dht.newQuery(key.KeyString(), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
460
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
461 462 463
			Type: notif.SendingQuery,
			ID:   p,
		})
464
		pmes, err := dht.findProvidersSingle(ctx, p, key)
Jeromy's avatar
Jeromy committed
465 466 467 468
		if err != nil {
			return nil, err
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
469
		log.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
470
		provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
471
		log.Debugf("%d provider entries decoded", len(provs))
Jeromy's avatar
Jeromy committed
472 473 474

		// Add unique providers from request, up to 'count'
		for _, prov := range provs {
475 476 477
			if prov.ID != dht.self {
				dht.peerstore.AddAddrs(prov.ID, prov.Addrs, pstore.TempAddrTTL)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
478
			log.Debugf("got provider: %s", prov)
479
			if ps.TryAdd(prov.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
480
				log.Debugf("using provider: %s", prov)
Jeromy's avatar
Jeromy committed
481
				select {
Jeromy's avatar
Jeromy committed
482
				case peerOut <- *prov:
Jeromy's avatar
Jeromy committed
483
				case <-ctx.Done():
484
					log.Debug("context timed out sending more providers")
Jeromy's avatar
Jeromy committed
485 486
					return nil, ctx.Err()
				}
487
			}
Jeromy's avatar
Jeromy committed
488
			if ps.Size() >= count {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
489
				log.Debugf("got enough providers (%d/%d)", ps.Size(), count)
Jeromy's avatar
Jeromy committed
490
				return &dhtQueryResult{success: true}, nil
491 492 493
			}
		}

Jeromy's avatar
Jeromy committed
494 495
		// Give closer peers back to the query to be queried
		closer := pmes.GetCloserPeers()
496
		clpeers := pb.PBPeersToPeerInfos(closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
497
		log.Debugf("got closer peers: %d %s", len(clpeers), clpeers)
498

Jeromy's avatar
Jeromy committed
499
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
500 501
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
502
			Responses: clpeers,
503
		})
Jeromy's avatar
Jeromy committed
504 505 506
		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

Jeromy's avatar
Jeromy committed
507
	peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.KeyString()), AlphaValue)
Jeromy's avatar
Jeromy committed
508 509
	_, err := query.Run(ctx, peers)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
510
		log.Debugf("Query error: %s", err)
511 512 513 514 515 516 517 518 519 520
		// Special handling for issue: https://github.com/ipfs/go-ipfs/issues/3032
		if fmt.Sprint(err) == "<nil>" {
			log.Error("reproduced bug 3032:")
			log.Errorf("Errors type information: %#v", err)
			log.Errorf("go version: %s", runtime.Version())
			log.Error("please report this information to: https://github.com/ipfs/go-ipfs/issues/3032")

			// replace problematic error with something that won't crash the daemon
			err = fmt.Errorf("<nil>")
		}
521 522 523 524
		notif.PublishQueryEvent(ctx, &notif.QueryEvent{
			Type:  notif.QueryError,
			Extra: err.Error(),
		})
Jeromy's avatar
Jeromy committed
525
	}
526 527
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
528
// FindPeer searches for a peer with given ID.
529 530 531 532 533 534 535 536
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo, err error) {
	eip := log.EventBegin(ctx, "FindPeer", id)
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
537

538
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
539
	if pi := dht.FindLocal(id); pi.ID != "" {
540
		return pi, nil
541 542
	}

Jeromy's avatar
Jeromy committed
543
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
544
	if len(peers) == 0 {
Jeromy's avatar
Jeromy committed
545
		return pstore.PeerInfo{}, kb.ErrLookupFailure
546
	}
547

Jeromy's avatar
Jeromy committed
548
	// Sanity...
549
	for _, p := range peers {
550
		if p == id {
551
			log.Debug("found target peer in list of closest peers...")
552
			return dht.peerstore.PeerInfo(p), nil
553
		}
554
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
555

Jeromy's avatar
Jeromy committed
556
	// setup the Query
Jeromy's avatar
Jeromy committed
557
	parent := ctx
558
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
559
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
560 561 562
			Type: notif.SendingQuery,
			ID:   p,
		})
Jeromy's avatar
Jeromy committed
563

564
		pmes, err := dht.findPeerSingle(ctx, p, id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
565
		if err != nil {
566
			return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
567
		}
568

Jeromy's avatar
Jeromy committed
569
		closer := pmes.GetCloserPeers()
570
		clpeerInfos := pb.PBPeersToPeerInfos(closer)
571

572
		// see if we got the peer here
573 574
		for _, npi := range clpeerInfos {
			if npi.ID == id {
Jeromy's avatar
Jeromy committed
575
				return &dhtQueryResult{
576
					peer:    npi,
Jeromy's avatar
Jeromy committed
577 578 579
					success: true,
				}, nil
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
580 581
		}

Jeromy's avatar
Jeromy committed
582
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
583
			Type:      notif.PeerResponse,
584
			ID:        p,
Jeromy's avatar
Jeromy committed
585
			Responses: clpeerInfos,
586 587
		})

588
		return &dhtQueryResult{closerPeers: clpeerInfos}, nil
589
	})
590

Jeromy's avatar
Jeromy committed
591
	// run it!
592
	result, err := query.Run(ctx, peers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
593
	if err != nil {
Jeromy's avatar
Jeromy committed
594
		return pstore.PeerInfo{}, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
595 596
	}

597
	log.Debugf("FindPeer %v %v", id, result.success)
598
	if result.peer.ID == "" {
Jeromy's avatar
Jeromy committed
599
		return pstore.PeerInfo{}, routing.ErrNotFound
600
	}
Jeromy's avatar
Jeromy committed
601

Jeromy's avatar
Jeromy committed
602
	return *result.peer, nil
603 604
}

605
// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
Jeromy's avatar
Jeromy committed
606
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan *pstore.PeerInfo, error) {
607

Jeromy's avatar
Jeromy committed
608
	peerchan := make(chan *pstore.PeerInfo, asyncQueryBuffer)
Jeromy's avatar
Jeromy committed
609
	peersSeen := make(map[peer.ID]struct{})
610
	var peersSeenMx sync.Mutex
611

Jeromy's avatar
Jeromy committed
612
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
613
	if len(peers) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
614
		return nil, kb.ErrLookupFailure
615 616 617
	}

	// setup the Query
618
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
619

620
		pmes, err := dht.findPeerSingle(ctx, p, id)
621 622 623 624
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
625
		var clpeers []*pstore.PeerInfo
626 627
		closer := pmes.GetCloserPeers()
		for _, pbp := range closer {
628
			pi := pb.PBPeerToPeerInfo(pbp)
629

630
			// skip peers already seen
631
			peersSeenMx.Lock()
632
			if _, found := peersSeen[pi.ID]; found {
633
				peersSeenMx.Unlock()
634 635
				continue
			}
636
			peersSeen[pi.ID] = struct{}{}
637
			peersSeenMx.Unlock()
638 639

			// if peer is connected, send it to our client.
640
			if pb.Connectedness(pbp.Connection) == inet.Connected {
641 642 643
				select {
				case <-ctx.Done():
					return nil, ctx.Err()
644
				case peerchan <- pi:
645 646 647 648
				}
			}

			// if peer is the peer we're looking for, don't bother querying it.
649
			// TODO maybe query it?
650
			if pb.Connectedness(pbp.Connection) != inet.Connected {
651
				clpeers = append(clpeers, pi)
652 653 654 655 656 657 658 659 660
			}
		}

		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

	// run it! run it asynchronously to gen peers as results are found.
	// this does no error checking
	go func() {
661
		if _, err := query.Run(ctx, peers); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
662
			log.Debug(err)
663 664 665 666 667 668 669 670
		}

		// close the peerchan channel when done.
		close(peerchan)
	}()

	return peerchan, nil
}