routing.go 19.2 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
7
	"runtime"
Jeromy's avatar
Jeromy committed
8
	"sync"
Jeromy's avatar
Jeromy committed
9
	"time"
10

11 12 13 14 15
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/libp2p/go-libp2p-core/peerstore"
	"github.com/libp2p/go-libp2p-core/routing"

16
	"github.com/ipfs/go-cid"
17
	u "github.com/ipfs/go-ipfs-util"
Jeromy's avatar
Jeromy committed
18
	logging "github.com/ipfs/go-log"
19
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
George Antoniadis's avatar
George Antoniadis committed
20 21
	kb "github.com/libp2p/go-libp2p-kbucket"
	record "github.com/libp2p/go-libp2p-record"
Adin Schmahmann's avatar
Adin Schmahmann committed
22
	"github.com/multiformats/go-multihash"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23 24
)

25 26 27 28 29 30
// asyncQueryBuffer is the size of buffered channels in async queries. This
// buffer allows multiple queries to execute simultaneously, return their
// results and continue querying closer peers. Note that different query
// results will wait for the channel to drain.
var asyncQueryBuffer = 10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31 32 33 34 35
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
36
// This is the top level "Store" operation of the DHT
37
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) {
38 39 40 41
	if !dht.enableValues {
		return routing.ErrNotSupported
	}

Matt Joiner's avatar
Matt Joiner committed
42
	eip := logger.EventBegin(ctx, "PutValue")
43 44 45 46 47 48 49
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Matt Joiner's avatar
Matt Joiner committed
50
	logger.Debugf("PutValue %s", key)
Jeromy's avatar
Jeromy committed
51

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

75
	rec := record.MakePutRecord(key, value)
76
	rec.TimeReceived = u.FormatRFC3339(time.Now())
Jeromy's avatar
Jeromy committed
77
	err = dht.putLocal(key, rec)
78 79 80 81
	if err != nil {
		return err
	}

82
	pchan, err := dht.GetClosestPeers(ctx, key)
83 84 85
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86

87 88 89 90
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
91 92
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
93
			defer wg.Done()
94 95
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.Value,
Jeromy's avatar
Jeromy committed
96 97 98
				ID:   p,
			})

99
			err := dht.putValueToPeer(ctx, p, rec)
100
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
101
				logger.Debugf("failed putting value to peer: %s", err)
102 103 104 105
			}
		}(p)
	}
	wg.Wait()
106

107
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
108 109
}

