routing.go 17.1 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
7
	"runtime"
Jeromy's avatar
Jeromy committed
8
	"sync"
Jeromy's avatar
Jeromy committed
9
	"time"
10

11
	cid "github.com/ipfs/go-cid"
12
	u "github.com/ipfs/go-ipfs-util"
Jeromy's avatar
Jeromy committed
13
	logging "github.com/ipfs/go-log"
George Antoniadis's avatar
George Antoniadis committed
14 15
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
	kb "github.com/libp2p/go-libp2p-kbucket"
16 17 18 19
	inet "github.com/libp2p/go-libp2p-net"
	peer "github.com/libp2p/go-libp2p-peer"
	pset "github.com/libp2p/go-libp2p-peer/peerset"
	pstore "github.com/libp2p/go-libp2p-peerstore"
George Antoniadis's avatar
George Antoniadis committed
20 21 22
	record "github.com/libp2p/go-libp2p-record"
	routing "github.com/libp2p/go-libp2p-routing"
	notif "github.com/libp2p/go-libp2p-routing/notifications"
Steven Allen's avatar
Steven Allen committed
23
	ropts "github.com/libp2p/go-libp2p-routing/options"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24 25
)

26 27 28 29 30 31
// asyncQueryBuffer is the size of buffered channels in async queries. This
// buffer allows multiple queries to execute simultaneously, return their
// results and continue querying closer peers. Note that different query
// results will wait for the channel to drain.
var asyncQueryBuffer = 10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32 33 34 35 36
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
37
// This is the top level "Store" operation of the DHT
Steven Allen's avatar
Steven Allen committed
38
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...ropts.Option) (err error) {
39 40 41 42 43 44 45 46
	eip := log.EventBegin(ctx, "PutValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47
	log.Debugf("PutValue %s", key)
Jeromy's avatar
Jeromy committed
48

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

72
	rec := record.MakePutRecord(key, value)
73
	rec.TimeReceived = u.FormatRFC3339(time.Now())
Jeromy's avatar
Jeromy committed
74
	err = dht.putLocal(key, rec)
75 76 77 78
	if err != nil {
		return err
	}

79
	pchan, err := dht.GetClosestPeers(ctx, key)
80 81 82
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
83

84 85 86 87
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
88 89
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
90
			defer wg.Done()
Jeromy's avatar
Jeromy committed
91 92 93 94 95
			notif.PublishQueryEvent(ctx, &notif.QueryEvent{
				Type: notif.Value,
				ID:   p,
			})

96
			err := dht.putValueToPeer(ctx, p, rec)
97
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
98
				log.Debugf("failed putting value to peer: %s", err)
99 100 101 102 103
			}
		}(p)
	}
	wg.Wait()
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104 105
}

