lookup.go 3.26 KB
Newer Older
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Steven Allen's avatar
Steven Allen committed
5 6
	"fmt"
	"strings"
Michael Avila's avatar
Michael Avila committed
7
	"time"
Jeromy's avatar
Jeromy committed
8

9
	"github.com/libp2p/go-libp2p-core/peer"
10
	"github.com/libp2p/go-libp2p-core/routing"
11

12
	"github.com/ipfs/go-cid"
13
	logging "github.com/ipfs/go-log"
14
	pb "github.com/libp2p/go-libp2p-kad-dht/pb"
George Antoniadis's avatar
George Antoniadis committed
15
	kb "github.com/libp2p/go-libp2p-kbucket"
Adin Schmahmann's avatar
Adin Schmahmann committed
16 17
	"github.com/multiformats/go-base32"
	"github.com/multiformats/go-multihash"
18 19
)

Steven Allen's avatar
Steven Allen committed
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
func tryFormatLoggableKey(k string) (string, error) {
	if len(k) == 0 {
		return "", fmt.Errorf("loggableKey is empty")
	}
	var proto, cstr string
	if k[0] == '/' {
		// it's a path (probably)
		protoEnd := strings.IndexByte(k[1:], '/')
		if protoEnd < 0 {
			return k, fmt.Errorf("loggableKey starts with '/' but is not a path: %x", k)
		}
		proto = k[1 : protoEnd+1]
		cstr = k[protoEnd+2:]
	} else {
		proto = "provider"
		cstr = k
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
38
	var encStr string
Steven Allen's avatar
Steven Allen committed
39
	c, err := cid.Cast([]byte(cstr))
Adin Schmahmann's avatar
Adin Schmahmann committed
40 41 42 43
	if err == nil {
		encStr = c.String()
	} else {
		encStr = base32.RawStdEncoding.EncodeToString([]byte(cstr))
Steven Allen's avatar
Steven Allen committed
44
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
45 46

	return fmt.Sprintf("/%s/%s", proto, encStr), nil
Steven Allen's avatar
Steven Allen committed
47 48
}

49
func loggableKey(k string) logging.LoggableMap {
Steven Allen's avatar
Steven Allen committed
50
	newKey, err := tryFormatLoggableKey(k)
Tomas Virgl's avatar
Tomas Virgl committed
51
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
52
		logger.Debug(err)
Tomas Virgl's avatar
Tomas Virgl committed
53
	} else {
Steven Allen's avatar
Steven Allen committed
54
		k = newKey
Tomas Virgl's avatar
Tomas Virgl committed
55
	}
Steven Allen's avatar
Steven Allen committed
56

57 58 59 60 61
	return logging.LoggableMap{
		"key": k,
	}
}

Adin Schmahmann's avatar
Adin Schmahmann committed
62 63 64 65 66 67
func multihashLoggableKey(mh multihash.Multihash) logging.LoggableMap {
	return logging.LoggableMap{
		"multihash": base32.RawStdEncoding.EncodeToString(mh),
	}
}

68 69
// Kademlia 'node lookup' operation. Returns a channel of the K closest peers
// to the given key
70
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
Matt Joiner's avatar
Matt Joiner committed
71
	e := logger.EventBegin(ctx, "getClosestPeers", loggableKey(key))
72
	tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
73
	if len(tablepeers) == 0 {
74
		return nil, kb.ErrLookupFailure
75 76
	}

77
	out := make(chan peer.ID, dht.bucketSize)
78

Jeromy's avatar
Jeromy committed
79 80 81
	// since the query doesnt actually pass our context down
	// we have to hack this here. whyrusleeping isnt a huge fan of goprocess
	parent := ctx
82 83
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
		// For DHT query command
84 85
		routing.PublishQueryEvent(parent, &routing.QueryEvent{
			Type: routing.SendingQuery,
86
			ID:   p,
87
		})
88

Dirk McCormick's avatar
Dirk McCormick committed
89
		pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
90
		if err != nil {
Matt Joiner's avatar
Matt Joiner committed
91
			logger.Debugf("error getting closer peers: %s", err)
92 93
			return nil, err
		}
Dirk McCormick's avatar
Dirk McCormick committed
94
		peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
95 96

		// For DHT query command
97 98
		routing.PublishQueryEvent(parent, &routing.QueryEvent{
			Type:      routing.PeerResponse,
99
			ID:        p,
Dirk McCormick's avatar
Dirk McCormick committed
100
			Responses: peers,
101
		})
102

Dirk McCormick's avatar
Dirk McCormick committed
103
		return &dhtQueryResult{closerPeers: peers}, nil
104 105 106 107 108
	})

	go func() {
		defer close(out)
		defer e.Done()
Michael Avila's avatar
Michael Avila committed
109 110
		timedCtx, cancel := context.WithTimeout(ctx, time.Minute)
		defer cancel()
111
		// run it!
Michael Avila's avatar
Michael Avila committed
112
		res, err := query.Run(timedCtx, tablepeers)
113
		if err != nil {
Matt Joiner's avatar
Matt Joiner committed
114
			logger.Debugf("closestPeers query run error: %s", err)
115
		}
Jeromy's avatar
Jeromy committed
116

117
		if res != nil && res.queriedSet != nil {
Aarsh Shah's avatar
Aarsh Shah committed
118 119
			// refresh the cpl for this key as the query was successful
			dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(key), time.Now())
120

121
			sorted := kb.SortClosestPeers(res.queriedSet.Peers(), kb.ConvertKey(key))
122 123 124
			l := len(sorted)
			if l > dht.bucketSize {
				sorted = sorted[:dht.bucketSize]
Jeromy's avatar
Jeromy committed
125 126 127 128 129 130
			}

			for _, p := range sorted {
				out <- p
			}
		}
131 132 133 134
	}()

	return out, nil
}