routing.go 14.4 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
7
	"runtime"
Jeromy's avatar
Jeromy committed
8
	"sync"
Jeromy's avatar
Jeromy committed
9
	"time"
10

11
	cid "github.com/ipfs/go-cid"
Jeromy's avatar
Jeromy committed
12
	logging "github.com/ipfs/go-log"
George Antoniadis's avatar
George Antoniadis committed
13 14
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
	kb "github.com/libp2p/go-libp2p-kbucket"
15 16 17 18
	inet "github.com/libp2p/go-libp2p-net"
	peer "github.com/libp2p/go-libp2p-peer"
	pset "github.com/libp2p/go-libp2p-peer/peerset"
	pstore "github.com/libp2p/go-libp2p-peerstore"
George Antoniadis's avatar
George Antoniadis committed
19 20 21
	record "github.com/libp2p/go-libp2p-record"
	routing "github.com/libp2p/go-libp2p-routing"
	notif "github.com/libp2p/go-libp2p-routing/notifications"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
22 23
)

24 25 26 27 28 29
// asyncQueryBuffer is the size of buffered channels in async queries. This
// buffer allows multiple queries to execute simultaneously, return their
// results and continue querying closer peers. Note that different query
// results will wait for the channel to drain.
var asyncQueryBuffer = 10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30 31 32 33 34
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
35
// This is the top level "Store" operation of the DHT
36 37 38 39 40 41 42 43 44
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte) (err error) {
	eip := log.EventBegin(ctx, "PutValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45
	log.Debugf("PutValue %s", key)
Jeromy's avatar
Jeromy committed
46
	sk, err := dht.getOwnPrivateKey()
Jeromy's avatar
Jeromy committed
47 48 49
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
50

51 52 53 54 55
	sign, err := dht.Validator.IsSigned(key)
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
56
	rec, err := record.MakePutRecord(sk, key, value, sign)
Jeromy's avatar
Jeromy committed
57
	if err != nil {
58
		log.Debug("creation of record failed!")
Jeromy's avatar
Jeromy committed
59 60 61
		return err
	}

Jeromy's avatar
Jeromy committed
62
	err = dht.putLocal(key, rec)
63 64 65 66
	if err != nil {
		return err
	}

67
	pchan, err := dht.GetClosestPeers(ctx, key)
68 69 70
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71

72 73 74 75
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
76 77
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
78
			defer wg.Done()
Jeromy's avatar
Jeromy committed
79 80 81 82 83
			notif.PublishQueryEvent(ctx, &notif.QueryEvent{
				Type: notif.Value,
				ID:   p,
			})

Jeromy's avatar
Jeromy committed
84
			err := dht.putValueToPeer(ctx, p, key, rec)
85
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86
				log.Debugf("failed putting value to peer: %s", err)
87 88 89 90 91
			}
		}(p)
	}
	wg.Wait()
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
92 93 94
}

