query.go 7.31 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2 3
package dht

import (
4 5
	"sync"

6
	key "github.com/ipfs/go-ipfs/blocks/key"
7 8 9 10 11 12 13 14
	notif "github.com/ipfs/go-ipfs/notifications"
	peer "github.com/ipfs/go-ipfs/p2p/peer"
	queue "github.com/ipfs/go-ipfs/p2p/peer/queue"
	"github.com/ipfs/go-ipfs/routing"
	eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
	u "github.com/ipfs/go-ipfs/util"
	pset "github.com/ipfs/go-ipfs/util/peerset"
	todoctr "github.com/ipfs/go-ipfs/util/todocounter"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15

16 17 18
	process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
	ctxproc "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
19 20
)

21
var maxQueryConcurrency = AlphaValue
22

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23
type dhtQuery struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24
	dht         *IpfsDHT
25
	key         key.Key   // the key we're querying for
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
26 27
	qfunc       queryFunc // the function to execute per peer
	concurrency int       // the concurrency parameter
28 29 30
}

type dhtQueryResult struct {
31 32 33 34
	value         []byte          // GetValue
	peer          peer.PeerInfo   // FindPeer
	providerPeers []peer.PeerInfo // GetProviders
	closerPeers   []peer.PeerInfo // *
35 36 37 38
	success       bool
}

// constructs query
39
func (dht *IpfsDHT) newQuery(k key.Key, f queryFunc) *dhtQuery {
40 41
	return &dhtQuery{
		key:         k,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
42
		dht:         dht,
43 44 45
		qfunc:       f,
		concurrency: maxQueryConcurrency,
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
46 47 48 49 50 51 52
}

// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
53
type queryFunc func(context.Context, peer.ID) (*dhtQueryResult, error)
54 55

// Run runs the query at hand. pass in a list of peers to use first.
56
func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
57 58 59 60 61 62
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	default:
	}

63 64 65
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

66 67
	runner := newQueryRunner(q)
	return runner.Run(ctx, peers)
68 69 70
}

type dhtQueryRunner struct {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71 72 73 74
	query          *dhtQuery        // query to run
	peersSeen      *pset.PeerSet    // all peers queried. prevent querying same peer 2x
	peersToQuery   *queue.ChanQueue // peers remaining to be queried
	peersRemaining todoctr.Counter  // peersToQuery + currently processing
75

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
76
	result *dhtQueryResult // query result
77
	errs   u.MultiErr      // result errors. maybe should be a map[peer.ID]error
78

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79 80
	rateLimit chan struct{} // processing semaphore
	log       eventlog.EventLogger
81

82
	proc process.Process
83 84 85
	sync.RWMutex
}

86 87 88
func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
	proc := process.WithParent(process.Background())
	ctx := ctxproc.WithProcessClosing(context.Background(), proc)
89 90 91 92
	return &dhtQueryRunner{
		query:          q,
		peersToQuery:   queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
		peersRemaining: todoctr.NewSyncCounter(),
93
		peersSeen:      pset.New(),
94
		rateLimit:      make(chan struct{}, q.concurrency),
95
		proc:           proc,
96 97 98
	}
}

99
func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
100 101
	r.log = log

102 103 104 105
	if len(peers) == 0 {
		log.Warning("Running query with no peers!")
		return nil, nil
	}
106

