routing.go 17.3 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
7
	"runtime"
Jeromy's avatar
Jeromy committed
8
	"sync"
Jeromy's avatar
Jeromy committed
9
	"time"
10

11
	cid "github.com/ipfs/go-cid"
12
	u "github.com/ipfs/go-ipfs-util"
Jeromy's avatar
Jeromy committed
13
	logging "github.com/ipfs/go-log"
George Antoniadis's avatar
George Antoniadis committed
14 15
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
	kb "github.com/libp2p/go-libp2p-kbucket"
16 17 18 19
	inet "github.com/libp2p/go-libp2p-net"
	peer "github.com/libp2p/go-libp2p-peer"
	pset "github.com/libp2p/go-libp2p-peer/peerset"
	pstore "github.com/libp2p/go-libp2p-peerstore"
George Antoniadis's avatar
George Antoniadis committed
20 21 22
	record "github.com/libp2p/go-libp2p-record"
	routing "github.com/libp2p/go-libp2p-routing"
	notif "github.com/libp2p/go-libp2p-routing/notifications"
Steven Allen's avatar
Steven Allen committed
23
	ropts "github.com/libp2p/go-libp2p-routing/options"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24 25
)

26 27 28 29 30 31
// asyncQueryBuffer is the size of buffered channels in async queries. This
// buffer allows multiple queries to execute simultaneously, return their
// results and continue querying closer peers. Note that different query
// results will wait for the channel to drain.
var asyncQueryBuffer = 10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32 33 34 35 36
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
37
// This is the top level "Store" operation of the DHT
Steven Allen's avatar
Steven Allen committed
38
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...ropts.Option) (err error) {
39 40 41 42 43 44 45 46
	eip := log.EventBegin(ctx, "PutValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47
	log.Debugf("PutValue %s", key)
Jeromy's avatar
Jeromy committed
48

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

72
	rec := record.MakePutRecord(key, value)
73
	rec.TimeReceived = u.FormatRFC3339(time.Now())
Jeromy's avatar
Jeromy committed
74
	err = dht.putLocal(key, rec)
75 76 77 78
	if err != nil {
		return err
	}

79
	pchan, err := dht.GetClosestPeers(ctx, key)
80 81 82
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
83

84 85 86 87
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
88 89
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
90
			defer wg.Done()
Jeromy's avatar
Jeromy committed
91 92 93 94 95
			notif.PublishQueryEvent(ctx, &notif.QueryEvent{
				Type: notif.Value,
				ID:   p,
			})

96
			err := dht.putValueToPeer(ctx, p, rec)
97
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
98
				log.Debugf("failed putting value to peer: %s", err)
99 100 101 102 103
			}
		}(p)
	}
	wg.Wait()
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104 105
}