Steven Allen's avatar
Steven Allen committed
110 111 112 113 114 115
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
116
// GetValue searches for the value corresponding to given Key.
117
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) {
118 119 120 121
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

Matt Joiner's avatar
Matt Joiner committed
122
	eip := logger.EventBegin(ctx, "GetValue")
123 124 125 126 127 128 129
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Jeromy's avatar
Jeromy committed
130

131
	// apply defaultQuorum if relevant
132
	var cfg routing.Options
133 134 135 136 137
	if err := cfg.Apply(opts...); err != nil {
		return nil, err
	}
	opts = append(opts, Quorum(getQuorum(&cfg, defaultQuorum)))

138 139 140 141
	responses, err := dht.SearchValue(ctx, key, opts...)
	if err != nil {
		return nil, err
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
142 143
	var best []byte

144 145
	for r := range responses {
		best = r
Łukasz Magiera's avatar
Łukasz Magiera committed
146 147
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
148 149 150 151
	if ctx.Err() != nil {
		return best, ctx.Err()
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
152 153 154
	if best == nil {
		return nil, routing.ErrNotFound
	}
Matt Joiner's avatar
Matt Joiner committed
155
	logger.Debugf("GetValue %v %v", key, best)
Łukasz Magiera's avatar
Łukasz Magiera committed
156 157 158
	return best, nil
}

159
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
160 161 162 163
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}

164
	var cfg routing.Options
Steven Allen's avatar
Steven Allen committed
165
	if err := cfg.Apply(opts...); err != nil {
166
		return nil, err
Steven Allen's avatar
Steven Allen committed
167 168 169 170
	}

	responsesNeeded := 0
	if !cfg.Offline {
171
		responsesNeeded = getQuorum(&cfg, -1)
Steven Allen's avatar
Steven Allen committed
172 173
	}

174 175 176 177 178 179
	valCh, err := dht.getValues(ctx, key, responsesNeeded)
	if err != nil {
		return nil, err
	}

	out := make(chan []byte)
Łukasz Magiera's avatar
Łukasz Magiera committed
180 181
	go func() {
		defer close(out)
182

Łukasz Magiera's avatar
Łukasz Magiera committed
183 184 185 186
		maxVals := responsesNeeded
		if maxVals < 0 {
			maxVals = defaultQuorum * 4 // we want some upper bound on how
			// much correctional entries we will send
187
		}
188 189 190

		// vals is used collect entries we got so far and send corrections to peers
		// when we exit this function
Łukasz Magiera's avatar
Łukasz Magiera committed
191
		vals := make([]RecvdVal, 0, maxVals)
192
		var best *RecvdVal
193

Łukasz Magiera's avatar
Łukasz Magiera committed
194
		defer func() {
195
			if len(vals) <= 1 || best == nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
196 197
				return
			}
198
			fixupRec := record.MakePutRecord(key, best.Val)
Łukasz Magiera's avatar
Łukasz Magiera committed
199 200
			for _, v := range vals {
				// if someone sent us a different 'less-valid' record, lets correct them
201
				if !bytes.Equal(v.Val, best.Val) {
Łukasz Magiera's avatar
Łukasz Magiera committed
202 203 204 205
					go func(v RecvdVal) {
						if v.From == dht.self {
							err := dht.putLocal(key, fixupRec)
							if err != nil {
Matt Joiner's avatar
Matt Joiner committed
206
								logger.Error("Error correcting local dht entry:", err)
Łukasz Magiera's avatar
Łukasz Magiera committed
207 208 209 210 211 212 213
							}
							return
						}
						ctx, cancel := context.WithTimeout(dht.Context(), time.Second*30)
						defer cancel()
						err := dht.putValueToPeer(ctx, v.From, fixupRec)
						if err != nil {
Matt Joiner's avatar
Matt Joiner committed
214
							logger.Debug("Error correcting DHT entry: ", err)
Łukasz Magiera's avatar
Łukasz Magiera committed
215 216 217 218 219
						}
					}(v)
				}
			}
		}()
220

Łukasz Magiera's avatar
Łukasz Magiera committed
221 222 223 224
		for {
			select {
			case v, ok := <-valCh:
				if !ok {
225
					return
Łukasz Magiera's avatar
Łukasz Magiera committed
226 227
				}

Łukasz Magiera's avatar
Łukasz Magiera committed
228 229 230
				if len(vals) < maxVals {
					vals = append(vals, v)
				}
231

Łukasz Magiera's avatar
Łukasz Magiera committed
232 233 234 235
				if v.Val == nil {
					continue
				}
				// Select best value
236
				if best != nil {
Steven Allen's avatar
Steven Allen committed
237 238 239
					if bytes.Equal(best.Val, v.Val) {
						continue
					}
240
					sel, err := dht.Validator.Select(key, [][]byte{best.Val, v.Val})
241
					if err != nil {
Matt Joiner's avatar
Matt Joiner committed
242
						logger.Warning("Failed to select dht key: ", err)
Łukasz Magiera's avatar
Łukasz Magiera committed
243
						continue
Łukasz Magiera's avatar
Łukasz Magiera committed
244
					}
Steven Allen's avatar
Steven Allen committed
245 246
					if sel != 1 {
						continue
247 248
					}
				}
Steven Allen's avatar
Steven Allen committed
249 250 251 252 253 254
				best = &v
				select {
				case out <- v.Val:
				case <-ctx.Done():
					return
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
255 256 257
			case <-ctx.Done():
				return
			}
258
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
259
	}()
260

261
	return out, nil
262 263
}

Steven Allen's avatar
Steven Allen committed
264
// GetValues gets nvals values corresponding to the given key.
265
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
266 267 268
	if !dht.enableValues {
		return nil, routing.ErrNotSupported
	}
269

270
	eip := logger.EventBegin(ctx, "GetValues")
271 272 273
	eip.Append(loggableKey(key))
	defer eip.Done()

274 275
	valCh, err := dht.getValues(ctx, key, nvals)
	if err != nil {
276
		eip.SetError(err)
277 278 279 280 281 282 283 284
		return nil, err
	}

	out := make([]RecvdVal, 0, nvals)
	for val := range valCh {
		out = append(out, val)
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
285
	return out, ctx.Err()
286 287
}

288
func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-chan RecvdVal, error) {
289
	vals := make(chan RecvdVal, 1)
Łukasz Magiera's avatar
Łukasz Magiera committed
290

291
	done := func(err error) (<-chan RecvdVal, error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
292
		defer close(vals)
293
		return vals, err
Łukasz Magiera's avatar
Łukasz Magiera committed
294
	}
295

296
	// If we have it local, don't bother doing an RPC!
297
	lrec, err := dht.getLocal(key)
298 299
	if err != nil {
		// something is wrong with the datastore.
Łukasz Magiera's avatar
Łukasz Magiera committed
300
		return done(err)
301 302
	}
	if lrec != nil {
303
		// TODO: this is tricky, we don't always want to trust our own value
304
		// what if the authoritative source updated it?
Matt Joiner's avatar
Matt Joiner committed
305
		logger.Debug("have it locally")
Łukasz Magiera's avatar
Łukasz Magiera committed
306
		vals <- RecvdVal{
307 308
			Val:  lrec.GetValue(),
			From: dht.self,
Łukasz Magiera's avatar
Łukasz Magiera committed
309
		}
310

311
		if nvals == 0 || nvals == 1 {
Łukasz Magiera's avatar
Łukasz Magiera committed
312
			return done(nil)
313
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
314 315

		nvals--
316
	} else if nvals == 0 {
Łukasz Magiera's avatar
Łukasz Magiera committed
317
		return done(routing.ErrNotFound)
Jeromy's avatar
Jeromy committed
318 319
	}

320
	// get closest peers in the routing table
Jeromy's avatar
Jeromy committed
321
	rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
Matt Joiner's avatar
Matt Joiner committed
322
	logger.Debugf("peers in rt: %d %s", len(rtp), rtp)
323
	if len(rtp) == 0 {
Matt Joiner's avatar
Matt Joiner committed
324
		logger.Warning("No peers from routing table!")
Łukasz Magiera's avatar
Łukasz Magiera committed
325
		return done(kb.ErrLookupFailure)
326 327
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
328 329 330
	var valslock sync.Mutex
	var got int

331
	// setup the Query
Jeromy's avatar
Jeromy committed
332
	parent := ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
333
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
334 335
		routing.PublishQueryEvent(parent, &routing.QueryEvent{
			Type: routing.SendingQuery,
Jeromy's avatar
Jeromy committed
336 337 338
			ID:   p,
		})

339
		rec, peers, err := dht.getValueOrPeers(ctx, p, key)
340 341 342 343 344
		switch err {
		case routing.ErrNotFound:
			// in this case, they responded with nothing,
			// still send a notification so listeners can know the
			// request has completed 'successfully'
345 346
			routing.PublishQueryEvent(parent, &routing.QueryEvent{
				Type: routing.PeerResponse,
347 348
				ID:   p,
			})
349
			return nil, err
350 351 352 353 354
		default:
			return nil, err

		case nil, errInvalidRecord:
			// in either of these cases, we want to keep going
355
		}
356

357 358
		res := &dhtQueryResult{closerPeers: peers}

359
		if rec.GetValue() != nil || err == errInvalidRecord {
Steven Allen's avatar
Steven Allen committed
360
			rv := RecvdVal{
361 362 363 364
				Val:  rec.GetValue(),
				From: p,
			}
			valslock.Lock()
365 366 367 368 369 370
			select {
			case vals <- rv:
			case <-ctx.Done():
				valslock.Unlock()
				return nil, ctx.Err()
			}
Łukasz Magiera's avatar
Łukasz Magiera committed
371
			got++
372

373
			// If we have collected enough records, we're done
Łukasz Magiera's avatar
Łukasz Magiera committed
374
			if nvals == got {
375 376 377
				res.success = true
			}
			valslock.Unlock()
378 379
		}

380 381
		routing.PublishQueryEvent(parent, &routing.QueryEvent{
			Type:      routing.PeerResponse,
Jeromy's avatar
Jeromy committed
382
			ID:        p,
Jeromy's avatar
Jeromy committed
383
			Responses: peers,
Jeromy's avatar
Jeromy committed
384 385
		})

386 387
		return res, nil
	})
388

Łukasz Magiera's avatar
Łukasz Magiera committed
389 390 391 392 393 394 395 396 397 398 399
	go func() {
		reqCtx, cancel := context.WithTimeout(ctx, time.Minute)
		defer cancel()

		_, err = query.Run(reqCtx, rtp)

		// We do have some values but we either ran out of peers to query or
		// searched for a whole minute.
		//
		// We'll just call this a success.
		if got > 0 && (err == routing.ErrNotFound || reqCtx.Err() == context.DeadlineExceeded) {
Aarsh Shah's avatar
Aarsh Shah committed
400 401
			// refresh the cpl for this key as the query was successful
			dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now())
Łukasz Magiera's avatar
Łukasz Magiera committed
402 403 404 405
			err = nil
		}
		done(err)
	}()
406

407
	return vals, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
408 409
}

