routing.go 16.9 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
7
	"runtime"
Jeromy's avatar
Jeromy committed
8
	"sync"
Jeromy's avatar
Jeromy committed
9
	"time"
10

11
	cid "github.com/ipfs/go-cid"
12
	u "github.com/ipfs/go-ipfs-util"
Jeromy's avatar
Jeromy committed
13
	logging "github.com/ipfs/go-log"
George Antoniadis's avatar
George Antoniadis committed
14 15
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
	kb "github.com/libp2p/go-libp2p-kbucket"
16 17 18 19
	inet "github.com/libp2p/go-libp2p-net"
	peer "github.com/libp2p/go-libp2p-peer"
	pset "github.com/libp2p/go-libp2p-peer/peerset"
	pstore "github.com/libp2p/go-libp2p-peerstore"
George Antoniadis's avatar
George Antoniadis committed
20 21 22
	record "github.com/libp2p/go-libp2p-record"
	routing "github.com/libp2p/go-libp2p-routing"
	notif "github.com/libp2p/go-libp2p-routing/notifications"
Steven Allen's avatar
Steven Allen committed
23
	ropts "github.com/libp2p/go-libp2p-routing/options"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24 25
)

26 27 28 29 30 31
// asyncQueryBuffer is the size of buffered channels in async queries. This
// buffer allows multiple queries to execute simultaneously, return their
// results and continue querying closer peers. Note that different query
// results will wait for the channel to drain.
var asyncQueryBuffer = 10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32 33 34 35 36
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
37
// This is the top level "Store" operation of the DHT
Steven Allen's avatar
Steven Allen committed
38
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...ropts.Option) (err error) {
39 40 41 42 43 44 45 46
	eip := log.EventBegin(ctx, "PutValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47
	log.Debugf("PutValue %s", key)
Jeromy's avatar
Jeromy committed
48

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

72
	rec := record.MakePutRecord(key, value)
73
	rec.TimeReceived = u.FormatRFC3339(time.Now())
Jeromy's avatar
Jeromy committed
74
	err = dht.putLocal(key, rec)
75 76 77 78
	if err != nil {
		return err
	}

79
	pchan, err := dht.GetClosestPeers(ctx, key)
80 81 82
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
83

84 85 86 87
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
88 89
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
90
			defer wg.Done()
Jeromy's avatar
Jeromy committed
91 92 93 94 95
			notif.PublishQueryEvent(ctx, &notif.QueryEvent{
				Type: notif.Value,
				ID:   p,
			})

96
			err := dht.putValueToPeer(ctx, p, rec)
97
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
98
				log.Debugf("failed putting value to peer: %s", err)
99 100 101 102 103
			}
		}(p)
	}
	wg.Wait()
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104 105
}

