routing.go 15.6 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
7
	"runtime"
Jeromy's avatar
Jeromy committed
8
	"sync"
Jeromy's avatar
Jeromy committed
9
	"time"
10

11
	proto "github.com/gogo/protobuf/proto"
12
	cid "github.com/ipfs/go-cid"
13
	u "github.com/ipfs/go-ipfs-util"
Jeromy's avatar
Jeromy committed
14
	logging "github.com/ipfs/go-log"
George Antoniadis's avatar
George Antoniadis committed
15 16
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
	kb "github.com/libp2p/go-libp2p-kbucket"
17 18 19 20
	inet "github.com/libp2p/go-libp2p-net"
	peer "github.com/libp2p/go-libp2p-peer"
	pset "github.com/libp2p/go-libp2p-peer/peerset"
	pstore "github.com/libp2p/go-libp2p-peerstore"
George Antoniadis's avatar
George Antoniadis committed
21 22 23
	record "github.com/libp2p/go-libp2p-record"
	routing "github.com/libp2p/go-libp2p-routing"
	notif "github.com/libp2p/go-libp2p-routing/notifications"
Steven Allen's avatar
Steven Allen committed
24
	ropts "github.com/libp2p/go-libp2p-routing/options"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25 26
)

27 28 29 30 31 32
// asyncQueryBuffer is the size of buffered channels in async queries. This
// buffer allows multiple queries to execute simultaneously, return their
// results and continue querying closer peers. Note that different query
// results will wait for the channel to drain.
var asyncQueryBuffer = 10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33 34 35 36 37
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
38
// This is the top level "Store" operation of the DHT
Steven Allen's avatar
Steven Allen committed
39
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...ropts.Option) (err error) {
40 41 42 43 44 45 46 47
	eip := log.EventBegin(ctx, "PutValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
48
	log.Debugf("PutValue %s", key)
Jeromy's avatar
Jeromy committed
49

50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

73
	rec := record.MakePutRecord(key, value)
74
	rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))
Jeromy's avatar
Jeromy committed
75
	err = dht.putLocal(key, rec)
76 77 78 79
	if err != nil {
		return err
	}

80
	pchan, err := dht.GetClosestPeers(ctx, key)
81 82 83
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
84

85 86 87 88
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
89 90
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
91
			defer wg.Done()
Jeromy's avatar
Jeromy committed
92 93 94 95 96
			notif.PublishQueryEvent(ctx, &notif.QueryEvent{
				Type: notif.Value,
				ID:   p,
			})

Jeromy's avatar
Jeromy committed
97
			err := dht.putValueToPeer(ctx, p, key, rec)
98
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
99
				log.Debugf("failed putting value to peer: %s", err)
100 101 102 103 104
			}
		}(p)
	}
	wg.Wait()
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
105 106
}