Steven Allen's avatar
Steven Allen committed
106 107 108 109 110 111
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112
// GetValue searches for the value corresponding to given Key.
Steven Allen's avatar
Steven Allen committed
113
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Option) (_ []byte, err error) {
114 115 116 117 118 119 120 121
	eip := log.EventBegin(ctx, "GetValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Jeromy's avatar
Jeromy committed
122

123 124 125 126 127 128 129
	// apply defaultQuorum if relevant
	var cfg ropts.Options
	if err := cfg.Apply(opts...); err != nil {
		return nil, err
	}
	opts = append(opts, Quorum(getQuorum(&cfg, defaultQuorum)))

130 131 132 133
	responses, err := dht.SearchValue(ctx, key, opts...)
	if err != nil {
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
134 135
	var best []byte

136 137
	for r := range responses {
		best = r
Łukasz Magiera's avatar
Łukasz Magiera committed
138 139
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
140 141 142 143
	if ctx.Err() != nil {
		return best, ctx.Err()
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
144 145 146 147 148 149 150
	if best == nil {
		return nil, routing.ErrNotFound
	}
	log.Debugf("GetValue %v %v", key, best)
	return best, nil
}

151
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
Steven Allen's avatar
Steven Allen committed
152 153
	var cfg ropts.Options
	if err := cfg.Apply(opts...); err != nil {
154
		return nil, err
Steven Allen's avatar
Steven Allen committed
155 156 157 158
	}

	responsesNeeded := 0
	if !cfg.Offline {
159
		responsesNeeded = getQuorum(&cfg, -1)
Steven Allen's avatar
Steven Allen committed
160 161
	}

162 163 164 165 166 167
	valCh, err := dht.getValues(ctx, key, responsesNeeded)
	if err != nil {
		return nil, err
	}

	out := make(chan []byte)
Łukasz Magiera's avatar
Łukasz Magiera committed
168 169
	go func() {
		defer close(out)
170

Łukasz Magiera's avatar
Łukasz Magiera committed
171 172 173 174
		maxVals := responsesNeeded
		if maxVals < 0 {
			maxVals = defaultQuorum * 4 // we want some upper bound on how
			// much correctional entries we will send
175
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
176
		vals := make([]RecvdVal, 0, maxVals)
Łukasz Magiera's avatar
Łukasz Magiera committed
177
		best := -1
178

Łukasz Magiera's avatar
Łukasz Magiera committed
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
		defer func() {
			if len(vals) <= 1 || best < 0 {
				return
			}
			fixupRec := record.MakePutRecord(key, vals[best].Val)
			for _, v := range vals {
				// if someone sent us a different 'less-valid' record, lets correct them
				if !bytes.Equal(v.Val, vals[best].Val) {
					go func(v RecvdVal) {
						if v.From == dht.self {
							err := dht.putLocal(key, fixupRec)
							if err != nil {
								log.Error("Error correcting local dht entry:", err)
							}
							return
						}
						ctx, cancel := context.WithTimeout(dht.Context(), time.Second*30)
						defer cancel()
						err := dht.putValueToPeer(ctx, v.From, fixupRec)
						if err != nil {
							log.Debug("Error correcting DHT entry: ", err)
						}
					}(v)
				}
			}
		}()
205

Łukasz Magiera's avatar
Łukasz Magiera committed
206 207 208 209
		for {
			select {
			case v, ok := <-valCh:
				if !ok {
210
					return
Łukasz Magiera's avatar
Łukasz Magiera committed
211 212
				}

Łukasz Magiera's avatar
Łukasz Magiera committed
213 214 215 216 217 218 219
				i := len(vals)
				if len(vals) < maxVals {
					vals = append(vals, v)
				} else {
					i = (best + 1) % maxVals
					vals[i] = v
				}
220

Łukasz Magiera's avatar
Łukasz Magiera committed
221 222 223 224 225
				if v.Val == nil {
					continue
				}
				// Select best value
				if best > -1 {
Łukasz Magiera's avatar
Łukasz Magiera committed
226
					sel, err := dht.Validator.Select(key, [][]byte{vals[best].Val, v.Val})
227
					if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
228 229
						log.Warning("Failed to select dht key: ", err)
						continue
Łukasz Magiera's avatar
Łukasz Magiera committed
230
					}
Łukasz Magiera's avatar
Łukasz Magiera committed
231
					if sel == 1 && !bytes.Equal(v.Val, vals[best].Val) {
Łukasz Magiera's avatar
Łukasz Magiera committed
232 233 234 235 236 237 238 239
						best = i
						out <- v.Val
					}
				} else {
					// Output first valid value
					if err := dht.Validator.Validate(key, v.Val); err == nil {
						best = len(vals) - 1
						out <- v.Val
240 241
					}
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
242 243 244
			case <-ctx.Done():
				return
			}
245
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
246
	}()
247

248
	return out, nil
249 250
}

Steven Allen's avatar
Steven Allen committed
251
// GetValues gets nvals values corresponding to the given key.
252 253 254 255 256 257 258 259 260 261 262
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
	valCh, err := dht.getValues(ctx, key, nvals)
	if err != nil {
		return nil, err
	}

	out := make([]RecvdVal, 0, nvals)
	for val := range valCh {
		out = append(out, val)
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
263
	return out, ctx.Err()
264 265
}

266
func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-chan RecvdVal, error) {
267
	eip := log.EventBegin(ctx, "GetValues")
Łukasz Magiera's avatar
Łukasz Magiera committed
268

269
	vals := make(chan RecvdVal, 1)
Łukasz Magiera's avatar
Łukasz Magiera committed
270

271
	done := func(err error) (<-chan RecvdVal, error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
272 273
		defer close(vals)

274 275 276 277 278
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
279
		return vals, err
Łukasz Magiera's avatar
Łukasz Magiera committed
280
	}
281

282
	// If we have it local, don't bother doing an RPC!
283
	lrec, err := dht.getLocal(key)
284 285
	if err != nil {
		// something is wrong with the datastore.
Łukasz Magiera's avatar
Łukasz Magiera committed
286
		return done(err)
287 288
	}
	if lrec != nil {
289
		// TODO: this is tricky, we don't always want to trust our own value
290
		// what if the authoritative source updated it?
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
291
		log.Debug("have it locally")
Łukasz Magiera's avatar
Łukasz Magiera committed
292
		vals <- RecvdVal{
293 294
			Val:  lrec.GetValue(),
			From: dht.self,
Łukasz Magiera's avatar
Łukasz Magiera committed
295
		}
296

297
		if nvals == 0 || nvals == 1 {
Łukasz Magiera's avatar
Łukasz Magiera committed
298
			return done(nil)
299
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
300 301

		nvals--
302
	} else if nvals == 0 {
Łukasz Magiera's avatar
Łukasz Magiera committed
303
		return done(routing.ErrNotFound)
Jeromy's avatar
Jeromy committed
304 305
	}

306
	// get closest peers in the routing table
Jeromy's avatar
Jeromy committed
307
	rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
308
	log.Debugf("peers in rt: %d %s", len(rtp), rtp)
309
	if len(rtp) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
310
		log.Warning("No peers from routing table!")
Łukasz Magiera's avatar
Łukasz Magiera committed
311
		return done(kb.ErrLookupFailure)
312 313
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
314 315 316
	var valslock sync.Mutex
	var got int

317
	// setup the Query
Jeromy's avatar
Jeromy committed
318
	parent := ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
319
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
320
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
321 322 323 324
			Type: notif.SendingQuery,
			ID:   p,
		})

325
		rec, peers, err := dht.getValueOrPeers(ctx, p, key)
326 327 328 329 330 331 332 333 334
		switch err {
		case routing.ErrNotFound:
			// in this case, they responded with nothing,
			// still send a notification so listeners can know the
			// request has completed 'successfully'
			notif.PublishQueryEvent(parent, &notif.QueryEvent{
				Type: notif.PeerResponse,
				ID:   p,
			})
335
			return nil, err
336 337 338 339 340
		default:
			return nil, err

		case nil, errInvalidRecord:
			// in either of these cases, we want to keep going
341
		}
342

343 344
		res := &dhtQueryResult{closerPeers: peers}

345
		if rec.GetValue() != nil || err == errInvalidRecord {
Steven Allen's avatar
Steven Allen committed
346
			rv := RecvdVal{
347 348 349 350
				Val:  rec.GetValue(),
				From: p,
			}
			valslock.Lock()
Łukasz Magiera's avatar
Łukasz Magiera committed
351 352
			vals <- rv
			got++
353

354
			// If we have collected enough records, we're done
Łukasz Magiera's avatar
Łukasz Magiera committed
355
			if nvals == got {
356 357 358
				res.success = true
			}
			valslock.Unlock()
359 360
		}

Jeromy's avatar
Jeromy committed
361
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
362 363
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
364
			Responses: peers,
Jeromy's avatar
Jeromy committed
365 366
		})

367 368
		return res, nil
	})
