routing.go 17.1 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4
	"bytes"
Jeromy's avatar
Jeromy committed
5
	"context"
6
	"fmt"
7
	"runtime"
Jeromy's avatar
Jeromy committed
8
	"sync"
Jeromy's avatar
Jeromy committed
9
	"time"
10

11
	cid "github.com/ipfs/go-cid"
12
	u "github.com/ipfs/go-ipfs-util"
Jeromy's avatar
Jeromy committed
13
	logging "github.com/ipfs/go-log"
George Antoniadis's avatar
George Antoniadis committed
14 15
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
	kb "github.com/libp2p/go-libp2p-kbucket"
16 17 18 19
	inet "github.com/libp2p/go-libp2p-net"
	peer "github.com/libp2p/go-libp2p-peer"
	pset "github.com/libp2p/go-libp2p-peer/peerset"
	pstore "github.com/libp2p/go-libp2p-peerstore"
George Antoniadis's avatar
George Antoniadis committed
20 21 22
	record "github.com/libp2p/go-libp2p-record"
	routing "github.com/libp2p/go-libp2p-routing"
	notif "github.com/libp2p/go-libp2p-routing/notifications"
Steven Allen's avatar
Steven Allen committed
23
	ropts "github.com/libp2p/go-libp2p-routing/options"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24 25
)

26 27 28 29 30 31
// asyncQueryBuffer is the size of buffered channels in async queries. This
// buffer allows multiple queries to execute simultaneously, return their
// results and continue querying closer peers. Note that different query
// results will wait for the channel to drain.
var asyncQueryBuffer = 10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32 33 34 35 36
// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get

