query.go 7.84 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Steven Allen's avatar
Steven Allen committed
5
	"errors"
6 7
	"sync"

8
	logging "github.com/ipfs/go-log"
George Antoniadis's avatar
George Antoniadis committed
9
	todoctr "github.com/ipfs/go-todocounter"
10 11
	process "github.com/jbenet/goprocess"
	ctxproc "github.com/jbenet/goprocess/context"
Jeromy's avatar
Jeromy committed
12
	inet "github.com/libp2p/go-libp2p-net"
13 14 15 16
	peer "github.com/libp2p/go-libp2p-peer"
	pset "github.com/libp2p/go-libp2p-peer/peerset"
	pstore "github.com/libp2p/go-libp2p-peerstore"
	queue "github.com/libp2p/go-libp2p-peerstore/queue"
George Antoniadis's avatar
George Antoniadis committed
17 18
	routing "github.com/libp2p/go-libp2p-routing"
	notif "github.com/libp2p/go-libp2p-routing/notifications"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19 20
)

Steven Allen's avatar
Steven Allen committed
21 22 23
// ErrNoPeersQueried is returned when we failed to connect to any peers.
var ErrNoPeersQueried = errors.New("failed to query any peers")

24
var maxQueryConcurrency = AlphaValue
25

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26
type dhtQuery struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
27
	dht         *IpfsDHT
28
	key         string    // the key we're querying for
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29 30
	qfunc       queryFunc // the function to execute per peer
	concurrency int       // the concurrency parameter
31 32 33
}

type dhtQueryResult struct {
34 35 36
	peer        *pstore.PeerInfo   // FindPeer
	closerPeers []*pstore.PeerInfo // *
	success     bool
Jeromy's avatar
Jeromy committed
37

38 39
	finalSet   *pset.PeerSet
	queriedSet *pset.PeerSet
40 41 42
}

// constructs query
43
func (dht *IpfsDHT) newQuery(k string, f queryFunc) *dhtQuery {
44 45
	return &dhtQuery{
		key:         k,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
46
		dht:         dht,
47 48 49
		qfunc:       f,
		concurrency: maxQueryConcurrency,
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
50 51 52 53 54 55 56
}

// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
57
type queryFunc func(context.Context, peer.ID) (*dhtQueryResult, error)
58 59

// Run runs the query at hand. pass in a list of peers to use first.
60
func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
61 62 63 64 65 66
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	default:
	}

67 68 69
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

70 71
	runner := newQueryRunner(q)
	return runner.Run(ctx, peers)
72 73 74
}

type dhtQueryRunner struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
75 76
	query          *dhtQuery        // query to run
	peersSeen      *pset.PeerSet    // all peers queried. prevent querying same peer 2x
77
	peersQueried   *pset.PeerSet    // peers successfully connected to and queried
78
	peersDialed    *dialQueue       // peers we have dialed to
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79 80
	peersToQuery   *queue.ChanQueue // peers remaining to be queried
	peersRemaining todoctr.Counter  // peersToQuery + currently processing
81

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
82
	result *dhtQueryResult // query result
83

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
84
	rateLimit chan struct{} // processing semaphore
Jeromy's avatar
Jeromy committed
85
	log       logging.EventLogger
86

87 88
	runCtx context.Context

89
	proc process.Process
90 91 92
	sync.RWMutex
}

93 94
func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
	proc := process.WithParent(process.Background())
95
	ctx := ctxproc.OnClosingContext(proc)
96 97
	peersToQuery := queue.NewChanQueue(ctx, queue.NewXORDistancePQ(string(q.key)))
	r := &dhtQueryRunner{
98 99
		query:          q,
		peersRemaining: todoctr.NewSyncCounter(),
100
		peersSeen:      pset.New(),
101
		peersQueried:   pset.New(),
102
		rateLimit:      make(chan struct{}, q.concurrency),
103
		peersToQuery:   peersToQuery,
104
		proc:           proc,
105
	}
106 107 108 109 110 111 112 113 114 115 116
	dq, err := newDialQueue(&dqParams{
		ctx:    ctx,
		target: q.key,
		in:     peersToQuery,
		dialFn: r.dialPeer,
		config: dqDefaultConfig(),
	})
	if err != nil {
		panic(err)
	}
	r.peersDialed = dq
117
	return r
118 119
}

120
func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
Matt Joiner's avatar
Matt Joiner committed
121
	r.log = logger
122
	r.runCtx = ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
123

124
	if len(peers) == 0 {
Matt Joiner's avatar
Matt Joiner committed
125
		logger.Warning("Running query with no peers!")
126 127
		return nil, nil
	}
128

