routing.go 17.5 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
7
	"runtime"
Jeromy's avatar
Jeromy committed
8
	"sync"
Jeromy's avatar
Jeromy committed
9
	"time"
10

11
	cid "github.com/ipfs/go-cid"
12
	u "github.com/ipfs/go-ipfs-util"
Jeromy's avatar
Jeromy committed
13
	logging "github.com/ipfs/go-log"
George Antoniadis's avatar
George Antoniadis committed
14 15
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
	kb "github.com/libp2p/go-libp2p-kbucket"
16 17 18 19
	inet "github.com/libp2p/go-libp2p-net"
	peer "github.com/libp2p/go-libp2p-peer"
	pset "github.com/libp2p/go-libp2p-peer/peerset"
	pstore "github.com/libp2p/go-libp2p-peerstore"
George Antoniadis's avatar
George Antoniadis committed
20 21 22
	record "github.com/libp2p/go-libp2p-record"
	routing "github.com/libp2p/go-libp2p-routing"
	notif "github.com/libp2p/go-libp2p-routing/notifications"
Steven Allen's avatar
Steven Allen committed
23
	ropts "github.com/libp2p/go-libp2p-routing/options"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24 25
)

26 27 28 29 30 31
// asyncQueryBuffer is the size of buffered channels in async queries. This
// buffer allows multiple queries to execute simultaneously, return their
// results and continue querying closer peers. Note that different query
// results will wait for the channel to drain.
var asyncQueryBuffer = 10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32 33 34 35 36
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
37
// This is the top level "Store" operation of the DHT
Steven Allen's avatar
Steven Allen committed
38
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...ropts.Option) (err error) {
39 40 41 42 43 44 45 46
	eip := log.EventBegin(ctx, "PutValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47
	log.Debugf("PutValue %s", key)
Jeromy's avatar
Jeromy committed
48

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

72
	rec := record.MakePutRecord(key, value)
73
	rec.TimeReceived = u.FormatRFC3339(time.Now())
Jeromy's avatar
Jeromy committed
74
	err = dht.putLocal(key, rec)
75 76 77 78
	if err != nil {
		return err
	}

79
	pchan, err := dht.GetClosestPeers(ctx, key)
80 81 82
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
83

84 85 86 87
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
88 89
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
90
			defer wg.Done()
Jeromy's avatar
Jeromy committed
91 92 93 94 95
			notif.PublishQueryEvent(ctx, &notif.QueryEvent{
				Type: notif.Value,
				ID:   p,
			})

96
			err := dht.putValueToPeer(ctx, p, rec)
97
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
98
				log.Debugf("failed putting value to peer: %s", err)
99 100 101 102 103
			}
		}(p)
	}
	wg.Wait()
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104 105
}

