lookup.go 3.04 KB
Newer Older
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Steven Allen's avatar
Steven Allen committed
5 6
	"fmt"
	"strings"
Michael Avila's avatar
Michael Avila committed
7
	"time"
Jeromy's avatar
Jeromy committed
8

9
	"github.com/libp2p/go-libp2p-core/peer"
10
	"github.com/libp2p/go-libp2p-core/routing"
11

12
	"github.com/ipfs/go-cid"
13
	logging "github.com/ipfs/go-log"
Adin Schmahmann's avatar
Adin Schmahmann committed
14
	"github.com/libp2p/go-libp2p-kad-dht/kpeerset"
15
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
George Antoniadis's avatar
George Antoniadis committed
16
	kb "github.com/libp2p/go-libp2p-kbucket"
Adin Schmahmann's avatar
Adin Schmahmann committed
17 18
	"github.com/multiformats/go-base32"
	"github.com/multiformats/go-multihash"
19 20
)

Steven Allen's avatar
Steven Allen committed
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
func tryFormatLoggableKey(k string) (string, error) {
	if len(k) == 0 {
		return "", fmt.Errorf("loggableKey is empty")
	}
	var proto, cstr string
	if k[0] == '/' {
		// it's a path (probably)
		protoEnd := strings.IndexByte(k[1:], '/')
		if protoEnd < 0 {
			return k, fmt.Errorf("loggableKey starts with '/' but is not a path: %x", k)
		}
		proto = k[1 : protoEnd+1]
		cstr = k[protoEnd+2:]
	} else {
		proto = "provider"
		cstr = k
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
39
	var encStr string
Steven Allen's avatar
Steven Allen committed
40
	c, err := cid.Cast([]byte(cstr))
Adin Schmahmann's avatar
Adin Schmahmann committed
41 42 43 44
	if err == nil {
		encStr = c.String()
	} else {
		encStr = base32.RawStdEncoding.EncodeToString([]byte(cstr))
Steven Allen's avatar
Steven Allen committed
45
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
46 47

	return fmt.Sprintf("/%s/%s", proto, encStr), nil
Steven Allen's avatar
Steven Allen committed
48 49
}

50
func loggableKey(k string) logging.LoggableMap {
Steven Allen's avatar
Steven Allen committed
51
	newKey, err := tryFormatLoggableKey(k)
Tomas Virgl's avatar
Tomas Virgl committed
52
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
53
		logger.Debug(err)
Tomas Virgl's avatar
Tomas Virgl committed
54
	} else {
Steven Allen's avatar
Steven Allen committed
55
		k = newKey
Tomas Virgl's avatar
Tomas Virgl committed
56
	}
Steven Allen's avatar
Steven Allen committed
57

58 59 60 61 62
	return logging.LoggableMap{
		"key": k,
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
63 64 65 66 67 68
func multihashLoggableKey(mh multihash.Multihash) logging.LoggableMap {
	return logging.LoggableMap{
		"multihash": base32.RawStdEncoding.EncodeToString(mh),
	}
}

69 70
// Kademlia 'node lookup' operation. Returns a channel of the K closest peers
// to the given key
71 72 73
//
// If the context is canceled, this function will return the context error along
// with the closest K peers it has found so far.
74
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
75
	//TODO: I can break the interface! return []peer.ID
Matt Joiner's avatar
Matt Joiner committed
76
	e := logger.EventBegin(ctx, "getClosestPeers", loggableKey(key))
Adin Schmahmann's avatar
Adin Schmahmann committed
77 78
	defer e.Done()

79
	queries, err := dht.runDisjointQueries(ctx, dht.d, key,
Adin Schmahmann's avatar
Adin Schmahmann committed
80 81 82 83 84 85 86 87 88 89 90 91 92
		func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type: routing.SendingQuery,
				ID:   p,
			})

			pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
			if err != nil {
				logger.Debugf("error getting closer peers: %s", err)
				return nil, err
			}
			peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
93

Adin Schmahmann's avatar
Adin Schmahmann committed
94 95 96 97 98 99
			// For DHT query command
			routing.PublishQueryEvent(ctx, &routing.QueryEvent{
				Type:      routing.PeerResponse,
				ID:        p,
				Responses: peers,
			})
100

Adin Schmahmann's avatar
Adin Schmahmann committed
101 102 103 104
			return peers, err
		},
		func(peerset *kpeerset.SortedPeerset) bool { return false },
	)
Jeromy's avatar
Jeromy committed
105

106 107 108 109
	if err != nil {
		return nil, err
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
110 111
	out := make(chan peer.ID, dht.bucketSize)
	defer close(out)
112

Adin Schmahmann's avatar
Adin Schmahmann committed
113
	kadID := kb.ConvertKey(key)
114
	allPeers := kb.SortClosestPeers(queries[0].globallyQueriedPeers.Peers(), kadID)
Adin Schmahmann's avatar
Adin Schmahmann committed
115 116 117
	for i, p := range allPeers {
		if i == dht.bucketSize {
			break
Jeromy's avatar
Jeromy committed
118
		}
Adin Schmahmann's avatar
Adin Schmahmann committed
119 120 121
		out <- p
	}

122
	if ctx.Err() == nil {
Adin Schmahmann's avatar
Adin Schmahmann committed
123 124 125
		// refresh the cpl for this key as the query was successful
		dht.routingTable.ResetCplRefreshedAtForID(kadID, time.Now())
	}
126

127
	return out, ctx.Err()
128
}