410 411 412
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
413

414
// Provide makes this node announce that it can provide a value for the given key
415
func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
416 417 418
	if !dht.enableProviders {
		return routing.ErrNotSupported
	}
419
	keyMH := key.Hash()
Adin Schmahmann's avatar
Adin Schmahmann committed
420
	eip := logger.EventBegin(ctx, "Provide", multihashLoggableKey(keyMH), logging.LoggableMap{"broadcast": brdcst})
421 422 423 424 425 426
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
427 428

	// add self locally
429
	dht.providers.AddProvider(ctx, keyMH, dht.self)
Jeromy's avatar
Jeromy committed
430 431 432
	if !brdcst {
		return nil
	}
433

434 435 436 437 438 439 440 441 442 443
	closerCtx := ctx
	if deadline, ok := ctx.Deadline(); ok {
		now := time.Now()
		timeout := deadline.Sub(now)

		if timeout < 0 {
			// timed out
			return context.DeadlineExceeded
		} else if timeout < 10*time.Second {
			// Reserve 10% for the final put.
444
			deadline = deadline.Add(-timeout / 10)
445 446 447 448 449 450 451 452 453 454
		} else {
			// Otherwise, reserve a second (we'll already be
			// connected so this should be fast).
			deadline = deadline.Add(-time.Second)
		}
		var cancel context.CancelFunc
		closerCtx, cancel = context.WithDeadline(ctx, deadline)
		defer cancel()
	}

