query.go 8.21 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Steven Allen's avatar
Steven Allen committed
5
	"errors"
6 7
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
Adin Schmahmann's avatar
Adin Schmahmann committed
8 9
	"github.com/libp2p/go-libp2p-core/routing"
	"github.com/libp2p/go-libp2p-kad-dht/kpeerset"
10
	kb "github.com/libp2p/go-libp2p-kbucket"
11 12

	pstore "github.com/libp2p/go-libp2p-core/peerstore"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13 14
)

Steven Allen's avatar
Steven Allen committed
15 16 17
// ErrNoPeersQueried is returned when we failed to connect to any peers.
var ErrNoPeersQueried = errors.New("failed to query any peers")

Adin Schmahmann's avatar
Adin Schmahmann committed
18 19
type queryFn func(context.Context, peer.ID) ([]*peer.AddrInfo, error)
type stopFn func(*kpeerset.SortedPeerset) bool
20

Adin Schmahmann's avatar
Adin Schmahmann committed
21
type query struct {
Adin Schmahmann's avatar
Adin Schmahmann committed
22 23
	ctx    context.Context
	cancel context.CancelFunc
24

Adin Schmahmann's avatar
Adin Schmahmann committed
25
	dht *IpfsDHT
Jeromy's avatar
Jeromy committed
26

Adin Schmahmann's avatar
Adin Schmahmann committed
27 28
	localPeers           *kpeerset.SortedPeerset
	globallyQueriedPeers *peer.Set
Adin Schmahmann's avatar
Adin Schmahmann committed
29 30
	queryFn              queryFn
	stopFn               stopFn
31 32
}