Steven Allen's avatar
Steven Allen committed
106 107 108 109 110 111
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112
// GetValue searches for the value corresponding to given Key.
Steven Allen's avatar
Steven Allen committed
113
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Option) (_ []byte, err error) {
114 115 116 117 118 119 120 121
	eip := log.EventBegin(ctx, "GetValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Jeromy's avatar
Jeromy committed
122

123 124 125 126
	responses, err := dht.SearchValue(ctx, key, opts...)
	if err != nil {
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
127 128
	var best []byte

129 130
	for r := range responses {
		best = r
Łukasz Magiera's avatar
Łukasz Magiera committed
131 132 133 134 135 136 137 138 139
	}

	if best == nil {
		return nil, routing.ErrNotFound
	}
	log.Debugf("GetValue %v %v", key, best)
	return best, nil
}

140
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
Steven Allen's avatar
Steven Allen committed
141 142
	var cfg ropts.Options
	if err := cfg.Apply(opts...); err != nil {
143
		return nil, err
Steven Allen's avatar
Steven Allen committed
144 145 146 147 148 149 150
	}

	responsesNeeded := 0
	if !cfg.Offline {
		responsesNeeded = getQuorum(&cfg)
	}

151 152 153 154 155 156
	valCh, err := dht.getValues(ctx, key, responsesNeeded)
	if err != nil {
		return nil, err
	}

	out := make(chan []byte)
Łukasz Magiera's avatar
Łukasz Magiera committed
157 158
	go func() {
		defer close(out)
159

160 161 162
		if responsesNeeded < 0 {
			responsesNeeded = 0
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
163 164
		vals := make([]RecvdVal, 0, responsesNeeded)
		best := -1
165

Łukasz Magiera's avatar
Łukasz Magiera committed
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
		defer func() {
			if len(vals) <= 1 || best < 0 {
				return
			}
			fixupRec := record.MakePutRecord(key, vals[best].Val)
			for _, v := range vals {
				// if someone sent us a different 'less-valid' record, lets correct them
				if !bytes.Equal(v.Val, vals[best].Val) {
					go func(v RecvdVal) {
						if v.From == dht.self {
							err := dht.putLocal(key, fixupRec)
							if err != nil {
								log.Error("Error correcting local dht entry:", err)
							}
							return
						}
						ctx, cancel := context.WithTimeout(dht.Context(), time.Second*30)
						defer cancel()
						err := dht.putValueToPeer(ctx, v.From, fixupRec)
						if err != nil {
							log.Debug("Error correcting DHT entry: ", err)
						}
					}(v)
				}
			}
		}()
192

Łukasz Magiera's avatar
Łukasz Magiera committed
193 194 195 196
		for {
			select {
			case v, ok := <-valCh:
				if !ok {
197
					return
Łukasz Magiera's avatar
Łukasz Magiera committed
198 199 200
				}

				vals = append(vals, v)
201

Łukasz Magiera's avatar
Łukasz Magiera committed
202 203 204 205 206 207
				if v.Val == nil {
					continue
				}
				// Select best value
				if best > -1 {
					i, err := dht.Validator.Select(key, [][]byte{vals[best].Val, v.Val})
208
					if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
209 210 211 212 213 214 215 216 217 218 219
						continue //TODO: Do we want to do something with the error here?
					}
					if i != best && !bytes.Equal(v.Val, vals[best].Val) {
						best = i
						out <- v.Val
					}
				} else {
					// Output first valid value
					if err := dht.Validator.Validate(key, v.Val); err == nil {
						best = len(vals) - 1
						out <- v.Val
220 221
					}
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
222 223 224
			case <-ctx.Done():
				return
			}
225
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
226
	}()
227

228
	return out, nil
229 230
}

Steven Allen's avatar
Steven Allen committed
231
// GetValues gets nvals values corresponding to the given key.
232 233 234 235 236 237 238 239 240 241 242 243 244 245
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
	valCh, err := dht.getValues(ctx, key, nvals)
	if err != nil {
		return nil, err
	}

	out := make([]RecvdVal, 0, nvals)
	for val := range valCh {
		out = append(out, val)
	}

	return out, nil
}

246
func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-chan RecvdVal, error) {
247
	eip := log.EventBegin(ctx, "GetValues")
Łukasz Magiera's avatar
Łukasz Magiera committed
248

249
	vals := make(chan RecvdVal, 1)
Łukasz Magiera's avatar
Łukasz Magiera committed
250

251
	done := func(err error) (<-chan RecvdVal, error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
252 253
		defer close(vals)

254 255 256 257 258
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
259
		return vals, err
Łukasz Magiera's avatar
Łukasz Magiera committed
260
	}
261

262
	// If we have it local, don't bother doing an RPC!
263
	lrec, err := dht.getLocal(key)
264 265
	if err != nil {
		// something is wrong with the datastore.
Łukasz Magiera's avatar
Łukasz Magiera committed
266
		return done(err)
267 268
	}
	if lrec != nil {
269
		// TODO: this is tricky, we don't always want to trust our own value
270
		// what if the authoritative source updated it?
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
271
		log.Debug("have it locally")
Łukasz Magiera's avatar
Łukasz Magiera committed
272
		vals <- RecvdVal{
273 274
			Val:  lrec.GetValue(),
			From: dht.self,
Łukasz Magiera's avatar
Łukasz Magiera committed
275
		}
276

277
		if nvals == 0 || nvals == 1 {
Łukasz Magiera's avatar
Łukasz Magiera committed
278
			return done(nil)
279
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
280 281

		nvals--
282
	} else if nvals == 0 {
Łukasz Magiera's avatar
Łukasz Magiera committed
283
		return done(routing.ErrNotFound)
Jeromy's avatar
Jeromy committed
284 285
	}

286
	// get closest peers in the routing table
Jeromy's avatar
Jeromy committed
287
	rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
288
	log.Debugf("peers in rt: %d %s", len(rtp), rtp)
289
	if len(rtp) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
290
		log.Warning("No peers from routing table!")
Łukasz Magiera's avatar
Łukasz Magiera committed
291
		return done(kb.ErrLookupFailure)
292 293
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
294 295 296
	var valslock sync.Mutex
	var got int

297
	// setup the Query
Jeromy's avatar
Jeromy committed
298
	parent := ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
299
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
300
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
301 302 303 304
			Type: notif.SendingQuery,
			ID:   p,
		})

305
		rec, peers, err := dht.getValueOrPeers(ctx, p, key)
306 307 308 309 310 311 312 313 314
		switch err {
		case routing.ErrNotFound:
			// in this case, they responded with nothing,
			// still send a notification so listeners can know the
			// request has completed 'successfully'
			notif.PublishQueryEvent(parent, &notif.QueryEvent{
				Type: notif.PeerResponse,
				ID:   p,
			})
315
			return nil, err
316 317 318 319 320
		default:
			return nil, err

		case nil, errInvalidRecord:
			// in either of these cases, we want to keep going
321
		}
322

323 324
		res := &dhtQueryResult{closerPeers: peers}

325
		if rec.GetValue() != nil || err == errInvalidRecord {
Steven Allen's avatar
Steven Allen committed
326
			rv := RecvdVal{
327 328 329 330
				Val:  rec.GetValue(),
				From: p,
			}
			valslock.Lock()
Łukasz Magiera's avatar
Łukasz Magiera committed
331 332
			vals <- rv
			got++
333

334
			// If we have collected enough records, we're done
Łukasz Magiera's avatar
Łukasz Magiera committed
335
			if nvals == got {
336 337 338
				res.success = true
			}
			valslock.Unlock()
339 340
		}

Jeromy's avatar
Jeromy committed
341
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
342 343
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
344
			Responses: peers,
Jeromy's avatar
Jeromy committed
345 346
		})

