lookup.go 2.87 KB
Newer Older
1 2 3
package dht

import (
4
	peer "github.com/ipfs/go-libp2p-peer"
George Antoniadis's avatar
George Antoniadis committed
5
	pset "github.com/ipfs/go-libp2p-peer/peerset"
6
	pstore "github.com/ipfs/go-libp2p-peerstore"
7
	logging "github.com/ipfs/go-log"
George Antoniadis's avatar
George Antoniadis committed
8 9
	kb "github.com/libp2p/go-libp2p-kbucket"
	notif "github.com/libp2p/go-libp2p-routing/notifications"
10
	context "golang.org/x/net/context"
11 12
)

13
// Required in order for proper JSON marshaling
Jeromy's avatar
Jeromy committed
14 15
func pointerizePeerInfos(pis []pstore.PeerInfo) []*pstore.PeerInfo {
	out := make([]*pstore.PeerInfo, len(pis))
16 17 18 19 20 21 22
	for i, p := range pis {
		np := p
		out[i] = &np
	}
	return out
}

23 24 25 26 27 28
func loggableKey(k string) logging.LoggableMap {
	return logging.LoggableMap{
		"key": k,
	}
}

29 30
// Kademlia 'node lookup' operation. Returns a channel of the K closest peers
// to the given key
31 32
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
	e := log.EventBegin(ctx, "getClosestPeers", loggableKey(key))
Jeromy's avatar
Jeromy committed
33
	tablepeers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue)
34
	if len(tablepeers) == 0 {
35
		return nil, kb.ErrLookupFailure
36 37 38 39 40 41 42 43 44 45 46 47 48 49
	}

	out := make(chan peer.ID, KValue)
	peerset := pset.NewLimited(KValue)

	for _, p := range tablepeers {
		select {
		case out <- p:
		case <-ctx.Done():
			return nil, ctx.Err()
		}
		peerset.Add(p)
	}

Jeromy's avatar
Jeromy committed
50 51 52
	// since the query doesnt actually pass our context down
	// we have to hack this here. whyrusleeping isnt a huge fan of goprocess
	parent := ctx
53 54
	query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
		// For DHT query command
Jeromy's avatar
Jeromy committed
55
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
56
			Type: notif.SendingQuery,
57
			ID:   p,
58
		})
59 60 61

		closer, err := dht.closerPeersSingle(ctx, key, p)
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62
			log.Debugf("error getting closer peers: %s", err)
63 64 65
			return nil, err
		}

Jeromy's avatar
Jeromy committed
66
		var filtered []pstore.PeerInfo
67 68 69 70 71 72 73 74 75 76 77 78
		for _, clp := range closer {
			if kb.Closer(clp, dht.self, key) && peerset.TryAdd(clp) {
				select {
				case out <- clp:
				case <-ctx.Done():
					return nil, ctx.Err()
				}
				filtered = append(filtered, dht.peerstore.PeerInfo(clp))
			}
		}

		// For DHT query command
Jeromy's avatar
Jeromy committed
79
		notif.PublishQueryEvent(parent, &notif.QueryEvent{
80
			Type:      notif.PeerResponse,
81 82
			ID:        p,
			Responses: pointerizePeerInfos(filtered),
83
		})
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100

		return &dhtQueryResult{closerPeers: filtered}, nil
	})

	go func() {
		defer close(out)
		defer e.Done()
		// run it!
		_, err := query.Run(ctx, tablepeers)
		if err != nil {
			log.Debugf("closestPeers query run error: %s", err)
		}
	}()

	return out, nil
}

101
func (dht *IpfsDHT) closerPeersSingle(ctx context.Context, key string, p peer.ID) ([]peer.ID, error) {
102 103 104 105 106 107 108 109 110
	pmes, err := dht.findPeerSingle(ctx, p, peer.ID(key))
	if err != nil {
		return nil, err
	}

	var out []peer.ID
	for _, pbp := range pmes.GetCloserPeers() {
		pid := peer.ID(pbp.GetId())
		if pid != dht.self { // dont add self
Jeromy's avatar
Jeromy committed
111
			dht.peerstore.AddAddrs(pid, pbp.Addresses(), pstore.TempAddrTTL)
112 113 114 115 116
			out = append(out, pid)
		}
	}
	return out, nil
}