Steven Allen's avatar
Steven Allen committed
107 108 109 110 111 112
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
113
// GetValue searches for the value corresponding to given Key.
Steven Allen's avatar
Steven Allen committed
114
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Option) (_ []byte, err error) {
115 116 117 118 119 120 121 122
	eip := log.EventBegin(ctx, "GetValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Jeromy's avatar
Jeromy committed
123

Steven Allen's avatar
Steven Allen committed
124 125 126 127 128 129 130 131 132 133 134
	var cfg ropts.Options
	if err := cfg.Apply(opts...); err != nil {
		return nil, err
	}

	responsesNeeded := 0
	if !cfg.Offline {
		responsesNeeded = getQuorum(&cfg)
	}

	vals, err := dht.GetValues(ctx, key, responsesNeeded)
135 136 137 138
	if err != nil {
		return nil, err
	}

Steven Allen's avatar
Steven Allen committed
139
	recs := make([][]byte, 0, len(vals))
140
	for _, v := range vals {
141 142 143
		if v.Val != nil {
			recs = append(recs, v.Val)
		}
144
	}
145 146 147
	if len(recs) == 0 {
		return nil, routing.ErrNotFound
	}
148

149
	i, err := dht.Validator.Select(key, recs)
150 151 152 153 154 155 156 157 158 159 160
	if err != nil {
		return nil, err
	}

	best := recs[i]
	log.Debugf("GetValue %v %v", key, best)
	if best == nil {
		log.Errorf("GetValue yielded correct record with nil value.")
		return nil, routing.ErrNotFound
	}

161
	fixupRec := record.MakePutRecord(key, best)
162 163 164
	for _, v := range vals {
		// if someone sent us a different 'less-valid' record, lets correct them
		if !bytes.Equal(v.Val, best) {
Steven Allen's avatar
Steven Allen committed
165
			go func(v RecvdVal) {
166 167 168 169 170 171 172
				if v.From == dht.self {
					err := dht.putLocal(key, fixupRec)
					if err != nil {
						log.Error("Error correcting local dht entry:", err)
					}
					return
				}
Jeromy's avatar
Jeromy committed
173 174
				ctx, cancel := context.WithTimeout(dht.Context(), time.Second*30)
				defer cancel()
175 176
				err := dht.putValueToPeer(ctx, v.From, key, fixupRec)
				if err != nil {
Steven Allen's avatar
Steven Allen committed
177
					log.Debug("Error correcting DHT entry: ", err)
178 179 180 181 182 183 184 185
				}
			}(v)
		}
	}

	return best, nil
}

Steven Allen's avatar
Steven Allen committed
186 187
// GetValues gets nvals values corresponding to the given key.
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
188 189 190 191 192 193 194 195
	eip := log.EventBegin(ctx, "GetValues")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Steven Allen's avatar
Steven Allen committed
196
	vals := make([]RecvdVal, 0, nvals)
197 198
	var valslock sync.Mutex

199
	// If we have it local, don't bother doing an RPC!
200
	lrec, err := dht.getLocal(key)
201 202 203 204 205
	if err != nil {
		// something is wrong with the datastore.
		return nil, err
	}
	if lrec != nil {
206
		// TODO: this is tricky, we don't always want to trust our own value
207
		// what if the authoritative source updated it?
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
208
		log.Debug("have it locally")
Steven Allen's avatar
Steven Allen committed
209
		vals = append(vals, RecvdVal{
210 211 212 213 214 215 216 217
			Val:  lrec.GetValue(),
			From: dht.self,
		})

		if nvals <= 1 {
			return vals, nil
		}
	} else if nvals == 0 {
218
		return nil, routing.ErrNotFound
Jeromy's avatar
Jeromy committed
219 220
	}

221
	// get closest peers in the routing table
Jeromy's avatar
Jeromy committed
222
	rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
223
	log.Debugf("peers in rt: %d %s", len(rtp), rtp)
224
	if len(rtp) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
225
		log.Warning("No peers from routing table!")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
226
		return nil, kb.ErrLookupFailure
227 228
	}

229
	// setup the Query
Jeromy's avatar
Jeromy committed
230
	parent := ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
231
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
232
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
233 234 235 236
			Type: notif.SendingQuery,
			ID:   p,
		})

237
		rec, peers, err := dht.getValueOrPeers(ctx, p, key)
238 239 240 241 242 243 244 245 246
		switch err {
		case routing.ErrNotFound:
			// in this case, they responded with nothing,
			// still send a notification so listeners can know the
			// request has completed 'successfully'
			notif.PublishQueryEvent(parent, &notif.QueryEvent{
				Type: notif.PeerResponse,
				ID:   p,
			})
247
			return nil, err
248 249 250 251 252
		default:
			return nil, err

		case nil, errInvalidRecord:
			// in either of these cases, we want to keep going
253
		}
254

255 256
		res := &dhtQueryResult{closerPeers: peers}

257
		if rec.GetValue() != nil || err == errInvalidRecord {
Steven Allen's avatar
Steven Allen committed
258
			rv := RecvdVal{
259 260 261 262 263 264
				Val:  rec.GetValue(),
				From: p,
			}
			valslock.Lock()
			vals = append(vals, rv)

265
			// If we have collected enough records, we're done
266 267 268 269
			if len(vals) >= nvals {
				res.success = true
			}
			valslock.Unlock()
270 271
		}