455
	peers, err := dht.GetClosestPeers(closerCtx, string(keyMH))
456 457
	if err != nil {
		return err
458 459
	}

460
	mes, err := dht.makeProvRecord(keyMH)
461 462 463 464
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
465
	wg := sync.WaitGroup{}
466
	for p := range peers {
Jeromy's avatar
Jeromy committed
467 468 469
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
470
			logger.Debugf("putProvider(%s, %s)", keyMH, p)
471
			err := dht.sendMessage(ctx, p, mes)
Jeromy's avatar
Jeromy committed
472
			if err != nil {
Matt Joiner's avatar
Matt Joiner committed
473
				logger.Debug(err)
Jeromy's avatar
Jeromy committed
474 475
			}
		}(p)
476
	}
Jeromy's avatar
Jeromy committed
477
	wg.Wait()
478
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
479
}
480
func (dht *IpfsDHT) makeProvRecord(key []byte) (*pb.Message, error) {
481
	pi := peer.AddrInfo{
482 483 484 485 486 487 488 489 490 491
		ID:    dht.self,
		Addrs: dht.host.Addrs(),
	}

	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
		return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
	}

492
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, key, 0)
493
	pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]peer.AddrInfo{pi})
494 495
	return pmes, nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
496

Brian Tiger Chow's avatar
Brian Tiger Chow committed
497
// FindProviders searches until the context expires.
498
func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
499 500 501
	if !dht.enableProviders {
		return nil, routing.ErrNotSupported
	}
