routing.go 16.8 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
7
	"runtime"
Jeromy's avatar
Jeromy committed
8
	"sync"
Jeromy's avatar
Jeromy committed
9
	"time"
10

11
	cid "github.com/ipfs/go-cid"
12
	u "github.com/ipfs/go-ipfs-util"
Jeromy's avatar
Jeromy committed
13
	logging "github.com/ipfs/go-log"
George Antoniadis's avatar
George Antoniadis committed
14 15
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
	kb "github.com/libp2p/go-libp2p-kbucket"
16 17 18 19
	inet "github.com/libp2p/go-libp2p-net"
	peer "github.com/libp2p/go-libp2p-peer"
	pset "github.com/libp2p/go-libp2p-peer/peerset"
	pstore "github.com/libp2p/go-libp2p-peerstore"
George Antoniadis's avatar
George Antoniadis committed
20 21 22
	record "github.com/libp2p/go-libp2p-record"
	routing "github.com/libp2p/go-libp2p-routing"
	notif "github.com/libp2p/go-libp2p-routing/notifications"
Steven Allen's avatar
Steven Allen committed
23
	ropts "github.com/libp2p/go-libp2p-routing/options"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24 25
)

26 27 28 29 30 31
// asyncQueryBuffer is the size of buffered channels in async queries. This
// buffer allows multiple queries to execute simultaneously, return their
// results and continue querying closer peers. Note that different query
// results will wait for the channel to drain.
var asyncQueryBuffer = 10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32 33 34 35 36
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
37
// This is the top level "Store" operation of the DHT
Steven Allen's avatar
Steven Allen committed
38
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...ropts.Option) (err error) {
39 40 41 42 43 44 45 46
	eip := log.EventBegin(ctx, "PutValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47
	log.Debugf("PutValue %s", key)
Jeromy's avatar
Jeromy committed
48

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

72
	rec := record.MakePutRecord(key, value)
73
	rec.TimeReceived = u.FormatRFC3339(time.Now())
Jeromy's avatar
Jeromy committed
74
	err = dht.putLocal(key, rec)
75 76 77 78
	if err != nil {
		return err
	}

79
	pchan, err := dht.GetClosestPeers(ctx, key)
80 81 82
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
83

84 85 86 87
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
88 89
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
90
			defer wg.Done()
Jeromy's avatar
Jeromy committed
91 92 93 94 95
			notif.PublishQueryEvent(ctx, &notif.QueryEvent{
				Type: notif.Value,
				ID:   p,
			})

96
			err := dht.putValueToPeer(ctx, p, rec)
97
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
98
				log.Debugf("failed putting value to peer: %s", err)
99 100 101 102 103
			}
		}(p)
	}
	wg.Wait()
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104 105
}