Steven Allen's avatar
Steven Allen committed
106 107 108 109 110 111
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112
// GetValue searches for the value corresponding to given Key.
Steven Allen's avatar
Steven Allen committed
113
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Option) (_ []byte, err error) {
114 115 116 117 118 119 120 121
	eip := log.EventBegin(ctx, "GetValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Jeromy's avatar
Jeromy committed
122

123 124 125 126 127 128 129
	// apply defaultQuorum if relevant
	var cfg ropts.Options
	if err := cfg.Apply(opts...); err != nil {
		return nil, err
	}
	opts = append(opts, Quorum(getQuorum(&cfg, defaultQuorum)))

130 131 132 133
	responses, err := dht.SearchValue(ctx, key, opts...)
	if err != nil {
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
134 135
	var best []byte

136 137
	for r := range responses {
		best = r
Łukasz Magiera's avatar
Łukasz Magiera committed
138 139 140 141 142 143 144 145 146
	}

	if best == nil {
		return nil, routing.ErrNotFound
	}
	log.Debugf("GetValue %v %v", key, best)
	return best, nil
}

147
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
Steven Allen's avatar
Steven Allen committed
148 149
	var cfg ropts.Options
	if err := cfg.Apply(opts...); err != nil {
150
		return nil, err
Steven Allen's avatar
Steven Allen committed
151 152 153 154
	}

	responsesNeeded := 0
	if !cfg.Offline {
155
		responsesNeeded = getQuorum(&cfg, -1)
Steven Allen's avatar
Steven Allen committed
156 157
	}

158 159 160 161 162 163
	valCh, err := dht.getValues(ctx, key, responsesNeeded)
	if err != nil {
		return nil, err
	}

	out := make(chan []byte)
Łukasz Magiera's avatar
Łukasz Magiera committed
164 165
	go func() {
		defer close(out)
166

167 168 169
		if responsesNeeded < 0 {
			responsesNeeded = 0
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
170 171
		vals := make([]RecvdVal, 0, responsesNeeded)
		best := -1
172

Łukasz Magiera's avatar
Łukasz Magiera committed
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
		defer func() {
			if len(vals) <= 1 || best < 0 {
				return
			}
			fixupRec := record.MakePutRecord(key, vals[best].Val)
			for _, v := range vals {
				// if someone sent us a different 'less-valid' record, lets correct them
				if !bytes.Equal(v.Val, vals[best].Val) {
					go func(v RecvdVal) {
						if v.From == dht.self {
							err := dht.putLocal(key, fixupRec)
							if err != nil {
								log.Error("Error correcting local dht entry:", err)
							}
							return
						}
						ctx, cancel := context.WithTimeout(dht.Context(), time.Second*30)
						defer cancel()
						err := dht.putValueToPeer(ctx, v.From, fixupRec)
						if err != nil {
							log.Debug("Error correcting DHT entry: ", err)
						}
					}(v)
				}
			}
		}()
199

Łukasz Magiera's avatar
Łukasz Magiera committed
200 201 202 203
		for {
			select {
			case v, ok := <-valCh:
				if !ok {
204
					return
Łukasz Magiera's avatar
Łukasz Magiera committed
205 206 207
				}

				vals = append(vals, v)
208

Łukasz Magiera's avatar
Łukasz Magiera committed
209 210 211 212 213 214
				if v.Val == nil {
					continue
				}
				// Select best value
				if best > -1 {
					i, err := dht.Validator.Select(key, [][]byte{vals[best].Val, v.Val})
215
					if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
216 217 218 219 220 221 222 223 224 225 226
						continue //TODO: Do we want to do something with the error here?
					}
					if i != best && !bytes.Equal(v.Val, vals[best].Val) {
						best = i
						out <- v.Val
					}
				} else {
					// Output first valid value
					if err := dht.Validator.Validate(key, v.Val); err == nil {
						best = len(vals) - 1
						out <- v.Val
227 228
					}
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
229 230 231
			case <-ctx.Done():
				return
			}
232
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
233
	}()
234

235
	return out, nil
236 237
}

Steven Allen's avatar
Steven Allen committed
238
// GetValues gets nvals values corresponding to the given key.
239 240 241 242 243 244 245 246 247 248 249 250 251 252
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
	valCh, err := dht.getValues(ctx, key, nvals)
	if err != nil {
		return nil, err
	}

	out := make([]RecvdVal, 0, nvals)
	for val := range valCh {
		out = append(out, val)
	}

	return out, nil
}

253
func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-chan RecvdVal, error) {
254
	eip := log.EventBegin(ctx, "GetValues")
Łukasz Magiera's avatar
Łukasz Magiera committed
255

256
	vals := make(chan RecvdVal, 1)
Łukasz Magiera's avatar
Łukasz Magiera committed
257

258
	done := func(err error) (<-chan RecvdVal, error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
259 260
		defer close(vals)

261 262 263 264 265
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
266
		return vals, err
Łukasz Magiera's avatar
Łukasz Magiera committed
267
	}
268

269
	// If we have it local, don't bother doing an RPC!
270
	lrec, err := dht.getLocal(key)
271 272
	if err != nil {
		// something is wrong with the datastore.
Łukasz Magiera's avatar
Łukasz Magiera committed
273
		return done(err)
274 275
	}
	if lrec != nil {
276
		// TODO: this is tricky, we don't always want to trust our own value
277
		// what if the authoritative source updated it?
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
278
		log.Debug("have it locally")
Łukasz Magiera's avatar
Łukasz Magiera committed
279
		vals <- RecvdVal{
280 281
			Val:  lrec.GetValue(),
			From: dht.self,
Łukasz Magiera's avatar
Łukasz Magiera committed
282
		}
283

284
		if nvals == 0 || nvals == 1 {
Łukasz Magiera's avatar
Łukasz Magiera committed
285
			return done(nil)
286
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
287 288

		nvals--
289
	} else if nvals == 0 {
Łukasz Magiera's avatar
Łukasz Magiera committed
290
		return done(routing.ErrNotFound)
Jeromy's avatar
Jeromy committed
291 292
	}

293
	// get closest peers in the routing table
Jeromy's avatar
Jeromy committed
294
	rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
295
	log.Debugf("peers in rt: %d %s", len(rtp), rtp)
296
	if len(rtp) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
297
		log.Warning("No peers from routing table!")
Łukasz Magiera's avatar
Łukasz Magiera committed
298
		return done(kb.ErrLookupFailure)
299 300
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
301 302 303
	var valslock sync.Mutex
	var got int

304
	// setup the Query
Jeromy's avatar
Jeromy committed
305
	parent := ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
306
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
307
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
308 309 310 311
			Type: notif.SendingQuery,
			ID:   p,
		})

312
		rec, peers, err := dht.getValueOrPeers(ctx, p, key)
313 314 315 316 317 318 319 320 321
		switch err {
		case routing.ErrNotFound:
			// in this case, they responded with nothing,
			// still send a notification so listeners can know the
			// request has completed 'successfully'
			notif.PublishQueryEvent(parent, &notif.QueryEvent{
				Type: notif.PeerResponse,
				ID:   p,
			})
322
			return nil, err
323 324 325 326 327
		default:
			return nil, err

		case nil, errInvalidRecord:
			// in either of these cases, we want to keep going
328
		}
329

330 331
		res := &dhtQueryResult{closerPeers: peers}

332
		if rec.GetValue() != nil || err == errInvalidRecord {
Steven Allen's avatar
Steven Allen committed
333
			rv := RecvdVal{
334 335 336 337
				Val:  rec.GetValue(),
				From: p,
			}
			valslock.Lock()
Łukasz Magiera's avatar
Łukasz Magiera committed
338 339
			vals <- rv
			got++
340

341
			// If we have collected enough records, we're done
Łukasz Magiera's avatar
Łukasz Magiera committed
342
			if nvals == got {
343 344 345
				res.success = true
			}
			valslock.Unlock()
346 347
		}

Jeromy's avatar
Jeromy committed
348
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
349 350
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
351
			Responses: peers,
Jeromy's avatar
Jeromy committed
352 353
		})