369

Łukasz Magiera's avatar
Łukasz Magiera committed
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
	go func() {
		reqCtx, cancel := context.WithTimeout(ctx, time.Minute)
		defer cancel()

		_, err = query.Run(reqCtx, rtp)

		// We do have some values but we either ran out of peers to query or
		// searched for a whole minute.
		//
		// We'll just call this a success.
		if got > 0 && (err == routing.ErrNotFound || reqCtx.Err() == context.DeadlineExceeded) {
			err = nil
		}
		done(err)
	}()
385

386
	return vals, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
387 388
}

389 390 391
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
392

393
// Provide makes this node announce that it can provide a value for the given key
394 395 396 397 398 399 400 401
func (dht *IpfsDHT) Provide(ctx context.Context, key *cid.Cid, brdcst bool) (err error) {
	eip := log.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst})
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
402 403

	// add self locally
404
	dht.providers.AddProvider(ctx, key, dht.self)
Jeromy's avatar
Jeromy committed
405 406 407
	if !brdcst {
		return nil
	}
408

409
	peers, err := dht.GetClosestPeers(ctx, key.KeyString())
410 411
	if err != nil {
		return err
412 413
	}

414 415 416 417 418
	mes, err := dht.makeProvRecord(key)
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
419
	wg := sync.WaitGroup{}