107 108 109
	// setup concurrency rate limiting
	for i := 0; i < r.query.concurrency; i++ {
		r.rateLimit <- struct{}{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
110 111
	}

112 113
	// add all the peers we got first.
	for _, p := range peers {
114
		r.addPeerToQuery(p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
115 116
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117
	// go do this thing.
118
	// do it as a child proc to make sure Run exits
119
	// ONLY AFTER spawn workers has exited.
120
	r.proc.Go(r.spawnWorkers)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
121 122 123 124

	// so workers are working.

	// wait until they're done.
125
	err := routing.ErrNotFound
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
126

127 128 129 130 131 132 133 134
	// now, if the context finishes, close the proc.
	// we have to do it here because the logic before is setup, which
	// should run without closing the proc.
	go func() {
		<-ctx.Done()
		r.proc.Close()
	}()

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
135
	select {
136
	case <-r.peersRemaining.Done():
137
		r.proc.Close()
138 139 140
		r.RLock()
		defer r.RUnlock()

141 142 143 144 145 146
		err = routing.ErrNotFound

		// if every query to every peer failed, something must be very wrong.
		if len(r.errs) > 0 && len(r.errs) == r.peersSeen.Size() {
			log.Debugf("query errs: %s", r.errs)
			err = r.errs[0]
147 148
		}

149
	case <-r.proc.Closed():
150 151
		r.RLock()
		defer r.RUnlock()
152
		err = context.DeadlineExceeded
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
153
	}
154

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
155 156
	if r.result != nil && r.result.success {
		return r.result, nil
157 158
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
159
	return nil, err
160 161
}

162
func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) {
163
	// if new peer is ourselves...
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
164
	if next == r.query.dht.self {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165
		r.log.Debug("addPeerToQuery skip self")
166 167 168
		return
	}

169
	if !r.peersSeen.TryAdd(next) {
170 171 172 173 174 175
		return
	}

	r.peersRemaining.Increment(1)
	select {
	case r.peersToQuery.EnqChan <- next:
176
	case <-r.proc.Closing():
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
177
	}
178 179
}

180
func (r *dhtQueryRunner) spawnWorkers(proc process.Process) {
181
	for {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
182

183 184 185 186
		select {
		case <-r.peersRemaining.Done():
			return

187
		case <-r.proc.Closing():
188 189
			return

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
190 191 192 193
		case p, more := <-r.peersToQuery.DeqChan:
			if !more {
				return // channel closed.
			}
194 195 196

			// do it as a child func to make sure Run exits
			// ONLY AFTER spawn workers has exited.
197 198
			proc.Go(func(proc process.Process) {
				r.queryPeer(proc, p)
199
			})
200 201 202
		}
	}
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
203

204
func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
205 206 207
	// make sure we rate limit concurrency.
	select {
	case <-r.rateLimit:
208
	case <-proc.Closing():
209 210 211 212
		r.peersRemaining.Decrement(1)
		return
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
213 214
	// ok let's do this!

215 216 217
	// create a context from our proc.
	ctx := ctxproc.WithProcessClosing(context.Background(), proc)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
218 219 220 221 222 223 224 225
	// make sure we do this when we exit
	defer func() {
		// signal we're done proccessing peer p
		r.peersRemaining.Decrement(1)
		r.rateLimit <- struct{}{}
	}()

	// make sure we're connected to the peer.
226
	// FIXME abstract away into the network layer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
227
	if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
228
		log.Infof("not connected. dialing.")
229 230 231
		// while we dial, we do not take up a rate limit. this is to allow
		// forward progress during potentially very high latency dials.
		r.rateLimit <- struct{}{}
232

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
233
		pi := peer.PeerInfo{ID: p}
234 235

		if err := r.query.dht.host.Connect(ctx, pi); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
236
			log.Debugf("Error connecting: %s", err)
237

238
			notif.PublishQueryEvent(ctx, &notif.QueryEvent{
239 240 241 242
				Type:  notif.QueryError,
				Extra: err.Error(),
			})

243 244 245
			r.Lock()
			r.errs = append(r.errs, err)
			r.Unlock()
246
			<-r.rateLimit // need to grab it again, as we deferred.
247 248
			return
		}
249
		<-r.rateLimit // need to grab it again, as we deferred.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
250
		log.Debugf("connected. dial success.")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
251
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
252

253
	// finally, run the query against this peer
254
	res, err := r.query.qfunc(ctx, p)
255 256

	if err != nil {
257
		log.Debugf("ERROR worker for: %v %v", p, err)
258 259 260 261 262
		r.Lock()
		r.errs = append(r.errs, err)
		r.Unlock()

	} else if res.success {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
263
		log.Debugf("SUCCESS worker for: %v %s", p, res)
264 265 266
		r.Lock()
		r.result = res
		r.Unlock()
267
		go r.proc.Close() // signal to everyone that we're done.
268
		// must be async, as we're one of the children, and Close blocks.
269

270 271
	} else if len(res.closerPeers) > 0 {
		log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
272
		for _, next := range res.closerPeers {
273 274 275 276 277
			if next.ID == r.query.dht.self { // dont add self.
				log.Debugf("PEERS CLOSER -- worker for: %v found self", p)
				continue
			}

278
			// add their addresses to the dialer's peerstore
279
			r.query.dht.peerstore.AddAddrs(next.ID, next.Addrs, peer.TempAddrTTL)
280
			r.addPeerToQuery(next.ID)
281
			log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
282
		}
283 284
	} else {
		log.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
285 286
	}
}