347 348
		return res, nil
	})
349

Łukasz Magiera's avatar
Łukasz Magiera committed
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
	go func() {
		reqCtx, cancel := context.WithTimeout(ctx, time.Minute)
		defer cancel()

		_, err = query.Run(reqCtx, rtp)

		// We do have some values but we either ran out of peers to query or
		// searched for a whole minute.
		//
		// We'll just call this a success.
		if got > 0 && (err == routing.ErrNotFound || reqCtx.Err() == context.DeadlineExceeded) {
			err = nil
		}
		done(err)
	}()
365

366
	return vals, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
367 368
}

369 370 371
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
372

373
// Provide makes this node announce that it can provide a value for the given key
374 375 376 377 378 379 380 381
func (dht *IpfsDHT) Provide(ctx context.Context, key *cid.Cid, brdcst bool) (err error) {
	eip := log.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst})
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
382 383

	// add self locally
384
	dht.providers.AddProvider(ctx, key, dht.self)
Jeromy's avatar
Jeromy committed
385 386 387
	if !brdcst {
		return nil
	}
388

389
	peers, err := dht.GetClosestPeers(ctx, key.KeyString())
390 391
	if err != nil {
		return err
392 393
	}

394 395 396 397 398
	mes, err := dht.makeProvRecord(key)
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
399
	wg := sync.WaitGroup{}
400
	for p := range peers {
Jeromy's avatar
Jeromy committed
401 402 403
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
404
			log.Debugf("putProvider(%s, %s)", key, p)
405
			err := dht.sendMessage(ctx, p, mes)
Jeromy's avatar
Jeromy committed
406
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
407
				log.Debug(err)
Jeromy's avatar
Jeromy committed
408 409
			}
		}(p)
410
	}
Jeromy's avatar
Jeromy committed
411
	wg.Wait()
412
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
413
}
414
func (dht *IpfsDHT) makeProvRecord(skey *cid.Cid) (*pb.Message, error) {
415 416 417 418 419 420 421 422 423 424 425
	pi := pstore.PeerInfo{
		ID:    dht.self,
		Addrs: dht.host.Addrs(),
	}

	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
		return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
	}

426
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey.Bytes(), 0)
427 428 429
	pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]pstore.PeerInfo{pi})
	return pmes, nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
430

Brian Tiger Chow's avatar
Brian Tiger Chow committed
431
// FindProviders searches until the context expires.
432
func (dht *IpfsDHT) FindProviders(ctx context.Context, c *cid.Cid) ([]pstore.PeerInfo, error) {
Jeromy's avatar
Jeromy committed
433
	var providers []pstore.PeerInfo
434
	for p := range dht.FindProvidersAsync(ctx, c, KValue) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
435 436 437 438 439
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
440 441 442
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
443 444
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key *cid.Cid, count int) <-chan pstore.PeerInfo {
	log.Event(ctx, "findProviders", key)
Jeromy's avatar
Jeromy committed
445
	peerOut := make(chan pstore.PeerInfo, count)
Jeromy's avatar
Jeromy committed
446 447 448 449
	go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
	return peerOut
}