420
	for p := range peers {
Jeromy's avatar
Jeromy committed
421 422 423
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
424
			log.Debugf("putProvider(%s, %s)", key, p)
425
			err := dht.sendMessage(ctx, p, mes)
Jeromy's avatar
Jeromy committed
426
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
427
				log.Debug(err)
Jeromy's avatar
Jeromy committed
428 429
			}
		}(p)
430
	}
Jeromy's avatar
Jeromy committed
431
	wg.Wait()
432
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
433
}
434
func (dht *IpfsDHT) makeProvRecord(skey *cid.Cid) (*pb.Message, error) {
435 436 437 438 439 440 441 442 443 444 445
	pi := pstore.PeerInfo{
		ID:    dht.self,
		Addrs: dht.host.Addrs(),
	}

	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
		return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
	}

446
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey.Bytes(), 0)
447 448 449
	pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]pstore.PeerInfo{pi})
	return pmes, nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
450

Brian Tiger Chow's avatar
Brian Tiger Chow committed
451
// FindProviders searches until the context expires.
452
func (dht *IpfsDHT) FindProviders(ctx context.Context, c *cid.Cid) ([]pstore.PeerInfo, error) {
Jeromy's avatar
Jeromy committed
453
	var providers []pstore.PeerInfo
454
	for p := range dht.FindProvidersAsync(ctx, c, KValue) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
455 456 457 458 459
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
460 461 462
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
463 464
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key *cid.Cid, count int) <-chan pstore.PeerInfo {
	log.Event(ctx, "findProviders", key)
Jeromy's avatar
Jeromy committed
465
	peerOut := make(chan pstore.PeerInfo, count)
Jeromy's avatar
Jeromy committed
466 467 468 469
	go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
	return peerOut
}

470 471
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key *cid.Cid, count int, peerOut chan pstore.PeerInfo) {
	defer log.EventBegin(ctx, "findProvidersAsync", key).Done()
Jeromy's avatar
Jeromy committed
472 473
	defer close(peerOut)

Jeromy's avatar
Jeromy committed
474
	ps := pset.NewLimited(count)
Jeromy's avatar
Jeromy committed
475 476
	provs := dht.providers.GetProviders(ctx, key)
	for _, p := range provs {
477
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
478
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
479
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
480
			select {
Jeromy's avatar
Jeromy committed
481
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
482 483 484
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
485
		}
Jeromy's avatar
Jeromy committed
486

487
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
488
		// TODO: is this a DOS vector?
Jeromy's avatar
Jeromy committed
489
		if ps.Size() >= count {
Jeromy's avatar
Jeromy committed
490 491 492 493 494
			return
		}
	}

	// setup the Query
Jeromy's avatar
Jeromy committed
495
	parent := ctx
496
	query := dht.newQuery(key.KeyString(), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
497
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
498 499 500
			Type: notif.SendingQuery,
			ID:   p,
		})