502
	var providers []peer.AddrInfo
503
	for p := range dht.FindProvidersAsync(ctx, c, dht.bucketSize) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
504 505 506 507 508
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
509 510 511
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
512
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
513
	peerOut := make(chan peer.AddrInfo, count)
514
	if !dht.enableProviders {
515 516
		close(peerOut)
		return peerOut
517 518
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
519 520
	keyMH := key.Hash()
	logger.Event(ctx, "findProviders", multihashLoggableKey(keyMH))
521

Adin Schmahmann's avatar
Adin Schmahmann committed
522
	go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut)
Jeromy's avatar
Jeromy committed
523 524 525
	return peerOut
}

Adin Schmahmann's avatar
Adin Schmahmann committed
526 527
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
	defer logger.EventBegin(ctx, "findProvidersAsync", multihashLoggableKey(key)).Done()
Jeromy's avatar
Jeromy committed
528 529
	defer close(peerOut)

530
	ps := peer.NewLimitedSet(count)
Adin Schmahmann's avatar
Adin Schmahmann committed
531
	provs := dht.providers.GetProviders(ctx, key)
Jeromy's avatar
Jeromy committed
532
	for _, p := range provs {
533
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
534
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
535
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
536
			select {
Jeromy's avatar
Jeromy committed
537
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
538 539 540
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
541
		}
Jeromy's avatar
Jeromy committed
542

543
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
544
		// TODO: is this a DOS vector?
Jeromy's avatar
Jeromy committed
545
		if ps.Size() >= count {
Jeromy's avatar
Jeromy committed
546 547 548 549
			return
		}
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
550
	peers := dht.routingTable.NearestPeers(kb.ConvertKey(string(key)), AlphaValue)
551
	if len(peers) == 0 {
552 553
		routing.PublishQueryEvent(ctx, &routing.QueryEvent{
			Type:  routing.QueryError,
554 555 556 557 558
			Extra: kb.ErrLookupFailure.Error(),
		})
		return
	}

Jeromy's avatar
Jeromy committed
559
	// setup the Query
Jeromy's avatar
Jeromy committed
560
	parent := ctx
Adin Schmahmann's avatar
Adin Schmahmann committed
561
	query := dht.newQuery(string(key), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
562 563
		routing.PublishQueryEvent(parent, &routing.QueryEvent{
			Type: routing.SendingQuery,
564 565
			ID:   p,
		})
566
		pmes, err := dht.findProvidersSingle(ctx, p, key)
Jeromy's avatar
Jeromy committed
567 568 569 570
		if err != nil {
			return nil, err
		}

Matt Joiner's avatar
Matt Joiner committed
571
		logger.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
572
		provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
Matt Joiner's avatar
Matt Joiner committed
573
		logger.Debugf("%d provider entries decoded", len(provs))
Jeromy's avatar
Jeromy committed
574 575 576

		// Add unique providers from request, up to 'count'
		for _, prov := range provs {
577
			if prov.ID != dht.self {
578
				dht.peerstore.AddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL)
579
			}
Matt Joiner's avatar
Matt Joiner committed
580
			logger.Debugf("got provider: %s", prov)
581
			if ps.TryAdd(prov.ID) {
Matt Joiner's avatar
Matt Joiner committed
582
				logger.Debugf("using provider: %s", prov)
Jeromy's avatar
Jeromy committed
583
				select {
Jeromy's avatar
Jeromy committed
584
				case peerOut <- *prov:
Jeromy's avatar
Jeromy committed
585
				case <-ctx.Done():
Matt Joiner's avatar
Matt Joiner committed
586
					logger.Debug("context timed out sending more providers")
Jeromy's avatar
Jeromy committed
587 588
					return nil, ctx.Err()
				}
589
			}
Jeromy's avatar
Jeromy committed
590
			if ps.Size() >= count {
Matt Joiner's avatar
Matt Joiner committed
591
				logger.Debugf("got enough providers (%d/%d)", ps.Size(), count)
Jeromy's avatar
Jeromy committed
592
				return &dhtQueryResult{success: true}, nil
593 594 595
			}
		}

Jeromy's avatar
Jeromy committed
596 597
		// Give closer peers back to the query to be queried
		closer := pmes.GetCloserPeers()
598
		clpeers := pb.PBPeersToPeerInfos(closer)
Matt Joiner's avatar
Matt Joiner committed
599
		logger.Debugf("got closer peers: %d %s", len(clpeers), clpeers)
600

601 602
		routing.PublishQueryEvent(parent, &routing.QueryEvent{
			Type:      routing.PeerResponse,
603
			ID:        p,
Jeromy's avatar
Jeromy committed
604
			Responses: clpeers,
605
		})
Jeromy's avatar
Jeromy committed
606 607 608 609 610
		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

	_, err := query.Run(ctx, peers)
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
611
		logger.Debugf("Query error: %s", err)
612 613
		// Special handling for issue: https://github.com/ipfs/go-ipfs/issues/3032
		if fmt.Sprint(err) == "<nil>" {
Matt Joiner's avatar
Matt Joiner committed
614 615 616 617
			logger.Error("reproduced bug 3032:")
			logger.Errorf("Errors type information: %#v", err)
			logger.Errorf("go version: %s", runtime.Version())
			logger.Error("please report this information to: https://github.com/ipfs/go-ipfs/issues/3032")
618 619 620 621

			// replace problematic error with something that won't crash the daemon
			err = fmt.Errorf("<nil>")
		}
622 623
		routing.PublishQueryEvent(ctx, &routing.QueryEvent{
			Type:  routing.QueryError,
624 625
			Extra: err.Error(),
		})
Jeromy's avatar
Jeromy committed
626
	}