// GetValue searches for the value corresponding to given Key.
95 96 97 98 99 100 101 102 103
func (dht *IpfsDHT) GetValue(ctx context.Context, key string) (_ []byte, err error) {
	eip := log.EventBegin(ctx, "GetValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Jeromy's avatar
Jeromy committed
104 105 106 107
	ctx, cancel := context.WithTimeout(ctx, time.Minute)
	defer cancel()

	vals, err := dht.GetValues(ctx, key, 16)
108 109 110 111
	if err != nil {
		return nil, err
	}

Steven Allen's avatar
Steven Allen committed
112
	recs := make([][]byte, 0, len(vals))
113
	for _, v := range vals {
114 115 116
		if v.Val != nil {
			recs = append(recs, v.Val)
		}
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
	}

	i, err := dht.Selector.BestRecord(key, recs)
	if err != nil {
		return nil, err
	}

	best := recs[i]
	log.Debugf("GetValue %v %v", key, best)
	if best == nil {
		log.Errorf("GetValue yielded correct record with nil value.")
		return nil, routing.ErrNotFound
	}

	fixupRec, err := record.MakePutRecord(dht.peerstore.PrivKey(dht.self), key, best, true)
	if err != nil {
		// probably shouldnt actually 'error' here as we have found a value we like,
		// but this call failing probably isnt something we want to ignore
		return nil, err
	}

	for _, v := range vals {
		// if someone sent us a different 'less-valid' record, lets correct them
		if !bytes.Equal(v.Val, best) {
			go func(v routing.RecvdVal) {
142 143 144 145 146 147 148
				if v.From == dht.self {
					err := dht.putLocal(key, fixupRec)
					if err != nil {
						log.Error("Error correcting local dht entry:", err)
					}
					return
				}
Jeromy's avatar
Jeromy committed
149 150
				ctx, cancel := context.WithTimeout(dht.Context(), time.Second*30)
				defer cancel()
151 152 153 154 155 156 157 158 159 160 161
				err := dht.putValueToPeer(ctx, v.From, key, fixupRec)
				if err != nil {
					log.Error("Error correcting DHT entry: ", err)
				}
			}(v)
		}
	}

	return best, nil
}

162 163 164 165 166 167 168 169 170
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []routing.RecvdVal, err error) {
	eip := log.EventBegin(ctx, "GetValues")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Steven Allen's avatar
Steven Allen committed
171
	vals := make([]routing.RecvdVal, 0, nvals)
172 173
	var valslock sync.Mutex

Jeromy's avatar
Jeromy committed
174
	// If we have it local, dont bother doing an RPC!
175
	lrec, err := dht.getLocal(key)
Jeromy's avatar
Jeromy committed
176
	if err == nil {
177 178
		// TODO: this is tricky, we dont always want to trust our own value
		// what if the authoritative source updated it?
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
179
		log.Debug("have it locally")
180 181 182 183 184 185 186 187 188 189
		vals = append(vals, routing.RecvdVal{
			Val:  lrec.GetValue(),
			From: dht.self,
		})

		if nvals <= 1 {
			return vals, nil
		}
	} else if nvals == 0 {
		return nil, err
Jeromy's avatar
Jeromy committed
190 191
	}

192
	// get closest peers in the routing table
Jeromy's avatar
Jeromy committed
193
	rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
194
	log.Debugf("peers in rt: %d %s", len(rtp), rtp)
195
	if len(rtp) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
196
		log.Warning("No peers from routing table!")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
197
		return nil, kb.ErrLookupFailure
198 199
	}

200
	// setup the Query
Jeromy's avatar
Jeromy committed
201
	parent := ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
203
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
204 205 206 207
			Type: notif.SendingQuery,
			ID:   p,
		})

208
		rec, peers, err := dht.getValueOrPeers(ctx, p, key)
209 210 211 212 213 214 215 216 217
		switch err {
		case routing.ErrNotFound:
			// in this case, they responded with nothing,
			// still send a notification so listeners can know the
			// request has completed 'successfully'
			notif.PublishQueryEvent(parent, &notif.QueryEvent{
				Type: notif.PeerResponse,
				ID:   p,
			})
218
			return nil, err
219 220 221 222 223
		default:
			return nil, err

		case nil, errInvalidRecord:
			// in either of these cases, we want to keep going
224
		}
225

226 227
		res := &dhtQueryResult{closerPeers: peers}

228
		if rec.GetValue() != nil || err == errInvalidRecord {
229 230 231 232 233 234 235 236 237 238 239 240
			rv := routing.RecvdVal{
				Val:  rec.GetValue(),
				From: p,
			}
			valslock.Lock()
			vals = append(vals, rv)

			// If weve collected enough records, we're done
			if len(vals) >= nvals {
				res.success = true
			}
			valslock.Unlock()
241 242
		}

Jeromy's avatar
Jeromy committed
243
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
244 245
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
246
			Responses: peers,
