routing.go 17.3 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
7
	"runtime"
Jeromy's avatar
Jeromy committed
8
	"sync"
Jeromy's avatar
Jeromy committed
9
	"time"
10

11
	cid "github.com/ipfs/go-cid"
12
	u "github.com/ipfs/go-ipfs-util"
Jeromy's avatar
Jeromy committed
13
	logging "github.com/ipfs/go-log"
George Antoniadis's avatar
George Antoniadis committed
14 15
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
	kb "github.com/libp2p/go-libp2p-kbucket"
16 17 18 19
	inet "github.com/libp2p/go-libp2p-net"
	peer "github.com/libp2p/go-libp2p-peer"
	pset "github.com/libp2p/go-libp2p-peer/peerset"
	pstore "github.com/libp2p/go-libp2p-peerstore"
George Antoniadis's avatar
George Antoniadis committed
20 21 22
	record "github.com/libp2p/go-libp2p-record"
	routing "github.com/libp2p/go-libp2p-routing"
	notif "github.com/libp2p/go-libp2p-routing/notifications"
Steven Allen's avatar
Steven Allen committed
23
	ropts "github.com/libp2p/go-libp2p-routing/options"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24 25
)

26 27 28 29 30 31
// asyncQueryBuffer is the size of buffered channels in async queries. This
// buffer allows multiple queries to execute simultaneously, return their
// results and continue querying closer peers. Note that different query
// results will wait for the channel to drain.
var asyncQueryBuffer = 10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32 33 34 35 36
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
37
// This is the top level "Store" operation of the DHT
Steven Allen's avatar
Steven Allen committed
38
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...ropts.Option) (err error) {
39 40 41 42 43 44 45 46
	eip := log.EventBegin(ctx, "PutValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47
	log.Debugf("PutValue %s", key)
Jeromy's avatar
Jeromy committed
48

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

72
	rec := record.MakePutRecord(key, value)
73
	rec.TimeReceived = u.FormatRFC3339(time.Now())
Jeromy's avatar
Jeromy committed
74
	err = dht.putLocal(key, rec)
75 76 77 78
	if err != nil {
		return err
	}

79
	pchan, err := dht.GetClosestPeers(ctx, key)
80 81 82
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
83

84 85 86 87
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
88 89
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
90
			defer wg.Done()
Jeromy's avatar
Jeromy committed
91 92 93 94 95
			notif.PublishQueryEvent(ctx, &notif.QueryEvent{
				Type: notif.Value,
				ID:   p,
			})

96
			err := dht.putValueToPeer(ctx, p, rec)
97
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
98
				log.Debugf("failed putting value to peer: %s", err)
99 100 101 102 103
			}
		}(p)
	}
	wg.Wait()
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104 105
}