Steven Allen's avatar
Steven Allen committed
106 107 108 109 110 111
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112
// GetValue searches for the value corresponding to given Key.
Steven Allen's avatar
Steven Allen committed
113
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Option) (_ []byte, err error) {
114 115 116 117 118 119 120 121
	eip := log.EventBegin(ctx, "GetValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Jeromy's avatar
Jeromy committed
122

123 124 125 126
	responses, err := dht.SearchValue(ctx, key, opts...)
	if err != nil {
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
127 128
	var best []byte

129 130
	for r := range responses {
		best = r
Łukasz Magiera's avatar
Łukasz Magiera committed
131 132 133 134 135 136 137 138 139
	}

	if best == nil {
		return nil, routing.ErrNotFound
	}
	log.Debugf("GetValue %v %v", key, best)
	return best, nil
}

140
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
Steven Allen's avatar
Steven Allen committed
141 142
	var cfg ropts.Options
	if err := cfg.Apply(opts...); err != nil {
143
		return nil, err
Steven Allen's avatar
Steven Allen committed
144 145 146 147 148 149 150
	}

	responsesNeeded := 0
	if !cfg.Offline {
		responsesNeeded = getQuorum(&cfg)
	}

151 152 153 154 155 156
	valCh, err := dht.getValues(ctx, key, responsesNeeded)
	if err != nil {
		return nil, err
	}

	out := make(chan []byte)
Łukasz Magiera's avatar
Łukasz Magiera committed
157 158
	go func() {
		defer close(out)
159

Łukasz Magiera's avatar
Łukasz Magiera committed
160 161
		vals := make([]RecvdVal, 0, responsesNeeded)
		best := -1
162

Łukasz Magiera's avatar
Łukasz Magiera committed
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
		defer func() {
			if len(vals) <= 1 || best < 0 {
				return
			}
			fixupRec := record.MakePutRecord(key, vals[best].Val)
			for _, v := range vals {
				// if someone sent us a different 'less-valid' record, lets correct them
				if !bytes.Equal(v.Val, vals[best].Val) {
					go func(v RecvdVal) {
						if v.From == dht.self {
							err := dht.putLocal(key, fixupRec)
							if err != nil {
								log.Error("Error correcting local dht entry:", err)
							}
							return
						}
						ctx, cancel := context.WithTimeout(dht.Context(), time.Second*30)
						defer cancel()
						err := dht.putValueToPeer(ctx, v.From, fixupRec)
						if err != nil {
							log.Debug("Error correcting DHT entry: ", err)
						}
					}(v)
				}
			}
		}()
189

Łukasz Magiera's avatar
Łukasz Magiera committed
190 191 192 193
		for {
			select {
			case v, ok := <-valCh:
				if !ok {
194
					return
Łukasz Magiera's avatar
Łukasz Magiera committed
195 196 197
				}

				vals = append(vals, v)
198

Łukasz Magiera's avatar
Łukasz Magiera committed
199 200 201 202 203 204
				if v.Val == nil {
					continue
				}
				// Select best value
				if best > -1 {
					i, err := dht.Validator.Select(key, [][]byte{vals[best].Val, v.Val})
205
					if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
206 207 208 209 210 211 212 213 214 215 216
						continue //TODO: Do we want to do something with the error here?
					}
					if i != best && !bytes.Equal(v.Val, vals[best].Val) {
						best = i
						out <- v.Val
					}
				} else {
					// Output first valid value
					if err := dht.Validator.Validate(key, v.Val); err == nil {
						best = len(vals) - 1
						out <- v.Val
217 218
					}
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
219 220 221
			case <-ctx.Done():
				return
			}
222
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
223
	}()
224

225
	return out, nil
226 227
}

Steven Allen's avatar
Steven Allen committed
228
// GetValues gets nvals values corresponding to the given key.
229 230 231 232 233 234 235 236 237 238 239 240 241 242
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
	valCh, err := dht.getValues(ctx, key, nvals)
	if err != nil {
		return nil, err
	}

	out := make([]RecvdVal, 0, nvals)
	for val := range valCh {
		out = append(out, val)
	}

	return out, nil
}

243
func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-chan RecvdVal, error) {
244
	eip := log.EventBegin(ctx, "GetValues")
Łukasz Magiera's avatar
Łukasz Magiera committed
245

246
	vals := make(chan RecvdVal, 1)
Łukasz Magiera's avatar
Łukasz Magiera committed
247

248
	done := func(err error) (<-chan RecvdVal, error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
249 250
		defer close(vals)

251 252 253 254 255
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
256
		return vals, err
Łukasz Magiera's avatar
Łukasz Magiera committed
257
	}
258

259
	// If we have it local, don't bother doing an RPC!
260
	lrec, err := dht.getLocal(key)
261 262
	if err != nil {
		// something is wrong with the datastore.
Łukasz Magiera's avatar
Łukasz Magiera committed
263
		return done(err)
264 265
	}
	if lrec != nil {
266
		// TODO: this is tricky, we don't always want to trust our own value
267
		// what if the authoritative source updated it?
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
268
		log.Debug("have it locally")
Łukasz Magiera's avatar
Łukasz Magiera committed
269
		vals <- RecvdVal{
270 271
			Val:  lrec.GetValue(),
			From: dht.self,
Łukasz Magiera's avatar
Łukasz Magiera committed
272
		}
273 274

		if nvals <= 1 {
Łukasz Magiera's avatar
Łukasz Magiera committed
275
			return done(nil)
276
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
277 278

		nvals--
279
	} else if nvals == 0 {
Łukasz Magiera's avatar
Łukasz Magiera committed
280
		return done(routing.ErrNotFound)
Jeromy's avatar
Jeromy committed
281 282
	}

283
	// get closest peers in the routing table
Jeromy's avatar
Jeromy committed
284
	rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
285
	log.Debugf("peers in rt: %d %s", len(rtp), rtp)
286
	if len(rtp) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
287
		log.Warning("No peers from routing table!")
Łukasz Magiera's avatar
Łukasz Magiera committed
288
		return done(kb.ErrLookupFailure)
289 290
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
291 292 293
	var valslock sync.Mutex
	var got int

294
	// setup the Query
Jeromy's avatar
Jeromy committed
295
	parent := ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
296
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
297
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
298 299 300 301
			Type: notif.SendingQuery,
			ID:   p,
		})