354 355
		return res, nil
	})
356

Łukasz Magiera's avatar
Łukasz Magiera committed
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
	go func() {
		reqCtx, cancel := context.WithTimeout(ctx, time.Minute)
		defer cancel()

		_, err = query.Run(reqCtx, rtp)

		// We do have some values but we either ran out of peers to query or
		// searched for a whole minute.
		//
		// We'll just call this a success.
		if got > 0 && (err == routing.ErrNotFound || reqCtx.Err() == context.DeadlineExceeded) {
			err = nil
		}
		done(err)
	}()
372

373
	return vals, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
374 375
}

376 377 378
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
379

380
// Provide makes this node announce that it can provide a value for the given key
381 382 383 384 385 386 387 388
func (dht *IpfsDHT) Provide(ctx context.Context, key *cid.Cid, brdcst bool) (err error) {
	eip := log.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst})
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
389 390

	// add self locally
391
	dht.providers.AddProvider(ctx, key, dht.self)
Jeromy's avatar
Jeromy committed
392 393 394
	if !brdcst {
		return nil
	}
395

396
	peers, err := dht.GetClosestPeers(ctx, key.KeyString())
397 398
	if err != nil {
		return err
399 400
	}

401 402 403 404 405
	mes, err := dht.makeProvRecord(key)
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
406
	wg := sync.WaitGroup{}
407
	for p := range peers {
Jeromy's avatar
Jeromy committed
408 409 410
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
411
			log.Debugf("putProvider(%s, %s)", key, p)
412
			err := dht.sendMessage(ctx, p, mes)
Jeromy's avatar
Jeromy committed
413
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
414
				log.Debug(err)
Jeromy's avatar
Jeromy committed
415 416
			}
		}(p)
417
	}
Jeromy's avatar
Jeromy committed
418
	wg.Wait()
419
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
420
}
421
func (dht *IpfsDHT) makeProvRecord(skey *cid.Cid) (*pb.Message, error) {
422 423 424 425 426 427 428 429 430 431 432
	pi := pstore.PeerInfo{
		ID:    dht.self,
		Addrs: dht.host.Addrs(),
	}

	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
		return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
	}

433
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey.Bytes(), 0)
434 435 436
	pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]pstore.PeerInfo{pi})
	return pmes, nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
437

Brian Tiger Chow's avatar
Brian Tiger Chow committed
438
// FindProviders searches until the context expires.
439
func (dht *IpfsDHT) FindProviders(ctx context.Context, c *cid.Cid) ([]pstore.PeerInfo, error) {
Jeromy's avatar
Jeromy committed
440
	var providers []pstore.PeerInfo
441
	for p := range dht.FindProvidersAsync(ctx, c, KValue) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
442 443 444 445 446
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
447 448 449
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
450 451
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key *cid.Cid, count int) <-chan pstore.PeerInfo {
	log.Event(ctx, "findProviders", key)
Jeromy's avatar
Jeromy committed
452
	peerOut := make(chan pstore.PeerInfo, count)
Jeromy's avatar
Jeromy committed
453 454 455 456
	go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
	return peerOut
}