627

Aarsh Shah's avatar
Aarsh Shah committed
628
	// refresh the cpl for this key after the query is run
Adin Schmahmann's avatar
Adin Schmahmann committed
629
	dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(string(key)), time.Now())
630 631
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
632
// FindPeer searches for a peer with given ID.
633
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) {
Matt Joiner's avatar
Matt Joiner committed
634
	eip := logger.EventBegin(ctx, "FindPeer", id)
635 636 637 638 639 640
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
641

642
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
643
	if pi := dht.FindLocal(id); pi.ID != "" {
644
		return pi, nil
645 646
	}

Jeromy's avatar
Jeromy committed
647
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
648
	if len(peers) == 0 {
649
		return peer.AddrInfo{}, kb.ErrLookupFailure
650
	}
651

Jeromy's avatar
Jeromy committed
652
	// Sanity...
653
	for _, p := range peers {
654
		if p == id {
Matt Joiner's avatar
Matt Joiner committed
655
			logger.Debug("found target peer in list of closest peers...")
656
			return dht.peerstore.PeerInfo(p), nil
657
		}
658
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
659

Jeromy's avatar
Jeromy committed
660
	// setup the Query
Jeromy's avatar
Jeromy committed
661
	parent := ctx
662
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
663 664
		routing.PublishQueryEvent(parent, &routing.QueryEvent{
			Type: routing.SendingQuery,
665 666
			ID:   p,
		})
Jeromy's avatar
Jeromy committed
667

668
		pmes, err := dht.findPeerSingle(ctx, p, id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
669
		if err != nil {
670
			return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
671
		}
672

Jeromy's avatar
Jeromy committed
673
		closer := pmes.GetCloserPeers()
674
		clpeerInfos := pb.PBPeersToPeerInfos(closer)
675

676
		// see if we got the peer here
677 678
		for _, npi := range clpeerInfos {
			if npi.ID == id {
Jeromy's avatar
Jeromy committed
679
				return &dhtQueryResult{
680
					peer:    npi,
Jeromy's avatar
Jeromy committed
681 682 683
					success: true,
				}, nil
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
684 685
		}

686 687
		routing.PublishQueryEvent(parent, &routing.QueryEvent{
			Type:      routing.PeerResponse,
688
			ID:        p,
Jeromy's avatar
Jeromy committed
689
			Responses: clpeerInfos,
690 691
		})

692
		return &dhtQueryResult{closerPeers: clpeerInfos}, nil
693
	})
694

Jeromy's avatar
Jeromy committed
695
	// run it!
696
	result, err := query.Run(ctx, peers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
697
	if err != nil {
698
		return peer.AddrInfo{}, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
699 700
	}

Aarsh Shah's avatar
Aarsh Shah committed
701 702
	// refresh the cpl for this key since the lookup was successful
	dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertPeerID(id), time.Now())
703

Matt Joiner's avatar
Matt Joiner committed
704
	logger.Debugf("FindPeer %v %v", id, result.success)
705
	if result.peer.ID == "" {
706
		return peer.AddrInfo{}, routing.ErrNotFound
707
	}
Jeromy's avatar
Jeromy committed
708

Jeromy's avatar
Jeromy committed
709
	return *result.peer, nil
710 711
}

712
// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
713
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan *peer.AddrInfo, error) {
714

715
	peerchan := make(chan *peer.AddrInfo, asyncQueryBuffer)
Jeromy's avatar
Jeromy committed
716
	peersSeen := make(map[peer.ID]struct{})
717
	var peersSeenMx sync.Mutex
718

Jeromy's avatar
Jeromy committed
719
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
720
	if len(peers) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
721
		return nil, kb.ErrLookupFailure
722 723 724
	}

	// setup the Query
