veyron/services/store/memstore/query: Finish subquery implementation.
While we want clients to see the results of a subquery inline
with the parent result, we have to stream them separately.
selectionEvaluator embeds a channel of results in the parent
result object, and evalIterator sends the results to the client
out of line. In the parent, evalIterator replaces the channel
with a NestedResult value. After sending the parent to the client,
evalIterator sends the nested results. evalIterator maintains
a stack of channels, making it essentially an iterator of iterators.
This change removes the monotonicInt type. I found it easier to
have evalIterator maintain the NestedResult value because it
needs this value for both the parent and the nested results.
The previous code stored the value in the nested results, but
then it was hard to set the value in the parent.
Change-Id: I55e87710fa629a37d2c729166a030d2b56332ed4
diff --git a/services/store/memstore/query/eval.go b/services/store/memstore/query/eval.go
index 06b8744..8b318dd 100644
--- a/services/store/memstore/query/eval.go
+++ b/services/store/memstore/query/eval.go
@@ -8,6 +8,7 @@
"sort"
"strings"
"sync"
+ "veyron2/vlog"
"veyron/services/store/memstore/state"
"veyron/services/store/service"
@@ -40,13 +41,28 @@
// call is guarded by mu.
abort chan bool
- // results is the output of the top-level evaluator for the query.
- results <-chan *store.QueryResult
+ // results represents a stack of result channels. evalIterator is
+ // essentially an iterator of iterators. Each subquery adds a nestedChannel
+ // to this stack. The top of the stack is the end of the slice.
+ results []nestedChannel
// errc is the path that evaluator implementations use to pass errors
// to evalIterator. Any error will abort query evaluation.
errc chan error
// cleanup is used for testing to ensure that no goroutines are leaked.
cleanup sync.WaitGroup
+ // maxNesting is the largest value used for nestedChannel.nesting. It
+ // is the maximum nesting over the duration of the query while len(results)
+ // is just the instantaneous nesting.
+ maxNesting int
+}
+
+// nestedChannel contains the results of a subquery.
+type nestedChannel struct {
+ // nesting is the value to use for the NestedResult field of all
+ // objects that come out of the results channel.
+ nesting int
+ // results is the stream of results for this subquery.
+ results <-chan *store.QueryResult
}
// Next implements the QueryStream method.
@@ -58,15 +74,25 @@
}
it.mu.Unlock()
+ depth := len(it.results) - 1
select {
- case result, ok := <-it.results:
+ case result, ok := <-it.results[depth].results:
if !ok {
+ it.results = it.results[:depth]
+ if len(it.results) > 0 {
+ return it.Next()
+ }
+ it.Abort() // Cause the handleErrors() goroutine to exit.
return false
}
+
+ // Set the correct value for NestedResult.
+ result.NestedResult = store.NestedResult(it.results[depth].nesting)
+
+ it.enqueueNestedChannels(result)
+
it.mu.Lock()
defer it.mu.Unlock()
- // TODO(kash): Need to watch out for fields of type channel and pull them
- // out of line.
it.result = result
return true
case <-it.abort:
@@ -74,6 +100,40 @@
}
}
+// enqueueNestedChannels looks through result.Fields for nested result
+// channels. If there are any, they are appended to it.results. We use
+// the result.Fields key to append them in reverse alphabetical order.
+// We use reverse alphabetical order because it.results is a stack--
+// we want to process them in alphabetical order.
+func (it *evalIterator) enqueueNestedChannels(result *store.QueryResult) {
+ if result.Fields == nil {
+ return
+ }
+ var nestingKeys []string
+ for key, val := range result.Fields {
+ // TODO(kash): If a stored value happens to be a store.QueryResult, we'll
+ // do the wrong thing here. Once we store vom.Value instead of raw structs,
+ // this should not be a problem.
+ if _, ok := val.(chan *store.QueryResult); ok {
+ nestingKeys = append(nestingKeys, key)
+ }
+ }
+ // Figure out the store.NestedResult values based on alphabetical order of
+ // the keys.
+ sort.Sort(sort.StringSlice(nestingKeys))
+ var nested []nestedChannel
+ for _, key := range nestingKeys {
+ it.maxNesting++
+ nested = append(nested,
+ nestedChannel{it.maxNesting, result.Fields[key].(chan *store.QueryResult)})
+ result.Fields[key] = store.NestedResult(it.maxNesting)
+ }
+ // Add the nested result channels in reverse alphabetical order.
+ for i := len(nested) - 1; i >= 0; i-- {
+ it.results = append(it.results, nested[i])
+ }
+}
+
// Get implements the QueryStream method.
func (it *evalIterator) Get() *store.QueryResult {
it.mu.Lock()
@@ -121,7 +181,7 @@
}
// sendError sends err on errc unless that would block. In that case, sendError
-// does nothing because there was aleady an error reported.
+// does nothing because there was already an error reported.
func sendError(errc chan<- error, err error) {
select {
case errc <- err:
@@ -153,22 +213,21 @@
out := make(chan *store.QueryResult, maxChannelSize)
it := &evalIterator{
- results: out,
+ results: []nestedChannel{nestedChannel{0, out}},
abort: make(chan bool),
errc: make(chan error),
}
go it.handleErrors()
it.cleanup.Add(1)
go evaluator.eval(&context{
- sn: sn,
- suffix: name.String(),
- clientID: clientID,
- nestedResult: &monotonicInt{},
- in: in,
- out: out,
- abort: it.abort,
- errc: it.errc,
- cleanup: &it.cleanup,
+ sn: sn,
+ suffix: name.String(),
+ clientID: clientID,
+ in: in,
+ out: out,
+ abort: it.abort,
+ errc: it.errc,
+ cleanup: &it.cleanup,
})
return it
}
@@ -182,9 +241,6 @@
suffix string
// clientID is the identity of the client that issued the query.
clientID security.PublicID
- // nestedResult produces a unique nesting identifier to be used as
- // QueryResult.NestedResult.
- nestedResult *monotonicInt
// in produces the intermediate results from the previous stage of the
// query. It will be closed when the evaluator should stop processing
// results. It is not necessary to select on 'in' and 'errc'.
@@ -207,6 +263,7 @@
// nil. Returns true if the caller should continue iterating, returns
// false if it is time to abort.
func (c *context) emit(result *store.QueryResult) bool {
+ vlog.VI(2).Info("emit", result)
if result == nil {
// Check for an abort before continuing iteration.
select {
@@ -301,15 +358,15 @@
defer close(c.out)
for result := range c.in {
- nestedResult := c.nestedResult.Next()
- basePath := naming.Join(c.suffix, result.Name)
- path := storage.ParsePath(naming.Join(basePath, e.wildcardName.VName))
+ basepath := naming.Join(result.Name, e.wildcardName.VName)
+ path := storage.ParsePath(naming.Join(c.suffix, basepath))
+ vlog.VI(2).Infof("nameEvaluator suffix: %s, result.Name: %s, VName: %s",
+ c.suffix, result.Name, e.wildcardName.VName)
for it := c.sn.NewIterator(c.clientID, path, state.ImmediateFilter); it.IsValid(); it.Next() {
entry := it.Get()
result := &store.QueryResult{
- NestedResult: store.NestedResult(nestedResult),
- Name: naming.Join(e.wildcardName.VName, it.Name()),
- Value: entry.Value,
+ Name: naming.Join(basepath, it.Name()),
+ Value: entry.Value,
}
if !c.emit(result) {
return
@@ -336,15 +393,14 @@
func startSource(c *context, src evaluator) chan *store.QueryResult {
srcOut := make(chan *store.QueryResult, maxChannelSize)
srcContext := context{
- sn: c.sn,
- suffix: c.suffix,
- clientID: c.clientID,
- nestedResult: c.nestedResult,
- in: c.in,
- out: srcOut,
- abort: c.abort,
- errc: c.errc,
- cleanup: c.cleanup,
+ sn: c.sn,
+ suffix: c.suffix,
+ clientID: c.clientID,
+ in: c.in,
+ out: srcOut,
+ abort: c.abort,
+ errc: c.errc,
+ cleanup: c.cleanup,
}
c.cleanup.Add(1)
go src.eval(&srcContext)
@@ -367,6 +423,7 @@
defer close(c.out)
for result := range startSource(c, e.src) {
+ vlog.VI(2).Info("typeEvaluator", result)
if val := reflect.ValueOf(result.Value); e.ty != val.Type().Name() {
continue
}
@@ -403,6 +460,7 @@
defer close(c.out)
for result := range startSource(c, e.src) {
+ vlog.VI(2).Info("filterEvaluator", result)
if e.pred.match(c, result) {
if !c.emit(result) {
return
@@ -473,6 +531,7 @@
}
func (e *selectionEvaluator) processSubpipelines(c *context, result *store.QueryResult) bool {
+ vlog.VI(2).Info("selection: ", result.Name)
sel := &store.QueryResult{
Name: result.Name,
Fields: make(map[string]vdl.Any),
@@ -485,15 +544,14 @@
close(in)
out := make(chan *store.QueryResult, maxChannelSize)
ctxt := &context{
- sn: c.sn,
- suffix: c.suffix,
- clientID: c.clientID,
- nestedResult: c.nestedResult,
- in: in,
- out: out,
- abort: c.abort,
- errc: c.errc,
- cleanup: c.cleanup,
+ sn: c.sn,
+ suffix: c.suffix,
+ clientID: c.clientID,
+ in: in,
+ out: out,
+ abort: c.abort,
+ errc: c.errc,
+ cleanup: c.cleanup,
}
c.cleanup.Add(1)
go a.evaluator.eval(ctxt)
@@ -829,7 +887,7 @@
type predicateAnd struct {
// lhs is the left-hand-side of the conjunction.
lhs predicate
- // rhs is the right-hand-side of the conjuction.
+ // rhs is the right-hand-side of the conjunction.
rhs predicate
// pos specifies where in the query string this component started.
pos parse.Pos
diff --git a/services/store/memstore/query/eval_test.go b/services/store/memstore/query/eval_test.go
index 19af62b..8dd7e81 100644
--- a/services/store/memstore/query/eval_test.go
+++ b/services/store/memstore/query/eval_test.go
@@ -5,14 +5,16 @@
"reflect"
"testing"
"time"
- "veyron2/services/store"
- "veyron2/vdl"
+ _ "veyron/lib/testutil"
"veyron/services/store/memstore/state"
"veyron2/query"
"veyron2/security"
+ "veyron2/services/store"
"veyron2/storage"
+ "veyron2/vdl"
+ "veyron2/vlog"
)
type team struct {
@@ -43,10 +45,18 @@
put(t, sn, "/teams/sharks", team{"sharks", "NY"})
put(t, sn, "/teams/bears", team{"bears", "CO"})
- put(t, sn, "/teams/cardinals/alfred", alfredID)
- put(t, sn, "/teams/sharks/alice", aliceID)
- put(t, sn, "/teams/sharks/betty", bettyID)
- put(t, sn, "/teams/sharks/bob", bobID)
+ put(t, sn, "/teams/cardinals/players", "")
+ put(t, sn, "/teams/sharks/players", "")
+ put(t, sn, "/teams/bears/players", "")
+
+ put(t, sn, "/teams/cardinals/players/alfred", alfredID)
+ put(t, sn, "/teams/sharks/players/alice", aliceID)
+ put(t, sn, "/teams/sharks/players/betty", bettyID)
+ // Call him something different to make sure we are handling
+ // paths correctly in subqueries. We don't want the subquery
+ // "teams/sharks | type team | { players/*}" to work with
+ // "/players/bob".
+ put(t, sn, "/teams/sharks/players/robert", bobID)
commit(t, st, sn)
return st
@@ -329,7 +339,8 @@
"myname": "cardinals",
"myloc": "CA",
},
- nil},
+ nil,
+ },
},
},
{
@@ -342,15 +353,86 @@
"myname": "cardinals",
"myloc": "CA",
},
- nil},
+ nil,
+ },
+ },
+ },
+ {
+ "",
+ "'teams/*' | type team | {" +
+ " Name as myname," +
+ " players/* | type player | ?Age >=21 | sort() as drinkers," +
+ " players/* | type player | ?Age < 21 | sort() as nondrinkers" +
+ "} | sort(myname)",
+ []*store.QueryResult{
+ &store.QueryResult{
+ 0,
+ "teams/bears",
+ map[string]vdl.Any{
+ "myname": "bears",
+ "drinkers": store.NestedResult(1),
+ "nondrinkers": store.NestedResult(2),
+ },
+ nil,
+ },
+ &store.QueryResult{
+ 0,
+ "teams/cardinals",
+ map[string]vdl.Any{
+ "myname": "cardinals",
+ "drinkers": store.NestedResult(3),
+ "nondrinkers": store.NestedResult(4),
+ },
+ nil,
+ },
+ &store.QueryResult{
+ 4,
+ "teams/cardinals/players/alfred",
+ nil,
+ player{"alfred", 17},
+ },
+ &store.QueryResult{
+ 0,
+ "teams/sharks",
+ map[string]vdl.Any{
+ "myname": "sharks",
+ "drinkers": store.NestedResult(5),
+ "nondrinkers": store.NestedResult(6),
+ },
+ nil,
+ },
+ &store.QueryResult{
+ 5,
+ "teams/sharks/players/betty",
+ nil,
+ player{"betty", 23},
+ },
+ &store.QueryResult{
+ 5,
+ "teams/sharks/players/robert",
+ nil,
+ player{"bob", 21},
+ },
+ &store.QueryResult{
+ 6,
+ "teams/sharks/players/alice",
+ nil,
+ player{"alice", 16},
+ },
},
},
}
for _, test := range tests {
+ vlog.VI(1).Infof("Testing %s\n", test.query)
it := Eval(st.Snapshot(), rootPublicID, storage.ParsePath(test.suffix), query.Query{test.query})
i := 0
for it.Next() {
result := it.Get()
+ if i >= len(test.expectedResults) {
+ t.Errorf("query: %s, not enough expected results, need at least %d", i)
+ it.Abort()
+ break
+ }
if got, want := result, test.expectedResults[i]; !reflect.DeepEqual(got, want) {
t.Errorf("query: %s;\nGOT %s\nWANT %s", test.query, got, want)
}
@@ -480,7 +562,7 @@
if i == 0 {
// Give the evaluators time to fill up the channels. Ensure that they
// don't block forever on a full channel.
- for len(it.(*evalIterator).results) < maxChannelSize {
+ for len(it.(*evalIterator).results[0].results) < maxChannelSize {
time.Sleep(time.Millisecond)
}
}
diff --git a/services/store/memstore/query/monotonic_int.go b/services/store/memstore/query/monotonic_int.go
deleted file mode 100644
index 05caf4c..0000000
--- a/services/store/memstore/query/monotonic_int.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package query
-
-import (
- "sync"
-)
-
-// monotonicInt produces monotonically increasing integers. This is useful
-// for the store.QueryResult.NestedResult field.
-// monotonicInt is threadsafe.
-type monotonicInt struct {
- mu sync.Mutex
- val int
-}
-
-// Next returns the next integer. It is guaranteed to be greater than all
-// previously returned integers.
-func (m *monotonicInt) Next() int {
- m.mu.Lock()
- ret := m.val
- m.val++
- m.mu.Unlock()
- return ret
-}
diff --git a/services/store/memstore/query/monotonic_int_test.go b/services/store/memstore/query/monotonic_int_test.go
deleted file mode 100644
index 4689fad..0000000
--- a/services/store/memstore/query/monotonic_int_test.go
+++ /dev/null
@@ -1,17 +0,0 @@
-package query
-
-import (
- "testing"
-)
-
-func TestMonotonic(t *testing.T) {
- mi := monotonicInt{}
- a := mi.Next()
- for i := 0; i < 5; i++ {
- b := mi.Next()
- if b <= a {
- t.Fatalf("%d is not greater than %d", b, a)
- }
- a = b
- }
-}