Jeromy's avatar
Jeromy committed
272
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
273 274
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
275
			Responses: peers,
Jeromy's avatar
Jeromy committed
276 277
		})

278 279
		return res, nil
	})
280

281 282 283 284 285 286 287 288 289 290
	reqCtx, cancel := context.WithTimeout(ctx, time.Minute)
	defer cancel()
	_, err = query.Run(reqCtx, rtp)

	// We do have some values but we either ran out of peers to query or
	// searched for a whole minute.
	//
	// We'll just call this a success.
	if len(vals) > 0 && (err == routing.ErrNotFound || reqCtx.Err() == context.DeadlineExceeded) {
		err = nil
291
	}
292
	return vals, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
293 294
}

295 296 297
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
298

299
// Provide makes this node announce that it can provide a value for the given key
300 301 302 303 304 305 306 307
func (dht *IpfsDHT) Provide(ctx context.Context, key *cid.Cid, brdcst bool) (err error) {
	eip := log.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst})
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
308 309

	// add self locally
310
	dht.providers.AddProvider(ctx, key, dht.self)
Jeromy's avatar
Jeromy committed
311 312 313
	if !brdcst {
		return nil
	}
314

315
	peers, err := dht.GetClosestPeers(ctx, key.KeyString())
316 317
	if err != nil {
		return err
318 319
	}

320 321 322 323 324
	mes, err := dht.makeProvRecord(key)
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
325
	wg := sync.WaitGroup{}
326
	for p := range peers {
Jeromy's avatar
Jeromy committed
327 328 329
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
330
			log.Debugf("putProvider(%s, %s)", key, p)
331
			err := dht.sendMessage(ctx, p, mes)
Jeromy's avatar
Jeromy committed
332
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
333
				log.Debug(err)
Jeromy's avatar
Jeromy committed
334 335
			}
		}(p)
336
	}
Jeromy's avatar
Jeromy committed
337
	wg.Wait()
338
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
339
}
340
func (dht *IpfsDHT) makeProvRecord(skey *cid.Cid) (*pb.Message, error) {
341 342 343 344 345 346 347 348 349 350 351
	pi := pstore.PeerInfo{
		ID:    dht.self,
		Addrs: dht.host.Addrs(),
	}

	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
		return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
	}

352
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey.KeyString(), 0)
353 354 355
	pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]pstore.PeerInfo{pi})
	return pmes, nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
356

Brian Tiger Chow's avatar
Brian Tiger Chow committed
357
// FindProviders searches until the context expires.
358
func (dht *IpfsDHT) FindProviders(ctx context.Context, c *cid.Cid) ([]pstore.PeerInfo, error) {
Jeromy's avatar
Jeromy committed
359
	var providers []pstore.PeerInfo
360
	for p := range dht.FindProvidersAsync(ctx, c, KValue) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
361 362 363 364 365
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
366 367 368
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
369 370
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key *cid.Cid, count int) <-chan pstore.PeerInfo {
	log.Event(ctx, "findProviders", key)
Jeromy's avatar
Jeromy committed
371
	peerOut := make(chan pstore.PeerInfo, count)
Jeromy's avatar
Jeromy committed
372 373 374 375
	go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
	return peerOut
}

376 377
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key *cid.Cid, count int, peerOut chan pstore.PeerInfo) {
	defer log.EventBegin(ctx, "findProvidersAsync", key).Done()
Jeromy's avatar
Jeromy committed
378 379
	defer close(peerOut)

Jeromy's avatar
Jeromy committed
380
	ps := pset.NewLimited(count)
Jeromy's avatar
Jeromy committed
381 382
	provs := dht.providers.GetProviders(ctx, key)
	for _, p := range provs {
383
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
384
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
385
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
386
			select {
Jeromy's avatar
Jeromy committed
387
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
388 389 390
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
391
		}
Jeromy's avatar
Jeromy committed
392

393
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
394
		// TODO: is this a DOS vector?
Jeromy's avatar
Jeromy committed
395
		if ps.Size() >= count {
Jeromy's avatar
Jeromy committed
396 397 398 399 400
			return
		}
	}

	// setup the Query
Jeromy's avatar
Jeromy committed
401
	parent := ctx
402
	query := dht.newQuery(key.KeyString(), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
403
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
404 405 406
			Type: notif.SendingQuery,
			ID:   p,
		})
