blob: bcf525fd93edda6bb4cc37c0e031d2d338d365c8 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package query
import (
"fmt"
"reflect"
"strconv"
"v.io/syncbase/v23/syncbase/nosql/internal/query/query_checker"
"v.io/syncbase/v23/syncbase/nosql/internal/query/query_functions"
"v.io/syncbase/v23/syncbase/nosql/internal/query/query_parser"
"v.io/syncbase/v23/syncbase/nosql/query_db"
"v.io/syncbase/v23/syncbase/nosql/syncql"
"v.io/v23/vdl"
)
type ResultStream interface {
Advance() bool
Result() []*vdl.Value
Err() error
Cancel()
}
func Exec(db query_db.Database, q string) ([]string, ResultStream, error) {
s, err := query_parser.Parse(db, q)
if err != nil {
return nil, nil, err
}
if err := query_checker.Check(db, s); err != nil {
return nil, nil, err
}
switch sel := (*s).(type) {
case query_parser.SelectStatement:
return execSelect(db, &sel)
default:
return nil, nil, syncql.NewErrExecOfUnknownStatementType(db.GetContext(), (*s).Offset(), reflect.TypeOf(*s).Name())
}
}
// Given a key, a value and a SelectClause, return the projection.
// This function is only called if Eval returned true on the WhereClause expression.
func ComposeProjection(db query_db.Database, k string, v *vdl.Value, s *query_parser.SelectClause) []*vdl.Value {
var projection []*vdl.Value
for _, selector := range s.Selectors {
switch selector.Type {
case query_parser.TypSelField:
// If field not found, nil is returned (as per specification).
f := ResolveField(db, k, v, selector.Field)
projection = append(projection, f)
case query_parser.TypSelFunc:
if selector.Function.Computed {
projection = append(projection, query_functions.ConvertFunctionRetValueToVdlValue(selector.Function.RetValue))
} else {
// need to exec function
// If error executing function, return nil (as per specification).
retValue, err := resolveArgsAndExecFunction(db, k, v, selector.Function)
if err != nil {
retValue = nil
}
projection = append(projection, query_functions.ConvertFunctionRetValueToVdlValue(retValue))
}
}
}
return projection
}
// For testing purposes, given a SelectStatement, k and v;
// return nil if row not selected, else return the projection (type []*vdl.Value).
// Note: limit and offset clauses are ignored for this function as they make no sense
// for a single row.
func ExecSelectSingleRow(db query_db.Database, k string, v *vdl.Value, s *query_parser.SelectStatement) []*vdl.Value {
if !Eval(db, k, v, s.Where.Expr) {
rs := []*vdl.Value{}
return rs
}
return ComposeProjection(db, k, v, s.Select)
}
type resultStreamImpl struct {
db query_db.Database
selectStatement *query_parser.SelectStatement
resultCount int64 // results served so far (needed for limit clause)
skippedCount int64 // skipped so far (needed for offset clause)
keyValueStream query_db.KeyValueStream
k string
v *vdl.Value
err error
}
func (rs *resultStreamImpl) Advance() bool {
if rs.selectStatement.Limit != nil && rs.resultCount >= rs.selectStatement.Limit.Limit.Value {
return false
}
for rs.keyValueStream.Advance() {
k, v := rs.keyValueStream.KeyValue()
// EvalWhereUsingOnlyKey
// INCLUDE: the row should be included in the results
// EXCLUDE: the row should NOT be included
// FETCH_VALUE: the value and/or type of the value are required to make determination.
rv := EvalWhereUsingOnlyKey(rs.db, rs.selectStatement, k)
var match bool
switch rv {
case INCLUDE:
match = true
case EXCLUDE:
match = false
case FETCH_VALUE:
match = Eval(rs.db, k, v, rs.selectStatement.Where.Expr)
}
if match {
if rs.selectStatement.ResultsOffset == nil || rs.selectStatement.ResultsOffset.ResultsOffset.Value <= rs.skippedCount {
rs.k = k
rs.v = v
rs.resultCount++
return true
} else {
rs.skippedCount++
}
}
}
if err := rs.keyValueStream.Err(); err != nil {
rs.err = syncql.NewErrKeyValueStreamError(rs.db.GetContext(), rs.selectStatement.Off, err)
}
return false
}
func (rs *resultStreamImpl) Result() []*vdl.Value {
return ComposeProjection(rs.db, rs.k, rs.v, rs.selectStatement.Select)
}
func (rs *resultStreamImpl) Err() error {
return rs.err
}
func (rs *resultStreamImpl) Cancel() {
rs.keyValueStream.Cancel()
}
func getColumnHeadings(s *query_parser.SelectStatement) []string {
columnHeaders := []string{}
for _, selector := range s.Select.Selectors {
columnName := ""
if selector.As != nil {
columnName = selector.As.AltName.Value
} else {
switch selector.Type {
case query_parser.TypSelField:
sep := ""
for _, segment := range selector.Field.Segments {
columnName = columnName + sep + segment.Value
for _, key := range segment.Keys {
columnName += getSegmentKeyAsHeading(key)
}
sep = "."
}
case query_parser.TypSelFunc:
columnName = selector.Function.Name
}
}
columnHeaders = append(columnHeaders, columnName)
}
return columnHeaders
}
// TODO(jkline): Should we really include key/index of a map/set/array/list in the header?
// The column names can get quite long. Perhaps just "[]" at the end of the segment
// would be better. The author of the query can always use the As clause to specify a
// better heading. Note: for functions, just the function name is included in the header.
// When a decision is made, it's best to be consistent for functions and key/indexes.
func getSegmentKeyAsHeading(segKey *query_parser.Operand) string {
val := "["
switch segKey.Type {
case query_parser.TypBigInt:
val += segKey.BigInt.String()
case query_parser.TypBigRat:
val += segKey.BigRat.String()
case query_parser.TypComplex:
val += fmt.Sprintf("%g", segKey.Complex)
case query_parser.TypField:
sep := ""
for _, segment := range segKey.Column.Segments {
val += sep + segment.Value
for _, key := range segment.Keys {
val += getSegmentKeyAsHeading(key)
}
sep = "."
}
case query_parser.TypBool:
val += strconv.FormatBool(segKey.Bool)
case query_parser.TypInt:
val += strconv.FormatInt(segKey.Int, 10)
case query_parser.TypFloat:
val += strconv.FormatFloat(segKey.Float, 'f', -1, 64)
case query_parser.TypFunction:
val += segKey.Function.Name
case query_parser.TypStr:
val += segKey.Str
case query_parser.TypTime:
val += segKey.Time.Format("Mon Jan 2 15:04:05 -0700 MST 2006")
case query_parser.TypNil:
val += "<nil>"
case query_parser.TypObject:
val += "<object>"
default:
val += "<?>"
}
val += "]"
return val
}
func execSelect(db query_db.Database, s *query_parser.SelectStatement) ([]string, ResultStream, error) {
keyValueStream, err := s.From.Table.DBTable.Scan(*query_checker.CompileKeyRanges(s.Where))
if err != nil {
return nil, nil, syncql.NewErrScanError(db.GetContext(), s.Off, err)
}
var resultStream resultStreamImpl
resultStream.db = db
resultStream.selectStatement = s
resultStream.keyValueStream = keyValueStream
return getColumnHeadings(s), &resultStream, nil
}