457 458
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key *cid.Cid, count int, peerOut chan pstore.PeerInfo) {
	defer log.EventBegin(ctx, "findProvidersAsync", key).Done()
Jeromy's avatar
Jeromy committed
459 460
	defer close(peerOut)

Jeromy's avatar
Jeromy committed
461
	ps := pset.NewLimited(count)
Jeromy's avatar
Jeromy committed
462 463
	provs := dht.providers.GetProviders(ctx, key)
	for _, p := range provs {
464
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
465
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
466
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
467
			select {
Jeromy's avatar
Jeromy committed
468
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
469 470 471
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
472
		}
Jeromy's avatar
Jeromy committed
473

474
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
475
		// TODO: is this a DOS vector?
Jeromy's avatar
Jeromy committed
476
		if ps.Size() >= count {
Jeromy's avatar
Jeromy committed
477 478 479 480 481
			return
		}
	}

	// setup the Query
Jeromy's avatar
Jeromy committed
482
	parent := ctx
483
	query := dht.newQuery(key.KeyString(), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
484
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
485 486 487
			Type: notif.SendingQuery,
			ID:   p,
		})
488
		pmes, err := dht.findProvidersSingle(ctx, p, key)
Jeromy's avatar
Jeromy committed
489 490 491 492
		if err != nil {
			return nil, err
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
493
		log.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
494
		provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
495
		log.Debugf("%d provider entries decoded", len(provs))
Jeromy's avatar
Jeromy committed
496 497 498

		// Add unique providers from request, up to 'count'
		for _, prov := range provs {
499 500 501
			if prov.ID != dht.self {
				dht.peerstore.AddAddrs(prov.ID, prov.Addrs, pstore.TempAddrTTL)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
502
			log.Debugf("got provider: %s", prov)
503
			if ps.TryAdd(prov.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
504
				log.Debugf("using provider: %s", prov)
Jeromy's avatar
Jeromy committed
505
				select {
Jeromy's avatar
Jeromy committed
506
				case peerOut <- *prov:
Jeromy's avatar
Jeromy committed
507
				case <-ctx.Done():
508
					log.Debug("context timed out sending more providers")
Jeromy's avatar
Jeromy committed
509 510
					return nil, ctx.Err()
				}
511
			}
Jeromy's avatar
Jeromy committed
512
			if ps.Size() >= count {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
513
				log.Debugf("got enough providers (%d/%d)", ps.Size(), count)
Jeromy's avatar
Jeromy committed
514
				return &dhtQueryResult{success: true}, nil
515 516 517
			}
		}

Jeromy's avatar
Jeromy committed
518 519
		// Give closer peers back to the query to be queried
		closer := pmes.GetCloserPeers()
520
		clpeers := pb.PBPeersToPeerInfos(closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
521
		log.Debugf("got closer peers: %d %s", len(clpeers), clpeers)
522

Jeromy's avatar
Jeromy committed
523
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
524 525
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
526
			Responses: clpeers,
527
		})
Jeromy's avatar
Jeromy committed
528 529 530
		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

Jeromy's avatar
Jeromy committed
531
	peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.KeyString()), AlphaValue)
Jeromy's avatar
Jeromy committed
532 533
	_, err := query.Run(ctx, peers)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
534
		log.Debugf("Query error: %s", err)
535 536 537 538 539 540 541 542 543 544
		// Special handling for issue: https://github.com/ipfs/go-ipfs/issues/3032
		if fmt.Sprint(err) == "<nil>" {
			log.Error("reproduced bug 3032:")
			log.Errorf("Errors type information: %#v", err)
			log.Errorf("go version: %s", runtime.Version())
			log.Error("please report this information to: https://github.com/ipfs/go-ipfs/issues/3032")

			// replace problematic error with something that won't crash the daemon
			err = fmt.Errorf("<nil>")
		}
545 546 547 548
		notif.PublishQueryEvent(ctx, &notif.QueryEvent{
			Type:  notif.QueryError,
			Extra: err.Error(),
		})
Jeromy's avatar
Jeromy committed
549
	}