407
		pmes, err := dht.findProvidersSingle(ctx, p, key)
Jeromy's avatar
Jeromy committed
408 409 410 411
		if err != nil {
			return nil, err
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
412
		log.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
413
		provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
414
		log.Debugf("%d provider entries decoded", len(provs))
Jeromy's avatar
Jeromy committed
415 416 417

		// Add unique providers from request, up to 'count'
		for _, prov := range provs {
418 419 420
			if prov.ID != dht.self {
				dht.peerstore.AddAddrs(prov.ID, prov.Addrs, pstore.TempAddrTTL)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
421
			log.Debugf("got provider: %s", prov)
422
			if ps.TryAdd(prov.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
423
				log.Debugf("using provider: %s", prov)
Jeromy's avatar
Jeromy committed
424
				select {
Jeromy's avatar
Jeromy committed
425
				case peerOut <- *prov:
Jeromy's avatar
Jeromy committed
426
				case <-ctx.Done():
427
					log.Debug("context timed out sending more providers")
Jeromy's avatar
Jeromy committed
428 429
					return nil, ctx.Err()
				}
430
			}
Jeromy's avatar
Jeromy committed
431
			if ps.Size() >= count {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
432
				log.Debugf("got enough providers (%d/%d)", ps.Size(), count)
Jeromy's avatar
Jeromy committed
433
				return &dhtQueryResult{success: true}, nil
434 435 436
			}
		}

Jeromy's avatar
Jeromy committed
437 438
		// Give closer peers back to the query to be queried
		closer := pmes.GetCloserPeers()
439
		clpeers := pb.PBPeersToPeerInfos(closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
440
		log.Debugf("got closer peers: %d %s", len(clpeers), clpeers)
441

Jeromy's avatar
Jeromy committed
442
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
443 444
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
445
			Responses: clpeers,
446
		})
Jeromy's avatar
Jeromy committed
447 448 449
		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

Jeromy's avatar
Jeromy committed
450
	peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.KeyString()), AlphaValue)
Jeromy's avatar
Jeromy committed
451 452
	_, err := query.Run(ctx, peers)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
453
		log.Debugf("Query error: %s", err)
454 455 456 457 458 459 460 461 462 463
		// Special handling for issue: https://github.com/ipfs/go-ipfs/issues/3032
		if fmt.Sprint(err) == "<nil>" {
			log.Error("reproduced bug 3032:")
			log.Errorf("Errors type information: %#v", err)
			log.Errorf("go version: %s", runtime.Version())
			log.Error("please report this information to: https://github.com/ipfs/go-ipfs/issues/3032")

			// replace problematic error with something that won't crash the daemon
			err = fmt.Errorf("<nil>")
		}
464 465 466 467
		notif.PublishQueryEvent(ctx, &notif.QueryEvent{
			Type:  notif.QueryError,
			Extra: err.Error(),
		})
Jeromy's avatar
Jeromy committed
468
	}
469 470
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
471
// FindPeer searches for a peer with given ID.
472 473 474 475 476 477 478 479
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo, err error) {
	eip := log.EventBegin(ctx, "FindPeer", id)
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
480

481
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
482
	if pi := dht.FindLocal(id); pi.ID != "" {
483
		return pi, nil
484 485
	}

Jeromy's avatar
Jeromy committed
486
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
487
	if len(peers) == 0 {
Jeromy's avatar
Jeromy committed
488
		return pstore.PeerInfo{}, kb.ErrLookupFailure
489
	}
490

Jeromy's avatar
Jeromy committed
491
	// Sanity...
492
	for _, p := range peers {
493
		if p == id {
494
			log.Debug("found target peer in list of closest peers...")
495
			return dht.peerstore.PeerInfo(p), nil
496
		}
497
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
498

Jeromy's avatar
Jeromy committed
499
	// setup the Query
Jeromy's avatar
Jeromy committed
500
	parent := ctx
501
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
502
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
503 504 505
			Type: notif.SendingQuery,
			ID:   p,
		})