725
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
726

727
		pmes, err := dht.findPeerSingle(ctx, p, id)
728 729 730 731
		if err != nil {
			return nil, err
		}

732
		var clpeers []*peer.AddrInfo
733 734
		closer := pmes.GetCloserPeers()
		for _, pbp := range closer {
735
			pi := pb.PBPeerToPeerInfo(pbp)
736

737
			// skip peers already seen
738
			peersSeenMx.Lock()
739
			if _, found := peersSeen[pi.ID]; found {
740
				peersSeenMx.Unlock()
741 742
				continue
			}
743
			peersSeen[pi.ID] = struct{}{}
744
			peersSeenMx.Unlock()
745 746

			// if peer is connected, send it to our client.
747
			if pb.Connectedness(pbp.Connection) == network.Connected {
748 749 750
				select {
				case <-ctx.Done():
					return nil, ctx.Err()
751
				case peerchan <- pi:
752 753 754 755
				}
			}

			// if peer is the peer we're looking for, don't bother querying it.
756
			// TODO maybe query it?
757
			if pb.Connectedness(pbp.Connection) != network.Connected {
758
				clpeers = append(clpeers, pi)
759 760 761 762 763 764 765 766 767
			}
		}

		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

	// run it! run it asynchronously to gen peers as results are found.
	// this does no error checking
	go func() {
768
		if _, err := query.Run(ctx, peers); err != nil {
Matt Joiner's avatar
Matt Joiner committed
769
			logger.Debug(err)
770 771
		}

Aarsh Shah's avatar
Aarsh Shah committed
772 773
		// refresh the cpl for this key
		dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertPeerID(id), time.Now())
774

775 776 777 778 779 780
		// close the peerchan channel when done.
		close(peerchan)
	}()

	return peerchan, nil
}