lookup.go 3 KB
Newer Older
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4 5
	"context"

Tomas Virgl's avatar
Tomas Virgl committed
6
	cid "github.com/ipfs/go-cid"
7
	logging "github.com/ipfs/go-log"
George Antoniadis's avatar
George Antoniadis committed
8
	kb "github.com/libp2p/go-libp2p-kbucket"
9 10
	peer "github.com/libp2p/go-libp2p-peer"
	pstore "github.com/libp2p/go-libp2p-peerstore"
George Antoniadis's avatar
George Antoniadis committed
11
	notif "github.com/libp2p/go-libp2p-routing/notifications"
12 13
)

14
// Required in order for proper JSON marshaling
Jeromy's avatar
Jeromy committed
15 16
func pointerizePeerInfos(pis []pstore.PeerInfo) []*pstore.PeerInfo {
	out := make([]*pstore.PeerInfo, len(pis))
17 18 19 20 21 22 23
	for i, p := range pis {
		np := p
		out[i] = &np
	}
	return out
}

Jeromy's avatar
Jeromy committed
24 25 26 27 28 29 30 31
func toPeerInfos(ps []peer.ID) []*pstore.PeerInfo {
	out := make([]*pstore.PeerInfo, len(ps))
	for i, p := range ps {
		out[i] = &pstore.PeerInfo{ID: p}
	}
	return out
}

32
func loggableKey(k string) logging.LoggableMap {
Tomas Virgl's avatar
Tomas Virgl committed
33 34
	cid, err := cid.Cast([]byte(k))
	if err != nil {
Jeromy's avatar
Jeromy committed
35
		log.Errorf("loggableKey could not cast key: %x %v", k, err)
Tomas Virgl's avatar
Tomas Virgl committed
36
	} else {
Tomas Virgl's avatar
Tomas Virgl committed
37 38
		k = cid.String()
	}
39 40 41 42 43
	return logging.LoggableMap{
		"key": k,
	}
}

44 45
// Kademlia 'node lookup' operation. Returns a channel of the K closest peers
// to the given key
46 47
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
	e := log.EventBegin(ctx, "getClosestPeers", loggableKey(key))
48
	tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
49
	if len(tablepeers) == 0 {
50
		return nil, kb.ErrLookupFailure
51 52 53 54
	}

	out := make(chan peer.ID, KValue)

Jeromy's avatar
Jeromy committed
55 56 57
	// since the query doesnt actually pass our context down
	// we have to hack this here. whyrusleeping isnt a huge fan of goprocess
	parent := ctx
58 59
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
		// For DHT query command
Jeromy's avatar
Jeromy committed
60
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
61
			Type: notif.SendingQuery,
62
			ID:   p,
63
		})
64 65 66

		closer, err := dht.closerPeersSingle(ctx, key, p)
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67
			log.Debugf("error getting closer peers: %s", err)
68 69 70
			return nil, err
		}

Jeromy's avatar
Jeromy committed
71
		peerinfos := toPeerInfos(closer)
72 73

		// For DHT query command
Jeromy's avatar
Jeromy committed
74
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
75
			Type:      notif.PeerResponse,
76
			ID:        p,
Jeromy's avatar
Jeromy committed
77
			Responses: peerinfos, // todo: remove need for this pointerize thing
78
		})
79

Jeromy's avatar
Jeromy committed
80
		return &dhtQueryResult{closerPeers: peerinfos}, nil
81 82 83 84 85 86
	})

	go func() {
		defer close(out)
		defer e.Done()
		// run it!
Jeromy's avatar
Jeromy committed
87
		res, err := query.Run(ctx, tablepeers)
88 89 90
		if err != nil {
			log.Debugf("closestPeers query run error: %s", err)
		}
Jeromy's avatar
Jeromy committed
91 92 93 94 95 96 97 98 99 100 101

		if res != nil && res.finalSet != nil {
			sorted := kb.SortClosestPeers(res.finalSet.Peers(), kb.ConvertKey(key))
			if len(sorted) > KValue {
				sorted = sorted[:KValue]
			}

			for _, p := range sorted {
				out <- p
			}
		}
102 103 104 105 106
	}()

	return out, nil
}

107
func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key string, p peer.ID) ([]peer.ID, error) {
108 109 110 111 112 113 114 115 116
	pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
	if err != nil {
		return nil, err
	}

	var out []peer.ID
	for _, pbp := range pmes.GetCloserPeers() {
		pid := peer.ID(pbp.GetId())
		if pid != dht.self { // dont add self
Jeromy's avatar
Jeromy committed
117
			dht.peerstore.AddAddrs(pid, pbp.Addresses(), pstore.TempAddrTTL)
118 119 120 121 122
			out = append(out, pid)
		}
	}
	return out, nil
}