query.go 7.66 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
Jeromy's avatar
Jeromy committed
4
	"context"
5 6
	"sync"

7 8
	u "github.com/ipfs/go-ipfs-util"
	logging "github.com/ipfs/go-log"
George Antoniadis's avatar
George Antoniadis committed
9
	todoctr "github.com/ipfs/go-todocounter"
10 11
	process "github.com/jbenet/goprocess"
	ctxproc "github.com/jbenet/goprocess/context"
Jeromy's avatar
Jeromy committed
12
	inet "github.com/libp2p/go-libp2p-net"
13 14 15 16
	peer "github.com/libp2p/go-libp2p-peer"
	pset "github.com/libp2p/go-libp2p-peer/peerset"
	pstore "github.com/libp2p/go-libp2p-peerstore"
	queue "github.com/libp2p/go-libp2p-peerstore/queue"
George Antoniadis's avatar
George Antoniadis committed
17 18
	routing "github.com/libp2p/go-libp2p-routing"
	notif "github.com/libp2p/go-libp2p-routing/notifications"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19 20
)

21
var maxQueryConcurrency = AlphaValue
22

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23
type dhtQuery struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24
	dht         *IpfsDHT
25
	key         string    // the key we're querying for
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26 27
	qfunc       queryFunc // the function to execute per peer
	concurrency int       // the concurrency parameter
28 29 30
}

type dhtQueryResult struct {
Jeromy's avatar
Jeromy committed
31 32 33 34
	value         []byte             // GetValue
	peer          *pstore.PeerInfo   // FindPeer
	providerPeers []pstore.PeerInfo  // GetProviders
	closerPeers   []*pstore.PeerInfo // *
35
	success       bool
Jeromy's avatar
Jeromy committed
36

37 38
	finalSet   *pset.PeerSet
	queriedSet *pset.PeerSet
39 40 41
}

// constructs query
42
func (dht *IpfsDHT) newQuery(k string, f queryFunc) *dhtQuery {
43 44
	return &dhtQuery{
		key:         k,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45
		dht:         dht,
46 47 48
		qfunc:       f,
		concurrency: maxQueryConcurrency,
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49 50 51 52 53 54 55
}

// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
56
type queryFunc func(context.Context, peer.ID) (*dhtQueryResult, error)
57 58

// Run runs the query at hand. pass in a list of peers to use first.
59
func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
60 61 62 63 64 65
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	default:
	}

66 67 68
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

69 70
	runner := newQueryRunner(q)
	return runner.Run(ctx, peers)
71 72 73
}

type dhtQueryRunner struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
74 75
	query          *dhtQuery        // query to run
	peersSeen      *pset.PeerSet    // all peers queried. prevent querying same peer 2x
76
	peersQueried   *pset.PeerSet    // peers successfully connected to and queried
77
	peersDialed    *dialQueue       // peers we have dialed to
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
78 79
	peersToQuery   *queue.ChanQueue // peers remaining to be queried
	peersRemaining todoctr.Counter  // peersToQuery + currently processing
80

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
81
	result *dhtQueryResult // query result
82
	errs   u.MultiErr      // result errors. maybe should be a map[peer.ID]error
83

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
84
	rateLimit chan struct{} // processing semaphore
Jeromy's avatar
Jeromy committed
85
	log       logging.EventLogger
86

87 88
	runCtx context.Context

89
	proc process.Process
90 91 92
	sync.RWMutex
}

93 94
func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
	proc := process.WithParent(process.Background())
95
	ctx := ctxproc.OnClosingContext(proc)
96 97
	peersToQuery := queue.NewChanQueue(ctx, queue.NewXORDistancePQ(string(q.key)))
	r := &dhtQueryRunner{
98 99
		query:          q,
		peersRemaining: todoctr.NewSyncCounter(),
100
		peersSeen:      pset.New(),
101
		peersQueried:   pset.New(),
102
		rateLimit:      make(chan struct{}, q.concurrency),
103
		peersToQuery:   peersToQuery,
104
		proc:           proc,
105
	}
106
	r.peersDialed = newDialQueue(ctx, q.key, peersToQuery, r.dialPeer, DialQueueMaxIdle, DialQueueScalingMutePeriod)
107
	return r
108 109
}

110
func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
111
	r.log = log
112
	r.runCtx = ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
113

114 115 116 117
	if len(peers) == 0 {
		log.Warning("Running query with no peers!")
		return nil, nil
	}
118