302
		rec, peers, err := dht.getValueOrPeers(ctx, p, key)
303 304 305 306 307 308 309 310 311
		switch err {
		case routing.ErrNotFound:
			// in this case, they responded with nothing,
			// still send a notification so listeners can know the
			// request has completed 'successfully'
			notif.PublishQueryEvent(parent, &notif.QueryEvent{
				Type: notif.PeerResponse,
				ID:   p,
			})
312
			return nil, err
313 314 315 316 317
		default:
			return nil, err

		case nil, errInvalidRecord:
			// in either of these cases, we want to keep going
318
		}
319

320 321
		res := &dhtQueryResult{closerPeers: peers}

322
		if rec.GetValue() != nil || err == errInvalidRecord {
Steven Allen's avatar
Steven Allen committed
323
			rv := RecvdVal{
324 325 326 327
				Val:  rec.GetValue(),
				From: p,
			}
			valslock.Lock()
Łukasz Magiera's avatar
Łukasz Magiera committed
328 329
			vals <- rv
			got++
330

331
			// If we have collected enough records, we're done
Łukasz Magiera's avatar
Łukasz Magiera committed
332
			if nvals == got {
333 334 335
				res.success = true
			}
			valslock.Unlock()
336 337
		}

Jeromy's avatar
Jeromy committed
338
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
339 340
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
341
			Responses: peers,
Jeromy's avatar
Jeromy committed
342 343
		})

344 345
		return res, nil
	})
346

Łukasz Magiera's avatar
Łukasz Magiera committed
347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
	go func() {
		reqCtx, cancel := context.WithTimeout(ctx, time.Minute)
		defer cancel()

		_, err = query.Run(reqCtx, rtp)

		// We do have some values but we either ran out of peers to query or
		// searched for a whole minute.
		//
		// We'll just call this a success.
		if got > 0 && (err == routing.ErrNotFound || reqCtx.Err() == context.DeadlineExceeded) {
			err = nil
		}
		done(err)
	}()
362

363
	return vals, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
364 365
}

366 367 368
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
369

370
// Provide makes this node announce that it can provide a value for the given key
371 372 373 374 375 376 377 378
func (dht *IpfsDHT) Provide(ctx context.Context, key *cid.Cid, brdcst bool) (err error) {
	eip := log.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst})
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
379 380

	// add self locally
381
	dht.providers.AddProvider(ctx, key, dht.self)
Jeromy's avatar
Jeromy committed
382 383 384
	if !brdcst {
		return nil
	}
385

386
	peers, err := dht.GetClosestPeers(ctx, key.KeyString())
387 388
	if err != nil {
		return err
389 390
	}

391 392 393 394 395
	mes, err := dht.makeProvRecord(key)
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
396
	wg := sync.WaitGroup{}
397
	for p := range peers {
Jeromy's avatar
Jeromy committed
398 399 400
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
401
			log.Debugf("putProvider(%s, %s)", key, p)
402
			err := dht.sendMessage(ctx, p, mes)
Jeromy's avatar
Jeromy committed
403
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
404
				log.Debug(err)
Jeromy's avatar
Jeromy committed
405 406
			}
		}(p)
407
	}
Jeromy's avatar
Jeromy committed
408
	wg.Wait()
409
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
410
}
411
func (dht *IpfsDHT) makeProvRecord(skey *cid.Cid) (*pb.Message, error) {
412 413 414 415 416 417 418 419 420 421 422
	pi := pstore.PeerInfo{
		ID:    dht.self,
		Addrs: dht.host.Addrs(),
	}

	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
		return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
	}

423
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey.Bytes(), 0)
424 425 426
	pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]pstore.PeerInfo{pi})
	return pmes, nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
427

Brian Tiger Chow's avatar
Brian Tiger Chow committed
428
// FindProviders searches until the context expires.
429
func (dht *IpfsDHT) FindProviders(ctx context.Context, c *cid.Cid) ([]pstore.PeerInfo, error) {
Jeromy's avatar
Jeromy committed
430
	var providers []pstore.PeerInfo
431
	for p := range dht.FindProvidersAsync(ctx, c, KValue) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
432 433 434 435 436
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
437 438 439
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
440 441
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key *cid.Cid, count int) <-chan pstore.PeerInfo {
	log.Event(ctx, "findProviders", key)
Jeromy's avatar
Jeromy committed
442
	peerOut := make(chan pstore.PeerInfo, count)
Jeromy's avatar
Jeromy committed
443 444 445 446
	go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
	return peerOut
}