Steven Allen's avatar
Steven Allen committed
106 107 108 109 110 111
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112
// GetValue searches for the value corresponding to given Key.
Steven Allen's avatar
Steven Allen committed
113
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Option) (_ []byte, err error) {
114 115 116 117 118 119 120 121
	eip := log.EventBegin(ctx, "GetValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Jeromy's avatar
Jeromy committed
122

123 124 125 126 127 128 129
	// apply defaultQuorum if relevant
	var cfg ropts.Options
	if err := cfg.Apply(opts...); err != nil {
		return nil, err
	}
	opts = append(opts, Quorum(getQuorum(&cfg, defaultQuorum)))

130 131 132 133
	responses, err := dht.SearchValue(ctx, key, opts...)
	if err != nil {
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
134 135
	var best []byte

136 137
	for r := range responses {
		best = r
Łukasz Magiera's avatar
Łukasz Magiera committed
138 139
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
140 141 142 143
	if ctx.Err() != nil {
		return best, ctx.Err()
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
144 145 146 147 148 149 150
	if best == nil {
		return nil, routing.ErrNotFound
	}
	log.Debugf("GetValue %v %v", key, best)
	return best, nil
}

151
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
Steven Allen's avatar
Steven Allen committed
152 153
	var cfg ropts.Options
	if err := cfg.Apply(opts...); err != nil {
154
		return nil, err
Steven Allen's avatar
Steven Allen committed
155 156 157 158
	}

	responsesNeeded := 0
	if !cfg.Offline {
159
		responsesNeeded = getQuorum(&cfg, -1)
Steven Allen's avatar
Steven Allen committed
160 161
	}

162 163 164 165 166 167
	valCh, err := dht.getValues(ctx, key, responsesNeeded)
	if err != nil {
		return nil, err
	}

	out := make(chan []byte)
Łukasz Magiera's avatar
Łukasz Magiera committed
168 169
	go func() {
		defer close(out)
170

Łukasz Magiera's avatar
Łukasz Magiera committed
171 172 173 174
		maxVals := responsesNeeded
		if maxVals < 0 {
			maxVals = defaultQuorum * 4 // we want some upper bound on how
			// much correctional entries we will send
175
		}
176 177 178

		// vals is used collect entries we got so far and send corrections to peers
		// when we exit this function
Łukasz Magiera's avatar
Łukasz Magiera committed
179
		vals := make([]RecvdVal, 0, maxVals)
180
		var best *RecvdVal
181

Łukasz Magiera's avatar
Łukasz Magiera committed
182
		defer func() {
183
			if len(vals) <= 1 || best == nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
184 185
				return
			}
186
			fixupRec := record.MakePutRecord(key, best.Val)
Łukasz Magiera's avatar
Łukasz Magiera committed
187 188
			for _, v := range vals {
				// if someone sent us a different 'less-valid' record, lets correct them
189
				if !bytes.Equal(v.Val, best.Val) {
Łukasz Magiera's avatar
Łukasz Magiera committed
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
					go func(v RecvdVal) {
						if v.From == dht.self {
							err := dht.putLocal(key, fixupRec)
							if err != nil {
								log.Error("Error correcting local dht entry:", err)
							}
							return
						}
						ctx, cancel := context.WithTimeout(dht.Context(), time.Second*30)
						defer cancel()
						err := dht.putValueToPeer(ctx, v.From, fixupRec)
						if err != nil {
							log.Debug("Error correcting DHT entry: ", err)
						}
					}(v)
				}
			}
		}()
208

Łukasz Magiera's avatar
Łukasz Magiera committed
209 210 211 212
		for {
			select {
			case v, ok := <-valCh:
				if !ok {
213
					return
Łukasz Magiera's avatar
Łukasz Magiera committed
214 215
				}

Łukasz Magiera's avatar
Łukasz Magiera committed
216 217 218
				if len(vals) < maxVals {
					vals = append(vals, v)
				}
219

Łukasz Magiera's avatar
Łukasz Magiera committed
220 221 222 223
				if v.Val == nil {
					continue
				}
				// Select best value
224 225
				if best != nil {
					sel, err := dht.Validator.Select(key, [][]byte{best.Val, v.Val})
226
					if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
227 228
						log.Warning("Failed to select dht key: ", err)
						continue
Łukasz Magiera's avatar
Łukasz Magiera committed
229
					}
230 231
					if sel == 1 && !bytes.Equal(v.Val, best.Val) {
						best = &v
232
						select {
Steven Allen's avatar
go fmt  
Steven Allen committed
233 234 235
						case out <- v.Val:
						case <-ctx.Done():
							return
236
						}
Łukasz Magiera's avatar
Łukasz Magiera committed
237 238 239 240
					}
				} else {
					// Output first valid value
					if err := dht.Validator.Validate(key, v.Val); err == nil {
241
						best = &v
242 243 244 245 246
						select {
						case out <- v.Val:
						case <-ctx.Done():
							return
						}
247 248
					}
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
249 250 251
			case <-ctx.Done():
				return
			}
252
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
253
	}()
254

255
	return out, nil
256 257
}

Steven Allen's avatar
Steven Allen committed
258
// GetValues gets nvals values corresponding to the given key.
259
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
260 261 262 263 264
	eip := log.EventBegin(ctx, "GetValues")

	eip.Append(loggableKey(key))
	defer eip.Done()

265 266
	valCh, err := dht.getValues(ctx, key, nvals)
	if err != nil {
267
		eip.SetError(err)
268 269 270 271 272 273 274 275
		return nil, err
	}

	out := make([]RecvdVal, 0, nvals)
	for val := range valCh {
		out = append(out, val)
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
276
	return out, ctx.Err()
277 278
}

279
func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-chan RecvdVal, error) {
280
	vals := make(chan RecvdVal, 1)
Łukasz Magiera's avatar
Łukasz Magiera committed
281

282
	done := func(err error) (<-chan RecvdVal, error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
283
		defer close(vals)
284
		return vals, err
Łukasz Magiera's avatar
Łukasz Magiera committed
285
	}
286

287
	// If we have it local, don't bother doing an RPC!
288
	lrec, err := dht.getLocal(key)
289 290
	if err != nil {
		// something is wrong with the datastore.
Łukasz Magiera's avatar
Łukasz Magiera committed
291
		return done(err)
292 293
	}
	if lrec != nil {
294
		// TODO: this is tricky, we don't always want to trust our own value
295
		// what if the authoritative source updated it?
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
296
		log.Debug("have it locally")
Łukasz Magiera's avatar
Łukasz Magiera committed
297
		vals <- RecvdVal{
298 299
			Val:  lrec.GetValue(),
			From: dht.self,
Łukasz Magiera's avatar
Łukasz Magiera committed
300
		}
301

302
		if nvals == 0 || nvals == 1 {
Łukasz Magiera's avatar
Łukasz Magiera committed
303
			return done(nil)
304
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
305 306

		nvals--
307
	} else if nvals == 0 {
Łukasz Magiera's avatar
Łukasz Magiera committed
308
		return done(routing.ErrNotFound)
Jeromy's avatar
Jeromy committed
309 310
	}

311
	// get closest peers in the routing table
Jeromy's avatar
Jeromy committed
312
	rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
313
	log.Debugf("peers in rt: %d %s", len(rtp), rtp)
314
	if len(rtp) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
315
		log.Warning("No peers from routing table!")
Łukasz Magiera's avatar
Łukasz Magiera committed
316
		return done(kb.ErrLookupFailure)
317 318
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
319 320 321
	var valslock sync.Mutex
	var got int

322
	// setup the Query
Jeromy's avatar
Jeromy committed
323
	parent := ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
324
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
325
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
326 327 328 329
			Type: notif.SendingQuery,
			ID:   p,
		})

330
		rec, peers, err := dht.getValueOrPeers(ctx, p, key)
331 332 333 334 335 336 337 338 339
		switch err {
		case routing.ErrNotFound:
			// in this case, they responded with nothing,
			// still send a notification so listeners can know the
			// request has completed 'successfully'
			notif.PublishQueryEvent(parent, &notif.QueryEvent{
				Type: notif.PeerResponse,
				ID:   p,
			})
340
			return nil, err
341 342 343 344 345
		default:
			return nil, err

		case nil, errInvalidRecord:
			// in either of these cases, we want to keep going
346
		}
347

348 349
		res := &dhtQueryResult{closerPeers: peers}

350
		if rec.GetValue() != nil || err == errInvalidRecord {
Steven Allen's avatar
Steven Allen committed
351
			rv := RecvdVal{
352 353 354 355
				Val:  rec.GetValue(),
				From: p,
			}
			valslock.Lock()
356 357 358 359 360 361
			select {
			case vals <- rv:
			case <-ctx.Done():
				valslock.Unlock()
				return nil, ctx.Err()
			}
Łukasz Magiera's avatar
Łukasz Magiera committed
362
			got++
363

364
			// If we have collected enough records, we're done
Łukasz Magiera's avatar
Łukasz Magiera committed
365
			if nvals == got {
366 367 368
				res.success = true
			}
			valslock.Unlock()
369 370
		}

Jeromy's avatar
Jeromy committed
371
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
372 373
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
374
			Responses: peers,
Jeromy's avatar
Jeromy committed
375 376
		})