// PutValue adds value corresponding to given Key.
37
// This is the top level "Store" operation of the DHT
Steven Allen's avatar
Steven Allen committed
38
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...ropts.Option) (err error) {
39 40 41 42 43 44 45 46
	eip := log.EventBegin(ctx, "PutValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47
	log.Debugf("PutValue %s", key)
Jeromy's avatar
Jeromy committed
48

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
	// don't even allow local users to put bad values.
	if err := dht.Validator.Validate(key, value); err != nil {
		return err
	}

	old, err := dht.getLocal(key)
	if err != nil {
		// Means something is wrong with the datastore.
		return err
	}

	// Check if we have an old value that's not the same as the new one.
	if old != nil && !bytes.Equal(old.GetValue(), value) {
		// Check to see if the new one is better.
		i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
		if err != nil {
			return err
		}
		if i != 0 {
			return fmt.Errorf("can't replace a newer value with an older value")
		}
	}

72
	rec := record.MakePutRecord(key, value)
73
	rec.TimeReceived = u.FormatRFC3339(time.Now())
Jeromy's avatar
Jeromy committed
74
	err = dht.putLocal(key, rec)
75 76 77 78
	if err != nil {
		return err
	}

79
	pchan, err := dht.GetClosestPeers(ctx, key)
80 81 82
	if err != nil {
		return err
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
83

84 85 86 87
	wg := sync.WaitGroup{}
	for p := range pchan {
		wg.Add(1)
		go func(p peer.ID) {
Jeromy's avatar
Jeromy committed
88 89
			ctx, cancel := context.WithCancel(ctx)
			defer cancel()
90
			defer wg.Done()
Jeromy's avatar
Jeromy committed
91 92 93 94 95
			notif.PublishQueryEvent(ctx, &notif.QueryEvent{
				Type: notif.Value,
				ID:   p,
			})

96
			err := dht.putValueToPeer(ctx, p, rec)
97
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
98
				log.Debugf("failed putting value to peer: %s", err)
99 100 101 102 103
			}
		}(p)
	}
	wg.Wait()
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104 105
}

Steven Allen's avatar
Steven Allen committed
106 107 108 109
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
	Val  []byte
	From peer.ID
110
	Err  error
Steven Allen's avatar
Steven Allen committed
111 112
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
113
// GetValue searches for the value corresponding to given Key.
Steven Allen's avatar
Steven Allen committed
114
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Option) (_ []byte, err error) {
115 116 117 118 119 120 121 122
	eip := log.EventBegin(ctx, "GetValue")
	defer func() {
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Jeromy's avatar
Jeromy committed
123

Łukasz Magiera's avatar
Łukasz Magiera committed
124 125 126 127 128 129 130 131
	responses, errCh := dht.SearchValue(ctx, key, opts...)
	var best []byte

	for {
		select {
		case r, ok := <-responses:
			if !ok {
				responses = nil
132
				break
Łukasz Magiera's avatar
Łukasz Magiera committed
133 134
			}

135 136
			best = r
		case err, ok := <-errCh:
Łukasz Magiera's avatar
Łukasz Magiera committed
137 138
			if !ok {
				errCh = nil
139
				break
Łukasz Magiera's avatar
Łukasz Magiera committed
140
			}
141

Łukasz Magiera's avatar
Łukasz Magiera committed
142 143
			return nil, err
		}
144 145 146 147

		if errCh == nil && responses == nil {
			break
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
148 149 150 151 152 153 154 155 156 157
	}

	if best == nil {
		return nil, routing.ErrNotFound
	}
	log.Debugf("GetValue %v %v", key, best)
	return best, nil
}

func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, <-chan error) {
Steven Allen's avatar
Steven Allen committed
158 159
	var cfg ropts.Options
	if err := cfg.Apply(opts...); err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
160
		return nil, wrapErr(err)
Steven Allen's avatar
Steven Allen committed
161 162 163 164 165 166 167
	}

	responsesNeeded := 0
	if !cfg.Offline {
		responsesNeeded = getQuorum(&cfg)
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
168 169 170 171 172
	out := make(chan []byte, responsesNeeded)
	outErr := make(chan error, 1)
	go func() {
		defer close(out)
		defer close(outErr)
173

174
		valCh := dht.GetValues(ctx, key, responsesNeeded)
Łukasz Magiera's avatar
Łukasz Magiera committed
175 176
		vals := make([]RecvdVal, 0, responsesNeeded)
		best := -1
177

Łukasz Magiera's avatar
Łukasz Magiera committed
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
		defer func() {
			if len(vals) <= 1 || best < 0 {
				return
			}
			fixupRec := record.MakePutRecord(key, vals[best].Val)
			for _, v := range vals {
				// if someone sent us a different 'less-valid' record, lets correct them
				if !bytes.Equal(v.Val, vals[best].Val) {
					go func(v RecvdVal) {
						if v.From == dht.self {
							err := dht.putLocal(key, fixupRec)
							if err != nil {
								log.Error("Error correcting local dht entry:", err)
							}
							return
						}
						ctx, cancel := context.WithTimeout(dht.Context(), time.Second*30)
						defer cancel()
						err := dht.putValueToPeer(ctx, v.From, fixupRec)
						if err != nil {
							log.Debug("Error correcting DHT entry: ", err)
						}
					}(v)
				}
			}
		}()
204

Łukasz Magiera's avatar
Łukasz Magiera committed
205 206 207 208 209 210 211 212
		for {
			select {
			case v, ok := <-valCh:
				if !ok {
					// failed to find a good record
					if best < 0 {
						outErr <- routing.ErrNotFound
					}
213 214 215 216 217
					return
				}
				if v.Err != nil {
					outErr <- v.Err
					return
Łukasz Magiera's avatar
Łukasz Magiera committed
218 219 220
				}

				vals = append(vals, v)
221

Łukasz Magiera's avatar
Łukasz Magiera committed
222 223 224 225 226 227
				if v.Val == nil {
					continue
				}
				// Select best value
				if best > -1 {
					i, err := dht.Validator.Select(key, [][]byte{vals[best].Val, v.Val})
228
					if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
229 230 231 232 233 234 235 236 237 238 239
						continue //TODO: Do we want to do something with the error here?
					}
					if i != best && !bytes.Equal(v.Val, vals[best].Val) {
						best = i
						out <- v.Val
					}
				} else {
					// Output first valid value
					if err := dht.Validator.Validate(key, v.Val); err == nil {
						best = len(vals) - 1
						out <- v.Val
240 241
					}
				}
Łukasz Magiera's avatar
Łukasz Magiera committed
242 243 244 245
			case <-ctx.Done():
				outErr <- ctx.Err()
				return
			}
246
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
247
	}()
248

Łukasz Magiera's avatar
Łukasz Magiera committed
249
	return out, outErr
250 251
}

Steven Allen's avatar
Steven Allen committed
252
// GetValues gets nvals values corresponding to the given key.
253
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) <-chan RecvdVal {
254
	eip := log.EventBegin(ctx, "GetValues")
Łukasz Magiera's avatar
Łukasz Magiera committed
255

256 257
	// alloc 1 to have for sync errors
	vals := make(chan RecvdVal, 1)
Łukasz Magiera's avatar
Łukasz Magiera committed
258

259
	done := func(err error) <-chan RecvdVal {
Łukasz Magiera's avatar
Łukasz Magiera committed
260 261
		defer close(vals)

262 263 264
		eip.Append(loggableKey(key))
		if err != nil {
			eip.SetError(err)
265
			vals <- RecvdVal{Err: err}
266 267
		}
		eip.Done()
268
		return vals
Łukasz Magiera's avatar
Łukasz Magiera committed
269
	}
270

271
	// If we have it local, don't bother doing an RPC!
272
	lrec, err := dht.getLocal(key)
273 274
	if err != nil {
		// something is wrong with the datastore.
Łukasz Magiera's avatar
Łukasz Magiera committed
275
		return done(err)
276 277
	}
	if lrec != nil {
278
		// TODO: this is tricky, we don't always want to trust our own value
279
		// what if the authoritative source updated it?
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
280
		log.Debug("have it locally")
Łukasz Magiera's avatar
Łukasz Magiera committed
281
		vals <- RecvdVal{
282 283
			Val:  lrec.GetValue(),
			From: dht.self,
Łukasz Magiera's avatar
Łukasz Magiera committed
284
		}
285 286

		if nvals <= 1 {
Łukasz Magiera's avatar
Łukasz Magiera committed
287
			return done(nil)
288
		}
Łukasz Magiera's avatar
Łukasz Magiera committed
289 290

		nvals--
291
	} else if nvals == 0 {
Łukasz Magiera's avatar
Łukasz Magiera committed
292
		return done(routing.ErrNotFound)
Jeromy's avatar
Jeromy committed
293 294
	}

295
	// get closest peers in the routing table
Jeromy's avatar
Jeromy committed
296
	rtp := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
297
	log.Debugf("peers in rt: %d %s", len(rtp), rtp)
298
	if len(rtp) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
299
		log.Warning("No peers from routing table!")
Łukasz Magiera's avatar
Łukasz Magiera committed
300
		return done(kb.ErrLookupFailure)
301 302
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
303 304 305
	var valslock sync.Mutex
	var got int

306
	// setup the Query
Jeromy's avatar
Jeromy committed
307
	parent := ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
308
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
309
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
310 311 312 313
			Type: notif.SendingQuery,
			ID:   p,
		})