Jeromy's avatar
Jeromy committed
506

507
		pmes, err := dht.findPeerSingle(ctx, p, id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
508
		if err != nil {
509
			return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
510
		}
511

Jeromy's avatar
Jeromy committed
512
		closer := pmes.GetCloserPeers()
513
		clpeerInfos := pb.PBPeersToPeerInfos(closer)
514

515
		// see if we got the peer here
516 517
		for _, npi := range clpeerInfos {
			if npi.ID == id {
Jeromy's avatar
Jeromy committed
518
				return &dhtQueryResult{
519
					peer:    npi,
Jeromy's avatar
Jeromy committed
520 521 522
					success: true,
				}, nil
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
523 524
		}

Jeromy's avatar
Jeromy committed
525
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
526
			Type:      notif.PeerResponse,
527
			ID:        p,
Jeromy's avatar
Jeromy committed
528
			Responses: clpeerInfos,
529 530
		})

531
		return &dhtQueryResult{closerPeers: clpeerInfos}, nil
532
	})
533

Jeromy's avatar
Jeromy committed
534
	// run it!
535
	result, err := query.Run(ctx, peers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
536
	if err != nil {
Jeromy's avatar
Jeromy committed
537
		return pstore.PeerInfo{}, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
538 539
	}

540
	log.Debugf("FindPeer %v %v", id, result.success)
541
	if result.peer.ID == "" {
Jeromy's avatar
Jeromy committed
542
		return pstore.PeerInfo{}, routing.ErrNotFound
543
	}
Jeromy's avatar
Jeromy committed
544

Jeromy's avatar
Jeromy committed
545
	return *result.peer, nil
546 547
}

548
// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
Jeromy's avatar
Jeromy committed
549
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan *pstore.PeerInfo, error) {
550

Jeromy's avatar
Jeromy committed
551
	peerchan := make(chan *pstore.PeerInfo, asyncQueryBuffer)
Jeromy's avatar
Jeromy committed
552
	peersSeen := make(map[peer.ID]struct{})
553
	var peersSeenMx sync.Mutex
554

Jeromy's avatar
Jeromy committed
555
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
556
	if len(peers) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
557
		return nil, kb.ErrLookupFailure
558 559 560
	}

	// setup the Query
561
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
562

563
		pmes, err := dht.findPeerSingle(ctx, p, id)
564 565 566 567
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
568
		var clpeers []*pstore.PeerInfo
569 570
		closer := pmes.GetCloserPeers()
		for _, pbp := range closer {
571
			pi := pb.PBPeerToPeerInfo(pbp)
572

573
			// skip peers already seen
574
			peersSeenMx.Lock()
575
			if _, found := peersSeen[pi.ID]; found {
576
				peersSeenMx.Unlock()
577 578
				continue
			}
579
			peersSeen[pi.ID] = struct{}{}
580
			peersSeenMx.Unlock()
581 582 583 584 585 586

			// if peer is connected, send it to our client.
			if pb.Connectedness(*pbp.Connection) == inet.Connected {
				select {
				case <-ctx.Done():
					return nil, ctx.Err()
587
				case peerchan <- pi:
588 589 590 591
				}
			}

			// if peer is the peer we're looking for, don't bother querying it.
592
			// TODO maybe query it?
593
			if pb.Connectedness(*pbp.Connection) != inet.Connected {
594
				clpeers = append(clpeers, pi)
595 596 597 598 599 600 601 602 603
			}
		}

		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

	// run it! run it asynchronously to gen peers as results are found.
	// this does no error checking
	go func() {
604
		if _, err := query.Run(ctx, peers); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
605
			log.Debug(err)
606 607 608 609 610 611 612 613
		}

		// close the peerchan channel when done.
		close(peerchan)
	}()

	return peerchan, nil
}