Steven Allen's avatar
Steven Allen committed
106 107 108 109 110 111
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112
// GetValue searches for the value corresponding to given Key.
Steven Allen's avatar
Steven Allen committed
113
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Option) (_ []byte, err error) {
114 115 116 117 118 119 120 121
	eip := log.EventBegin(ctx, "GetValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Jeromy's avatar
Jeromy committed
122

Łukasz Magiera's avatar
Łukasz Magiera committed
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
	responses, errCh := dht.SearchValue(ctx, key, opts...)
	var best []byte

	loop:
	for {
		select {
		case r, ok := <-responses:
			if !ok {
				if errCh == nil {
					break loop
				}
				responses = nil
				continue
			}
			best = r

		case err, ok := <- errCh:
			if !ok {
				if responses == nil {
					break loop
				}
				errCh = nil
				continue
			}
			return nil, err
		}
	}

	if best == nil {
		return nil, routing.ErrNotFound
	}
	log.Debugf("GetValue %v %v", key, best)
	return best, nil
}

func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, <-chan error) {
Steven Allen's avatar
Steven Allen committed
159 160
	var cfg ropts.Options
	if err := cfg.Apply(opts...); err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
161
		return nil, wrapErr(err)
Steven Allen's avatar
Steven Allen committed
162 163 164 165 166 167 168
	}

	responsesNeeded := 0
	if !cfg.Offline {
		responsesNeeded = getQuorum(&cfg)
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
169 170 171 172 173
	out := make(chan []byte, responsesNeeded)
	outErr := make(chan error, 1)
	go func() {
		defer close(out)
		defer close(outErr)
174

Łukasz Magiera's avatar
Łukasz Magiera committed
175 176 177
		valCh, errCh := dht.GetValues(ctx, key, responsesNeeded)
		vals := make([]RecvdVal, 0, responsesNeeded)
		best := -1
178

Łukasz Magiera's avatar
Łukasz Magiera committed
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
		defer func() {
			if len(vals) <= 1 || best < 0 {
				return
			}
			fixupRec := record.MakePutRecord(key, vals[best].Val)
			for _, v := range vals {
				// if someone sent us a different 'less-valid' record, lets correct them
				if !bytes.Equal(v.Val, vals[best].Val) {
					go func(v RecvdVal) {
						if v.From == dht.self {
							err := dht.putLocal(key, fixupRec)
							if err != nil {
								log.Error("Error correcting local dht entry:", err)
							}
							return
						}
						ctx, cancel := context.WithTimeout(dht.Context(), time.Second*30)
						defer cancel()
						err := dht.putValueToPeer(ctx, v.From, fixupRec)
						if err != nil {
							log.Debug("Error correcting DHT entry: ", err)
						}
					}(v)
				}
			}
		}()
205

Łukasz Magiera's avatar
Łukasz Magiera committed
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
		for {
			select {
			case v, ok := <-valCh:
				if !ok {
					if errCh == nil {
						return
					}
					// failed to find a good record
					if best < 0 {
						outErr <- routing.ErrNotFound
					}
					valCh = nil
					continue
				}

				vals = append(vals, v)
222

Łukasz Magiera's avatar
Łukasz Magiera committed
223 224 225 226 227 228
				if v.Val == nil {
					continue
				}
				// Select best value
				if best > -1 {
					i, err := dht.Validator.Select(key, [][]byte{vals[best].Val, v.Val})
229
					if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
230 231 232 233 234 235 236 237 238 239 240
						continue //TODO: Do we want to do something with the error here?
					}
					if i != best && !bytes.Equal(v.Val, vals[best].Val) {
						best = i
						out <- v.Val
					}
				} else {
					// Output first valid value
					if err := dht.Validator.Validate(key, v.Val); err == nil {
						best = len(vals) - 1
						out <- v.Val
241 242
					}
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
243 244 245 246 247 248 249
			case err, ok := <-errCh:
				if !ok {
					if valCh == nil {
						return
					}
					errCh = nil
					continue
250
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
251 252 253 254 255 256
				outErr <- err
				return
			case <-ctx.Done():
				outErr <- ctx.Err()
				return
			}
257
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
258
	}()
259

Łukasz Magiera's avatar
Łukasz Magiera committed
260
	return out, outErr
261 262
}

Steven Allen's avatar
Steven Allen committed
263
// GetValues gets nvals values corresponding to the given key.
Łukasz Magiera's avatar
Łukasz Magiera committed
264
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (<-chan RecvdVal, <-chan error) {
265
	eip := log.EventBegin(ctx, "GetValues")
Łukasz Magiera's avatar
Łukasz Magiera committed
266 267 268 269 270 271

	vals := make(chan RecvdVal, nvals)

	done := func(err error) (<-chan RecvdVal, <-chan error) {
		defer close(vals)

272 273 274 275 276
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
Łukasz Magiera's avatar
Łukasz Magiera committed
277 278
		return vals, wrapErr(err)
	}
279

280
	// If we have it local, don't bother doing an RPC!
281
	lrec, err := dht.getLocal(key)
282 283
	if err != nil {
		// something is wrong with the datastore.
Łukasz Magiera's avatar
Łukasz Magiera committed
284
		return done(err)
285 286
	}
	if lrec != nil {
287
		// TODO: this is tricky, we don't always want to trust our own value
288
		// what if the authoritative source updated it?
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
289
		log.Debug("have it locally")
Łukasz Magiera's avatar
Łukasz Magiera committed
290
		vals <- RecvdVal{
291 292
			Val:  lrec.GetValue(),
			From: dht.self,
Łukasz Magiera's avatar
Łukasz Magiera committed
293
		}
294 295

		if nvals <= 1 {
Łukasz Magiera's avatar
Łukasz Magiera committed
296
			return done(nil)
297
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
298 299

		nvals--
300
	} else if nvals == 0 {
Łukasz Magiera's avatar
Łukasz Magiera committed
301
		return done(routing.ErrNotFound)
Jeromy's avatar
Jeromy committed
302 303
	}

304
	// get closest peers in the routing table
Jeromy's avatar
Jeromy committed
305
	rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
306
	log.Debugf("peers in rt: %d %s", len(rtp), rtp)
307
	if len(rtp) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
308
		log.Warning("No peers from routing table!")
Łukasz Magiera's avatar
Łukasz Magiera committed
309
		return done(kb.ErrLookupFailure)
310 311
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
312 313 314
	var valslock sync.Mutex
	var got int

315
	// setup the Query
Jeromy's avatar
Jeromy committed
316
	parent := ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
317
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
318
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
319 320 321 322
			Type: notif.SendingQuery,
			ID:   p,
		})

323
		rec, peers, err := dht.getValueOrPeers(ctx, p, key)
324 325 326 327 328 329 330 331 332
		switch err {
		case routing.ErrNotFound:
			// in this case, they responded with nothing,
			// still send a notification so listeners can know the
			// request has completed 'successfully'
			notif.PublishQueryEvent(parent, &notif.QueryEvent{
				Type: notif.PeerResponse,
				ID:   p,
			})
333
			return nil, err
334 335 336 337 338
		default:
			return nil, err

		case nil, errInvalidRecord:
			// in either of these cases, we want to keep going
339
		}
340

341 342
		res := &dhtQueryResult{closerPeers: peers}

343
		if rec.GetValue() != nil || err == errInvalidRecord {
Steven Allen's avatar
Steven Allen committed
344
			rv := RecvdVal{
345 346 347 348
				Val:  rec.GetValue(),
				From: p,
			}
			valslock.Lock()
Łukasz Magiera's avatar
Łukasz Magiera committed
349 350
			vals <- rv
			got++
351

352
			// If we have collected enough records, we're done
Łukasz Magiera's avatar
Łukasz Magiera committed
353
			if nvals == got {
354 355 356
				res.success = true
			}
			valslock.Unlock()
357 358
		}

Jeromy's avatar
Jeromy committed
359
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
360 361
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
362
			Responses: peers,
Jeromy's avatar
Jeromy committed
363 364
		})

