veyron/services/store/memstore/query: Fix flaky error handling.
The TestError test was failing 1/10 times because the error
handler goroutine would get cleaned up before it had a chance
to pull the error off of the channel. We don't need the error
channel now that evalIterator has a mutex. It is far easier
to just use the mutex.
Also switched the abort channel to be of type struct{} to
better indicate that it is a signal-only channel.
Change-Id: I0b69eea85440be6c801f5e970001df1686e7d897
diff --git a/services/store/memstore/query/eval.go b/services/store/memstore/query/eval.go
index 8b318dd..3b65a30 100644
--- a/services/store/memstore/query/eval.go
+++ b/services/store/memstore/query/eval.go
@@ -39,15 +39,12 @@
// abort is used as the signal to query evaluation to terminate early.
// evaluator implementations will test for abort closing. The close()
// call is guarded by mu.
- abort chan bool
+ abort chan struct{}
// results represents a stack of result channels. evalIterator is
// essentially an iterator of iterators. Each subquery adds a nestedChannel
// to this stack. The top of the stack is the end of the slice.
results []nestedChannel
- // errc is the path that evaluator implementations use to pass errors
- // to evalIterator. Any error will abort query evaluation.
- errc chan error
// cleanup is used for testing to ensure that no goroutines are leaked.
cleanup sync.WaitGroup
// maxNesting is the largest value used for nestedChannel.nesting. It
@@ -82,7 +79,6 @@
if len(it.results) > 0 {
return it.Next()
}
- it.Abort() // Cause the handleErrors() goroutine to exit.
return false
}
@@ -161,17 +157,17 @@
return it.err
}
-// handleErrors watches for errors on it.errc, calling it.Abort when it finds
-// one. It should run in a goroutine.
-func (it *evalIterator) handleErrors() {
- select {
- case <-it.abort:
- case err := <-it.errc:
- it.mu.Lock()
+func (it *evalIterator) setErrorf(format string, args ...interface{}) {
+ it.setError(fmt.Errorf(format, args...))
+}
+
+func (it *evalIterator) setError(err error) {
+ it.mu.Lock()
+ if it.err == nil {
it.err = err
- it.mu.Unlock()
- it.Abort()
}
+ it.mu.Unlock()
+ it.Abort()
}
// wait blocks until all children goroutines are finished. This is useful in
@@ -180,15 +176,10 @@
it.cleanup.Wait()
}
-// sendError sends err on errc unless that would block. In that case, sendError
-// does nothing because there was already an error reported.
-func sendError(errc chan<- error, err error) {
- select {
- case errc <- err:
- // Sent error successfully.
- default:
- // Sending the error would block because there is already an error in the
- // channel. The first error wins.
+func newErrorIterator(err error) service.QueryStream {
+ return &evalIterator{
+ err: err,
+ abort: make(chan struct{}),
}
}
@@ -199,11 +190,11 @@
func Eval(sn state.Snapshot, clientID security.PublicID, name storage.PathName, q query.Query) service.QueryStream {
ast, err := parse.Parse(q)
if err != nil {
- return &evalIterator{err: err}
+ return newErrorIterator(err)
}
evaluator, err := convert(ast)
if err != nil {
- return &evalIterator{err: err}
+ return newErrorIterator(err)
}
// Seed the input with the root entity.
@@ -214,10 +205,8 @@
out := make(chan *store.QueryResult, maxChannelSize)
it := &evalIterator{
results: []nestedChannel{nestedChannel{0, out}},
- abort: make(chan bool),
- errc: make(chan error),
+ abort: make(chan struct{}),
}
- go it.handleErrors()
it.cleanup.Add(1)
go evaluator.eval(&context{
sn: sn,
@@ -225,8 +214,8 @@
clientID: clientID,
in: in,
out: out,
+ evalIt: it,
abort: it.abort,
- errc: it.errc,
cleanup: &it.cleanup,
})
return it
@@ -249,11 +238,12 @@
// evaluators should use context.emit instead of writing directly
// to out.
out chan<- *store.QueryResult
+ // evalIt is the iterator that interfaces with the client. It is here
+ // to allow the evaluation code to propagate errors via setError().
+ evalIt *evalIterator
// abort will be closed if query evaluation should terminate early.
// evaluator implementations should regularly test if it is still open.
- abort chan bool
- // errc is where evaluators can propagate errors to the client.
- errc chan<- error
+ abort chan struct{}
// cleanup is used for testing to ensure that no goroutines are leaked.
// evaluator.eval implementations should call Done when finished processing.
cleanup *sync.WaitGroup
@@ -398,8 +388,8 @@
clientID: c.clientID,
in: c.in,
out: srcOut,
+ evalIt: c.evalIt,
abort: c.abort,
- errc: c.errc,
cleanup: c.cleanup,
}
c.cleanup.Add(1)
@@ -549,8 +539,8 @@
clientID: c.clientID,
in: in,
out: out,
+ evalIt: c.evalIt,
abort: c.abort,
- errc: c.errc,
cleanup: c.cleanup,
}
c.cleanup.Add(1)
@@ -679,8 +669,8 @@
jval := arg.value(a.c, a.results[j])
res, err := compare(a.c, ival, jval)
if err != nil {
- sendError(a.c.errc, fmt.Errorf("error while sorting (Pos %v Arg: %d) left: %s, right: %s; %v",
- a.e.pos, n, a.results[i].Name, a.results[j].Name, err))
+ a.c.evalIt.setErrorf("error while sorting (Pos %v Arg: %d) left: %s, right: %s; %v",
+ a.e.pos, n, a.results[i].Name, a.results[j].Name, err)
return false
}
if res == 0 {
@@ -756,8 +746,8 @@
res, err := compare(c, lval, rval)
if err != nil {
- sendError(c.errc, fmt.Errorf("error while evaluating predicate (Pos %v) for name '%s': %v",
- p.pos, result.Name, err))
+ c.evalIt.setErrorf("error while evaluating predicate (Pos %v) for name '%s': %v",
+ p.pos, result.Name, err)
return false
}
switch p.comp {
@@ -774,7 +764,7 @@
case parse.CompGE:
return res >= 0
default:
- sendError(c.errc, fmt.Errorf("unknown comparator %d at Pos %v", p.comp, p.pos))
+ c.evalIt.setErrorf("unknown comparator %d at Pos %v", p.comp, p.pos)
return false
}
}
@@ -1013,8 +1003,8 @@
// e.name has no slashes.
val, ok := result.Fields[e.name]
if !ok {
- sendError(c.errc, fmt.Errorf("name '%s' was not selected from '%s', found: [%s]",
- e.name, result.Name, mapKeys(result.Fields)))
+ c.evalIt.setErrorf("name '%s' was not selected from '%s', found: [%s]",
+ e.name, result.Name, mapKeys(result.Fields))
return nil
}
return val
@@ -1022,7 +1012,8 @@
fullpath := naming.Join(result.Name, e.name)
entry, err := c.sn.Get(c.clientID, storage.ParsePath(fullpath))
if err != nil {
- sendError(c.errc, fmt.Errorf("could not look up name '%s' relative to '%s': %v", e.name, result.Name, err))
+ c.evalIt.setErrorf("could not look up name '%s' relative to '%s': %v",
+ e.name, result.Name, err)
return nil
}
return entry.Value
@@ -1080,13 +1071,13 @@
case uint64:
return -v
default:
- sendError(c.errc, fmt.Errorf("cannot negate value of type %T for %s", v, result.Name))
+ c.evalIt.setErrorf("cannot negate value of type %T for %s", v, result.Name)
return nil
}
case parse.OpPos:
return v
default:
- sendError(c.errc, fmt.Errorf("unknown operator %d at Pos %v", e.op, e.pos))
+ c.evalIt.setErrorf("unknown operator %d at Pos %v", e.op, e.pos)
return nil
}
}