lookup.go 2.74 KB
Newer Older
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Steven Allen's avatar
Steven Allen committed
5 6
	"fmt"
	"strings"
Jeromy's avatar
Jeromy committed
7

Tomas Virgl's avatar
Tomas Virgl committed
8
	cid "github.com/ipfs/go-cid"
9
	logging "github.com/ipfs/go-log"
10
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
George Antoniadis's avatar
George Antoniadis committed
11
	kb "github.com/libp2p/go-libp2p-kbucket"
12
	peer "github.com/libp2p/go-libp2p-peer"
George Antoniadis's avatar
George Antoniadis committed
13
	notif "github.com/libp2p/go-libp2p-routing/notifications"
14 15
)

Steven Allen's avatar
Steven Allen committed
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
func tryFormatLoggableKey(k string) (string, error) {
	if len(k) == 0 {
		return "", fmt.Errorf("loggableKey is empty")
	}
	var proto, cstr string
	if k[0] == '/' {
		// it's a path (probably)
		protoEnd := strings.IndexByte(k[1:], '/')
		if protoEnd < 0 {
			return k, fmt.Errorf("loggableKey starts with '/' but is not a path: %x", k)
		}
		proto = k[1 : protoEnd+1]
		cstr = k[protoEnd+2:]
	} else {
		proto = "provider"
		cstr = k
	}

	c, err := cid.Cast([]byte(cstr))
	if err != nil {
		return "", fmt.Errorf("loggableKey could not cast key to a CID: %x %v", k, err)
	}
	return fmt.Sprintf("/%s/%s", proto, c.String()), nil
}

41
func loggableKey(k string) logging.LoggableMap {
Steven Allen's avatar
Steven Allen committed
42
	newKey, err := tryFormatLoggableKey(k)
Tomas Virgl's avatar
Tomas Virgl committed
43
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
44
		logger.Debug(err)
Tomas Virgl's avatar
Tomas Virgl committed
45
	} else {
Steven Allen's avatar
Steven Allen committed
46
		k = newKey
Tomas Virgl's avatar
Tomas Virgl committed
47
	}
Steven Allen's avatar
Steven Allen committed
48

49 50 51 52 53
	return logging.LoggableMap{
		"key": k,
	}
}

54 55
// Kademlia 'node lookup' operation. Returns a channel of the K closest peers
// to the given key
56
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
Matt Joiner's avatar
Matt Joiner committed
57
	e := logger.EventBegin(ctx, "getClosestPeers", loggableKey(key))
58
	tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
59
	if len(tablepeers) == 0 {
60
		return nil, kb.ErrLookupFailure
61 62 63 64
	}

	out := make(chan peer.ID, KValue)

Jeromy's avatar
Jeromy committed
65 66 67
	// since the query doesnt actually pass our context down
	// we have to hack this here. whyrusleeping isnt a huge fan of goprocess
	parent := ctx
68 69
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
		// For DHT query command
Jeromy's avatar
Jeromy committed
70
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
71
			Type: notif.SendingQuery,
72
			ID:   p,
73
		})
74

Dirk McCormick's avatar
Dirk McCormick committed
75
		pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
76
		if err != nil {
Matt Joiner's avatar
Matt Joiner committed
77
			logger.Debugf("error getting closer peers: %s", err)
78 79
			return nil, err
		}
Dirk McCormick's avatar
Dirk McCormick committed
80
		peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
81 82

		// For DHT query command
Jeromy's avatar
Jeromy committed
83
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
84
			Type:      notif.PeerResponse,
85
			ID:        p,
Dirk McCormick's avatar
Dirk McCormick committed
86
			Responses: peers,
87
		})
88

Dirk McCormick's avatar
Dirk McCormick committed
89
		return &dhtQueryResult{closerPeers: peers}, nil
90 91 92 93 94 95
	})

	go func() {
		defer close(out)
		defer e.Done()
		// run it!
Jeromy's avatar
Jeromy committed
96
		res, err := query.Run(ctx, tablepeers)
97
		if err != nil {
Matt Joiner's avatar
Matt Joiner committed
98
			logger.Debugf("closestPeers query run error: %s", err)
99
		}
Jeromy's avatar
Jeromy committed
100

101 102
		if res != nil && res.queriedSet != nil {
			sorted := kb.SortClosestPeers(res.queriedSet.Peers(), kb.ConvertKey(key))
Jeromy's avatar
Jeromy committed
103 104 105 106 107 108 109 110
			if len(sorted) > KValue {
				sorted = sorted[:KValue]
			}

			for _, p := range sorted {
				out <- p
			}
		}
111 112 113 114
	}()

	return out, nil
}