501
		pmes, err := dht.findProvidersSingle(ctx, p, key)
Jeromy's avatar
Jeromy committed
502 503 504 505
		if err != nil {
			return nil, err
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
506
		log.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
507
		provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
508
		log.Debugf("%d provider entries decoded", len(provs))
Jeromy's avatar
Jeromy committed
509 510 511

		// Add unique providers from request, up to 'count'
		for _, prov := range provs {
512 513 514
			if prov.ID != dht.self {
				dht.peerstore.AddAddrs(prov.ID, prov.Addrs, pstore.TempAddrTTL)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
515
			log.Debugf("got provider: %s", prov)
516
			if ps.TryAdd(prov.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
517
				log.Debugf("using provider: %s", prov)
Jeromy's avatar
Jeromy committed
518
				select {
Jeromy's avatar
Jeromy committed
519
				case peerOut <- *prov:
Jeromy's avatar
Jeromy committed
520
				case <-ctx.Done():
521
					log.Debug("context timed out sending more providers")
Jeromy's avatar
Jeromy committed
522 523
					return nil, ctx.Err()
				}
524
			}
Jeromy's avatar
Jeromy committed
525
			if ps.Size() >= count {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
526
				log.Debugf("got enough providers (%d/%d)", ps.Size(), count)
Jeromy's avatar
Jeromy committed
527
				return &dhtQueryResult{success: true}, nil
528 529 530
			}
		}

Jeromy's avatar
Jeromy committed
531 532
		// Give closer peers back to the query to be queried
		closer := pmes.GetCloserPeers()
533
		clpeers := pb.PBPeersToPeerInfos(closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
534
		log.Debugf("got closer peers: %d %s", len(clpeers), clpeers)
535

Jeromy's avatar
Jeromy committed
536
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
537 538
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
539
			Responses: clpeers,
540
		})
Jeromy's avatar
Jeromy committed
541 542 543
		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

Jeromy's avatar
Jeromy committed
544
	peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.KeyString()), AlphaValue)
Jeromy's avatar
Jeromy committed
545 546
	_, err := query.Run(ctx, peers)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
547
		log.Debugf("Query error: %s", err)
548 549 550 551 552 553 554 555 556 557
		// Special handling for issue: https://github.com/ipfs/go-ipfs/issues/3032
		if fmt.Sprint(err) == "<nil>" {
			log.Error("reproduced bug 3032:")
			log.Errorf("Errors type information: %#v", err)
			log.Errorf("go version: %s", runtime.Version())
			log.Error("please report this information to: https://github.com/ipfs/go-ipfs/issues/3032")

			// replace problematic error with something that won't crash the daemon
			err = fmt.Errorf("<nil>")
		}
558 559 560 561
		notif.PublishQueryEvent(ctx, &notif.QueryEvent{
			Type:  notif.QueryError,
			Extra: err.Error(),
		})
Jeromy's avatar
Jeromy committed
562
	}
563 564
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
565
// FindPeer searches for a peer with given ID.
566 567 568 569 570 571 572 573
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo, err error) {
	eip := log.EventBegin(ctx, "FindPeer", id)
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
574

575
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
576
	if pi := dht.FindLocal(id); pi.ID != "" {
577
		return pi, nil
578 579
	}

Jeromy's avatar
Jeromy committed
580
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
581
	if len(peers) == 0 {
Jeromy's avatar
Jeromy committed
582
		return pstore.PeerInfo{}, kb.ErrLookupFailure
583
	}
584

Jeromy's avatar
Jeromy committed
585
	// Sanity...
586
	for _, p := range peers {
587
		if p == id {
588
			log.Debug("found target peer in list of closest peers...")
589
			return dht.peerstore.PeerInfo(p), nil
590
		}
591
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
592

Jeromy's avatar
Jeromy committed
593
	// setup the Query
Jeromy's avatar
Jeromy committed
594
	parent := ctx
595
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
596
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
597 598 599
			Type: notif.SendingQuery,
			ID:   p,
		})