314
		rec, peers, err := dht.getValueOrPeers(ctx, p, key)
315 316 317 318 319 320 321 322 323
		switch err {
		case routing.ErrNotFound:
			// in this case, they responded with nothing,
			// still send a notification so listeners can know the
			// request has completed 'successfully'
			notif.PublishQueryEvent(parent, &notif.QueryEvent{
				Type: notif.PeerResponse,
				ID:   p,
			})
324
			return nil, err
325 326 327 328 329
		default:
			return nil, err

		case nil, errInvalidRecord:
			// in either of these cases, we want to keep going
330
		}
331

332 333
		res := &dhtQueryResult{closerPeers: peers}

334
		if rec.GetValue() != nil || err == errInvalidRecord {
Steven Allen's avatar
Steven Allen committed
335
			rv := RecvdVal{
336 337 338 339
				Val:  rec.GetValue(),
				From: p,
			}
			valslock.Lock()
Łukasz Magiera's avatar
Łukasz Magiera committed
340 341
			vals <- rv
			got++
342

343
			// If we have collected enough records, we're done
Łukasz Magiera's avatar
Łukasz Magiera committed
344
			if nvals == got {
345 346 347
				res.success = true
			}
			valslock.Unlock()
348 349
		}

Jeromy's avatar
Jeromy committed
350
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
Jeromy's avatar
Jeromy committed
351 352
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
353
			Responses: peers,
Jeromy's avatar
Jeromy committed
354 355
		})

356 357
		return res, nil
	})
358

Łukasz Magiera's avatar
Łukasz Magiera committed
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
	go func() {
		reqCtx, cancel := context.WithTimeout(ctx, time.Minute)
		defer cancel()

		_, err = query.Run(reqCtx, rtp)

		// We do have some values but we either ran out of peers to query or
		// searched for a whole minute.
		//
		// We'll just call this a success.
		if got > 0 && (err == routing.ErrNotFound || reqCtx.Err() == context.DeadlineExceeded) {
			err = nil
		}
		done(err)
	}()