377 378
		return res, nil
	})
379

Łukasz Magiera's avatar
Łukasz Magiera committed
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
	go func() {
		reqCtx, cancel := context.WithTimeout(ctx, time.Minute)
		defer cancel()

		_, err = query.Run(reqCtx, rtp)

		// We do have some values but we either ran out of peers to query or
		// searched for a whole minute.
		//
		// We'll just call this a success.
		if got > 0 && (err == routing.ErrNotFound || reqCtx.Err() == context.DeadlineExceeded) {
			err = nil
		}
		done(err)
	}()
395

396
	return vals, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
397 398
}

399 400 401
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
402

403
// Provide makes this node announce that it can provide a value for the given key
404 405 406 407 408 409 410 411
func (dht *IpfsDHT) Provide(ctx context.Context, key *cid.Cid, brdcst bool) (err error) {
	eip := log.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst})
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
412 413

	// add self locally
414
	dht.providers.AddProvider(ctx, key, dht.self)
Jeromy's avatar
Jeromy committed
415 416 417
	if !brdcst {
		return nil
	}
418

419
	peers, err := dht.GetClosestPeers(ctx, key.KeyString())
420 421
	if err != nil {
		return err
422 423
	}

424 425 426 427 428
	mes, err := dht.makeProvRecord(key)
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
429
	wg := sync.WaitGroup{}
