query.go 7.72 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4 5
	"sync"

6
	key "github.com/ipfs/go-ipfs/blocks/key"
7 8
	notif "github.com/ipfs/go-ipfs/notifications"
	"github.com/ipfs/go-ipfs/routing"
9 10
	pset "github.com/ipfs/go-ipfs/thirdparty/peerset"
	todoctr "github.com/ipfs/go-ipfs/thirdparty/todocounter"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11

Jeromy's avatar
Jeromy committed
12
	peer "gx/ipfs/QmQGwpJy9P4yXZySmqkZEXCmbBpJUb8xntCv8Ca4taZwDC/go-libp2p-peer"
13 14
	process "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
	ctxproc "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context"
Jakub Sztandera's avatar
Jakub Sztandera committed
15
	logging "gx/ipfs/QmYtB7Qge8cJpXc4irsEp8zRqfnZMBeB7aTrMEkPk67DRv/go-log"
Jeromy's avatar
Jeromy committed
16 17 18
	pstore "gx/ipfs/QmZ62t46e9p7vMYqCmptwQC1RhRv5cpQ5cwoqYspedaXyq/go-libp2p-peerstore"
	queue "gx/ipfs/QmZ62t46e9p7vMYqCmptwQC1RhRv5cpQ5cwoqYspedaXyq/go-libp2p-peerstore/queue"
	u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
19
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20 21
)

22
var maxQueryConcurrency = AlphaValue
23

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24
type dhtQuery struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25
	dht         *IpfsDHT
26
	key         key.Key   // the key we're querying for
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
27 28
	qfunc       queryFunc // the function to execute per peer
	concurrency int       // the concurrency parameter
29 30 31
}

type dhtQueryResult struct {
Jeromy's avatar
Jeromy committed
32 33 34 35
	value         []byte            // GetValue
	peer          pstore.PeerInfo   // FindPeer
	providerPeers []pstore.PeerInfo // GetProviders
	closerPeers   []pstore.PeerInfo // *
36 37 38 39
	success       bool
}

// constructs query
40
func (dht *IpfsDHT) newQuery(k key.Key, f queryFunc) *dhtQuery {
41 42
	return &dhtQuery{
		key:         k,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
43
		dht:         dht,
44 45 46
		qfunc:       f,
		concurrency: maxQueryConcurrency,
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47 48 49 50 51 52 53
}

// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
54
type queryFunc func(context.Context, peer.ID) (*dhtQueryResult, error)
55 56

// Run runs the query at hand. pass in a list of peers to use first.
57
func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58 59 60 61 62 63
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	default:
	}

64 65 66
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

67 68
	runner := newQueryRunner(q)
	return runner.Run(ctx, peers)
69 70 71
}

type dhtQueryRunner struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72 73 74 75
	query          *dhtQuery        // query to run
	peersSeen      *pset.PeerSet    // all peers queried. prevent querying same peer 2x
	peersToQuery   *queue.ChanQueue // peers remaining to be queried
	peersRemaining todoctr.Counter  // peersToQuery + currently processing
76

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77
	result *dhtQueryResult // query result
78
	errs   u.MultiErr      // result errors. maybe should be a map[peer.ID]error
79

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80
	rateLimit chan struct{} // processing semaphore
Jeromy's avatar
Jeromy committed
81
	log       logging.EventLogger
82

83 84
	runCtx context.Context

85
	proc process.Process
86 87 88
	sync.RWMutex
}

89 90
func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
	proc := process.WithParent(process.Background())
91
	ctx := ctxproc.OnClosingContext(proc)
92 93
	return &dhtQueryRunner{
		query:          q,
94
		peersToQuery:   queue.NewChanQueue(ctx, queue.NewXORDistancePQ(string(q.key))),
95
		peersRemaining: todoctr.NewSyncCounter(),
96
		peersSeen:      pset.New(),
97
		rateLimit:      make(chan struct{}, q.concurrency),
98
		proc:           proc,
99 100 101
	}
}

102
func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
103
	r.log = log
104
	r.runCtx = ctx
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
105

106 107 108 109
	if len(peers) == 0 {
		log.Warning("Running query with no peers!")
		return nil, nil
	}
110