550 551
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
552
// FindPeer searches for a peer with given ID.
553 554 555 556 557 558 559 560
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo, err error) {
	eip := log.EventBegin(ctx, "FindPeer", id)
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
561

562
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
563
	if pi := dht.FindLocal(id); pi.ID != "" {
564
		return pi, nil
565 566
	}

Jeromy's avatar
Jeromy committed
567
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
568
	if len(peers) == 0 {
Jeromy's avatar
Jeromy committed
569
		return pstore.PeerInfo{}, kb.ErrLookupFailure
570
	}
571

Jeromy's avatar
Jeromy committed
572
	// Sanity...
573
	for _, p := range peers {
574
		if p == id {
575
			log.Debug("found target peer in list of closest peers...")
576
			return dht.peerstore.PeerInfo(p), nil
577
		}
578
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
579

Jeromy's avatar
Jeromy committed
580
	// setup the Query
Jeromy's avatar
Jeromy committed
581
	parent := ctx
582
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
583
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
584 585 586
			Type: notif.SendingQuery,
			ID:   p,
		})
Jeromy's avatar
Jeromy committed
587

588
		pmes, err := dht.findPeerSingle(ctx, p, id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
589
		if err != nil {
590
			return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
591
		}
592

Jeromy's avatar
Jeromy committed
593
		closer := pmes.GetCloserPeers()
594
		clpeerInfos := pb.PBPeersToPeerInfos(closer)
595

596
		// see if we got the peer here
597 598
		for _, npi := range clpeerInfos {
			if npi.ID == id {
Jeromy's avatar
Jeromy committed
599
				return &dhtQueryResult{
600
					peer:    npi,
Jeromy's avatar
Jeromy committed
601 602 603
					success: true,
				}, nil
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
604 605
		}

Jeromy's avatar
Jeromy committed
606
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
607
			Type:      notif.PeerResponse,
608
			ID:        p,
Jeromy's avatar
Jeromy committed
609
			Responses: clpeerInfos,
610 611
		})

612
		return &dhtQueryResult{closerPeers: clpeerInfos}, nil
613
	})
614

Jeromy's avatar
Jeromy committed
615
	// run it!
616
	result, err := query.Run(ctx, peers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
617
	if err != nil {
Jeromy's avatar
Jeromy committed
618
		return pstore.PeerInfo{}, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
619 620
	}

621
	log.Debugf("FindPeer %v %v", id, result.success)
622
	if result.peer.ID == "" {
Jeromy's avatar
Jeromy committed
623
		return pstore.PeerInfo{}, routing.ErrNotFound
624
	}
Jeromy's avatar
Jeromy committed
625

Jeromy's avatar
Jeromy committed
626
	return *result.peer, nil
627 628
}

629
// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
Jeromy's avatar
Jeromy committed
630
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan *pstore.PeerInfo, error) {
631

Jeromy's avatar
Jeromy committed
632
	peerchan := make(chan *pstore.PeerInfo, asyncQueryBuffer)
Jeromy's avatar
Jeromy committed
633
	peersSeen := make(map[peer.ID]struct{})
634
	var peersSeenMx sync.Mutex
635

Jeromy's avatar
Jeromy committed
636
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
637
	if len(peers) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
638
		return nil, kb.ErrLookupFailure
639 640 641
	}

	// setup the Query
642
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
643

644
		pmes, err := dht.findPeerSingle(ctx, p, id)
645 646 647 648
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
649
		var clpeers []*pstore.PeerInfo
650 651
		closer := pmes.GetCloserPeers()
		for _, pbp := range closer {
652
			pi := pb.PBPeerToPeerInfo(pbp)
653

654
			// skip peers already seen
655
			peersSeenMx.Lock()
656
			if _, found := peersSeen[pi.ID]; found {
657
				peersSeenMx.Unlock()
658 659
				continue
			}
660
			peersSeen[pi.ID] = struct{}{}
661
			peersSeenMx.Unlock()
662 663

			// if peer is connected, send it to our client.
664
			if pb.Connectedness(pbp.Connection) == inet.Connected {
665 666 667
				select {
				case <-ctx.Done():
					return nil, ctx.Err()
668
				case peerchan <- pi:
669 670 671 672
				}
			}

			// if peer is the peer we're looking for, don't bother querying it.
673
			// TODO maybe query it?
674
			if pb.Connectedness(pbp.Connection) != inet.Connected {
675
				clpeers = append(clpeers, pi)
676 677 678 679 680 681 682 683 684
			}
		}

		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

	// run it! run it asynchronously to gen peers as results are found.
	// this does no error checking
	go func() {
685
		if _, err := query.Run(ctx, peers); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
686
			log.Debug(err)
687 688 689 690 691 692 693 694
		}

		// close the peerchan channel when done.
		close(peerchan)
	}()

	return peerchan, nil
}