lookup.go 2.74 KB
Newer Older
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Steven Allen's avatar
Steven Allen committed
5 6
	"fmt"
	"strings"
Jeromy's avatar
Jeromy committed
7

8 9
	"github.com/libp2p/go-libp2p-core/peer"

Tomas Virgl's avatar
Tomas Virgl committed
10
	cid "github.com/ipfs/go-cid"
11
	logging "github.com/ipfs/go-log"
12
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
George Antoniadis's avatar
George Antoniadis committed
13 14
	kb "github.com/libp2p/go-libp2p-kbucket"
	notif "github.com/libp2p/go-libp2p-routing/notifications"
15 16
)

Steven Allen's avatar
Steven Allen committed
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
func tryFormatLoggableKey(k string) (string, error) {
	if len(k) == 0 {
		return "", fmt.Errorf("loggableKey is empty")
	}
	var proto, cstr string
	if k[0] == '/' {
		// it's a path (probably)
		protoEnd := strings.IndexByte(k[1:], '/')
		if protoEnd < 0 {
			return k, fmt.Errorf("loggableKey starts with '/' but is not a path: %x", k)
		}
		proto = k[1 : protoEnd+1]
		cstr = k[protoEnd+2:]
	} else {
		proto = "provider"
		cstr = k
	}

	c, err := cid.Cast([]byte(cstr))
	if err != nil {
		return "", fmt.Errorf("loggableKey could not cast key to a CID: %x %v", k, err)
	}
	return fmt.Sprintf("/%s/%s", proto, c.String()), nil
}

42
func loggableKey(k string) logging.LoggableMap {
Steven Allen's avatar
Steven Allen committed
43
	newKey, err := tryFormatLoggableKey(k)
Tomas Virgl's avatar
Tomas Virgl committed
44
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
45
		logger.Debug(err)
Tomas Virgl's avatar
Tomas Virgl committed
46
	} else {
Steven Allen's avatar
Steven Allen committed
47
		k = newKey
Tomas Virgl's avatar
Tomas Virgl committed
48
	}
Steven Allen's avatar
Steven Allen committed
49

50 51 52 53 54
	return logging.LoggableMap{
		"key": k,
	}
}

55 56
// Kademlia 'node lookup' operation. Returns a channel of the K closest peers
// to the given key
57
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
Matt Joiner's avatar
Matt Joiner committed
58
	e := logger.EventBegin(ctx, "getClosestPeers", loggableKey(key))
59
	tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
60
	if len(tablepeers) == 0 {
61
		return nil, kb.ErrLookupFailure
62 63 64 65
	}

	out := make(chan peer.ID, KValue)

Jeromy's avatar
Jeromy committed
66 67 68
	// since the query doesnt actually pass our context down
	// we have to hack this here. whyrusleeping isnt a huge fan of goprocess
	parent := ctx
69 70
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
		// For DHT query command
Jeromy's avatar
Jeromy committed
71
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
72
			Type: notif.SendingQuery,
73
			ID:   p,
74
		})
75

Dirk McCormick's avatar
Dirk McCormick committed
76
		pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
77
		if err != nil {
Matt Joiner's avatar
Matt Joiner committed
78
			logger.Debugf("error getting closer peers: %s", err)
79 80
			return nil, err
		}
Dirk McCormick's avatar
Dirk McCormick committed
81
		peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
82 83

		// For DHT query command
Jeromy's avatar
Jeromy committed
84
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
85
			Type:      notif.PeerResponse,
86
			ID:        p,
Dirk McCormick's avatar
Dirk McCormick committed
87
			Responses: peers,
88
		})
89

Dirk McCormick's avatar
Dirk McCormick committed
90
		return &dhtQueryResult{closerPeers: peers}, nil
91 92 93 94 95 96
	})

	go func() {
		defer close(out)
		defer e.Done()
		// run it!
Jeromy's avatar
Jeromy committed
97
		res, err := query.Run(ctx, tablepeers)
98
		if err != nil {
Matt Joiner's avatar
Matt Joiner committed
99
			logger.Debugf("closestPeers query run error: %s", err)
100
		}
Jeromy's avatar
Jeromy committed
101

102 103
		if res != nil && res.queriedSet != nil {
			sorted := kb.SortClosestPeers(res.queriedSet.Peers(), kb.ConvertKey(key))
Jeromy's avatar
Jeromy committed
104 105 106 107 108 109 110 111
			if len(sorted) > KValue {
				sorted = sorted[:KValue]
			}

			for _, p := range sorted {
				out <- p
			}
		}
112 113 114 115
	}()

	return out, nil
}