430
	for p := range peers {
Jeromy's avatar
Jeromy committed
431 432 433
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
434
			log.Debugf("putProvider(%s, %s)", key, p)
435
			err := dht.sendMessage(ctx, p, mes)
Jeromy's avatar
Jeromy committed
436
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
437
				log.Debug(err)
Jeromy's avatar
Jeromy committed
438 439
			}
		}(p)
440
	}
Jeromy's avatar
Jeromy committed
441
	wg.Wait()
442
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
443
}
444
func (dht *IpfsDHT) makeProvRecord(skey *cid.Cid) (*pb.Message, error) {
445 446 447 448 449 450 451 452 453 454 455
	pi := pstore.PeerInfo{
		ID:    dht.self,
		Addrs: dht.host.Addrs(),
	}

	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
		return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
	}

456
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey.Bytes(), 0)
457 458 459
	pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]pstore.PeerInfo{pi})
	return pmes, nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
460

Brian Tiger Chow's avatar
Brian Tiger Chow committed
461
// FindProviders searches until the context expires.
462
func (dht *IpfsDHT) FindProviders(ctx context.Context, c *cid.Cid) ([]pstore.PeerInfo, error) {
Jeromy's avatar
Jeromy committed
463
	var providers []pstore.PeerInfo
464
	for p := range dht.FindProvidersAsync(ctx, c, KValue) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
465 466 467 468 469
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
470 471 472
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
473 474
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key *cid.Cid, count int) <-chan pstore.PeerInfo {
	log.Event(ctx, "findProviders", key)
Jeromy's avatar
Jeromy committed
475
	peerOut := make(chan pstore.PeerInfo, count)
Jeromy's avatar
Jeromy committed
476 477 478 479
	go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
	return peerOut
}

480 481
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key *cid.Cid, count int, peerOut chan pstore.PeerInfo) {
	defer log.EventBegin(ctx, "findProvidersAsync", key).Done()
Jeromy's avatar
Jeromy committed
482 483
	defer close(peerOut)

Jeromy's avatar
Jeromy committed
484
	ps := pset.NewLimited(count)
Jeromy's avatar
Jeromy committed
485 486
	provs := dht.providers.GetProviders(ctx, key)
	for _, p := range provs {
487
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
488
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
489
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
490
			select {
Jeromy's avatar
Jeromy committed
491
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
492 493 494
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
495
		}
Jeromy's avatar
Jeromy committed
496

497
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
498
		// TODO: is this a DOS vector?
Jeromy's avatar
Jeromy committed
499
		if ps.Size() >= count {
Jeromy's avatar
Jeromy committed
500 501 502 503 504
			return
		}
	}

	// setup the Query
Jeromy's avatar
Jeromy committed
505
	parent := ctx
506
	query := dht.newQuery(key.KeyString(), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
507
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
508 509 510
			Type: notif.SendingQuery,
			ID:   p,
		})