374

375
	return vals
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
376 377
}

378 379 380
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
381

382
// Provide makes this node announce that it can provide a value for the given key
383 384 385 386 387 388 389 390
func (dht *IpfsDHT) Provide(ctx context.Context, key *cid.Cid, brdcst bool) (err error) {
	eip := log.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst})
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
391 392

	// add self locally
393
	dht.providers.AddProvider(ctx, key, dht.self)
Jeromy's avatar
Jeromy committed
394 395 396
	if !brdcst {
		return nil
	}
397

398
	peers, err := dht.GetClosestPeers(ctx, key.KeyString())
399 400
	if err != nil {
		return err
401 402
	}

403 404 405 406 407
	mes, err := dht.makeProvRecord(key)
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
408
	wg := sync.WaitGroup{}
409
	for p := range peers {
Jeromy's avatar
Jeromy committed
410 411 412
		wg.Add(1)
		go func(p peer.ID) {
			defer wg.Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
413
			log.Debugf("putProvider(%s, %s)", key, p)
414
			err := dht.sendMessage(ctx, p, mes)
Jeromy's avatar
Jeromy committed
415
			if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
416
				log.Debug(err)
Jeromy's avatar
Jeromy committed
417 418
			}
		}(p)
419
	}
Jeromy's avatar
Jeromy committed
420
	wg.Wait()
421
	return nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
422
}
423
func (dht *IpfsDHT) makeProvRecord(skey *cid.Cid) (*pb.Message, error) {
424 425 426 427 428 429 430 431 432 433 434
	pi := pstore.PeerInfo{
		ID:    dht.self,
		Addrs: dht.host.Addrs(),
	}

	// // only share WAN-friendly addresses ??
	// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
	if len(pi.Addrs) < 1 {
		return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
	}

435
	pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey.Bytes(), 0)
436 437 438
	pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]pstore.PeerInfo{pi})
	return pmes, nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
439

Brian Tiger Chow's avatar
Brian Tiger Chow committed
440
// FindProviders searches until the context expires.
441
func (dht *IpfsDHT) FindProviders(ctx context.Context, c *cid.Cid) ([]pstore.PeerInfo, error) {
Jeromy's avatar
Jeromy committed
442
	var providers []pstore.PeerInfo
443
	for p := range dht.FindProvidersAsync(ctx, c, KValue) {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
444 445 446 447 448
		providers = append(providers, p)
	}
	return providers, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
449 450 451
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes.
452 453
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key *cid.Cid, count int) <-chan pstore.PeerInfo {
	log.Event(ctx, "findProviders", key)
Jeromy's avatar
Jeromy committed
454
	peerOut := make(chan pstore.PeerInfo, count)
Jeromy's avatar
Jeromy committed
455 456 457 458
	go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
	return peerOut
}

459 460
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key *cid.Cid, count int, peerOut chan pstore.PeerInfo) {
	defer log.EventBegin(ctx, "findProvidersAsync", key).Done()
Jeromy's avatar
Jeromy committed
461 462
	defer close(peerOut)

Jeromy's avatar
Jeromy committed
463
	ps := pset.NewLimited(count)
Jeromy's avatar
Jeromy committed
464 465
	provs := dht.providers.GetProviders(ctx, key)
	for _, p := range provs {
466
		// NOTE: Assuming that this list of peers is unique
Jeromy's avatar
Jeromy committed
467
		if ps.TryAdd(p) {
Jeromy's avatar
Jeromy committed
468
			pi := dht.peerstore.PeerInfo(p)
Jeromy's avatar
Jeromy committed
469
			select {
Jeromy's avatar
Jeromy committed
470
			case peerOut <- pi:
Jeromy's avatar
Jeromy committed
471 472 473
			case <-ctx.Done():
				return
			}
Jeromy's avatar
Jeromy committed
474
		}
Jeromy's avatar
Jeromy committed
475

476
		// If we have enough peers locally, don't bother with remote RPC
Jeromy's avatar
Jeromy committed
477
		// TODO: is this a DOS vector?
Jeromy's avatar
Jeromy committed
478
		if ps.Size() >= count {
Jeromy's avatar
Jeromy committed
479 480 481 482 483
			return
		}
	}

	// setup the Query
Jeromy's avatar
Jeromy committed
484
	parent := ctx
485
	query := dht.newQuery(key.KeyString(), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
486
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
487 488 489
			Type: notif.SendingQuery,
			ID:   p,
		})
