query.go 7.84 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
5 6
	"sync"

7 8
	u "github.com/ipfs/go-ipfs-util"
	logging "github.com/ipfs/go-log"
George Antoniadis's avatar
George Antoniadis committed
9
	todoctr "github.com/ipfs/go-todocounter"
10 11
	process "github.com/jbenet/goprocess"
	ctxproc "github.com/jbenet/goprocess/context"
Jeromy's avatar
Jeromy committed
12
	inet "github.com/libp2p/go-libp2p-net"
13 14 15 16
	peer "github.com/libp2p/go-libp2p-peer"
	pset "github.com/libp2p/go-libp2p-peer/peerset"
	pstore "github.com/libp2p/go-libp2p-peerstore"
	queue "github.com/libp2p/go-libp2p-peerstore/queue"
George Antoniadis's avatar
George Antoniadis committed
17 18
	routing "github.com/libp2p/go-libp2p-routing"
	notif "github.com/libp2p/go-libp2p-routing/notifications"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19 20
)

21
var maxQueryConcurrency = AlphaValue
22

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23
type dhtQuery struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24
	dht         *IpfsDHT
25
	key         string    // the key we're querying for
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26 27
	qfunc       queryFunc // the function to execute per peer
	concurrency int       // the concurrency parameter
28 29 30
}

type dhtQueryResult struct {
Jeromy's avatar
Jeromy committed
31 32 33 34
	value         []byte             // GetValue
	peer          *pstore.PeerInfo   // FindPeer
	providerPeers []pstore.PeerInfo  // GetProviders
	closerPeers   []*pstore.PeerInfo // *
35
	success       bool
Jeromy's avatar
Jeromy committed
36

37 38
	finalSet   *pset.PeerSet
	queriedSet *pset.PeerSet
39 40 41
}

// constructs query
42
func (dht *IpfsDHT) newQuery(k string, f queryFunc) *dhtQuery {
43 44
	return &dhtQuery{
		key:         k,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45
		dht:         dht,
46 47 48
		qfunc:       f,
		concurrency: maxQueryConcurrency,
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49 50 51 52 53 54 55
}

// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
56
type queryFunc func(context.Context, peer.ID) (*dhtQueryResult, error)
57 58

// Run runs the query at hand. pass in a list of peers to use first.
59
func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
60 61 62 63 64 65
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	default:
	}

66 67 68
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

69 70
	runner := newQueryRunner(q)
	return runner.Run(ctx, peers)
71 72 73
}

type dhtQueryRunner struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
74 75
	query          *dhtQuery        // query to run
	peersSeen      *pset.PeerSet    // all peers queried. prevent querying same peer 2x
76
	peersQueried   *pset.PeerSet    // peers successfully connected to and queried
77
	peersDialed    *dialQueue       // peers we have dialed to
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
78 79
	peersToQuery   *queue.ChanQueue // peers remaining to be queried
	peersRemaining todoctr.Counter  // peersToQuery + currently processing
80

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
81
	result *dhtQueryResult // query result
82
	errs   u.MultiErr      // result errors. maybe should be a map[peer.ID]error
83

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
84
	rateLimit chan struct{} // processing semaphore
Jeromy's avatar
Jeromy committed
85
	log       logging.EventLogger
86

87 88
	runCtx context.Context

89
	proc process.Process
90 91 92
	sync.RWMutex
}

93 94
func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
	proc := process.WithParent(process.Background())
95
	ctx := ctxproc.OnClosingContext(proc)
96 97
	peersToQuery := queue.NewChanQueue(ctx, queue.NewXORDistancePQ(string(q.key)))
	r := &dhtQueryRunner{
98 99
		query:          q,
		peersRemaining: todoctr.NewSyncCounter(),
100
		peersSeen:      pset.New(),
101
		peersQueried:   pset.New(),
102
		rateLimit:      make(chan struct{}, q.concurrency),
103
		peersToQuery:   peersToQuery,
104
		proc:           proc,
105
	}
106 107 108 109 110 111 112 113 114 115 116
	dq, err := newDialQueue(&dqParams{
		ctx:    ctx,
		target: q.key,
		in:     peersToQuery,
		dialFn: r.dialPeer,
		config: dqDefaultConfig(),
	})
	if err != nil {
		panic(err)
	}
	r.peersDialed = dq
117
	return r
118 119
}

120
func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
Matt Joiner's avatar
Matt Joiner committed
121
	r.log = logger
122
	r.runCtx = ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
123

124
	if len(peers) == 0 {
Matt Joiner's avatar
Matt Joiner committed
125
		logger.Warning("Running query with no peers!")
126 127
		return nil, nil
	}
128