Jeromy's avatar
Jeromy committed
600

601
		pmes, err := dht.findPeerSingle(ctx, p, id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
602
		if err != nil {
603
			return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
604
		}
605

Jeromy's avatar
Jeromy committed
606
		closer := pmes.GetCloserPeers()
607
		clpeerInfos := pb.PBPeersToPeerInfos(closer)
608

609
		// see if we got the peer here
610 611
		for _, npi := range clpeerInfos {
			if npi.ID == id {
Jeromy's avatar
Jeromy committed
612
				return &dhtQueryResult{
613
					peer:    npi,
Jeromy's avatar
Jeromy committed
614 615 616
					success: true,
				}, nil
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
617 618
		}

Jeromy's avatar
Jeromy committed
619
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
620
			Type:      notif.PeerResponse,
621
			ID:        p,
Jeromy's avatar
Jeromy committed
622
			Responses: clpeerInfos,
623 624
		})

625
		return &dhtQueryResult{closerPeers: clpeerInfos}, nil
626
	})
627

Jeromy's avatar
Jeromy committed
628
	// run it!
629
	result, err := query.Run(ctx, peers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
630
	if err != nil {
Jeromy's avatar
Jeromy committed
631
		return pstore.PeerInfo{}, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
632 633
	}

634
	log.Debugf("FindPeer %v %v", id, result.success)
635
	if result.peer.ID == "" {
Jeromy's avatar
Jeromy committed
636
		return pstore.PeerInfo{}, routing.ErrNotFound
637
	}
Jeromy's avatar
Jeromy committed
638

Jeromy's avatar
Jeromy committed
639
	return *result.peer, nil
640 641
}

642
// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
Jeromy's avatar
Jeromy committed
643
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan *pstore.PeerInfo, error) {
644

Jeromy's avatar
Jeromy committed
645
	peerchan := make(chan *pstore.PeerInfo, asyncQueryBuffer)
Jeromy's avatar
Jeromy committed
646
	peersSeen := make(map[peer.ID]struct{})
647
	var peersSeenMx sync.Mutex
648

Jeromy's avatar
Jeromy committed
649
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
650
	if len(peers) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
651
		return nil, kb.ErrLookupFailure
652 653 654
	}

	// setup the Query
655
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
656

657
		pmes, err := dht.findPeerSingle(ctx, p, id)
658 659 660 661
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
662
		var clpeers []*pstore.PeerInfo
663 664
		closer := pmes.GetCloserPeers()
		for _, pbp := range closer {
665
			pi := pb.PBPeerToPeerInfo(pbp)
666

667
			// skip peers already seen
668
			peersSeenMx.Lock()
669
			if _, found := peersSeen[pi.ID]; found {
670
				peersSeenMx.Unlock()
671 672
				continue
			}
673
			peersSeen[pi.ID] = struct{}{}
674
			peersSeenMx.Unlock()
675 676

			// if peer is connected, send it to our client.
677
			if pb.Connectedness(pbp.Connection) == inet.Connected {
678 679 680
				select {
				case <-ctx.Done():
					return nil, ctx.Err()
681
				case peerchan <- pi:
682 683 684 685
				}
			}

			// if peer is the peer we're looking for, don't bother querying it.
686
			// TODO maybe query it?
687
			if pb.Connectedness(pbp.Connection) != inet.Connected {
688
				clpeers = append(clpeers, pi)
689 690 691 692 693 694 695 696 697
			}
		}

		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

	// run it! run it asynchronously to gen peers as results are found.
	// this does no error checking
	go func() {
698
		if _, err := query.Run(ctx, peers); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
699
			log.Debug(err)
700 701 702 703 704 705 706 707
		}

		// close the peerchan channel when done.
		close(peerchan)
	}()

	return peerchan, nil
}