511
		pmes, err := dht.findProvidersSingle(ctx, p, key)
Jeromy's avatar
Jeromy committed
512 513 514 515
		if err != nil {
			return nil, err
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
516
		log.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
517
		provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
518
		log.Debugf("%d provider entries decoded", len(provs))
Jeromy's avatar
Jeromy committed
519 520 521

		// Add unique providers from request, up to 'count'
		for _, prov := range provs {
522 523 524
			if prov.ID != dht.self {
				dht.peerstore.AddAddrs(prov.ID, prov.Addrs, pstore.TempAddrTTL)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
525
			log.Debugf("got provider: %s", prov)
526
			if ps.TryAdd(prov.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
527
				log.Debugf("using provider: %s", prov)
Jeromy's avatar
Jeromy committed
528
				select {
Jeromy's avatar
Jeromy committed
529
				case peerOut <- *prov:
Jeromy's avatar
Jeromy committed
530
				case <-ctx.Done():
531
					log.Debug("context timed out sending more providers")
Jeromy's avatar
Jeromy committed
532 533
					return nil, ctx.Err()
				}
534
			}
Jeromy's avatar
Jeromy committed
535
			if ps.Size() >= count {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
536
				log.Debugf("got enough providers (%d/%d)", ps.Size(), count)
Jeromy's avatar
Jeromy committed
537
				return &dhtQueryResult{success: true}, nil
538 539 540
			}
		}

Jeromy's avatar
Jeromy committed
541 542
		// Give closer peers back to the query to be queried
		closer := pmes.GetCloserPeers()
543
		clpeers := pb.PBPeersToPeerInfos(closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
544
		log.Debugf("got closer peers: %d %s", len(clpeers), clpeers)
545

Jeromy's avatar
Jeromy committed
546
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
547 548
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
549
			Responses: clpeers,
550
		})
Jeromy's avatar
Jeromy committed
551 552 553
		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

Jeromy's avatar
Jeromy committed
554
	peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.KeyString()), AlphaValue)
Jeromy's avatar
Jeromy committed
555 556
	_, err := query.Run(ctx, peers)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
557
		log.Debugf("Query error: %s", err)
558 559 560 561 562 563 564 565 566 567
		// Special handling for issue: https://github.com/ipfs/go-ipfs/issues/3032
		if fmt.Sprint(err) == "<nil>" {
			log.Error("reproduced bug 3032:")
			log.Errorf("Errors type information: %#v", err)
			log.Errorf("go version: %s", runtime.Version())
			log.Error("please report this information to: https://github.com/ipfs/go-ipfs/issues/3032")

			// replace problematic error with something that won't crash the daemon
			err = fmt.Errorf("<nil>")
		}
568 569 570 571
		notif.PublishQueryEvent(ctx, &notif.QueryEvent{
			Type:  notif.QueryError,
			Extra: err.Error(),
		})
Jeromy's avatar
Jeromy committed
572
	}
573 574
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
575
// FindPeer searches for a peer with given ID.
576 577 578 579 580 581 582 583
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo, err error) {
	eip := log.EventBegin(ctx, "FindPeer", id)
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
584

585
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
586
	if pi := dht.FindLocal(id); pi.ID != "" {
587
		return pi, nil
588 589
	}

Jeromy's avatar
Jeromy committed
590
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
591
	if len(peers) == 0 {
Jeromy's avatar
Jeromy committed
592
		return pstore.PeerInfo{}, kb.ErrLookupFailure
593
	}
594

Jeromy's avatar
Jeromy committed
595
	// Sanity...
596
	for _, p := range peers {
597
		if p == id {
598
			log.Debug("found target peer in list of closest peers...")
599
			return dht.peerstore.PeerInfo(p), nil
600
		}
601
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
602

Jeromy's avatar
Jeromy committed
603
	// setup the Query
Jeromy's avatar
Jeromy committed
604
	parent := ctx
605
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
606
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
607 608 609
			Type: notif.SendingQuery,
			ID:   p,
		})