450 451
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key *cid.Cid, count int, peerOut chan pstore.PeerInfo) {
	defer log.EventBegin(ctx, "findProvidersAsync", key).Done()
Jeromy's avatar
Jeromy committed
452 453
	defer close(peerOut)

Jeromy's avatar
Jeromy committed
454
	ps := pset.NewLimited(count)
Jeromy's avatar
Jeromy committed
455 456
	provs := dht.providers.GetProviders(ctx, key)
	for _, p := range provs {
457
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
458
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
459
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
460
			select {
Jeromy's avatar
Jeromy committed
461
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
462 463 464
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
465
		}
Jeromy's avatar
Jeromy committed
466

467
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
468
		// TODO: is this a DOS vector?
Jeromy's avatar
Jeromy committed
469
		if ps.Size() >= count {
Jeromy's avatar
Jeromy committed
470 471 472 473 474
			return
		}
	}

	// setup the Query
Jeromy's avatar
Jeromy committed
475
	parent := ctx
476
	query := dht.newQuery(key.KeyString(), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
477
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
478 479 480
			Type: notif.SendingQuery,
			ID:   p,
		})
481
		pmes, err := dht.findProvidersSingle(ctx, p, key)
Jeromy's avatar
Jeromy committed
482 483 484 485
		if err != nil {
			return nil, err
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
486
		log.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
487
		provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
488
		log.Debugf("%d provider entries decoded", len(provs))
Jeromy's avatar
Jeromy committed
489 490 491

		// Add unique providers from request, up to 'count'
		for _, prov := range provs {
492 493 494
			if prov.ID != dht.self {
				dht.peerstore.AddAddrs(prov.ID, prov.Addrs, pstore.TempAddrTTL)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
495
			log.Debugf("got provider: %s", prov)
496
			if ps.TryAdd(prov.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
497
				log.Debugf("using provider: %s", prov)
Jeromy's avatar
Jeromy committed
498
				select {
Jeromy's avatar
Jeromy committed
499
				case peerOut <- *prov:
Jeromy's avatar
Jeromy committed
500
				case <-ctx.Done():
501
					log.Debug("context timed out sending more providers")
Jeromy's avatar
Jeromy committed
502 503
					return nil, ctx.Err()
				}
504
			}
Jeromy's avatar
Jeromy committed
505
			if ps.Size() >= count {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
506
				log.Debugf("got enough providers (%d/%d)", ps.Size(), count)
Jeromy's avatar
Jeromy committed
507
				return &dhtQueryResult{success: true}, nil
508 509 510
			}
		}

Jeromy's avatar
Jeromy committed
511 512
		// Give closer peers back to the query to be queried
		closer := pmes.GetCloserPeers()
513
		clpeers := pb.PBPeersToPeerInfos(closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
514
		log.Debugf("got closer peers: %d %s", len(clpeers), clpeers)
515

Jeromy's avatar
Jeromy committed
516
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
517 518
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
519
			Responses: clpeers,
520
		})
Jeromy's avatar
Jeromy committed
521 522 523
		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

Jeromy's avatar
Jeromy committed
524
	peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.KeyString()), AlphaValue)
Jeromy's avatar
Jeromy committed
525 526
	_, err := query.Run(ctx, peers)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
527
		log.Debugf("Query error: %s", err)
528 529 530 531 532 533 534 535 536 537
		// Special handling for issue: https://github.com/ipfs/go-ipfs/issues/3032
		if fmt.Sprint(err) == "<nil>" {
			log.Error("reproduced bug 3032:")
			log.Errorf("Errors type information: %#v", err)
			log.Errorf("go version: %s", runtime.Version())
			log.Error("please report this information to: https://github.com/ipfs/go-ipfs/issues/3032")

			// replace problematic error with something that won't crash the daemon
			err = fmt.Errorf("<nil>")
		}
538 539 540 541
		notif.PublishQueryEvent(ctx, &notif.QueryEvent{
			Type:  notif.QueryError,
			Extra: err.Error(),
		})
Jeromy's avatar
Jeromy committed
542
	}