365 366
		return res, nil
	})
367

Łukasz Magiera's avatar
Łukasz Magiera committed
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387
	errCh := make(chan error, 1)
	go func() {
		defer close(errCh)
		reqCtx, cancel := context.WithTimeout(ctx, time.Minute)
		defer cancel()

		_, err = query.Run(reqCtx, rtp)

		// We do have some values but we either ran out of peers to query or
		// searched for a whole minute.
		//
		// We'll just call this a success.
		if got > 0 && (err == routing.ErrNotFound || reqCtx.Err() == context.DeadlineExceeded) {
			err = nil
		}
		if err != nil {
			errCh <- err
		}
		done(err)
	}()
388

Łukasz Magiera's avatar
Łukasz Magiera committed
389
	return vals, errCh
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
390 391
}

392 393 394
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
395

396
// Provide makes this node announce that it can provide a value for the given key
397 398 399 400 401 402 403 404
func (dht *IpfsDHT) Provide(ctx context.Context, key *cid.Cid, brdcst bool) (err error) {
	eip := log.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst})
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
405 406

	// add self locally
407
	dht.providers.AddProvider(ctx, key, dht.self)
Jeromy's avatar
Jeromy committed
408 409 410
	if !brdcst {
		return nil
	}
411

412
	peers, err := dht.GetClosestPeers(ctx, key.KeyString())
