lookup.go 1.72 KB
Newer Older
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Steven Allen's avatar
Steven Allen committed
5
	"fmt"
Michael Avila's avatar
Michael Avila committed
6
	"time"
Jeromy's avatar
Jeromy committed
7

8
	"github.com/libp2p/go-libp2p-core/peer"
9
	"github.com/libp2p/go-libp2p-core/routing"
10

11
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
George Antoniadis's avatar
George Antoniadis committed
12
	kb "github.com/libp2p/go-libp2p-kbucket"
13 14
)

Alan Shaw's avatar
Alan Shaw committed
15 16
// GetClosestPeers is a Kademlia 'node lookup' operation. Returns a channel of
// the K closest peers to the given key.
17 18 19
//
// If the context is canceled, this function will return the context error along
// with the closest K peers it has found so far.
20
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
Steven Allen's avatar
Steven Allen committed
21 22 23
	if key == "" {
		return nil, fmt.Errorf("can't lookup empty key")
	}
24
	//TODO: I can break the interface! return []peer.ID
Aarsh Shah's avatar
Aarsh Shah committed
25
	lookupRes, err := dht.runLookupWithFollowup(ctx, key,
Adin Schmahmann's avatar
Adin Schmahmann committed
26 27 28 29 30 31 32 33 34 35 36 37 38
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})

			pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
			if err != nil {
				logger.Debugf("error getting closer peers: %s", err)
				return nil, err
			}
			peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
39

Adin Schmahmann's avatar
Adin Schmahmann committed
40 41 42 43 44 45
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
				Responses: peers,
			})
46

Adin Schmahmann's avatar
Adin Schmahmann committed
47 48
			return peers, err
		},
Adin Schmahmann's avatar
Adin Schmahmann committed
49
		func() bool { return false },
Adin Schmahmann's avatar
Adin Schmahmann committed
50
	)
Jeromy's avatar
Jeromy committed
51

52 53 54 55
	if err != nil {
		return nil, err
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
56 57
	out := make(chan peer.ID, dht.bucketSize)
	defer close(out)
58

Adin Schmahmann's avatar
Adin Schmahmann committed
59
	for _, p := range lookupRes.peers {
Adin Schmahmann's avatar
Adin Schmahmann committed
60 61 62
		out <- p
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
63
	if ctx.Err() == nil && lookupRes.completed {
Adin Schmahmann's avatar
Adin Schmahmann committed
64
		// refresh the cpl for this key as the query was successful
Adin Schmahmann's avatar
Adin Schmahmann committed
65
		dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now())
Adin Schmahmann's avatar
Adin Schmahmann committed
66
	}
67

68
	return out, ctx.Err()
69
}