| package query |
| |
| import ( |
| "fmt" |
| "math/big" |
| "reflect" |
| "runtime" |
| "sort" |
| "strings" |
| "sync" |
| "veyron2/vlog" |
| |
| vsync "veyron/runtimes/google/lib/sync" |
| "veyron/services/store/memstore/state" |
| "veyron/services/store/service" |
| |
| "veyron2/naming" |
| "veyron2/query" |
| "veyron2/query/parse" |
| "veyron2/security" |
| "veyron2/services/store" |
| "veyron2/storage" |
| "veyron2/vdl/vdlutil" |
| ) |
| |
| // maxChannelSize is the maximum size of the channels used for concurrent |
| // query evaluation. |
| const maxChannelSize = 100 |
| |
| // evalIterator implements service.QueryStream. |
| type evalIterator struct { |
| // mu guards 'result', 'err', and the closing of 'abort'. |
| mu sync.Mutex |
| // result is what Get will return. It will be nil if there are no more |
| // query results. Guarded by mu. |
| result *store.QueryResult |
| // err is the first error encountered during query evaluation. |
| // Guarded by mu. |
| err error |
| // abort is used as the signal to query evaluation to terminate early. |
| // evaluator implementations will test for abort closing. The close() |
| // call is guarded by mu. |
| abort chan struct{} |
| |
| // results represents a stack of result channels. evalIterator is |
| // essentially an iterator of iterators. Each subquery adds a nestedChannel |
| // to this stack. The top of the stack is the end of the slice. |
| results []nestedChannel |
| // cleanup is used for testing to ensure that no goroutines are leaked. |
| cleanup vsync.WaitGroup |
| // maxNesting is the largest value used for nestedChannel.nesting. It |
| // is the maximum nesting over the duration of the query while len(results) |
| // is just the instantaneous nesting. |
| maxNesting int |
| } |
| |
| // nestedChannel contains the results of a subquery. |
| type nestedChannel struct { |
| // nesting is the value to use for the NestedResult field of all |
| // objects that come out of the results channel. |
| nesting int |
| // results is the stream of results for this subquery. |
| results <-chan *store.QueryResult |
| } |
| |
| // hiddenResult wraps a value so evalIterator can elide it from |
| // storage.QueryResult.Fields that are sent to the client. |
| type hiddenResult struct { |
| value vdlutil.Any |
| } |
| |
| // Next implements the QueryStream method. |
| func (it *evalIterator) Next() bool { |
| it.mu.Lock() |
| if it.err != nil { |
| it.mu.Unlock() |
| return false |
| } |
| it.mu.Unlock() |
| |
| depth := len(it.results) - 1 |
| select { |
| case result, ok := <-it.results[depth].results: |
| if !ok { |
| it.results = it.results[:depth] |
| if len(it.results) > 0 { |
| return it.Next() |
| } |
| return false |
| } |
| |
| // Set the correct value for NestedResult. |
| result.NestedResult = store.NestedResult(it.results[depth].nesting) |
| |
| it.enqueueNestedChannels(result) |
| |
| it.mu.Lock() |
| defer it.mu.Unlock() |
| it.result = result |
| return true |
| case <-it.abort: |
| return false |
| } |
| } |
| |
| // enqueueNestedChannels looks through result.Fields for nested result |
| // channels. If there are any, they are appended to it.results. We use |
| // the result.Fields key to append them in reverse alphabetical order. |
| // We use reverse alphabetical order because it.results is a stack-- |
| // we want to process them in alphabetical order. |
| // |
| // enqueueNestedChannels also removes any result.Fields that are of the |
| // type hiddenResult. |
| func (it *evalIterator) enqueueNestedChannels(result *store.QueryResult) { |
| if result.Fields == nil { |
| return |
| } |
| var nestingKeys []string |
| for key, val := range result.Fields { |
| switch val.(type) { |
| case chan *store.QueryResult: |
| nestingKeys = append(nestingKeys, key) |
| case hiddenResult: |
| // If a field is "hidden", the value will be wrapped in the type |
| // hiddenResult to make it possible for evalIterator to elide it |
| // from the results sent to the client. |
| delete(result.Fields, key) |
| } |
| } |
| // Figure out the store.NestedResult values based on alphabetical order of |
| // the keys. |
| sort.Sort(sort.StringSlice(nestingKeys)) |
| var nested []nestedChannel |
| for _, key := range nestingKeys { |
| it.maxNesting++ |
| nested = append(nested, |
| nestedChannel{it.maxNesting, result.Fields[key].(chan *store.QueryResult)}) |
| result.Fields[key] = store.NestedResult(it.maxNesting) |
| } |
| // Add the nested result channels in reverse alphabetical order. |
| for i := len(nested) - 1; i >= 0; i-- { |
| it.results = append(it.results, nested[i]) |
| } |
| } |
| |
| // Get implements the QueryStream method. |
| func (it *evalIterator) Get() *store.QueryResult { |
| it.mu.Lock() |
| defer it.mu.Unlock() |
| return it.result |
| } |
| |
| // Abort implements the QueryStream method. |
| func (it *evalIterator) Abort() { |
| it.mu.Lock() |
| defer it.mu.Unlock() |
| select { |
| case <-it.abort: |
| // Already closed. |
| default: |
| close(it.abort) |
| } |
| it.result = nil |
| } |
| |
| // Err implements the QueryStream method. |
| func (it *evalIterator) Err() error { |
| it.mu.Lock() |
| defer it.mu.Unlock() |
| return it.err |
| } |
| |
| func (it *evalIterator) setErrorf(format string, args ...interface{}) { |
| it.setError(fmt.Errorf(format, args...)) |
| } |
| |
| func (it *evalIterator) setError(err error) { |
| it.mu.Lock() |
| if it.err == nil { |
| it.err = err |
| } |
| it.mu.Unlock() |
| it.Abort() |
| } |
| |
| // wait blocks until all children goroutines are finished. This is useful in |
| // tests to ensure that an abort cleans up correctly. |
| func (it *evalIterator) wait() { |
| it.cleanup.Wait() |
| } |
| |
| func newErrorIterator(err error) service.QueryStream { |
| return &evalIterator{ |
| err: err, |
| abort: make(chan struct{}), |
| } |
| } |
| |
| // Eval evaluates a query and returns a QueryStream for the results. If there is |
| // an error parsing the query, it will show up as an error in the QueryStream. |
| // Query evaluation is concurrent, so it is important to call QueryStream.Abort |
| // if the client does not consume all of the results. |
| func Eval(sn state.Snapshot, clientID security.PublicID, name storage.PathName, q query.Query) service.QueryStream { |
| ast, err := parse.Parse(q) |
| if err != nil { |
| return newErrorIterator(err) |
| } |
| evaluator, err := convert(ast) |
| if err != nil { |
| return newErrorIterator(err) |
| } |
| |
| // Seed the input with the root entity. |
| in := make(chan *store.QueryResult, 1) |
| in <- &store.QueryResult{Name: ""} |
| close(in) |
| |
| out := make(chan *store.QueryResult, maxChannelSize) |
| it := &evalIterator{ |
| results: []nestedChannel{nestedChannel{0, out}}, |
| abort: make(chan struct{}), |
| } |
| if !it.cleanup.TryAdd() { |
| // The query has been aborted by a call to Cancel. |
| return it |
| } |
| go evaluator.eval(&context{ |
| sn: sn, |
| suffix: name.String(), |
| clientID: clientID, |
| in: in, |
| out: out, |
| evalIt: it, |
| abort: it.abort, |
| cleanup: &it.cleanup, |
| }) |
| |
| return it |
| } |
| |
| // context is a wrapper of all the variables that need to be passed around |
| // during evaluation. |
| type context struct { |
| // sn is the snapshot of the store's state to use to find query results. |
| sn state.Snapshot |
| // suffix is the suffix we're evaluating relative to. |
| suffix string |
| // clientID is the identity of the client that issued the query. |
| clientID security.PublicID |
| // in produces the intermediate results from the previous stage of the |
| // query. It will be closed when the evaluator should stop processing |
| // results. It is not necessary to select on 'in' and 'errc'. |
| in <-chan *store.QueryResult |
| // out is where the evaluator should write the intermediate results. |
| // evaluators should use context.emit instead of writing directly |
| // to out. |
| out chan<- *store.QueryResult |
| // evalIt is the iterator that interfaces with the client. It is here |
| // to allow the evaluation code to propagate errors via setError(). |
| evalIt *evalIterator |
| // abort will be closed if query evaluation should terminate early. |
| // evaluator implementations should regularly test if it is still open. |
| abort chan struct{} |
| // cleanup is used for testing to ensure that no goroutines are leaked. |
| // evaluator.eval implementations should call Done when finished processing. |
| cleanup *vsync.WaitGroup |
| } |
| |
| // emit sends result on c.out. It is careful to watch for aborts. result can be |
| // nil. Returns true if the caller should continue iterating, returns |
| // false if it is time to abort. |
| func (c *context) emit(result *store.QueryResult) bool { |
| vlog.VI(2).Info("emit", result) |
| if result == nil { |
| // Check for an abort before continuing iteration. |
| select { |
| case <-c.abort: |
| return false |
| default: |
| return true |
| } |
| } else { |
| // If c.out is full, we don't want to block on it forever and ignore |
| // aborts. |
| select { |
| case <-c.abort: |
| return false |
| case c.out <- result: |
| return true |
| } |
| } |
| } |
| |
| // evaluator is a node in the query evaluation flow. It takes intermediate |
| // results produced by the previous node and produces a new set of results. |
| type evaluator interface { |
| // eval does the work or processing intermediate results to produce a new |
| // set of results. It is expected that the client run eval in its own |
| // goroutine (i.e. "go eval(ctxt)"). |
| eval(c *context) |
| |
| // singleResult returns true if this evaluator returns a single result |
| // (e.g. an aggregate or a specific field). This is useful in selection |
| // because we want to unbox those results. For example, |
| // "teams/* | { players/* | count as numplayers }" should return |
| // { Name: "teams/cardinals", Fields: {"numplayers": 5}} |
| // and not |
| // { Name: "teams/cardinals", Fields: {"numplayers": [{Name: "numplayers", Value: 5}]}} |
| singleResult() bool |
| |
| // name returns a relative Object name that is appropriate for the query |
| // results produced by this evaluator. |
| name() string |
| } |
| |
| // convert transforms the AST produced by parse.Parse into an AST that supports |
| // evaluation specific to memstore. This transformation should not produce |
| // any errors since we know all of the types that parse.Parse can produce. |
| // Just in case one was overlooked, we use the panic/recover idiom to handle |
| // unexpected errors. The conversion functions in the remainder of this file |
| // do not return errors. Instead, they are allowed to panic, and this function |
| // will recover. |
| func convert(q parse.Pipeline) (ev evaluator, err error) { |
| defer func() { |
| if r := recover(); r != nil { |
| if _, ok := r.(runtime.Error); ok { |
| panic(r) |
| } |
| ev = nil |
| err = r.(error) |
| } |
| }() |
| return convertPipeline(q), nil |
| } |
| |
| // convertPipeline transforms a parse.Pipeline into an evaluator. |
| func convertPipeline(q parse.Pipeline) evaluator { |
| switch q := q.(type) { |
| case *parse.PipelineName: |
| return &nameEvaluator{q.WildcardName, q.Pos} |
| case *parse.PipelineType: |
| return &typeEvaluator{convertPipeline(q.Src), q.Type, q.Pos} |
| case *parse.PipelineFilter: |
| return &filterEvaluator{convertPipeline(q.Src), convertPredicate(q.Pred), q.Pos} |
| case *parse.PipelineSelection: |
| return convertSelection(q) |
| case *parse.PipelineFunc: |
| return convertPipelineFunc(q) |
| default: |
| panic(fmt.Errorf("unexpected type %T", q)) |
| } |
| } |
| |
| // nameEvaluator is the evaluator version of parse.PipelineName. |
| type nameEvaluator struct { |
| wildcardName *parse.WildcardName |
| |
| // pos specifies where in the query string this component started. |
| pos parse.Pos |
| } |
| |
| // eval implements the evaluator method. |
| func (e *nameEvaluator) eval(c *context) { |
| defer c.cleanup.Done() |
| defer close(c.out) |
| |
| for result := range c.in { |
| basepath := naming.Join(result.Name, e.wildcardName.VName) |
| path := storage.ParsePath(naming.Join(c.suffix, basepath)) |
| vlog.VI(2).Infof("nameEvaluator suffix: %s, result.Name: %s, VName: %s", |
| c.suffix, result.Name, e.wildcardName.VName) |
| for it := c.sn.NewIterator(c.clientID, path, |
| state.ListObjects, state.ImmediateFilter); it.IsValid(); it.Next() { |
| |
| entry := it.Get() |
| result := &store.QueryResult{ |
| Name: naming.Join(basepath, it.Name()), |
| Value: entry.Value, |
| } |
| if !c.emit(result) { |
| return |
| } |
| if e.singleResult() { |
| return |
| } |
| } |
| } |
| } |
| |
| // singleResult implements the evaluator method. |
| func (e *nameEvaluator) singleResult() bool { |
| return e.wildcardName.Exp == parse.Self |
| } |
| |
| // name implements the evaluator method. |
| func (e *nameEvaluator) name() string { |
| return e.wildcardName.VName |
| } |
| |
| // startSource creates a goroutine for src.eval(). It returns the |
| // output channel for src. |
| func startSource(c *context, src evaluator) chan *store.QueryResult { |
| srcOut := make(chan *store.QueryResult, maxChannelSize) |
| srcContext := context{ |
| sn: c.sn, |
| suffix: c.suffix, |
| clientID: c.clientID, |
| in: c.in, |
| out: srcOut, |
| evalIt: c.evalIt, |
| abort: c.abort, |
| cleanup: c.cleanup, |
| } |
| if !c.cleanup.TryAdd() { |
| // The query has been aborted by a call to Cancel. |
| close(srcOut) |
| return srcOut |
| } |
| go src.eval(&srcContext) |
| return srcOut |
| } |
| |
| // typeEvaluator is the evaluator version of parse.PipelineType. |
| type typeEvaluator struct { |
| // src produces the results to be filtered by type. |
| src evaluator |
| // ty restricts the results to a specific type of object. |
| ty string |
| // Pos specifies where in the query string this component started. |
| pos parse.Pos |
| } |
| |
| // eval implements the evaluator method. |
| func (e *typeEvaluator) eval(c *context) { |
| defer c.cleanup.Done() |
| defer close(c.out) |
| |
| for result := range startSource(c, e.src) { |
| vlog.VI(2).Info("typeEvaluator", result) |
| if val := reflect.ValueOf(result.Value); e.ty != val.Type().Name() { |
| continue |
| } |
| if !c.emit(result) { |
| return |
| } |
| } |
| } |
| |
| // singleResult implements the evaluator method. |
| func (e *typeEvaluator) singleResult() bool { |
| return false |
| } |
| |
| // name implements the evaluator method. |
| func (e *typeEvaluator) name() string { |
| return e.src.name() |
| } |
| |
| // filterEvaluator is the evaluator version of parse.PipelineFilter. |
| type filterEvaluator struct { |
| // src produces intermediate results that will be filtered by pred. |
| src evaluator |
| // pred determines whether an intermediate result produced by src should be |
| // filtered out. |
| pred predicate |
| // pos specifies where in the query string this component started. |
| pos parse.Pos |
| } |
| |
| // eval implements the evaluator method. |
| func (e *filterEvaluator) eval(c *context) { |
| defer c.cleanup.Done() |
| defer close(c.out) |
| |
| for result := range startSource(c, e.src) { |
| vlog.VI(2).Info("filterEvaluator", result) |
| if e.pred.match(c, result) { |
| if !c.emit(result) { |
| return |
| } |
| } |
| } |
| } |
| |
| // singleResult implements the evaluator method. |
| func (e *filterEvaluator) singleResult() bool { |
| return false |
| } |
| |
| // name implements the evaluator method. |
| func (e *filterEvaluator) name() string { |
| return e.src.name() |
| } |
| |
| // convertSelection transforms a parse.PipelineSelection into a |
| // selectionEvaluator. |
| func convertSelection(p *parse.PipelineSelection) evaluator { |
| e := &selectionEvaluator{ |
| src: convertPipeline(p.Src), |
| subpipelines: make([]alias, len(p.SubPipelines), len(p.SubPipelines)), |
| pos: p.Pos, |
| } |
| for i, a := range p.SubPipelines { |
| e.subpipelines[i] = alias{convertPipeline(a.Pipeline), a.Alias, a.Hidden} |
| } |
| return e |
| } |
| |
| // alias is the evaluator version of parse.Alias. It represents a pipeline |
| // that has an alternate name inside of a selection using the 'as' keyword. |
| type alias struct { |
| // evaluator is the evaluator to be aliased. |
| evaluator evaluator |
| // alias is the new name for the output of evaluator. |
| alias string |
| // hidden is true if this field in the selection should not be included |
| // in the results sent to the client. |
| hidden bool |
| } |
| |
| // selectionEvaluator is the evaluator version of parse.PipelineSelection. |
| type selectionEvaluator struct { |
| // src produces intermediate results on which to select. |
| src evaluator |
| // subpipelines is the list of pipelines to run for each result produced |
| // by src. |
| subpipelines []alias |
| // pos specifies where in the query string this component started. |
| pos parse.Pos |
| } |
| |
| // eval implements the evaluator method. |
| func (e *selectionEvaluator) eval(c *context) { |
| defer c.cleanup.Done() |
| defer close(c.out) |
| |
| for result := range startSource(c, e.src) { |
| if !e.processSubpipelines(c, result) { |
| return |
| } |
| } |
| } |
| |
| func (e *selectionEvaluator) processSubpipelines(c *context, result *store.QueryResult) bool { |
| vlog.VI(2).Info("selection: ", result.Name) |
| sel := &store.QueryResult{ |
| Name: result.Name, |
| Fields: make(map[string]vdlutil.Any), |
| } |
| for _, a := range e.subpipelines { |
| // We create a new channel for each intermediate result, so there's no need to |
| // create a large buffer. |
| in := make(chan *store.QueryResult, 1) |
| in <- result |
| close(in) |
| out := make(chan *store.QueryResult, maxChannelSize) |
| ctxt := &context{ |
| sn: c.sn, |
| suffix: c.suffix, |
| clientID: c.clientID, |
| in: in, |
| out: out, |
| evalIt: c.evalIt, |
| abort: c.abort, |
| cleanup: c.cleanup, |
| } |
| if !c.cleanup.TryAdd() { |
| // The query has been aborted by a call to Cancel. |
| return false |
| } |
| go a.evaluator.eval(ctxt) |
| |
| // If the subpipeline would produce a single result, use that single result |
| // as the field value. Otherwise, put the channel as the field value and let |
| // evalIterator do the right thing with the sub-results. |
| var value interface{} |
| if a.evaluator.singleResult() { |
| select { |
| case <-c.abort: |
| return false |
| case sub, ok := <-out: |
| if ok { |
| value = sub.Value |
| } |
| } |
| } else { |
| value = out |
| } |
| |
| if a.alias != "" { |
| if a.hidden { |
| // If a field is "hidden", the value will be wrapped in the type |
| // hiddenResult to make it possible for evalIterator to elide it |
| // from the results sent to the client. |
| value = hiddenResult{value} |
| } |
| sel.Fields[a.alias] = value |
| } else { |
| sel.Fields[a.evaluator.name()] = value |
| } |
| } |
| return c.emit(sel) |
| } |
| |
| // singleResult implements the evaluator method. |
| func (e *selectionEvaluator) singleResult() bool { |
| return false |
| } |
| |
| // name implements the evaluator method. |
| func (e *selectionEvaluator) name() string { |
| return e.src.name() |
| } |
| |
| // convertPipelineFunc transforms a parse.PipelineFunc into a funcEvaluator. |
| func convertPipelineFunc(p *parse.PipelineFunc) evaluator { |
| args := make([]expr, len(p.Args), len(p.Args)) |
| for i, a := range p.Args { |
| args[i] = convertExpr(a) |
| } |
| src := convertPipeline(p.Src) |
| switch p.FuncName { |
| case "sort": |
| if src.singleResult() { |
| panic(fmt.Errorf("found aggregate function at %v, sort expects multiple results")) |
| } |
| return &funcSortEvaluator{ |
| src: convertPipeline(p.Src), |
| args: args, |
| pos: p.Pos, |
| } |
| default: |
| panic(fmt.Errorf("unknown function %s at Pos %v", p.FuncName, p.Pos)) |
| } |
| } |
| |
| type funcSortEvaluator struct { |
| // src produces intermediate results that will be transformed by func. |
| src evaluator |
| // args is the list of arguments passed to the function. |
| args []expr |
| // pos specifies where in the query string this component started. |
| pos parse.Pos |
| } |
| |
| func (e *funcSortEvaluator) eval(c *context) { |
| defer c.cleanup.Done() |
| defer close(c.out) |
| srcOut := startSource(c, e.src) |
| |
| sorter := argsSorter{e, c, nil} |
| for result := range srcOut { |
| sorter.results = append(sorter.results, result) |
| } |
| sort.Sort(sorter) |
| for _, result := range sorter.results { |
| if !c.emit(result) { |
| return |
| } |
| } |
| } |
| |
| // singleResult implements the evaluator method. |
| func (e *funcSortEvaluator) singleResult() bool { |
| // During construction, we tested that e.src is not singleResult. |
| return false |
| } |
| |
| // name implements the evaluator method. |
| func (e *funcSortEvaluator) name() string { |
| // A sorted resultset is still the same as the original resultset, so it |
| // should have the same name. |
| return e.src.name() |
| } |
| |
| // argsSorter implements sort.Interface to sort results by e.args. |
| type argsSorter struct { |
| e *funcSortEvaluator |
| c *context |
| results []*store.QueryResult |
| } |
| |
| func (a argsSorter) Len() int { return len(a.results) } |
| func (a argsSorter) Swap(i, j int) { a.results[i], a.results[j] = a.results[j], a.results[i] } |
| func (a argsSorter) Less(i, j int) bool { |
| for n, arg := range a.e.args { |
| // Normally, exprUnary only supports numeric operands. As part of a sort |
| // expression however, it is possible to negate a string operand to |
| // cause a descending sort. |
| ascending := true |
| unaryArg, ok := arg.(*exprUnary) |
| if ok { |
| // Remove the +/- operator. |
| arg = unaryArg.operand |
| ascending = unaryArg.op == parse.OpPos |
| } |
| ival := arg.value(a.c, a.results[i]) |
| jval := arg.value(a.c, a.results[j]) |
| res, err := compare(a.c, ival, jval) |
| if err != nil { |
| a.c.evalIt.setErrorf("error while sorting (Pos %v Arg: %d) left: %s, right: %s; %v", |
| a.e.pos, n, a.results[i].Name, a.results[j].Name, err) |
| return false |
| } |
| if res == 0 { |
| continue |
| } |
| if ascending { |
| return res < 0 |
| } else { |
| return res > 0 |
| } |
| } |
| // Break ties based on name to get a deterministic order. |
| return a.results[i].Name < a.results[j].Name |
| } |
| |
| // predicate determines whether an intermediate query result should be |
| // filtered out. |
| type predicate interface { |
| match(c *context, e *store.QueryResult) bool |
| } |
| |
| // convertPredicate transforms a parse.Predicate into a predicate. |
| func convertPredicate(p parse.Predicate) predicate { |
| switch p := p.(type) { |
| case *parse.PredicateBool: |
| return &predicateBool{p.Bool, p.Pos} |
| case *parse.PredicateCompare: |
| switch p.Comp { |
| case parse.CompEQ, parse.CompNE, parse.CompLT, parse.CompGT, parse.CompLE, parse.CompGE: |
| default: |
| panic(fmt.Errorf("unknown comparator %d at %v", p.Comp, p.Pos)) |
| } |
| return &predicateCompare{convertExpr(p.LHS), convertExpr(p.RHS), p.Comp, p.Pos} |
| case *parse.PredicateAnd: |
| return &predicateAnd{convertPredicate(p.LHS), convertPredicate(p.RHS), p.Pos} |
| case *parse.PredicateOr: |
| return &predicateOr{convertPredicate(p.LHS), convertPredicate(p.RHS), p.Pos} |
| case *parse.PredicateNot: |
| return &predicateNot{convertPredicate(p.Pred), p.Pos} |
| // TODO(kash): Support parse.PredicateFunc. |
| default: |
| panic(fmt.Errorf("unexpected type %T", p)) |
| } |
| } |
| |
| // predicateBool represents a boolean literal. |
| type predicateBool struct { |
| b bool |
| pos parse.Pos |
| } |
| |
| // match implements the predicate method. |
| func (p *predicateBool) match(c *context, e *store.QueryResult) bool { |
| return p.b |
| } |
| |
| // predicateCompare handles the comparison on two expressions. |
| type predicateCompare struct { |
| // lhs is the left-hand-side of the comparison. |
| lhs expr |
| // rhs is the right-hand-side of the comparison. |
| rhs expr |
| // comp specifies the operator to use in the comparison. |
| comp parse.Comparator |
| // pos specifies where in the query string this component started. |
| pos parse.Pos |
| } |
| |
| // match implements the predicate method. |
| func (p *predicateCompare) match(c *context, result *store.QueryResult) bool { |
| lval := p.lhs.value(c, result) |
| rval := p.rhs.value(c, result) |
| |
| res, err := compare(c, lval, rval) |
| if err != nil { |
| c.evalIt.setErrorf("error while evaluating predicate (Pos %v) for name '%s': %v", |
| p.pos, result.Name, err) |
| return false |
| } |
| switch p.comp { |
| case parse.CompEQ: |
| return res == 0 |
| case parse.CompNE: |
| return res != 0 |
| case parse.CompLT: |
| return res < 0 |
| case parse.CompGT: |
| return res > 0 |
| case parse.CompLE: |
| return res <= 0 |
| case parse.CompGE: |
| return res >= 0 |
| default: |
| c.evalIt.setErrorf("unknown comparator %d at Pos %v", p.comp, p.pos) |
| return false |
| } |
| } |
| |
| // compare returns a negative number if lval is less than rval, 0 if they are |
| // equal, and a positive number if lval is greater than rval. |
| func compare(c *context, lval, rval interface{}) (int, error) { |
| switch lval := lval.(type) { |
| case string: |
| rval, ok := rval.(string) |
| if !ok { |
| return 0, fmt.Errorf("type mismatch; left: %T, right: %T", lval, rval) |
| } |
| if lval < rval { |
| return -1, nil |
| } else if lval > rval { |
| return 1, nil |
| } else { |
| return 0, nil |
| } |
| case *big.Rat: |
| switch rval := rval.(type) { |
| case *big.Rat: |
| return lval.Cmp(rval), nil |
| case *big.Int: |
| return lval.Cmp(new(big.Rat).SetInt(rval)), nil |
| case int, int8, int16, int32, int64: |
| return lval.Cmp(new(big.Rat).SetInt64(toInt64(rval))), nil |
| case uint, uint8, uint16, uint32, uint64: |
| // It is not possible to convert to a big.Rat from an unsigned. Need to |
| // go through big.Int first. |
| return lval.Cmp(new(big.Rat).SetInt(new(big.Int).SetUint64(toUint64(rval)))), nil |
| } |
| case *big.Int: |
| switch rval := rval.(type) { |
| case *big.Rat: |
| return new(big.Rat).SetInt(lval).Cmp(rval), nil |
| case *big.Int: |
| return lval.Cmp(rval), nil |
| case int, int8, int16, int32, int64: |
| return lval.Cmp(big.NewInt(toInt64(rval))), nil |
| case uint, uint8, uint16, uint32, uint64: |
| return lval.Cmp(new(big.Int).SetUint64(toUint64(rval))), nil |
| } |
| case int, int8, int16, int32, int64: |
| switch rval := rval.(type) { |
| case *big.Rat: |
| return new(big.Rat).SetInt64(toInt64(lval)).Cmp(rval), nil |
| case *big.Int: |
| return new(big.Int).SetInt64(toInt64(lval)).Cmp(rval), nil |
| case int, int8, int16, int32, int64: |
| lint, rint := toInt64(lval), toInt64(rval) |
| if lint < rint { |
| return -1, nil |
| } else if lint > rint { |
| return 1, nil |
| } else { |
| return 0, nil |
| } |
| case uint, uint8, uint16, uint32, uint64: |
| lint, rint := toUint64(lval), toUint64(rval) |
| if lint < rint { |
| return -1, nil |
| } else if lint > rint { |
| return 1, nil |
| } else { |
| return 0, nil |
| } |
| } |
| } |
| return 0, fmt.Errorf("unexpected type %T", lval) |
| } |
| |
| func toInt64(i interface{}) int64 { |
| switch i := i.(type) { |
| case int: |
| return int64(i) |
| case int8: |
| return int64(i) |
| case int16: |
| return int64(i) |
| case int32: |
| return int64(i) |
| case int64: |
| return int64(i) |
| default: |
| panic(fmt.Errorf("unexpected type %T", i)) |
| } |
| } |
| |
| func toUint64(i interface{}) uint64 { |
| switch i := i.(type) { |
| case uint: |
| return uint64(i) |
| case uint8: |
| return uint64(i) |
| case uint16: |
| return uint64(i) |
| case uint32: |
| return uint64(i) |
| case uint64: |
| return uint64(i) |
| default: |
| panic(fmt.Errorf("unexpected type %T", i)) |
| } |
| } |
| |
| // predicateAnd is a predicate that is the logical conjunction of two |
| // predicates. |
| type predicateAnd struct { |
| // lhs is the left-hand-side of the conjunction. |
| lhs predicate |
| // rhs is the right-hand-side of the conjunction. |
| rhs predicate |
| // pos specifies where in the query string this component started. |
| pos parse.Pos |
| } |
| |
| // match implements the predicate method. |
| func (p *predicateAnd) match(c *context, result *store.QueryResult) bool { |
| // Short circuit to avoid extra processing. |
| if !p.lhs.match(c, result) { |
| return false |
| } |
| return p.rhs.match(c, result) |
| } |
| |
| // predicateAnd is a predicate that is the logical disjunction of two |
| // predicates. |
| type predicateOr struct { |
| // lhs is the left-hand-side of the disjunction. |
| lhs predicate |
| // rhs is the right-hand-side of the disjunction. |
| rhs predicate |
| // pos specifies where in the query string this component started. |
| pos parse.Pos |
| } |
| |
| // match implements the predicate method. |
| func (p *predicateOr) match(c *context, result *store.QueryResult) bool { |
| // Short circuit to avoid extra processing. |
| if p.lhs.match(c, result) { |
| return true |
| } |
| return p.rhs.match(c, result) |
| } |
| |
| // predicateAnd is a predicate that is the logical negation of another |
| // predicate. |
| type predicateNot struct { |
| // pred is the predicate to be negated. |
| pred predicate |
| // pos specifies where in the query string this component started. |
| pos parse.Pos |
| } |
| |
| // match implements the predicate method. |
| func (p *predicateNot) match(c *context, result *store.QueryResult) bool { |
| return !p.pred.match(c, result) |
| } |
| |
| // expr produces a value in the context of a store.QueryResult. |
| type expr interface { |
| // value returns a value in the context of result. |
| value(c *context, result *store.QueryResult) interface{} |
| } |
| |
| // convertExpr transforms a parse.Expr into an expr. |
| func convertExpr(e parse.Expr) expr { |
| switch e := e.(type) { |
| case *parse.ExprString: |
| return &exprString{e.Str, e.Pos} |
| case *parse.ExprRat: |
| return &exprRat{e.Rat, e.Pos} |
| case *parse.ExprInt: |
| return &exprInt{e.Int, e.Pos} |
| case *parse.ExprName: |
| return &exprName{e.Name, e.Pos} |
| case *parse.ExprUnary: |
| return &exprUnary{convertExpr(e.Operand), e.Op, e.Pos} |
| // TODO(kash): Support the other types of expressions. |
| default: |
| panic(fmt.Errorf("unexpected type %T", e)) |
| } |
| } |
| |
| // exprString is an expr that represents a string constant. |
| type exprString struct { |
| // str is the string constant specified in the query. |
| str string |
| // pos specifies where in the query string this component started. |
| pos parse.Pos |
| } |
| |
| // value implements the expr method. |
| func (e *exprString) value(c *context, result *store.QueryResult) interface{} { |
| return e.str |
| } |
| |
| // exprRat is an expr that represents a rational number constant. |
| type exprRat struct { |
| rat *big.Rat |
| // pos specifies where in the query string this component started. |
| pos parse.Pos |
| } |
| |
| // value implements the expr method. |
| func (e *exprRat) value(c *context, result *store.QueryResult) interface{} { |
| return e.rat |
| } |
| |
| // exprInt is an expr that represents an integer constant. |
| type exprInt struct { |
| i *big.Int |
| // pos specifies where in the query string this component started. |
| pos parse.Pos |
| } |
| |
| // value implements the expr method. |
| func (e *exprInt) value(c *context, result *store.QueryResult) interface{} { |
| return e.i |
| } |
| |
| // exprName is an expr for an Object name literal. |
| type exprName struct { |
| // name is the Object name used in the query. |
| name string |
| // pos specifies where in the query string this component started. |
| pos parse.Pos |
| } |
| |
| // value implements the expr method. |
| func (e *exprName) value(c *context, result *store.QueryResult) interface{} { |
| if result.Fields != nil { |
| // TODO(kash): Handle multipart names. This currently only works if |
| // e.name has no slashes. |
| val, ok := result.Fields[e.name] |
| if !ok { |
| c.evalIt.setErrorf("name '%s' was not selected from '%s', found: [%s]", |
| e.name, result.Name, mapKeys(result.Fields)) |
| return nil |
| } |
| // If a field is "hidden", the value will be wrapped in the type |
| // hiddenResult to make it possible for evalIterator to elide it |
| // from the results sent to the client. |
| if v, ok := val.(hiddenResult); ok { |
| return v.value |
| } |
| return val |
| } |
| fullpath := naming.Join(result.Name, e.name) |
| entry, err := c.sn.Get(c.clientID, storage.ParsePath(fullpath)) |
| if err != nil { |
| c.evalIt.setErrorf("could not look up name '%s' relative to '%s': %v", |
| e.name, result.Name, err) |
| return nil |
| } |
| return entry.Value |
| } |
| |
| func mapKeys(m map[string]vdlutil.Any) string { |
| s := make([]string, 0, len(m)) |
| for key, _ := range m { |
| s = append(s, key) |
| } |
| sort.Strings(s) |
| return strings.Join(s, ", ") |
| } |
| |
| // exprUnary is an expr preceded by a '+' or '-'. |
| type exprUnary struct { |
| // operand is the expression to be modified by Op. |
| operand expr |
| // op is the operator that modifies operand. |
| op parse.Operator |
| // pos specifies where in the query string this component started. |
| pos parse.Pos |
| } |
| |
| // value implements the expr method. |
| func (e *exprUnary) value(c *context, result *store.QueryResult) interface{} { |
| v := e.operand.value(c, result) |
| switch e.op { |
| case parse.OpNeg: |
| switch v := v.(type) { |
| case *big.Int: |
| // Need to create a temporary big.Int since Neg mutates the Int. |
| return new(big.Int).Set(v).Neg(v) |
| case *big.Rat: |
| // Need to create a temporary big.Rat since Neg mutates the Rat. |
| return new(big.Rat).Set(v).Neg(v) |
| case int: |
| return -v |
| case int8: |
| return -v |
| case int16: |
| return -v |
| case int32: |
| return -v |
| case int64: |
| return -v |
| case uint: |
| return -v |
| case uint8: |
| return -v |
| case uint16: |
| return -v |
| case uint32: |
| return -v |
| case uint64: |
| return -v |
| default: |
| c.evalIt.setErrorf("cannot negate value of type %T for %s", v, result.Name) |
| return nil |
| } |
| case parse.OpPos: |
| return v |
| default: |
| c.evalIt.setErrorf("unknown operator %d at Pos %v", e.op, e.pos) |
| return nil |
| } |
| } |