413 414
	if err != nil {
		return err
415 416
	}

417 418 419 420 421
	mes, err := dht.makeProvRecord(key)
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
422
	wg := sync.WaitGroup{}
423
	for p := range peers {
Jeromy's avatar
Jeromy committed
424 425 426
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
427
			log.Debugf("putProvider(%s, %s)", key, p)
428
			err := dht.sendMessage(ctx, p, mes)
Jeromy's avatar
Jeromy committed
429
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
430
				log.Debug(err)
Jeromy's avatar
Jeromy committed
431 432
			}
		}(p)
433
	}
Jeromy's avatar
Jeromy committed
434
	wg.Wait()
435
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
436
}
437
func (dht *IpfsDHT) makeProvRecord(skey *cid.Cid) (*pb.Message, error) {
438 439 440 441 442 443 444 445 446 447 448
	pi := pstore.PeerInfo{
		ID:    dht.self,
		Addrs: dht.host.Addrs(),
	}

	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
		return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
	}

449
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey.Bytes(), 0)
450 451 452
	pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]pstore.PeerInfo{pi})
	return pmes, nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
453

Brian Tiger Chow's avatar
Brian Tiger Chow committed
454
// FindProviders searches until the context expires.
455
func (dht *IpfsDHT) FindProviders(ctx context.Context, c *cid.Cid) ([]pstore.PeerInfo, error) {
Jeromy's avatar
Jeromy committed
456
	var providers []pstore.PeerInfo
457
	for p := range dht.FindProvidersAsync(ctx, c, KValue) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
458 459 460 461 462
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
463 464 465
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
466 467
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key *cid.Cid, count int) <-chan pstore.PeerInfo {
	log.Event(ctx, "findProviders", key)
Jeromy's avatar
Jeromy committed
468
	peerOut := make(chan pstore.PeerInfo, count)
Jeromy's avatar
Jeromy committed
469 470 471 472
	go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
	return peerOut
}

473 474
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key *cid.Cid, count int, peerOut chan pstore.PeerInfo) {
	defer log.EventBegin(ctx, "findProvidersAsync", key).Done()
Jeromy's avatar
Jeromy committed
475 476
	defer close(peerOut)

Jeromy's avatar
Jeromy committed
477
	ps := pset.NewLimited(count)
Jeromy's avatar
Jeromy committed
478 479
	provs := dht.providers.GetProviders(ctx, key)
	for _, p := range provs {
480
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
481
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
482
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
483
			select {
Jeromy's avatar
Jeromy committed
484
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
485 486 487
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
488
		}
Jeromy's avatar
Jeromy committed
489

490
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
491
		// TODO: is this a DOS vector?
Jeromy's avatar
Jeromy committed
492
		if ps.Size() >= count {
Jeromy's avatar
Jeromy committed
493 494 495 496 497
			return
		}
	}

	// setup the Query
Jeromy's avatar
Jeromy committed
498
	parent := ctx
499
	query := dht.newQuery(key.KeyString(), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
500
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
501 502 503
			Type: notif.SendingQuery,
			ID:   p,
		})
504
		pmes, err := dht.findProvidersSingle(ctx, p, key)