490
		pmes, err := dht.findProvidersSingle(ctx, p, key)
Jeromy's avatar
Jeromy committed
491 492 493 494
		if err != nil {
			return nil, err
		}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
495
		log.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
496
		provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
497
		log.Debugf("%d provider entries decoded", len(provs))
Jeromy's avatar
Jeromy committed
498 499 500

		// Add unique providers from request, up to 'count'
		for _, prov := range provs {
501 502 503
			if prov.ID != dht.self {
				dht.peerstore.AddAddrs(prov.ID, prov.Addrs, pstore.TempAddrTTL)
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
504
			log.Debugf("got provider: %s", prov)
505
			if ps.TryAdd(prov.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
506
				log.Debugf("using provider: %s", prov)
Jeromy's avatar
Jeromy committed
507
				select {
Jeromy's avatar
Jeromy committed
508
				case peerOut <- *prov:
Jeromy's avatar
Jeromy committed
509
				case <-ctx.Done():
510
					log.Debug("context timed out sending more providers")
Jeromy's avatar
Jeromy committed
511 512
					return nil, ctx.Err()
				}
513
			}
Jeromy's avatar
Jeromy committed
514
			if ps.Size() >= count {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
515
				log.Debugf("got enough providers (%d/%d)", ps.Size(), count)
Jeromy's avatar
Jeromy committed
516
				return &dhtQueryResult{success: true}, nil
517 518 519
			}
		}

Jeromy's avatar
Jeromy committed
520 521
		// Give closer peers back to the query to be queried
		closer := pmes.GetCloserPeers()
522
		clpeers := pb.PBPeersToPeerInfos(closer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
523
		log.Debugf("got closer peers: %d %s", len(clpeers), clpeers)
524

Jeromy's avatar
Jeromy committed
525
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
526 527
			Type:      notif.PeerResponse,
			ID:        p,
Jeromy's avatar
Jeromy committed
528
			Responses: clpeers,
529
		})
Jeromy's avatar
Jeromy committed
530 531 532
		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

Jeromy's avatar
Jeromy committed
533
	peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.KeyString()), AlphaValue)
Jeromy's avatar
Jeromy committed
534 535
	_, err := query.Run(ctx, peers)
	if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
536
		log.Debugf("Query error: %s", err)
537 538 539 540 541 542 543 544 545 546
		// Special handling for issue: https://github.com/ipfs/go-ipfs/issues/3032
		if fmt.Sprint(err) == "<nil>" {
			log.Error("reproduced bug 3032:")
			log.Errorf("Errors type information: %#v", err)
			log.Errorf("go version: %s", runtime.Version())
			log.Error("please report this information to: https://github.com/ipfs/go-ipfs/issues/3032")

			// replace problematic error with something that won't crash the daemon
			err = fmt.Errorf("<nil>")
		}
547 548 549 550
		notif.PublishQueryEvent(ctx, &notif.QueryEvent{
			Type:  notif.QueryError,
			Extra: err.Error(),
		})
Jeromy's avatar
Jeromy committed
551
	}
552 553
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
554
// FindPeer searches for a peer with given ID.
555 556 557 558 559 560 561 562
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo, err error) {
	eip := log.EventBegin(ctx, "FindPeer", id)
	defer func() {
		if err != nil {
			eip.SetError(err)
		}
		eip.Done()
	}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
563

564
	// Check if were already connected to them
Jeromy's avatar
Jeromy committed
565
	if pi := dht.FindLocal(id); pi.ID != "" {
566
		return pi, nil
567 568
	}

Jeromy's avatar
Jeromy committed
569
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
570
	if len(peers) == 0 {
Jeromy's avatar
Jeromy committed
571
		return pstore.PeerInfo{}, kb.ErrLookupFailure
572
	}
573

Jeromy's avatar
Jeromy committed
574
	// Sanity...
575
	for _, p := range peers {
576
		if p == id {
577
			log.Debug("found target peer in list of closest peers...")
578
			return dht.peerstore.PeerInfo(p), nil
579
		}
580
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
581

Jeromy's avatar
Jeromy committed
582
	// setup the Query
Jeromy's avatar
Jeromy committed
583
	parent := ctx
584
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
Jeromy's avatar
Jeromy committed
585
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
586 587 588
			Type: notif.SendingQuery,
			ID:   p,
		})