543 544
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
545
// FindPeer searches for a peer with given ID.
546 547 548 549 550 551 552 553
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo, err error) {
	eip := log.EventBegin(ctx, "FindPeer", id)
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
554

555
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
556
	if pi := dht.FindLocal(id); pi.ID != "" {
557
		return pi, nil
558 559
	}

Jeromy's avatar
Jeromy committed
560
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
561
	if len(peers) == 0 {
Jeromy's avatar
Jeromy committed
562
		return pstore.PeerInfo{}, kb.ErrLookupFailure
563
	}
564

Jeromy's avatar
Jeromy committed
565
	// Sanity...
566
	for _, p := range peers {
567
		if p == id {
568
			log.Debug("found target peer in list of closest peers...")
569
			return dht.peerstore.PeerInfo(p), nil
570
		}
571
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
572

Jeromy's avatar
Jeromy committed
573
	// setup the Query
Jeromy's avatar
Jeromy committed
574
	parent := ctx
575
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
576
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
577 578 579
			Type: notif.SendingQuery,
			ID:   p,
		})
Jeromy's avatar
Jeromy committed
580

581
		pmes, err := dht.findPeerSingle(ctx, p, id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
582
		if err != nil {
583
			return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
584
		}
585

Jeromy's avatar
Jeromy committed
586
		closer := pmes.GetCloserPeers()
587
		clpeerInfos := pb.PBPeersToPeerInfos(closer)
588

589
		// see if we got the peer here
590 591
		for _, npi := range clpeerInfos {
			if npi.ID == id {
Jeromy's avatar
Jeromy committed
592
				return &dhtQueryResult{
593
					peer:    npi,
Jeromy's avatar
Jeromy committed
594 595 596
					success: true,
				}, nil
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
597 598
		}

Jeromy's avatar
Jeromy committed
599
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
600
			Type:      notif.PeerResponse,
601
			ID:        p,
Jeromy's avatar
Jeromy committed
602
			Responses: clpeerInfos,
603 604
		})

605
		return &dhtQueryResult{closerPeers: clpeerInfos}, nil
606
	})
607

Jeromy's avatar
Jeromy committed
608
	// run it!
609
	result, err := query.Run(ctx, peers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
610
	if err != nil {
Jeromy's avatar
Jeromy committed
611
		return pstore.PeerInfo{}, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
612 613
	}

614
	log.Debugf("FindPeer %v %v", id, result.success)
615
	if result.peer.ID == "" {
Jeromy's avatar
Jeromy committed
616
		return pstore.PeerInfo{}, routing.ErrNotFound
617
	}
Jeromy's avatar
Jeromy committed
618

Jeromy's avatar
Jeromy committed
619
	return *result.peer, nil
620 621
}

622
// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
Jeromy's avatar
Jeromy committed
623
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan *pstore.PeerInfo, error) {
624

Jeromy's avatar
Jeromy committed
625
	peerchan := make(chan *pstore.PeerInfo, asyncQueryBuffer)
Jeromy's avatar
Jeromy committed
626
	peersSeen := make(map[peer.ID]struct{})
627
	var peersSeenMx sync.Mutex
628

Jeromy's avatar
Jeromy committed
629
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
630
	if len(peers) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
631
		return nil, kb.ErrLookupFailure
632 633 634
	}

	// setup the Query
635
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
636

637
		pmes, err := dht.findPeerSingle(ctx, p, id)
638 639 640 641
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
642
		var clpeers []*pstore.PeerInfo
643 644
		closer := pmes.GetCloserPeers()
		for _, pbp := range closer {
645
			pi := pb.PBPeerToPeerInfo(pbp)
646

647
			// skip peers already seen
648
			peersSeenMx.Lock()
649
			if _, found := peersSeen[pi.ID]; found {
650
				peersSeenMx.Unlock()
651 652
				continue
			}
653
			peersSeen[pi.ID] = struct{}{}
654
			peersSeenMx.Unlock()
655 656

			// if peer is connected, send it to our client.
657
			if pb.Connectedness(pbp.Connection) == inet.Connected {
658 659 660
				select {
				case <-ctx.Done():
					return nil, ctx.Err()
661
				case peerchan <- pi:
662 663 664 665
				}
			}

			// if peer is the peer we're looking for, don't bother querying it.
666
			// TODO maybe query it?
667
			if pb.Connectedness(pbp.Connection) != inet.Connected {
668
				clpeers = append(clpeers, pi)
669 670 671 672 673 674 675 676 677
			}
		}

		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

	// run it! run it asynchronously to gen peers as results are found.
	// this does no error checking
	go func() {
678
		if _, err := query.Run(ctx, peers); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
679
			log.Debug(err)
680 681 682 683 684 685 686 687
		}

		// close the peerchan channel when done.
		close(peerchan)
	}()

	return peerchan, nil
}