Jeromy's avatar
Jeromy committed
505 506 507 508
		if err != nil {
			return nil, err
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
509
		log.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
510
		provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
511
		log.Debugf("%d provider entries decoded", len(provs))
Jeromy's avatar
Jeromy committed
512 513 514

		// Add unique providers from request, up to 'count'
		for _, prov := range provs {
515 516 517
			if prov.ID != dht.self {
				dht.peerstore.AddAddrs(prov.ID, prov.Addrs, pstore.TempAddrTTL)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
518
			log.Debugf("got provider: %s", prov)
519
			if ps.TryAdd(prov.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
520
				log.Debugf("using provider: %s", prov)
Jeromy's avatar
Jeromy committed
521
				select {
Jeromy's avatar
Jeromy committed
522
				case peerOut <- *prov:
Jeromy's avatar
Jeromy committed
523
				case <-ctx.Done():
524
					log.Debug("context timed out sending more providers")
Jeromy's avatar
Jeromy committed
525 526
					return nil, ctx.Err()
				}
527
			}
Jeromy's avatar
Jeromy committed
528
			if ps.Size() >= count {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
529
				log.Debugf("got enough providers (%d/%d)", ps.Size(), count)
Jeromy's avatar
Jeromy committed
530
				return &dhtQueryResult{success: true}, nil
531 532 533
			}
		}

Jeromy's avatar
Jeromy committed
534 535
		// Give closer peers back to the query to be queried
		closer := pmes.GetCloserPeers()
536
		clpeers := pb.PBPeersToPeerInfos(closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
537
		log.Debugf("got closer peers: %d %s", len(clpeers), clpeers)
538

Jeromy's avatar
Jeromy committed
539
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
540 541
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
542
			Responses: clpeers,
543
		})
Jeromy's avatar
Jeromy committed
544 545 546
		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

Jeromy's avatar
Jeromy committed
547
	peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.KeyString()), AlphaValue)
Jeromy's avatar
Jeromy committed
548 549
	_, err := query.Run(ctx, peers)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
550
		log.Debugf("Query error: %s", err)
551 552 553 554 555 556 557 558 559 560
		// Special handling for issue: https://github.com/ipfs/go-ipfs/issues/3032
		if fmt.Sprint(err) == "<nil>" {
			log.Error("reproduced bug 3032:")
			log.Errorf("Errors type information: %#v", err)
			log.Errorf("go version: %s", runtime.Version())
			log.Error("please report this information to: https://github.com/ipfs/go-ipfs/issues/3032")

			// replace problematic error with something that won't crash the daemon
			err = fmt.Errorf("<nil>")
		}
561 562 563 564
		notif.PublishQueryEvent(ctx, &notif.QueryEvent{
			Type:  notif.QueryError,
			Extra: err.Error(),
		})
Jeromy's avatar
Jeromy committed
565
	}
566 567
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
568
// FindPeer searches for a peer with given ID.
569 570 571 572 573 574 575 576
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo, err error) {
	eip := log.EventBegin(ctx, "FindPeer", id)
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
577

578
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
579
	if pi := dht.FindLocal(id); pi.ID != "" {
580
		return pi, nil
581 582
	}

Jeromy's avatar
Jeromy committed
583
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
584
	if len(peers) == 0 {
Jeromy's avatar
Jeromy committed
585
		return pstore.PeerInfo{}, kb.ErrLookupFailure
586
	}
587

Jeromy's avatar
Jeromy committed
588
	// Sanity...
589
	for _, p := range peers {
590
		if p == id {
591
			log.Debug("found target peer in list of closest peers...")
592
			return dht.peerstore.PeerInfo(p), nil
593
		}
594
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
595

Jeromy's avatar
Jeromy committed
596
	// setup the Query
Jeromy's avatar
Jeromy committed
597
	parent := ctx
598
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
599
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
600 601 602
			Type: notif.SendingQuery,
			ID:   p,
		})
Jeromy's avatar
Jeromy committed
603

