query.go 8.15 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Steven Allen's avatar
Steven Allen committed
5
	"errors"
6 7
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
8
	pstore "github.com/libp2p/go-libp2p-core/peerstore"
Adin Schmahmann's avatar
Adin Schmahmann committed
9
	"github.com/libp2p/go-libp2p-core/routing"
10 11 12 13
	"github.com/libp2p/go-libp2p-kad-dht/kpeerset/peerheap"
	"math/big"
	"time"

Adin Schmahmann's avatar
Adin Schmahmann committed
14
	"github.com/libp2p/go-libp2p-kad-dht/kpeerset"
15
	kb "github.com/libp2p/go-libp2p-kbucket"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16 17
)

Steven Allen's avatar
Steven Allen committed
18 19 20
// ErrNoPeersQueried is returned when we failed to connect to any peers.
var ErrNoPeersQueried = errors.New("failed to query any peers")

Adin Schmahmann's avatar
Adin Schmahmann committed
21 22
type queryFn func(context.Context, peer.ID) ([]*peer.AddrInfo, error)
type stopFn func(*kpeerset.SortedPeerset) bool
23

24
// query represents a single disjoint query.
Adin Schmahmann's avatar
Adin Schmahmann committed
25
type query struct {
26 27 28
	// the query context.
	ctx context.Context
	// the cancellation function for the query context.
Adin Schmahmann's avatar
Adin Schmahmann committed
29
	cancel context.CancelFunc
30

Adin Schmahmann's avatar
Adin Schmahmann committed
31
	dht *IpfsDHT
Jeromy's avatar
Jeromy committed
32

33 34 35 36
	// localPeers is the set of peers that need to be queried or have already been queried for this query.
	localPeers *kpeerset.SortedPeerset

	// globallyQueriedPeers is the combined set of peers queried across ALL the disjoint queries.
Adin Schmahmann's avatar
Adin Schmahmann committed
37
	globallyQueriedPeers *peer.Set
38 39 40 41 42 43

	// the function that will be used to query a single peer.
	queryFn queryFn

	// stopFn is used to determine if we should stop the WHOLE disjoint query.
	stopFn stopFn
44 45
}