129 130 131
	// setup concurrency rate limiting
	for i := 0; i < r.query.concurrency; i++ {
		r.rateLimit <- struct{}{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132 133
	}

134 135
	// add all the peers we got first.
	for _, p := range peers {
136
		r.addPeerToQuery(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
137 138
	}

139 140 141 142 143
	// start the dial queue only after we've added the initial set of peers.
	// this is to avoid race conditions that could cause the peersRemaining todoctr
	// to be done too early if the initial dial fails before others make it into the queue.
	r.peersDialed.Start()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
144
	// go do this thing.
145
	// do it as a child proc to make sure Run exits
146
	// ONLY AFTER spawn workers has exited.
147
	r.proc.Go(r.spawnWorkers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
148 149

	// wait until they're done.
150
	var err error
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
151

152 153 154
	// now, if the context finishes, close the proc.
	// we have to do it here because the logic before is setup, which
	// should run without closing the proc.
rht's avatar
rht committed
155
	ctxproc.CloseAfterContext(r.proc, ctx)
156

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157
	select {
158
	case <-r.peersRemaining.Done():
159
		r.proc.Close()
Steven Allen's avatar
Steven Allen committed
160 161 162 163
		if r.peersQueried.Size() == 0 {
			err = ErrNoPeersQueried
		} else {
			err = routing.ErrNotFound
164 165
		}

166
	case <-r.proc.Closed():
167
		err = r.runCtx.Err()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
168
	}
169

Steven Allen's avatar
Steven Allen committed
170 171 172
	r.RLock()
	defer r.RUnlock()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
173 174
	if r.result != nil && r.result.success {
		return r.result, nil
175 176
	}

Jeromy's avatar
Jeromy committed
177
	return &dhtQueryResult{
178 179
		finalSet:   r.peersSeen,
		queriedSet: r.peersQueried,
Jeromy's avatar
Jeromy committed
180
	}, err
181 182
}

183
func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) {
184
	// if new peer is ourselves...
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
185
	if next == r.query.dht.self {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
186
		r.log.Debug("addPeerToQuery skip self")
187 188 189
		return
	}

190
	if !r.peersSeen.TryAdd(next) {
191 192 193
		return
	}

194 195 196 197 198
	notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
		Type: notif.AddingPeer,
		ID:   next,
	})

199 200 201
	r.peersRemaining.Increment(1)
	select {
	case r.peersToQuery.EnqChan <- next:
202
	case <-r.proc.Closing():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
203
	}
204 205
}

206
func (r *dhtQueryRunner) spawnWorkers(proc process.Process) {
207 208 209 210 211
	for {
		select {
		case <-r.peersRemaining.Done():
			return

212
		case <-r.proc.Closing():
213 214
			return

Jeromy's avatar
Jeromy committed
215
		case <-r.rateLimit:
216
			ch := r.peersDialed.Consume()
Jeromy's avatar
Jeromy committed
217
			select {
218 219 220
			case p, ok := <-ch:
				if !ok {
					// this signals context cancellation.
221 222
					return
				}
Jeromy's avatar
Jeromy committed
223 224 225 226 227 228 229 230 231
				// do it as a child func to make sure Run exits
				// ONLY AFTER spawn workers has exited.
				proc.Go(func(proc process.Process) {
					r.queryPeer(proc, p)
				})
			case <-r.proc.Closing():
				return
			case <-r.peersRemaining.Done():
				return
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
232
			}
233 234 235
		}
	}
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
236

237 238 239 240 241 242
func (r *dhtQueryRunner) dialPeer(ctx context.Context, p peer.ID) error {
	// short-circuit if we're already connected.
	if r.query.dht.host.Network().Connectedness(p) == inet.Connected {
		return nil
	}

Matt Joiner's avatar
Matt Joiner committed
243
	logger.Debug("not connected. dialing.")
244 245 246 247 248 249 250
	notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
		Type: notif.DialingPeer,
		ID:   p,
	})

	pi := pstore.PeerInfo{ID: p}
	if err := r.query.dht.host.Connect(ctx, pi); err != nil {
Matt Joiner's avatar
Matt Joiner committed
251
		logger.Debugf("error connecting: %s", err)
252 253 254 255 256 257
		notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
			Type:  notif.QueryError,
			Extra: err.Error(),
			ID:    p,
		})

258 259
		// This peer is dropping out of the race.
		r.peersRemaining.Decrement(1)
260 261
		return err
	}
Matt Joiner's avatar
Matt Joiner committed
262
	logger.Debugf("connected. dial success.")
263 264 265
	return nil
}

266
func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
267 268
	// ok let's do this!

269
	// create a context from our proc.
270
	ctx := ctxproc.OnClosingContext(proc)
271

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
272 273
	// make sure we do this when we exit
	defer func() {
274
		// signal we're done processing peer p
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
275 276 277 278
		r.peersRemaining.Decrement(1)
		r.rateLimit <- struct{}{}
	}()

279
	// finally, run the query against this peer
280
	res, err := r.query.qfunc(ctx, p)
281

282 283
	r.peersQueried.Add(p)

284
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
285
		logger.Debugf("ERROR worker for: %v %v", p, err)
286
	} else if res.success {
Matt Joiner's avatar
Matt Joiner committed
287
		logger.Debugf("SUCCESS worker for: %v %s", p, res)
288 289 290
		r.Lock()
		r.result = res
		r.Unlock()
291 292 293
		if res.peer != nil {
			r.query.dht.peerstore.AddAddrs(res.peer.ID, res.peer.Addrs, pstore.TempAddrTTL)
		}
294
		go r.proc.Close() // signal to everyone that we're done.
295
		// must be async, as we're one of the children, and Close blocks.
296

297
	} else if len(res.closerPeers) > 0 {
Matt Joiner's avatar
Matt Joiner committed
298
		logger.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
299
		for _, next := range res.closerPeers {
300
			if next.ID == r.query.dht.self { // don't add self.
Matt Joiner's avatar
Matt Joiner committed
301
				logger.Debugf("PEERS CLOSER -- worker for: %v found self", p)
302 303 304
				continue
			}

305
			// add their addresses to the dialer's peerstore
Jeromy's avatar
Jeromy committed
306
			r.query.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
307
			r.addPeerToQuery(next.ID)
Matt Joiner's avatar
Matt Joiner committed
308
			logger.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
309
		}
310
	} else {
Matt Joiner's avatar
Matt Joiner committed
311
		logger.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
312 313
	}
}