119 120 121
	// setup concurrency rate limiting
	for i := 0; i < r.query.concurrency; i++ {
		r.rateLimit <- struct{}{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
122 123
	}

124 125
	// add all the peers we got first.
	for _, p := range peers {
126
		r.addPeerToQuery(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
127 128
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129
	// go do this thing.
130
	// do it as a child proc to make sure Run exits
131
	// ONLY AFTER spawn workers has exited.
132
	r.proc.Go(r.spawnWorkers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
133 134 135 136

	// so workers are working.

	// wait until they're done.
137
	err := routing.ErrNotFound
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
138

139 140 141
	// now, if the context finishes, close the proc.
	// we have to do it here because the logic before is setup, which
	// should run without closing the proc.
rht's avatar
rht committed
142
	ctxproc.CloseAfterContext(r.proc, ctx)
143

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
144
	select {
145
	case <-r.peersRemaining.Done():
146
		r.proc.Close()
147 148 149
		r.RLock()
		defer r.RUnlock()

150 151 152 153 154 155
		err = routing.ErrNotFound

		// if every query to every peer failed, something must be very wrong.
		if len(r.errs) > 0 && len(r.errs) == r.peersSeen.Size() {
			log.Debugf("query errs: %s", r.errs)
			err = r.errs[0]
156 157
		}

158
	case <-r.proc.Closed():
159 160
		r.RLock()
		defer r.RUnlock()
161
		err = context.DeadlineExceeded
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
162
	}
163

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
164 165
	if r.result != nil && r.result.success {
		return r.result, nil
166 167
	}

Jeromy's avatar
Jeromy committed
168
	return &dhtQueryResult{
169 170
		finalSet:   r.peersSeen,
		queriedSet: r.peersQueried,
Jeromy's avatar
Jeromy committed
171
	}, err
172 173
}

174
func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) {
175
	// if new peer is ourselves...
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
176
	if next == r.query.dht.self {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
177
		r.log.Debug("addPeerToQuery skip self")
178 179 180
		return
	}

181
	if !r.peersSeen.TryAdd(next) {
182 183 184
		return
	}

185 186 187 188 189
	notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
		Type: notif.AddingPeer,
		ID:   next,
	})

190 191 192
	r.peersRemaining.Increment(1)
	select {
	case r.peersToQuery.EnqChan <- next:
193
	case <-r.proc.Closing():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
194
	}
195 196
}

197
func (r *dhtQueryRunner) spawnWorkers(proc process.Process) {
198 199 200 201 202
	for {
		select {
		case <-r.peersRemaining.Done():
			return

203
		case <-r.proc.Closing():
204 205
			return

Jeromy's avatar
Jeromy committed
206
		case <-r.rateLimit:
207
			ch := r.peersDialed.Consume()
Jeromy's avatar
Jeromy committed
208
			select {
209 210 211
			case p, ok := <-ch:
				if !ok {
					// this signals context cancellation.
212 213
					return
				}
Jeromy's avatar
Jeromy committed
214 215 216 217 218 219 220 221 222
				// do it as a child func to make sure Run exits
				// ONLY AFTER spawn workers has exited.
				proc.Go(func(proc process.Process) {
					r.queryPeer(proc, p)
				})
			case <-r.proc.Closing():
				return
			case <-r.peersRemaining.Done():
				return
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
223
			}
224 225 226
		}
	}
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
227

228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
func (r *dhtQueryRunner) dialPeer(ctx context.Context, p peer.ID) error {
	// short-circuit if we're already connected.
	if r.query.dht.host.Network().Connectedness(p) == inet.Connected {
		return nil
	}

	log.Debug("not connected. dialing.")
	notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
		Type: notif.DialingPeer,
		ID:   p,
	})

	pi := pstore.PeerInfo{ID: p}
	if err := r.query.dht.host.Connect(ctx, pi); err != nil {
		log.Debugf("error connecting: %s", err)
		notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
			Type:  notif.QueryError,
			Extra: err.Error(),
			ID:    p,
		})

		r.Lock()
		r.errs = append(r.errs, err)
		r.Unlock()
		return err
	}
	log.Debugf("connected. dial success.")
	return nil
}

258
func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
259 260
	// ok let's do this!

261
	// create a context from our proc.
262
	ctx := ctxproc.OnClosingContext(proc)
263

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
264 265
	// make sure we do this when we exit
	defer func() {
266
		// signal we're done processing peer p
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
267 268 269 270
		r.peersRemaining.Decrement(1)
		r.rateLimit <- struct{}{}
	}()

271
	// finally, run the query against this peer
272
	res, err := r.query.qfunc(ctx, p)
273

274 275
	r.peersQueried.Add(p)

276
	if err != nil {
277
		log.Debugf("ERROR worker for: %v %v", p, err)
278 279 280 281 282
		r.Lock()
		r.errs = append(r.errs, err)
		r.Unlock()

	} else if res.success {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
283
		log.Debugf("SUCCESS worker for: %v %s", p, res)
284 285 286
		r.Lock()
		r.result = res
		r.Unlock()
287
		go r.proc.Close() // signal to everyone that we're done.
288
		// must be async, as we're one of the children, and Close blocks.
289

290 291
	} else if len(res.closerPeers) > 0 {
		log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
292
		for _, next := range res.closerPeers {
293
			if next.ID == r.query.dht.self { // don't add self.
294 295 296 297
				log.Debugf("PEERS CLOSER -- worker for: %v found self", p)
				continue
			}

298
			// add their addresses to the dialer's peerstore
Jeromy's avatar
Jeromy committed
299
			r.query.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
300
			r.addPeerToQuery(next.ID)
301
			log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
302
		}
303 304
	} else {
		log.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
305 306
	}
}