604
		pmes, err := dht.findPeerSingle(ctx, p, id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
605
		if err != nil {
606
			return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
607
		}
608

Jeromy's avatar
Jeromy committed
609
		closer := pmes.GetCloserPeers()
610
		clpeerInfos := pb.PBPeersToPeerInfos(closer)
611

612
		// see if we got the peer here
613 614
		for _, npi := range clpeerInfos {
			if npi.ID == id {
Jeromy's avatar
Jeromy committed
615
				return &dhtQueryResult{
616
					peer:    npi,
Jeromy's avatar
Jeromy committed
617 618 619
					success: true,
				}, nil
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
620 621
		}

Jeromy's avatar
Jeromy committed
622
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
623
			Type:      notif.PeerResponse,
624
			ID:        p,
Jeromy's avatar
Jeromy committed
625
			Responses: clpeerInfos,
626 627
		})

628
		return &dhtQueryResult{closerPeers: clpeerInfos}, nil
629
	})
630

Jeromy's avatar
Jeromy committed
631
	// run it!
632
	result, err := query.Run(ctx, peers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
633
	if err != nil {
Jeromy's avatar
Jeromy committed
634
		return pstore.PeerInfo{}, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
635 636
	}

637
	log.Debugf("FindPeer %v %v", id, result.success)
638
	if result.peer.ID == "" {
Jeromy's avatar
Jeromy committed
639
		return pstore.PeerInfo{}, routing.ErrNotFound
640
	}
Jeromy's avatar
Jeromy committed
641

Jeromy's avatar
Jeromy committed
642
	return *result.peer, nil
643 644
}

645
// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
Jeromy's avatar
Jeromy committed
646
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan *pstore.PeerInfo, error) {
647

Jeromy's avatar
Jeromy committed
648
	peerchan := make(chan *pstore.PeerInfo, asyncQueryBuffer)
Jeromy's avatar
Jeromy committed
649
	peersSeen := make(map[peer.ID]struct{})
650
	var peersSeenMx sync.Mutex
651

Jeromy's avatar
Jeromy committed
652
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
653
	if len(peers) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
654
		return nil, kb.ErrLookupFailure
655 656 657
	}

	// setup the Query
658
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
659

660
		pmes, err := dht.findPeerSingle(ctx, p, id)
661 662 663 664
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
665
		var clpeers []*pstore.PeerInfo
666 667
		closer := pmes.GetCloserPeers()
		for _, pbp := range closer {
668
			pi := pb.PBPeerToPeerInfo(pbp)
669

670
			// skip peers already seen
671
			peersSeenMx.Lock()
672
			if _, found := peersSeen[pi.ID]; found {
673
				peersSeenMx.Unlock()
674 675
				continue
			}
676
			peersSeen[pi.ID] = struct{}{}
677
			peersSeenMx.Unlock()
678 679

			// if peer is connected, send it to our client.
680
			if pb.Connectedness(pbp.Connection) == inet.Connected {
681 682 683
				select {
				case <-ctx.Done():
					return nil, ctx.Err()
684
				case peerchan <- pi:
685 686 687 688
				}
			}

			// if peer is the peer we're looking for, don't bother querying it.
689
			// TODO maybe query it?
690
			if pb.Connectedness(pbp.Connection) != inet.Connected {
691
				clpeers = append(clpeers, pi)
692 693 694 695 696 697 698 699 700
			}
		}

		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

	// run it! run it asynchronously to gen peers as results are found.
	// this does no error checking
	go func() {
701
		if _, err := query.Run(ctx, peers); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
702
			log.Debug(err)
703 704 705 706 707 708 709 710
		}

		// close the peerchan channel when done.
		close(peerchan)
	}()

	return peerchan, nil
}
Łukasz Magiera's avatar
Łukasz Magiera committed
711 712 713 714 715 716 717 718 719

func wrapErr(err error) <-chan error {
	ch := make(chan error, 1)
	if err != nil {
		ch <- err
	}
	close(ch)
	return ch
}