query.go 10.3 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1 2
package query

3
import (
4 5
	"time"

Jakub Sztandera's avatar
Jakub Sztandera committed
6
	goprocess "github.com/jbenet/goprocess"
7 8
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
/*
Query represents storage for any key-value pair.

tl;dr:

  queries are supported across datastores.
  Cheap on top of relational dbs, and expensive otherwise.
  Pick the right tool for the job!

In addition to the key-value store get and set semantics, datastore
provides an interface to retrieve multiple records at a time through
the use of queries. The datastore Query model gleans a common set of
operations performed when querying. To avoid pasting here years of
database research, let’s summarize the operations datastore supports.

Query Operations:

  * namespace - scope the query, usually by object type
  * filters - select a subset of values by applying constraints
  * orders - sort the results by applying sort conditions
  * limit - impose a numeric limit on the number of results
  * offset - skip a number of results (for efficient pagination)

datastore combines these operations into a simple Query class that allows
applications to define their constraints in a simple, generic, way without
introducing datastore specific calls, languages, etc.

Of course, different datastores provide relational query support across a
wide spectrum, from full support in traditional databases to none at all in
most key-value stores. Datastore aims to provide a common, simple interface
for the sake of application evolution over time and keeping large code bases
free of tool-specific code. It would be ridiculous to claim to support high-
performance queries on architectures that obviously do not. Instead, datastore
provides the interface, ideally translating queries to their native form
(e.g. into SQL for MySQL).

However, on the wrong datastore, queries can potentially incur the high cost
of performing the aforemantioned query operations on the data set directly in
Go. It is the client’s responsibility to select the right tool for the job:
pick a data storage solution that fits the application’s needs now, and wrap
it with a datastore implementation. As the needs change, swap out datastore
implementations to support your new use cases. Some applications, particularly
in early development stages, can afford to incurr the cost of queries on non-
relational databases (e.g. using a FSDatastore and not worry about a database
at all). When it comes time to switch the tool for performance, updating the
application code can be as simple as swapping the datastore in one place, not
all over the application code base. This gain in engineering time, both at
initial development and during later iterations, can significantly offset the
cost of the layer of abstraction.

*/
type Query struct {
61 62
	Prefix            string   // namespaces the query to results whose keys have Prefix
	Filters           []Filter // filter results. apply sequentially
63
	Orders            []Order  // order results. apply hierarchically
64 65 66 67
	Limit             int      // maximum number of results
	Offset            int      // skip given number of results
	KeysOnly          bool     // return only keys.
	ReturnExpirations bool     // return expirations (see TTLDatastore)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68 69 70 71
}

// Entry is a query result entry.
type Entry struct {
72 73 74
	Key        string    // cant be ds.Key because circular imports ...!!!
	Value      []byte    // Will be nil if KeysOnly has been passed.
	Expiration time.Time // Entry expiration timestamp if requested and supported (see TTLDatastore).
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
75 76
}

77
// Result is a special entry that includes an error, so that the client
78 79
// may be warned about internal errors. If Error is non-nil, Entry must be
// empty.
80 81
type Result struct {
	Entry
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
82

83
	Error error
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
84 85
}

86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
// Results is a set of Query results. This is the interface for clients.
// Example:
//
//   qr, _ := myds.Query(q)
//   for r := range qr.Next() {
//     if r.Error != nil {
//       // handle.
//       break
//     }
//
//     fmt.Println(r.Entry.Key, r.Entry.Value)
//   }
//
// or, wait on all results at once:
//
//   qr, _ := myds.Query(q)
//   es, _ := qr.Rest()
//   for _, e := range es {
//     	fmt.Println(e.Key, e.Value)
//   }
//
type Results interface {
108 109 110 111 112
	Query() Query             // the query these Results correspond to
	Next() <-chan Result      // returns a channel to wait for the next result
	NextSync() (Result, bool) // blocks and waits to return the next result, second paramter returns false when results are exhausted
	Rest() ([]Entry, error)   // waits till processing finishes, returns all entries at once.
	Close() error             // client may call Close to signal early exit
113 114 115 116 117 118 119

	// Process returns a goprocess.Process associated with these results.
	// most users will not need this function (Close is all they want),
	// but it's here in case you want to connect the results to other
	// goprocess-friendly things.
	Process() goprocess.Process
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120

121 122 123 124 125 126
// results implements Results
type results struct {
	query Query
	proc  goprocess.Process
	res   <-chan Result
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
127

128
func (r *results) Next() <-chan Result {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129 130 131
	return r.res
}

132 133 134 135 136
func (r *results) NextSync() (Result, bool) {
	val, ok := <-r.res
	return val, ok
}

137 138
func (r *results) Rest() ([]Entry, error) {
	var es []Entry
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
139
	for e := range r.res {
140 141 142 143
		if e.Error != nil {
			return es, e.Error
		}
		es = append(es, e.Entry)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
144
	}
145 146 147 148 149 150 151 152 153 154 155 156 157 158
	<-r.proc.Closed() // wait till the processing finishes.
	return es, nil
}

func (r *results) Process() goprocess.Process {
	return r.proc
}

func (r *results) Close() error {
	return r.proc.Close()
}

func (r *results) Query() Query {
	return r.query
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
159
}
160

161 162 163
// ResultBuilder is what implementors use to construct results
// Implementors of datastores and their clients must respect the
// Process of the Request:
164
//
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
//   * clients must call r.Process().Close() on an early exit, so
//     implementations can reclaim resources.
//   * if the Entries are read to completion (channel closed), Process
//     should be closed automatically.
//   * datastores must respect <-Process.Closing(), which intermediates
//     an early close signal from the client.
//
type ResultBuilder struct {
	Query   Query
	Process goprocess.Process
	Output  chan Result
}

// Results returns a Results to to this builder.
func (rb *ResultBuilder) Results() Results {
	return &results{
		query: rb.Query,
		proc:  rb.Process,
		res:   rb.Output,
	}
}

187
const NormalBufSize = 1
188 189
const KeysOnlyBufSize = 128

190
func NewResultBuilder(q Query) *ResultBuilder {
191
	bufSize := NormalBufSize
192 193 194
	if q.KeysOnly {
		bufSize = KeysOnlyBufSize
	}
195 196
	b := &ResultBuilder{
		Query:  q,
197
		Output: make(chan Result, bufSize),
198 199 200 201 202 203 204 205 206
	}
	b.Process = goprocess.WithTeardown(func() error {
		close(b.Output)
		return nil
	})
	return b
}

// ResultsWithChan returns a Results object from a channel
Steven Allen's avatar
Steven Allen committed
207 208 209 210
// of Result entries.
//
// DEPRECATED: This iterator is impossible to cancel correctly. Canceling it
// will leave anything trying to write to the result channel hanging.
211
func ResultsWithChan(q Query, res <-chan Result) Results {
Steven Allen's avatar
Steven Allen committed
212
	return ResultsWithProcess(q, func(worker goprocess.Process, out chan<- Result) {
213 214 215 216 217 218 219 220 221 222
		for {
			select {
			case <-worker.Closing(): // client told us to close early
				return
			case e, more := <-res:
				if !more {
					return
				}

				select {
Steven Allen's avatar
Steven Allen committed
223
				case out <- e:
224 225 226 227 228 229
				case <-worker.Closing(): // client told us to close early
					return
				}
			}
		}
	})
Steven Allen's avatar
Steven Allen committed
230 231 232 233 234 235 236 237 238 239 240
}

// ResultsWithProcess returns a Results object with the results generated by the
// passed subprocess.
func ResultsWithProcess(q Query, proc func(goprocess.Process, chan<- Result)) Results {
	b := NewResultBuilder(q)

	// go consume all the entries and add them to the results.
	b.Process.Go(func(worker goprocess.Process) {
		proc(worker, b.Output)
	})
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266

	go b.Process.CloseAfterChildren()
	return b.Results()
}

// ResultsWithEntries returns a Results object from a list of entries
func ResultsWithEntries(q Query, res []Entry) Results {
	b := NewResultBuilder(q)

	// go consume all the entries and add them to the results.
	b.Process.Go(func(worker goprocess.Process) {
		for _, e := range res {
			select {
			case b.Output <- Result{Entry: e}:
			case <-worker.Closing(): // client told us to close early
				return
			}
		}
		return
	})

	go b.Process.CloseAfterChildren()
	return b.Results()
}

func ResultsReplaceQuery(r Results, q Query) Results {
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
	switch r := r.(type) {
	case *results:
		// note: not using field names to make sure all fields are copied
		return &results{q, r.proc, r.res}
	case *resultsIter:
		// note: not using field names to make sure all fields are copied
		lr := r.legacyResults
		if lr != nil {
			lr = &results{q, lr.proc, lr.res}
		}
		return &resultsIter{q, r.next, r.close, lr}
	default:
		panic("unknown results type")
	}
}

//
// ResultFromIterator provides an alternative way to to construct
// results without the use of channels.
//

func ResultsFromIterator(q Query, iter Iterator) Results {
289 290 291
	if iter.Close == nil {
		iter.Close = noopClose
	}
292
	return &resultsIter{
293
		query: q,
294 295 296 297 298
		next:  iter.Next,
		close: iter.Close,
	}
}

299 300 301 302
func noopClose() error {
	return nil
}

303 304
type Iterator struct {
	Next  func() (Result, bool)
305
	Close func() error // note: might be called more than once
306 307 308 309 310 311 312 313 314 315
}

type resultsIter struct {
	query         Query
	next          func() (Result, bool)
	close         func() error
	legacyResults *results
}

func (r *resultsIter) Next() <-chan Result {
316
	r.useLegacyResults()
317 318 319 320
	return r.legacyResults.Next()
}

func (r *resultsIter) NextSync() (Result, bool) {
321 322 323 324 325 326 327 328
	if r.legacyResults != nil {
		return r.legacyResults.NextSync()
	} else {
		res, ok := r.next()
		if !ok {
			r.close()
		}
		return res, ok
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347
	}
}

func (r *resultsIter) Rest() ([]Entry, error) {
	var es []Entry
	for {
		e, ok := r.NextSync()
		if !ok {
			break
		}
		if e.Error != nil {
			return es, e.Error
		}
		es = append(es, e.Entry)
	}
	return es, nil
}

func (r *resultsIter) Process() goprocess.Process {
348
	r.useLegacyResults()
349 350 351 352 353
	return r.legacyResults.Process()
}

func (r *resultsIter) Close() error {
	if r.legacyResults != nil {
354 355 356
		return r.legacyResults.Close()
	} else {
		return r.close()
357 358 359 360 361 362 363
	}
}

func (r *resultsIter) Query() Query {
	return r.query
}

364 365 366 367 368 369
func (r *resultsIter) useLegacyResults() {
	if r.legacyResults != nil {
		return
	}

	b := NewResultBuilder(r.query)
370 371 372

	// go consume all the entries and add them to the results.
	b.Process.Go(func(worker goprocess.Process) {
373
		defer r.close()
374
		for {
375
			e, ok := r.next()
376 377 378 379 380 381 382 383 384 385 386 387 388
			if !ok {
				break
			}
			select {
			case b.Output <- e:
			case <-worker.Closing(): // client told us to close early
				return
			}
		}
		return
	})

	go b.Process.CloseAfterChildren()
389 390

	r.legacyResults = b.Results().(*results)
391
}