129 130 131
	// setup concurrency rate limiting
	for i := 0; i < r.query.concurrency; i++ {
		r.rateLimit <- struct{}{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132 133
	}

134 135
	// add all the peers we got first.
	for _, p := range peers {
136
		r.addPeerToQuery(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
137 138
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
139
	// go do this thing.
140
	// do it as a child proc to make sure Run exits
141
	// ONLY AFTER spawn workers has exited.
142
	r.proc.Go(r.spawnWorkers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
143 144 145 146

	// so workers are working.

	// wait until they're done.
147
	err := routing.ErrNotFound
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
148

149 150 151
	// now, if the context finishes, close the proc.
	// we have to do it here because the logic before is setup, which
	// should run without closing the proc.
rht's avatar
rht committed
152
	ctxproc.CloseAfterContext(r.proc, ctx)
153

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
154
	select {
155
	case <-r.peersRemaining.Done():
156
		r.proc.Close()
157 158 159
		r.RLock()
		defer r.RUnlock()

160 161 162 163
		err = routing.ErrNotFound

		// if every query to every peer failed, something must be very wrong.
		if len(r.errs) > 0 && len(r.errs) == r.peersSeen.Size() {
Matt Joiner's avatar
Matt Joiner committed
164
			logger.Debugf("query errs: %s", r.errs)
165
			err = r.errs[0]
166 167
		}

168
	case <-r.proc.Closed():
169 170
		r.RLock()
		defer r.RUnlock()
171
		err = r.runCtx.Err()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
172
	}
173

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
174 175
	if r.result != nil && r.result.success {
		return r.result, nil
176 177
	}

Jeromy's avatar
Jeromy committed
178
	return &dhtQueryResult{
179 180
		finalSet:   r.peersSeen,
		queriedSet: r.peersQueried,
Jeromy's avatar
Jeromy committed
181
	}, err
182 183
}

184
func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) {
185
	// if new peer is ourselves...
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
186
	if next == r.query.dht.self {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187
		r.log.Debug("addPeerToQuery skip self")
188 189 190
		return
	}

191
	if !r.peersSeen.TryAdd(next) {
192 193 194
		return
	}

195 196 197 198 199
	notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
		Type: notif.AddingPeer,
		ID:   next,
	})

200 201 202
	r.peersRemaining.Increment(1)
	select {
	case r.peersToQuery.EnqChan <- next:
203
	case <-r.proc.Closing():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
204
	}
205 206
}

207
func (r *dhtQueryRunner) spawnWorkers(proc process.Process) {
208 209 210 211 212
	for {
		select {
		case <-r.peersRemaining.Done():
			return

213
		case <-r.proc.Closing():
214 215
			return

Jeromy's avatar
Jeromy committed
216
		case <-r.rateLimit:
217
			ch := r.peersDialed.Consume()
Jeromy's avatar
Jeromy committed
218
			select {
219 220 221
			case p, ok := <-ch:
				if !ok {
					// this signals context cancellation.
222 223
					return
				}
Jeromy's avatar
Jeromy committed
224 225 226 227 228 229 230 231 232
				// do it as a child func to make sure Run exits
				// ONLY AFTER spawn workers has exited.
				proc.Go(func(proc process.Process) {
					r.queryPeer(proc, p)
				})
			case <-r.proc.Closing():
				return
			case <-r.peersRemaining.Done():
				return
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
233
			}
234 235 236
		}
	}
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
237

238 239 240 241 242 243
func (r *dhtQueryRunner) dialPeer(ctx context.Context, p peer.ID) error {
	// short-circuit if we're already connected.
	if r.query.dht.host.Network().Connectedness(p) == inet.Connected {
		return nil
	}

Matt Joiner's avatar
Matt Joiner committed
244
	logger.Debug("not connected. dialing.")
245 246 247 248 249 250 251
	notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
		Type: notif.DialingPeer,
		ID:   p,
	})

	pi := pstore.PeerInfo{ID: p}
	if err := r.query.dht.host.Connect(ctx, pi); err != nil {
Matt Joiner's avatar
Matt Joiner committed
252
		logger.Debugf("error connecting: %s", err)
253 254 255 256 257 258 259 260 261
		notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
			Type:  notif.QueryError,
			Extra: err.Error(),
			ID:    p,
		})

		r.Lock()
		r.errs = append(r.errs, err)
		r.Unlock()
262 263 264

		// This peer is dropping out of the race.
		r.peersRemaining.Decrement(1)
265 266
		return err
	}
Matt Joiner's avatar
Matt Joiner committed
267
	logger.Debugf("connected. dial success.")
268 269 270
	return nil
}

271
func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
272 273
	// ok let's do this!

274
	// create a context from our proc.
275
	ctx := ctxproc.OnClosingContext(proc)
276

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
277 278
	// make sure we do this when we exit
	defer func() {
279
		// signal we're done processing peer p
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
280 281 282 283
		r.peersRemaining.Decrement(1)
		r.rateLimit <- struct{}{}
	}()

284
	// finally, run the query against this peer
285
	res, err := r.query.qfunc(ctx, p)
286

287 288
	r.peersQueried.Add(p)

289
	if err != nil {
Matt Joiner's avatar
Matt Joiner committed
290
		logger.Debugf("ERROR worker for: %v %v", p, err)
291 292 293 294 295
		r.Lock()
		r.errs = append(r.errs, err)
		r.Unlock()

	} else if res.success {
Matt Joiner's avatar
Matt Joiner committed
296
		logger.Debugf("SUCCESS worker for: %v %s", p, res)
297 298 299
		r.Lock()
		r.result = res
		r.Unlock()
300
		go r.proc.Close() // signal to everyone that we're done.
301
		// must be async, as we're one of the children, and Close blocks.
302

303
	} else if len(res.closerPeers) > 0 {
Matt Joiner's avatar
Matt Joiner committed
304
		logger.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
305
		for _, next := range res.closerPeers {
306
			if next.ID == r.query.dht.self { // don't add self.
Matt Joiner's avatar
Matt Joiner committed
307
				logger.Debugf("PEERS CLOSER -- worker for: %v found self", p)
308 309 310
				continue
			}

311
			// add their addresses to the dialer's peerstore
Jeromy's avatar
Jeromy committed
312
			r.query.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
313
			r.addPeerToQuery(next.ID)
Matt Joiner's avatar
Matt Joiner committed
314
			logger.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
315
		}
316
	} else {
Matt Joiner's avatar
Matt Joiner committed
317
		logger.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
318 319
	}
}