447 448
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key *cid.Cid, count int, peerOut chan pstore.PeerInfo) {
	defer log.EventBegin(ctx, "findProvidersAsync", key).Done()
Jeromy's avatar
Jeromy committed
449 450
	defer close(peerOut)

Jeromy's avatar
Jeromy committed
451
	ps := pset.NewLimited(count)
Jeromy's avatar
Jeromy committed
452 453
	provs := dht.providers.GetProviders(ctx, key)
	for _, p := range provs {
454
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
455
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
456
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
457
			select {
Jeromy's avatar
Jeromy committed
458
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
459 460 461
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
462
		}
Jeromy's avatar
Jeromy committed
463

464
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
465
		// TODO: is this a DOS vector?
Jeromy's avatar
Jeromy committed
466
		if ps.Size() >= count {
Jeromy's avatar
Jeromy committed
467 468 469 470 471
			return
		}
	}

	// setup the Query
Jeromy's avatar
Jeromy committed
472
	parent := ctx
473
	query := dht.newQuery(key.KeyString(), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
474
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
475 476 477
			Type: notif.SendingQuery,
			ID:   p,
		})
478
		pmes, err := dht.findProvidersSingle(ctx, p, key)
Jeromy's avatar
Jeromy committed
479 480 481 482
		if err != nil {
			return nil, err
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
483
		log.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
484
		provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
485
		log.Debugf("%d provider entries decoded", len(provs))
Jeromy's avatar
Jeromy committed
486 487 488

		// Add unique providers from request, up to 'count'
		for _, prov := range provs {
489 490 491
			if prov.ID != dht.self {
				dht.peerstore.AddAddrs(prov.ID, prov.Addrs, pstore.TempAddrTTL)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
492
			log.Debugf("got provider: %s", prov)
493
			if ps.TryAdd(prov.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
494
				log.Debugf("using provider: %s", prov)
Jeromy's avatar
Jeromy committed
495
				select {
Jeromy's avatar
Jeromy committed
496
				case peerOut <- *prov:
Jeromy's avatar
Jeromy committed
497
				case <-ctx.Done():
498
					log.Debug("context timed out sending more providers")
Jeromy's avatar
Jeromy committed
499 500
					return nil, ctx.Err()
				}
501
			}
Jeromy's avatar
Jeromy committed
502
			if ps.Size() >= count {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
503
				log.Debugf("got enough providers (%d/%d)", ps.Size(), count)
Jeromy's avatar
Jeromy committed
504
				return &dhtQueryResult{success: true}, nil
505 506 507
			}
		}

Jeromy's avatar
Jeromy committed
508 509
		// Give closer peers back to the query to be queried
		closer := pmes.GetCloserPeers()
510
		clpeers := pb.PBPeersToPeerInfos(closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
511
		log.Debugf("got closer peers: %d %s", len(clpeers), clpeers)
512

Jeromy's avatar
Jeromy committed
513
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
514 515
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
516
			Responses: clpeers,
517
		})
Jeromy's avatar
Jeromy committed
518 519 520
		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

Jeromy's avatar
Jeromy committed
521
	peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.KeyString()), AlphaValue)
Jeromy's avatar
Jeromy committed
522 523
	_, err := query.Run(ctx, peers)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
524
		log.Debugf("Query error: %s", err)
525 526 527 528 529 530 531 532 533 534
		// Special handling for issue: https://github.com/ipfs/go-ipfs/issues/3032
		if fmt.Sprint(err) == "<nil>" {
			log.Error("reproduced bug 3032:")
			log.Errorf("Errors type information: %#v", err)
			log.Errorf("go version: %s", runtime.Version())
			log.Error("please report this information to: https://github.com/ipfs/go-ipfs/issues/3032")

			// replace problematic error with something that won't crash the daemon
			err = fmt.Errorf("<nil>")
		}
535 536 537 538
		notif.PublishQueryEvent(ctx, &notif.QueryEvent{
			Type:  notif.QueryError,
			Extra: err.Error(),
		})
Jeromy's avatar
Jeromy committed
539
	}