Jeromy's avatar
Jeromy committed
247 248
		})

249 250
		return res, nil
	})
251

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
252
	// run it!
253 254 255 256 257
	_, err = query.Run(ctx, rtp)
	if len(vals) == 0 {
		if err != nil {
			return nil, err
		}
258 259
	}

260
	return vals, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
261 262 263 264 265
}

// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.

266
// Provide makes this node announce that it can provide a value for the given key
267 268 269 270 271 272 273 274
func (dht *IpfsDHT) Provide(ctx context.Context, key *cid.Cid, brdcst bool) (err error) {
	eip := log.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst})
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
275 276

	// add self locally
277
	dht.providers.AddProvider(ctx, key, dht.self)
Jeromy's avatar
Jeromy committed
278 279 280
	if !brdcst {
		return nil
	}
281

282
	peers, err := dht.GetClosestPeers(ctx, key.KeyString())
283 284
	if err != nil {
		return err
285 286
	}

287 288 289 290 291
	mes, err := dht.makeProvRecord(key)
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
292
	wg := sync.WaitGroup{}
293
	for p := range peers {
Jeromy's avatar
Jeromy committed
294 295 296
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
297
			log.Debugf("putProvider(%s, %s)", key, p)
298
			err := dht.sendMessage(ctx, p, mes)
Jeromy's avatar
Jeromy committed
299
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
300
				log.Debug(err)
Jeromy's avatar
Jeromy committed
301 302
			}
		}(p)
303
	}
Jeromy's avatar
Jeromy committed
304
	wg.Wait()
305
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
306
}
307
func (dht *IpfsDHT) makeProvRecord(skey *cid.Cid) (*pb.Message, error) {
308 309 310 311 312 313 314 315 316 317 318
	pi := pstore.PeerInfo{
		ID:    dht.self,
		Addrs: dht.host.Addrs(),
	}

	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
		return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
	}

319
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey.KeyString(), 0)
320 321 322
	pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]pstore.PeerInfo{pi})
	return pmes, nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
323

Brian Tiger Chow's avatar
Brian Tiger Chow committed
324
// FindProviders searches until the context expires.
325
func (dht *IpfsDHT) FindProviders(ctx context.Context, c *cid.Cid) ([]pstore.PeerInfo, error) {
Jeromy's avatar
Jeromy committed
326
	var providers []pstore.PeerInfo
327
	for p := range dht.FindProvidersAsync(ctx, c, KValue) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
328 329 330 331 332
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
333 334 335
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
336 337
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key *cid.Cid, count int) <-chan pstore.PeerInfo {
	log.Event(ctx, "findProviders", key)
Jeromy's avatar
Jeromy committed
338
	peerOut := make(chan pstore.PeerInfo, count)
Jeromy's avatar
Jeromy committed
339 340 341 342
	go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
	return peerOut
}

343 344
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key *cid.Cid, count int, peerOut chan pstore.PeerInfo) {
	defer log.EventBegin(ctx, "findProvidersAsync", key).Done()
Jeromy's avatar
Jeromy committed
345 346
	defer close(peerOut)

Jeromy's avatar
Jeromy committed
347
	ps := pset.NewLimited(count)
Jeromy's avatar
Jeromy committed
348 349
	provs := dht.providers.GetProviders(ctx, key)
	for _, p := range provs {
350
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
351
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
352
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
353
			select {
Jeromy's avatar
Jeromy committed
354
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
355 356 357
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
358
		}
Jeromy's avatar
Jeromy committed
359 360

		// If we have enough peers locally, dont bother with remote RPC
Jeromy's avatar
Jeromy committed
361
		// TODO: is this a DOS vector?
Jeromy's avatar
Jeromy committed
362
		if ps.Size() >= count {
Jeromy's avatar
Jeromy committed
363 364 365 366 367
			return
		}
	}

	// setup the Query
Jeromy's avatar
Jeromy committed
368
	parent := ctx
369
	query := dht.newQuery(key.KeyString(), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
370
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
371 372 373
			Type: notif.SendingQuery,
			ID:   p,
		})