Jeromy's avatar
Jeromy committed
589

590
		pmes, err := dht.findPeerSingle(ctx, p, id)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
591
		if err != nil {
592
			return nil, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
593
		}
594

Jeromy's avatar
Jeromy committed
595
		closer := pmes.GetCloserPeers()
596
		clpeerInfos := pb.PBPeersToPeerInfos(closer)
597

598
		// see if we got the peer here
599 600
		for _, npi := range clpeerInfos {
			if npi.ID == id {
Jeromy's avatar
Jeromy committed
601
				return &dhtQueryResult{
602
					peer:    npi,
Jeromy's avatar
Jeromy committed
603 604 605
					success: true,
				}, nil
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
606 607
		}

Jeromy's avatar
Jeromy committed
608
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
609
			Type:      notif.PeerResponse,
610
			ID:        p,
Jeromy's avatar
Jeromy committed
611
			Responses: clpeerInfos,
612 613
		})

614
		return &dhtQueryResult{closerPeers: clpeerInfos}, nil
615
	})
616

Jeromy's avatar
Jeromy committed
617
	// run it!
618
	result, err := query.Run(ctx, peers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
619
	if err != nil {
Jeromy's avatar
Jeromy committed
620
		return pstore.PeerInfo{}, err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
621 622
	}

623
	log.Debugf("FindPeer %v %v", id, result.success)
624
	if result.peer.ID == "" {
Jeromy's avatar
Jeromy committed
625
		return pstore.PeerInfo{}, routing.ErrNotFound
626
	}
Jeromy's avatar
Jeromy committed
627

Jeromy's avatar
Jeromy committed
628
	return *result.peer, nil
629 630
}

631
// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
Jeromy's avatar
Jeromy committed
632
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan *pstore.PeerInfo, error) {
633

Jeromy's avatar
Jeromy committed
634
	peerchan := make(chan *pstore.PeerInfo, asyncQueryBuffer)
Jeromy's avatar
Jeromy committed
635
	peersSeen := make(map[peer.ID]struct{})
636
	var peersSeenMx sync.Mutex
637

Jeromy's avatar
Jeromy committed
638
	peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
639
	if len(peers) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
640
		return nil, kb.ErrLookupFailure
641 642 643
	}

	// setup the Query
644
	query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
645

646
		pmes, err := dht.findPeerSingle(ctx, p, id)
647 648 649 650
		if err != nil {
			return nil, err
		}

Jeromy's avatar
Jeromy committed
651
		var clpeers []*pstore.PeerInfo
652 653
		closer := pmes.GetCloserPeers()
		for _, pbp := range closer {
654
			pi := pb.PBPeerToPeerInfo(pbp)
655

656
			// skip peers already seen
657
			peersSeenMx.Lock()
658
			if _, found := peersSeen[pi.ID]; found {
659
				peersSeenMx.Unlock()
660 661
				continue
			}
662
			peersSeen[pi.ID] = struct{}{}
663
			peersSeenMx.Unlock()
664 665

			// if peer is connected, send it to our client.
666
			if pb.Connectedness(pbp.Connection) == inet.Connected {
667 668 669
				select {
				case <-ctx.Done():
					return nil, ctx.Err()
670
				case peerchan <- pi:
671 672 673 674
				}
			}

			// if peer is the peer we're looking for, don't bother querying it.
675
			// TODO maybe query it?
676
			if pb.Connectedness(pbp.Connection) != inet.Connected {
677
				clpeers = append(clpeers, pi)
678 679 680 681 682 683 684 685 686
			}
		}

		return &dhtQueryResult{closerPeers: clpeers}, nil
	})

	// run it! run it asynchronously to gen peers as results are found.
	// this does no error checking
	go func() {
687
		if _, err := query.Run(ctx, peers); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
688
			log.Debug(err)
689 690 691 692 693 694 695 696
		}

		// close the peerchan channel when done.
		close(peerchan)
	}()

	return peerchan, nil
}
Łukasz Magiera's avatar
Łukasz Magiera committed
697 698 699 700 701 702 703 704 705

func wrapErr(err error) <-chan error {
	ch := make(chan error, 1)
	if err != nil {
		ch <- err
	}
	close(ch)
	return ch
}