540 541
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
542
// FindPeer searches for a peer with given ID.
543 544 545 546 547 548 549 550
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo, err error) {
	eip := log.EventBegin(ctx, "FindPeer", id)
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
551

552
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
553
	if pi := dht.FindLocal(id); pi.ID != "" {
554
		return pi, nil
555 556
	}

Jeromy's avatar
Jeromy committed
557
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
558
	if len(peers) == 0 {
Jeromy's avatar
Jeromy committed
559
		return pstore.PeerInfo{}, kb.ErrLookupFailure
560
	}
561

Jeromy's avatar
Jeromy committed
562
	// Sanity...
563
	for _, p := range peers {
564
		if p == id {
565
			log.Debug("found target peer in list of closest peers...")
566
			return dht.peerstore.PeerInfo(p), nil
567
		}
568
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
569

Jeromy's avatar
Jeromy committed
570
	// setup the Query
Jeromy's avatar
Jeromy committed
571
	parent := ctx
572
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
573
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
574 575 576
			Type: notif.SendingQuery,
			ID:   p,
		})
Jeromy's avatar
Jeromy committed
577

578
		pmes, err := dht.findPeerSingle(ctx, p, id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
579
		if err != nil {
580
			return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
581
		}
582

Jeromy's avatar
Jeromy committed
583
		closer := pmes.GetCloserPeers()
584
		clpeerInfos := pb.PBPeersToPeerInfos(closer)
585

586
		// see if we got the peer here
587 588
		for _, npi := range clpeerInfos {
			if npi.ID == id {
Jeromy's avatar
Jeromy committed
589
				return &dhtQueryResult{
590
					peer:    npi,
Jeromy's avatar
Jeromy committed
591 592 593
					success: true,
				}, nil
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
594 595
		}

Jeromy's avatar
Jeromy committed
596
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
597
			Type:      notif.PeerResponse,
598
			ID:        p,
Jeromy's avatar
Jeromy committed
599
			Responses: clpeerInfos,
600 601
		})

602
		return &dhtQueryResult{closerPeers: clpeerInfos}, nil
603
	})
604

Jeromy's avatar
Jeromy committed
605
	// run it!
606
	result, err := query.Run(ctx, peers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
607
	if err != nil {
Jeromy's avatar
Jeromy committed
608
		return pstore.PeerInfo{}, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
609 610
	}

611
	log.Debugf("FindPeer %v %v", id, result.success)
612
	if result.peer.ID == "" {
Jeromy's avatar
Jeromy committed
613
		return pstore.PeerInfo{}, routing.ErrNotFound
614
	}
Jeromy's avatar
Jeromy committed
615

Jeromy's avatar
Jeromy committed
616
	return *result.peer, nil
617 618
}

619
// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
Jeromy's avatar
Jeromy committed
620
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan *pstore.PeerInfo, error) {
621

Jeromy's avatar
Jeromy committed
622
	peerchan := make(chan *pstore.PeerInfo, asyncQueryBuffer)
Jeromy's avatar
Jeromy committed
623
	peersSeen := make(map[peer.ID]struct{})
624
	var peersSeenMx sync.Mutex
625

Jeromy's avatar
Jeromy committed
626
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
627
	if len(peers) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
628
		return nil, kb.ErrLookupFailure
629 630 631
	}

	// setup the Query
632
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
633

634
		pmes, err := dht.findPeerSingle(ctx, p, id)
635 636 637 638
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
639
		var clpeers []*pstore.PeerInfo
640 641
		closer := pmes.GetCloserPeers()
		for _, pbp := range closer {
642
			pi := pb.PBPeerToPeerInfo(pbp)
643

644
			// skip peers already seen
645
			peersSeenMx.Lock()
646
			if _, found := peersSeen[pi.ID]; found {
647
				peersSeenMx.Unlock()
648 649
				continue
			}
650
			peersSeen[pi.ID] = struct{}{}
651
			peersSeenMx.Unlock()
652 653

			// if peer is connected, send it to our client.
654
			if pb.Connectedness(pbp.Connection) == inet.Connected {
655 656 657
				select {
				case <-ctx.Done():
					return nil, ctx.Err()
658
				case peerchan <- pi:
659 660 661 662
				}
			}

			// if peer is the peer we're looking for, don't bother querying it.
663
			// TODO maybe query it?
664
			if pb.Connectedness(pbp.Connection) != inet.Connected {
665
				clpeers = append(clpeers, pi)
666 667 668 669 670 671 672 673 674
			}
		}

		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

	// run it! run it asynchronously to gen peers as results are found.
	// this does no error checking
	go func() {
675
		if _, err := query.Run(ctx, peers); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
676
			log.Debug(err)
677 678 679 680 681 682 683 684
		}

		// close the peerchan channel when done.
		close(peerchan)
	}()

	return peerchan, nil
}