374
		pmes, err := dht.findProvidersSingle(ctx, p, key)
Jeromy's avatar
Jeromy committed
375 376 377 378
		if err != nil {
			return nil, err
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
379
		log.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
380
		provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
381
		log.Debugf("%d provider entries decoded", len(provs))
Jeromy's avatar
Jeromy committed
382 383 384

		// Add unique providers from request, up to 'count'
		for _, prov := range provs {
385 386 387
			if prov.ID != dht.self {
				dht.peerstore.AddAddrs(prov.ID, prov.Addrs, pstore.TempAddrTTL)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
388
			log.Debugf("got provider: %s", prov)
389
			if ps.TryAdd(prov.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
390
				log.Debugf("using provider: %s", prov)
Jeromy's avatar
Jeromy committed
391
				select {
Jeromy's avatar
Jeromy committed
392
				case peerOut <- *prov:
Jeromy's avatar
Jeromy committed
393
				case <-ctx.Done():
394
					log.Debug("context timed out sending more providers")
Jeromy's avatar
Jeromy committed
395 396
					return nil, ctx.Err()
				}
397
			}
Jeromy's avatar
Jeromy committed
398
			if ps.Size() >= count {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
399
				log.Debugf("got enough providers (%d/%d)", ps.Size(), count)
Jeromy's avatar
Jeromy committed
400
				return &dhtQueryResult{success: true}, nil
401 402 403
			}
		}

Jeromy's avatar
Jeromy committed
404 405
		// Give closer peers back to the query to be queried
		closer := pmes.GetCloserPeers()
406
		clpeers := pb.PBPeersToPeerInfos(closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
407
		log.Debugf("got closer peers: %d %s", len(clpeers), clpeers)
408

Jeromy's avatar
Jeromy committed
409
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
410 411
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
412
			Responses: clpeers,
413
		})
Jeromy's avatar
Jeromy committed
414 415 416
		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

Jeromy's avatar
Jeromy committed
417
	peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.KeyString()), AlphaValue)
Jeromy's avatar
Jeromy committed
418 419
	_, err := query.Run(ctx, peers)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
420
		log.Debugf("Query error: %s", err)
421 422 423 424 425 426 427 428 429 430
		// Special handling for issue: https://github.com/ipfs/go-ipfs/issues/3032
		if fmt.Sprint(err) == "<nil>" {
			log.Error("reproduced bug 3032:")
			log.Errorf("Errors type information: %#v", err)
			log.Errorf("go version: %s", runtime.Version())
			log.Error("please report this information to: https://github.com/ipfs/go-ipfs/issues/3032")

			// replace problematic error with something that won't crash the daemon
			err = fmt.Errorf("<nil>")
		}
431 432 433 434
		notif.PublishQueryEvent(ctx, &notif.QueryEvent{
			Type:  notif.QueryError,
			Extra: err.Error(),
		})
Jeromy's avatar
Jeromy committed
435
	}
436 437
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
438
// FindPeer searches for a peer with given ID.
439 440 441 442 443 444 445 446
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo, err error) {
	eip := log.EventBegin(ctx, "FindPeer", id)
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
447

448
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
449
	if pi := dht.FindLocal(id); pi.ID != "" {
450
		return pi, nil
451 452
	}

Jeromy's avatar
Jeromy committed
453
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
454
	if len(peers) == 0 {
Jeromy's avatar
Jeromy committed
455
		return pstore.PeerInfo{}, kb.ErrLookupFailure
456
	}
457

Jeromy's avatar
Jeromy committed
458
	// Sanity...
459
	for _, p := range peers {
460
		if p == id {
461
			log.Debug("found target peer in list of closest peers...")
462
			return dht.peerstore.PeerInfo(p), nil
463
		}
464
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
465

Jeromy's avatar
Jeromy committed
466
	// setup the Query
Jeromy's avatar
Jeromy committed
467
	parent := ctx
468
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
469
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
470 471 472
			Type: notif.SendingQuery,
			ID:   p,
		})