Jeromy's avatar
Jeromy committed
610

611
		pmes, err := dht.findPeerSingle(ctx, p, id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
612
		if err != nil {
613
			return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
614
		}
615

Jeromy's avatar
Jeromy committed
616
		closer := pmes.GetCloserPeers()
617
		clpeerInfos := pb.PBPeersToPeerInfos(closer)
618

619
		// see if we got the peer here
620 621
		for _, npi := range clpeerInfos {
			if npi.ID == id {
Jeromy's avatar
Jeromy committed
622
				return &dhtQueryResult{
623
					peer:    npi,
Jeromy's avatar
Jeromy committed
624 625 626
					success: true,
				}, nil
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
627 628
		}

Jeromy's avatar
Jeromy committed
629
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
630
			Type:      notif.PeerResponse,
631
			ID:        p,
Jeromy's avatar
Jeromy committed
632
			Responses: clpeerInfos,
633 634
		})

635
		return &dhtQueryResult{closerPeers: clpeerInfos}, nil
636
	})
637

Jeromy's avatar
Jeromy committed
638
	// run it!
639
	result, err := query.Run(ctx, peers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
640
	if err != nil {
Jeromy's avatar
Jeromy committed
641
		return pstore.PeerInfo{}, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
642 643
	}

644
	log.Debugf("FindPeer %v %v", id, result.success)
645
	if result.peer.ID == "" {
Jeromy's avatar
Jeromy committed
646
		return pstore.PeerInfo{}, routing.ErrNotFound
647
	}
Jeromy's avatar
Jeromy committed
648

Jeromy's avatar
Jeromy committed
649
	return *result.peer, nil
650 651
}

652
// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
Jeromy's avatar
Jeromy committed
653
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan *pstore.PeerInfo, error) {
654

Jeromy's avatar
Jeromy committed
655
	peerchan := make(chan *pstore.PeerInfo, asyncQueryBuffer)
Jeromy's avatar
Jeromy committed
656
	peersSeen := make(map[peer.ID]struct{})
657
	var peersSeenMx sync.Mutex
658

Jeromy's avatar
Jeromy committed
659
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
660
	if len(peers) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
661
		return nil, kb.ErrLookupFailure
662 663 664
	}

	// setup the Query
665
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
666

667
		pmes, err := dht.findPeerSingle(ctx, p, id)
668 669 670 671
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
672
		var clpeers []*pstore.PeerInfo
673 674
		closer := pmes.GetCloserPeers()
		for _, pbp := range closer {
675
			pi := pb.PBPeerToPeerInfo(pbp)
676

677
			// skip peers already seen
678
			peersSeenMx.Lock()
679
			if _, found := peersSeen[pi.ID]; found {
680
				peersSeenMx.Unlock()
681 682
				continue
			}
683
			peersSeen[pi.ID] = struct{}{}
684
			peersSeenMx.Unlock()
685 686

			// if peer is connected, send it to our client.
687
			if pb.Connectedness(pbp.Connection) == inet.Connected {
688 689 690
				select {
				case <-ctx.Done():
					return nil, ctx.Err()
691
				case peerchan <- pi:
692 693 694 695
				}
			}

			// if peer is the peer we're looking for, don't bother querying it.
696
			// TODO maybe query it?
697
			if pb.Connectedness(pbp.Connection) != inet.Connected {
698
				clpeers = append(clpeers, pi)
699 700 701 702 703 704 705 706 707
			}
		}

		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

	// run it! run it asynchronously to gen peers as results are found.
	// this does no error checking
	go func() {
708
		if _, err := query.Run(ctx, peers); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
709
			log.Debug(err)
710 711 712 713 714 715 716 717
		}

		// close the peerchan channel when done.
		close(peerchan)
	}()

	return peerchan, nil
}