Adin Schmahmann's avatar
Adin Schmahmann committed
33
func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) ([]*query, error) {
Adin Schmahmann's avatar
Adin Schmahmann committed
34 35 36 37 38
	queryCtx, cancelQuery := context.WithCancel(ctx)

	numQueriesComplete := 0
	queryDone := make(chan struct{}, d)

39
	seedPeers := dht.routingTable.NearestPeers(kb.ConvertKey(target), dht.bucketSize)
40 41 42 43 44 45 46
	if len(seedPeers) == 0 {
		routing.PublishQueryEvent(ctx, &routing.QueryEvent{
			Type:  routing.QueryError,
			Extra: kb.ErrLookupFailure.Error(),
		})
		return nil, kb.ErrLookupFailure
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
47 48 49 50 51

	dht.rng.Shuffle(len(seedPeers), func(i, j int) {
		seedPeers[i], seedPeers[j] = seedPeers[j], seedPeers[i]
	})

Adin Schmahmann's avatar
Adin Schmahmann committed
52
	queries := make([]*query, d)
Adin Schmahmann's avatar
Adin Schmahmann committed
53 54 55

	peersQueried := peer.NewSet()
	for i := 0; i < d; i++ {
Adin Schmahmann's avatar
Adin Schmahmann committed
56
		query := &query{
Adin Schmahmann's avatar
Adin Schmahmann committed
57 58 59
			ctx:                  queryCtx,
			cancel:               cancelQuery,
			dht:                  dht,
60
			localPeers:           kpeerset.NewSortedPeerset(dht.bucketSize, target, dht.sortPeers),
Adin Schmahmann's avatar
Adin Schmahmann committed
61 62 63 64 65 66
			globallyQueriedPeers: peersQueried,
			queryFn:              queryFn,
			stopFn:               stopFn,
		}

		queries[i] = query
67
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68

Adin Schmahmann's avatar
Adin Schmahmann committed
69 70
	for i := 0; i < len(seedPeers); i++ {
		queries[i%d].localPeers.Add(seedPeers[i])
71 72
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
73 74 75 76 77 78
	for i := 0; i < d; i++ {
		query := queries[i]
		go func() {
			strictParallelismQuery(query)
			queryDone <- struct{}{}
		}()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79 80
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
81 82 83 84 85 86 87 88 89 90 91 92
loop:
	for {
		select {
		case <-queryDone:
			numQueriesComplete++
			if numQueriesComplete == d {
				break loop
			}
		case <-ctx.Done():
			break loop
		}
	}
93

94
	return queries, nil
95 96
}

Adin Schmahmann's avatar
Adin Schmahmann committed
97 98 99
func (dht *IpfsDHT) sortPeers(peers []kpeerset.IPeerMetric) kpeerset.SortablePeers {
	return kpeerset.PeersSortedByLatency(peers, dht.host.Network(), dht.peerstore)
}
100

Adin Schmahmann's avatar
Adin Schmahmann committed
101
func strictParallelismQuery(q *query) {
Adin Schmahmann's avatar
Adin Schmahmann committed
102 103 104 105 106 107 108 109 110
	/*
		start with K closest peers (some queried already some not)
		take best alpha (sorted by some metric)
		query those alpha
		once they complete:
			if the alpha requests did not add any new peers to top K, repeat with unqueried top K
			else repeat
	*/

111
	foundCloser := false
Adin Schmahmann's avatar
Adin Schmahmann committed
112 113
	for {
		peersToQuery := q.localPeers.KUnqueried()
114

Adin Schmahmann's avatar
Adin Schmahmann committed
115 116 117
		if len(peersToQuery) == 0 {
			return
		}
118

119
		// TODO: Is it finding a closer peer if it's closer than one we know about or one we have queried?
120
		numQuery := q.dht.alpha
121
		if foundCloser {
Adin Schmahmann's avatar
Adin Schmahmann committed
122 123 124 125
			numQuery = len(peersToQuery)
		} else if pqLen := len(peersToQuery); pqLen < numQuery {
			numQuery = pqLen
		}
126
		foundCloser = false
127

128
		queryResCh := make(chan *queryResult, numQuery)
Adin Schmahmann's avatar
Adin Schmahmann committed
129
		resultsReceived := 0
130

Adin Schmahmann's avatar
Adin Schmahmann committed
131 132 133 134 135 136 137 138 139
		for _, p := range peersToQuery[:numQuery] {
			go func(p peer.ID) {
				queryResCh <- q.queryPeer(q.ctx, p)
			}(p)
		}

	loop:
		for {
			select {
140 141
			case res := <-queryResCh:
				foundCloser = foundCloser || res.foundCloserPeer
Adin Schmahmann's avatar
Adin Schmahmann committed
142 143 144 145 146 147 148 149
				resultsReceived++
				if resultsReceived == numQuery {
					break loop
				}
			case <-q.ctx.Done():
				return
			}
		}
150
	}
151 152
}

Adin Schmahmann's avatar
Adin Schmahmann committed
153
func simpleQuery(q *query) {
Adin Schmahmann's avatar
Adin Schmahmann committed
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
	/*
		start with K closest peers (some queried already some not)
		take best alpha (sorted by some metric)
		query those alpha
		   - if a query fails then take the next one
		once they complete:
			if the alpha requests did not add any new peers to top K, repeat with unqueried top K
			else repeat
	*/

	var lastPeers []peer.ID
	for {
		peersToQuery := q.localPeers.KUnqueried()

		if len(peersToQuery) == 0 {
			return
		}
171

172
		numQuery := q.dht.alpha
Adin Schmahmann's avatar
Adin Schmahmann committed
173 174 175 176 177
		if lastPeers != nil && peerSlicesEqual(lastPeers, peersToQuery) {
			numQuery = len(peersToQuery)
		} else if pqLen := len(peersToQuery); pqLen < numQuery {
			numQuery = pqLen
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
178

Adin Schmahmann's avatar
Adin Schmahmann committed
179 180 181 182
		peersToQueryCh := make(chan peer.ID, numQuery)
		for _, p := range peersToQuery[:numQuery] {
			peersToQueryCh <- p
		}
183
		queryResCh := make(chan *queryResult, numQuery)
Adin Schmahmann's avatar
Adin Schmahmann committed
184
		queriesSucceeded, queriesSent := 0, numQuery
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
185

Adin Schmahmann's avatar
Adin Schmahmann committed
186 187 188 189 190 191 192
	dialPeers:
		for {
			select {
			case p := <-peersToQueryCh:
				go func() {
					queryResCh <- q.queryPeer(q.ctx, p)
				}()
193 194
			case res := <-queryResCh:
				if res.success {
Adin Schmahmann's avatar
Adin Schmahmann committed
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
					queriesSucceeded++
					if queriesSucceeded == numQuery {
						break dialPeers
					}
				} else {
					queriesSent++
					if queriesSent >= len(peersToQuery) {
						break dialPeers
					}
					peersToQueryCh <- peersToQuery[queriesSent]
				}
			case <-q.ctx.Done():
				return
			}
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
210
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
211 212
}

Adin Schmahmann's avatar
Adin Schmahmann committed
213
func boundedDialQuery(q *query) {
Adin Schmahmann's avatar
Adin Schmahmann committed
214 215 216 217 218 219 220 221 222 223 224 225
	/*
		start with K closest peers (some queried already some not)
		take best alpha (sorted by some metric)
		query those alpha
		-- if queried peer falls out of top K we've heard of + top alpha we've received responses from
			+ others like percentage of way through the timeout, their reputation, etc.
			1) Cancel dial 2) Cancel query but not dial 3) Continue with query
	*/

	var lastPeers []peer.ID
	for {
		peersToQuery := q.localPeers.KUnqueried()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
226

Adin Schmahmann's avatar
Adin Schmahmann committed
227 228
		if len(peersToQuery) == 0 {
			return
229 230
		}

231
		numQuery := q.dht.alpha
Adin Schmahmann's avatar
Adin Schmahmann committed
232 233 234
		if lastPeers != nil && peerSlicesEqual(lastPeers, peersToQuery) {
			numQuery = len(peersToQuery)
		}
235

Adin Schmahmann's avatar
Adin Schmahmann committed
236 237 238 239
		peersToQueryCh := make(chan peer.ID, numQuery)
		for _, p := range peersToQuery[:numQuery] {
			peersToQueryCh <- p
		}
240
		queryResCh := make(chan *queryResult, numQuery)
Adin Schmahmann's avatar
Adin Schmahmann committed
241
		queriesSucceeded, queriesSent := 0, 0
Steven Allen's avatar
Steven Allen committed
242

Adin Schmahmann's avatar
Adin Schmahmann committed
243 244 245 246 247 248
		for {
			select {
			case p := <-peersToQueryCh:
				go func() {
					queryResCh <- q.queryPeer(q.ctx, p)
				}()
249 250
			case res := <-queryResCh:
				if res.success {
Adin Schmahmann's avatar
Adin Schmahmann committed
251 252 253 254 255 256 257 258 259 260 261 262
					queriesSucceeded++
				} else {
					queriesSent++
					if queriesSent >= len(peersToQuery) {
						return
					}
					peersToQueryCh <- peersToQuery[queriesSent]
				}
			case <-q.ctx.Done():
				return
			}
		}
263 264 265
	}
}

266 267 268 269 270
type queryResult struct {
	success         bool
	foundCloserPeer bool
}

Adin Schmahmann's avatar
Adin Schmahmann committed
271
func (q *query) queryPeer(ctx context.Context, p peer.ID) *queryResult {
Adin Schmahmann's avatar
Adin Schmahmann committed
272 273 274 275
	dialCtx, queryCtx := ctx, ctx

	if err := q.dht.dialPeer(dialCtx, p); err != nil {
		q.localPeers.Remove(p)
276
		return &queryResult{}
Adin Schmahmann's avatar
Adin Schmahmann committed
277 278 279
	}
	if !q.globallyQueriedPeers.TryAdd(p) {
		q.localPeers.Remove(p)
280
		return &queryResult{}
281 282
	}

283 284 285 286 287
	if q.stopFn(q.localPeers) {
		q.cancel()
		return &queryResult{}
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
288 289 290
	newPeers, err := q.queryFn(queryCtx, p)
	if err != nil {
		q.localPeers.Remove(p)
291
		return &queryResult{}
292 293
	}

Adin Schmahmann's avatar
Adin Schmahmann committed
294
	q.localPeers.MarkQueried(p)
295

Adin Schmahmann's avatar
Adin Schmahmann committed
296 297
	if len(newPeers) == 0 {
		logger.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
298
	}
299

Adin Schmahmann's avatar
Adin Schmahmann committed
300 301 302 303 304
	for _, next := range newPeers {
		if next.ID == q.dht.self { // don't add self.
			logger.Debugf("PEERS CLOSER -- worker for: %v found self", p)
			continue
		}
305

Adin Schmahmann's avatar
Adin Schmahmann committed
306 307 308
		// add their addresses to the dialer's peerstore
		q.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
	}
309

310
	foundCloserPeer := false
Adin Schmahmann's avatar
Adin Schmahmann committed
311
	for _, np := range newPeers {
312 313
		closer := q.localPeers.Add(np.ID)
		foundCloserPeer = foundCloserPeer || closer
314
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
315 316 317 318

	if q.stopFn(q.localPeers) {
		q.cancel()
	}
319 320 321 322
	return &queryResult{
		success:         true,
		foundCloserPeer: foundCloserPeer,
	}
323
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
324

Adin Schmahmann's avatar
Adin Schmahmann committed
325
func (dht *IpfsDHT) dialPeer(ctx context.Context, p peer.ID) error {
326
	// short-circuit if we're already connected.
Adin Schmahmann's avatar
Adin Schmahmann committed
327
	if dht.host.Network().Connectedness(p) == network.Connected {
328 329 330
		return nil
	}

Matt Joiner's avatar
Matt Joiner committed
331
	logger.Debug("not connected. dialing.")
Adin Schmahmann's avatar
Adin Schmahmann committed
332
	routing.PublishQueryEvent(ctx, &routing.QueryEvent{
333
		Type: routing.DialingPeer,
334 335 336
		ID:   p,
	})

337
	pi := peer.AddrInfo{ID: p}
Adin Schmahmann's avatar
Adin Schmahmann committed
338
	if err := dht.host.Connect(ctx, pi); err != nil {
Matt Joiner's avatar
Matt Joiner committed
339
		logger.Debugf("error connecting: %s", err)
Adin Schmahmann's avatar
Adin Schmahmann committed
340
		routing.PublishQueryEvent(ctx, &routing.QueryEvent{
341
			Type:  routing.QueryError,
342 343 344 345 346 347
			Extra: err.Error(),
			ID:    p,
		})

		return err
	}
Matt Joiner's avatar
Matt Joiner committed
348
	logger.Debugf("connected. dial success.")
349 350 351
	return nil
}

Adin Schmahmann's avatar
Adin Schmahmann committed
352 353 354 355 356 357 358 359 360
// Equal tells whether a and b contain the same elements.
// A nil argument is equivalent to an empty slice.
func peerSlicesEqual(a, b []peer.ID) bool {
	if len(a) != len(b) {
		return false
	}
	for i, v := range a {
		if v != b[i] {
			return false
361
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
362
	}
Adin Schmahmann's avatar
Adin Schmahmann committed
363
	return true
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
364
}