Jeromy's avatar
Jeromy committed
473

474
		pmes, err := dht.findPeerSingle(ctx, p, id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
475
		if err != nil {
476
			return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
477
		}
478

Jeromy's avatar
Jeromy committed
479
		closer := pmes.GetCloserPeers()
480
		clpeerInfos := pb.PBPeersToPeerInfos(closer)
481

482
		// see if we got the peer here
483 484
		for _, npi := range clpeerInfos {
			if npi.ID == id {
Jeromy's avatar
Jeromy committed
485
				return &dhtQueryResult{
486
					peer:    npi,
Jeromy's avatar
Jeromy committed
487 488 489
					success: true,
				}, nil
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
490 491
		}

Jeromy's avatar
Jeromy committed
492
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
493
			Type:      notif.PeerResponse,
494
			ID:        p,
Jeromy's avatar
Jeromy committed
495
			Responses: clpeerInfos,
496 497
		})

498
		return &dhtQueryResult{closerPeers: clpeerInfos}, nil
499
	})
500

Jeromy's avatar
Jeromy committed
501
	// run it!
502
	result, err := query.Run(ctx, peers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
503
	if err != nil {
Jeromy's avatar
Jeromy committed
504
		return pstore.PeerInfo{}, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
505 506
	}

507
	log.Debugf("FindPeer %v %v", id, result.success)
508
	if result.peer.ID == "" {
Jeromy's avatar
Jeromy committed
509
		return pstore.PeerInfo{}, routing.ErrNotFound
510
	}
Jeromy's avatar
Jeromy committed
511

Jeromy's avatar
Jeromy committed
512
	return *result.peer, nil
513 514
}

515
// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
Jeromy's avatar
Jeromy committed
516
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan *pstore.PeerInfo, error) {
517

Jeromy's avatar
Jeromy committed
518
	peerchan := make(chan *pstore.PeerInfo, asyncQueryBuffer)
Jeromy's avatar
Jeromy committed
519
	peersSeen := make(map[peer.ID]struct{})
520

Jeromy's avatar
Jeromy committed
521
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
522
	if len(peers) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
523
		return nil, kb.ErrLookupFailure
524 525 526
	}

	// setup the Query
527
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
528

529
		pmes, err := dht.findPeerSingle(ctx, p, id)
530 531 532 533
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
534
		var clpeers []*pstore.PeerInfo
535 536
		closer := pmes.GetCloserPeers()
		for _, pbp := range closer {
537
			pi := pb.PBPeerToPeerInfo(pbp)
538

539 540
			// skip peers already seen
			if _, found := peersSeen[pi.ID]; found {
541 542
				continue
			}
543
			peersSeen[pi.ID] = struct{}{}
544 545 546 547 548 549

			// if peer is connected, send it to our client.
			if pb.Connectedness(*pbp.Connection) == inet.Connected {
				select {
				case <-ctx.Done():
					return nil, ctx.Err()
550
				case peerchan <- pi:
551 552 553 554
				}
			}

			// if peer is the peer we're looking for, don't bother querying it.
555
			// TODO maybe query it?
556
			if pb.Connectedness(*pbp.Connection) != inet.Connected {
557
				clpeers = append(clpeers, pi)
558 559 560 561 562 563 564 565 566
			}
		}

		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

	// run it! run it asynchronously to gen peers as results are found.
	// this does no error checking
	go func() {
567
		if _, err := query.Run(ctx, peers); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
568
			log.Debug(err)
569 570 571 572 573 574 575 576
		}

		// close the peerchan channel when done.
		close(peerchan)
	}()

	return peerchan, nil
}