111 112 113
	// setup concurrency rate limiting
	for i := 0; i < r.query.concurrency; i++ {
		r.rateLimit <- struct{}{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
114 115
	}

116 117
	// add all the peers we got first.
	for _, p := range peers {
118
		r.addPeerToQuery(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
119 120
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
121
	// go do this thing.
122
	// do it as a child proc to make sure Run exits
123
	// ONLY AFTER spawn workers has exited.
124
	r.proc.Go(r.spawnWorkers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
125 126 127 128

	// so workers are working.

	// wait until they're done.
129
	err := routing.ErrNotFound
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
130

131 132 133
	// now, if the context finishes, close the proc.
	// we have to do it here because the logic before is setup, which
	// should run without closing the proc.
rht's avatar
rht committed
134
	ctxproc.CloseAfterContext(r.proc, ctx)
135

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
136
	select {
137
	case <-r.peersRemaining.Done():
138
		r.proc.Close()
139 140 141
		r.RLock()
		defer r.RUnlock()

142 143 144 145 146 147
		err = routing.ErrNotFound

		// if every query to every peer failed, something must be very wrong.
		if len(r.errs) > 0 && len(r.errs) == r.peersSeen.Size() {
			log.Debugf("query errs: %s", r.errs)
			err = r.errs[0]
148 149
		}

150
	case <-r.proc.Closed():
151 152
		r.RLock()
		defer r.RUnlock()
153
		err = context.DeadlineExceeded
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
154
	}
155

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
156 157
	if r.result != nil && r.result.success {
		return r.result, nil
158 159
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160
	return nil, err
161 162
}

163
func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) {
164
	// if new peer is ourselves...
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165
	if next == r.query.dht.self {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
166
		r.log.Debug("addPeerToQuery skip self")
167 168 169
		return
	}

170
	if !r.peersSeen.TryAdd(next) {
171 172 173
		return
	}

174 175 176 177 178
	notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
		Type: notif.AddingPeer,
		ID:   next,
	})

179 180 181
	r.peersRemaining.Increment(1)
	select {
	case r.peersToQuery.EnqChan <- next:
182
	case <-r.proc.Closing():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
183
	}
184 185
}

186
func (r *dhtQueryRunner) spawnWorkers(proc process.Process) {
187
	for {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
188

189 190 191 192
		select {
		case <-r.peersRemaining.Done():
			return

193
		case <-r.proc.Closing():
194 195
			return

Jeromy's avatar
Jeromy committed
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
		case <-r.rateLimit:
			select {
			case p, more := <-r.peersToQuery.DeqChan:
				if !more {
					return // channel closed.
				}

				// do it as a child func to make sure Run exits
				// ONLY AFTER spawn workers has exited.
				proc.Go(func(proc process.Process) {
					r.queryPeer(proc, p)
				})
			case <-r.proc.Closing():
				return
			case <-r.peersRemaining.Done():
				return
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
212
			}
213 214 215
		}
	}
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
216

217
func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
218 219
	// ok let's do this!

220
	// create a context from our proc.
221
	ctx := ctxproc.OnClosingContext(proc)
222

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
223 224 225 226 227 228 229 230
	// make sure we do this when we exit
	defer func() {
		// signal we're done proccessing peer p
		r.peersRemaining.Decrement(1)
		r.rateLimit <- struct{}{}
	}()

	// make sure we're connected to the peer.
231
	// FIXME abstract away into the network layer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
232
	if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 {
Jeromy's avatar
Jeromy committed
233
		log.Debug("not connected. dialing.")
234 235 236 237 238

		notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
			Type: notif.DialingPeer,
			ID:   p,
		})
239 240 241
		// while we dial, we do not take up a rate limit. this is to allow
		// forward progress during potentially very high latency dials.
		r.rateLimit <- struct{}{}
242

Jeromy's avatar
Jeromy committed
243
		pi := pstore.PeerInfo{ID: p}
244 245

		if err := r.query.dht.host.Connect(ctx, pi); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
246
			log.Debugf("Error connecting: %s", err)
247

248
			notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
249 250
				Type:  notif.QueryError,
				Extra: err.Error(),
251
				ID:    p,
252 253
			})

254 255 256
			r.Lock()
			r.errs = append(r.errs, err)
			r.Unlock()
257
			<-r.rateLimit // need to grab it again, as we deferred.
258 259
			return
		}
260
		<-r.rateLimit // need to grab it again, as we deferred.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
261
		log.Debugf("connected. dial success.")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
262
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
263

264
	// finally, run the query against this peer
265
	res, err := r.query.qfunc(ctx, p)
266 267

	if err != nil {
268
		log.Debugf("ERROR worker for: %v %v", p, err)
269 270 271 272 273
		r.Lock()
		r.errs = append(r.errs, err)
		r.Unlock()

	} else if res.success {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
274
		log.Debugf("SUCCESS worker for: %v %s", p, res)
275 276 277
		r.Lock()
		r.result = res
		r.Unlock()
278
		go r.proc.Close() // signal to everyone that we're done.
279
		// must be async, as we're one of the children, and Close blocks.
280

281 282
	} else if len(res.closerPeers) > 0 {
		log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
283
		for _, next := range res.closerPeers {
284 285 286 287 288
			if next.ID == r.query.dht.self { // dont add self.
				log.Debugf("PEERS CLOSER -- worker for: %v found self", p)
				continue
			}

289
			// add their addresses to the dialer's peerstore
Jeromy's avatar
Jeromy committed
290
			r.query.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
291
			r.addPeerToQuery(next.ID)
292
			log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
293
		}
294 295
	} else {
		log.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
296 297
	}
}