Adin Schmahmann's avatar
Adin Schmahmann committed
46
func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) ([]*query, error) {
Adin Schmahmann's avatar
Adin Schmahmann committed
47 48 49 50 51
	queryCtx, cancelQuery := context.WithCancel(ctx)

	numQueriesComplete := 0
	queryDone := make(chan struct{}, d)

52
	// pick the K closest peers to the key in our Routing table and shuffle them.
53
	seedPeers := dht.routingTable.NearestPeers(kb.ConvertKey(target), dht.bucketSize)
54 55 56 57 58 59 60
	if len(seedPeers) == 0 {
		routing.PublishQueryEvent(ctx, &routing.QueryEvent{
			Type:  routing.QueryError,
			Extra: kb.ErrLookupFailure.Error(),
		})
		return nil, kb.ErrLookupFailure
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
61

62
	dht.rnglk.Lock()
Adin Schmahmann's avatar
Adin Schmahmann committed
63 64 65
	dht.rng.Shuffle(len(seedPeers), func(i, j int) {
		seedPeers[i], seedPeers[j] = seedPeers[j], seedPeers[i]
	})
66
	dht.rnglk.Unlock()
Adin Schmahmann's avatar
Adin Schmahmann committed
67

68
	// create "d" disjoint queries
Adin Schmahmann's avatar
Adin Schmahmann committed
69
	queries := make([]*query, d)
Adin Schmahmann's avatar
Adin Schmahmann committed
70 71
	peersQueried := peer.NewSet()
	for i := 0; i < d; i++ {
Adin Schmahmann's avatar
Adin Schmahmann committed
72
		query := &query{
Adin Schmahmann's avatar
Adin Schmahmann committed
73 74 75
			ctx:                  queryCtx,
			cancel:               cancelQuery,
			dht:                  dht,
76
			localPeers:           kpeerset.NewSortedPeerset(dht.bucketSize, target),
Adin Schmahmann's avatar
Adin Schmahmann committed
77 78 79 80 81 82
			globallyQueriedPeers: peersQueried,
			queryFn:              queryFn,
			stopFn:               stopFn,
		}

		queries[i] = query
83
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
84

85
	// distribute the shuffled K closest peers as seeds among the "d" disjoint queries
Adin Schmahmann's avatar
Adin Schmahmann committed
86 87
	for i := 0; i < len(seedPeers); i++ {
		queries[i%d].localPeers.Add(seedPeers[i])
88 89
	}

90
	// start the "d"  disjoint queries
Adin Schmahmann's avatar
Adin Schmahmann committed
91 92 93 94 95 96
	for i := 0; i < d; i++ {
		query := queries[i]
		go func() {
			strictParallelismQuery(query)
			queryDone <- struct{}{}
		}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
97 98
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
99
loop:
100
	// wait for all the "d" disjoint queries to complete before we return
Adin Schmahmann's avatar
Adin Schmahmann committed
101 102 103 104 105 106 107 108 109 110 111
	for {
		select {
		case <-queryDone:
			numQueriesComplete++
			if numQueriesComplete == d {
				break loop
			}
		case <-ctx.Done():
			break loop
		}
	}
112

113
	return queries, nil
114 115
}

116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
// TODO This function should be owned by the DHT as it dosen't really belong to "a query".
// scorePeerByDistanceAndLatency scores a peer using metrics such as connectendness of the peer, it's distance from the key
// and it's current known latency.
func (q query) scorePeerByDistanceAndLatency(p peer.ID, distanceFromKey *big.Int) interface{} {
	connectedness := q.dht.host.Network().Connectedness(p)
	latency := q.dht.host.Peerstore().LatencyEWMA(p)

	var c int64
	switch connectedness {
	case network.Connected:
		c = 1
	case network.CanConnect:
		c = 5
	case network.CannotConnect:
		c = 10000
	default:
		c = 20
	}

	l := int64(latency)
	if l <= 0 {
		l = int64(time.Second) * 10
	}

	res := big.NewInt(c)
	res.Mul(res, big.NewInt(l))
	res.Mul(res, distanceFromKey)

	return res
Adin Schmahmann's avatar
Adin Schmahmann committed
145
}
146

147 148
// strictParallelismQuery concurrently sends the query RPC to all eligible peers
// and waits for ALL the RPC's to complete before starting the next round of RPC's.
Adin Schmahmann's avatar
Adin Schmahmann committed
149
func strictParallelismQuery(q *query) {
150
	foundCloser := false
Adin Schmahmann's avatar
Adin Schmahmann committed
151
	for {
152 153 154 155 156 157 158 159 160 161 162
		// get the unqueried peers from among the K closest peers to the key sorted in ascending order
		// of their 'distance-latency` score.
		// We sort peers like this so that "better" peers are chosen to be in the α peers
		// which get queried from among the unqueried K  closet.
		peersToQuery := q.localPeers.UnqueriedFromKClosest(q.scorePeerByDistanceAndLatency,
			func(i1 peerheap.Item, i2 peerheap.Item) bool {
				return i1.Value.(*big.Int).Cmp(i2.Value.(*big.Int)) == -1
			})

		// The lookup terminates when the initiator has queried and gotten responses from the k
		// closest nodes it has heard about.
Adin Schmahmann's avatar
Adin Schmahmann committed
163 164 165
		if len(peersToQuery) == 0 {
			return
		}
166

167 168
		// Of the k nodes the initiator has heard of closest to the target,
		// it picks α that it has not yet queried and resends the FIND NODE RPC to them.
169
		numQuery := q.dht.alpha
170 171 172 173 174

		// However, If a round of RPC's fails to return a node any closer than the closest already heard about,
		// the initiator resends the RPC'S to all of the k closest nodes it has
		// not already queried.
		if !foundCloser {
Adin Schmahmann's avatar
Adin Schmahmann committed
175 176
			numQuery = len(peersToQuery)
		} else if pqLen := len(peersToQuery); pqLen < numQuery {
177
			// if we don't have α peers, pick whatever number we have.
Adin Schmahmann's avatar
Adin Schmahmann committed
178 179
			numQuery = pqLen
		}
180 181

		// reset foundCloser to false for the next round of RPC's
182
		foundCloser = false
183

184
		queryResCh := make(chan *queryResult, numQuery)
Adin Schmahmann's avatar
Adin Schmahmann committed
185
		resultsReceived := 0
186

187
		// send RPC's to all the chosen peers concurrently
Adin Schmahmann's avatar
Adin Schmahmann committed
188 189
		for _, p := range peersToQuery[:numQuery] {
			go func(p peer.ID) {
190
				queryResCh <- q.queryPeer(p)
Adin Schmahmann's avatar
Adin Schmahmann committed
191 192 193 194
			}(p)
		}

	loop:
195
		// wait for all outstanding RPC's to complete before we start the next round.
Adin Schmahmann's avatar
Adin Schmahmann committed
196 197
		for {
			select {
198 199
			case res := <-queryResCh:
				foundCloser = foundCloser || res.foundCloserPeer
Adin Schmahmann's avatar
Adin Schmahmann committed
200 201 202 203 204 205 206 207
				resultsReceived++
				if resultsReceived == numQuery {
					break loop
				}
			case <-q.ctx.Done():
				return
			}
		}
208
	}
209 210
}

211
type queryResult struct {
212 213
	// foundCloserPeer is true if the peer we're querying returns a peer
	// closer than the closest we've already heard about
214 215 216
	foundCloserPeer bool
}

217 218 219
// queryPeer queries a single peer.
func (q *query) queryPeer(p peer.ID) *queryResult {
	dialCtx, queryCtx := q.ctx, q.ctx
Adin Schmahmann's avatar
Adin Schmahmann committed
220

221
	// dial the peer
Adin Schmahmann's avatar
Adin Schmahmann committed
222 223
	if err := q.dht.dialPeer(dialCtx, p); err != nil {
		q.localPeers.Remove(p)
224
		return &queryResult{}
Adin Schmahmann's avatar
Adin Schmahmann committed
225
	}
226 227 228

	// add the peer to the global set of queried peers since the dial was successful
	// so that no other disjoint query tries sending an RPC to the same peer
Adin Schmahmann's avatar
Adin Schmahmann committed
229 230
	if !q.globallyQueriedPeers.TryAdd(p) {
		q.localPeers.Remove(p)
231
		return &queryResult{}
232 233
	}

234
	// did the dial fulfill the stop condition ?
235 236 237 238 239
	if q.stopFn(q.localPeers) {
		q.cancel()
		return &queryResult{}
	}

240
	// send query RPC to the remote peer
Adin Schmahmann's avatar
Adin Schmahmann committed
241 242 243
	newPeers, err := q.queryFn(queryCtx, p)
	if err != nil {
		q.localPeers.Remove(p)
244
		return &queryResult{}
245 246
	}

247
	// mark the peer as queried.
Adin Schmahmann's avatar
Adin Schmahmann committed
248
	q.localPeers.MarkQueried(p)
249

Adin Schmahmann's avatar
Adin Schmahmann committed
250 251
	if len(newPeers) == 0 {
		logger.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
252
	}
253

254
	foundCloserPeer := false
Adin Schmahmann's avatar
Adin Schmahmann committed
255 256 257 258 259
	for _, next := range newPeers {
		if next.ID == q.dht.self { // don't add self.
			logger.Debugf("PEERS CLOSER -- worker for: %v found self", p)
			continue
		}
260

Adin Schmahmann's avatar
Adin Schmahmann committed
261 262
		// add their addresses to the dialer's peerstore
		q.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
263
		closer := q.localPeers.Add(next.ID)
264
		foundCloserPeer = foundCloserPeer || closer
265
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
266

267
	// did the successful query RPC fulfill the query stop condition ?
Adin Schmahmann's avatar
Adin Schmahmann committed
268 269 270
	if q.stopFn(q.localPeers) {
		q.cancel()
	}
271

272 273 274
	return &queryResult{
		foundCloserPeer: foundCloserPeer,
	}
275
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
276

Adin Schmahmann's avatar
Adin Schmahmann committed
277
func (dht *IpfsDHT) dialPeer(ctx context.Context, p peer.ID) error {
278
	// short-circuit if we're already connected.
Adin Schmahmann's avatar
Adin Schmahmann committed
279
	if dht.host.Network().Connectedness(p) == network.Connected {
280 281 282
		return nil
	}

Matt Joiner's avatar
Matt Joiner committed
283
	logger.Debug("not connected. dialing.")
Adin Schmahmann's avatar
Adin Schmahmann committed
284
	routing.PublishQueryEvent(ctx, &routing.QueryEvent{
285
		Type: routing.DialingPeer,
286 287 288
		ID:   p,
	})

289
	pi := peer.AddrInfo{ID: p}
Adin Schmahmann's avatar
Adin Schmahmann committed
290
	if err := dht.host.Connect(ctx, pi); err != nil {
Matt Joiner's avatar
Matt Joiner committed
291
		logger.Debugf("error connecting: %s", err)
Adin Schmahmann's avatar
Adin Schmahmann committed
292
		routing.PublishQueryEvent(ctx, &routing.QueryEvent{
293
			Type:  routing.QueryError,
294 295 296 297 298 299
			Extra: err.Error(),
			ID:    p,
		})

		return err
	}
Matt Joiner's avatar
Matt Joiner committed
300
	logger.Debugf("connected. dial success.")
301 302
	return nil
}