veyron/services/store/memstore/query: Fix flaky error handling.

The TestError test was failing 1/10 times because the error
handler goroutine would get cleaned up before it had a chance
to pull the error off of the channel.  We don't need the error
channel now that evalIterator has a mutex.  It is far easier
to just use the mutex.

Also switched the abort channel to be of type struct{} to
better indicate that it is a signal-only channel.

Change-Id: I0b69eea85440be6c801f5e970001df1686e7d897
diff --git a/services/store/memstore/query/eval.go b/services/store/memstore/query/eval.go
index 8b318dd..3b65a30 100644
--- a/services/store/memstore/query/eval.go
+++ b/services/store/memstore/query/eval.go
@@ -39,15 +39,12 @@
 	// abort is used as the signal to query evaluation to terminate early.
 	// evaluator implementations will test for abort closing.  The close()
 	// call is guarded by mu.
-	abort chan bool
+	abort chan struct{}
 
 	// results represents a stack of result channels.  evalIterator is
 	// essentially an iterator of iterators.  Each subquery adds a nestedChannel
 	// to this stack.  The top of the stack is the end of the slice.
 	results []nestedChannel
-	// errc is the path that evaluator implementations use to pass errors
-	// to evalIterator.  Any error will abort query evaluation.
-	errc chan error
 	// cleanup is used for testing to ensure that no goroutines are leaked.
 	cleanup sync.WaitGroup
 	// maxNesting is the largest value used for nestedChannel.nesting.  It
@@ -82,7 +79,6 @@
 			if len(it.results) > 0 {
 				return it.Next()
 			}
-			it.Abort() // Cause the handleErrors() goroutine to exit.
 			return false
 		}
 
@@ -161,17 +157,17 @@
 	return it.err
 }
 
-// handleErrors watches for errors on it.errc, calling it.Abort when it finds
-// one.  It should run in a goroutine.
-func (it *evalIterator) handleErrors() {
-	select {
-	case <-it.abort:
-	case err := <-it.errc:
-		it.mu.Lock()
+func (it *evalIterator) setErrorf(format string, args ...interface{}) {
+	it.setError(fmt.Errorf(format, args...))
+}
+
+func (it *evalIterator) setError(err error) {
+	it.mu.Lock()
+	if it.err == nil {
 		it.err = err
-		it.mu.Unlock()
-		it.Abort()
 	}
+	it.mu.Unlock()
+	it.Abort()
 }
 
 // wait blocks until all children goroutines are finished.  This is useful in
@@ -180,15 +176,10 @@
 	it.cleanup.Wait()
 }
 
-// sendError sends err on errc unless that would block.  In that case, sendError
-// does nothing because there was already an error reported.
-func sendError(errc chan<- error, err error) {
-	select {
-	case errc <- err:
-		// Sent error successfully.
-	default:
-		// Sending the error would block because there is already an error in the
-		// channel.  The first error wins.
+func newErrorIterator(err error) service.QueryStream {
+	return &evalIterator{
+		err:   err,
+		abort: make(chan struct{}),
 	}
 }
 
@@ -199,11 +190,11 @@
 func Eval(sn state.Snapshot, clientID security.PublicID, name storage.PathName, q query.Query) service.QueryStream {
 	ast, err := parse.Parse(q)
 	if err != nil {
-		return &evalIterator{err: err}
+		return newErrorIterator(err)
 	}
 	evaluator, err := convert(ast)
 	if err != nil {
-		return &evalIterator{err: err}
+		return newErrorIterator(err)
 	}
 
 	// Seed the input with the root entity.
@@ -214,10 +205,8 @@
 	out := make(chan *store.QueryResult, maxChannelSize)
 	it := &evalIterator{
 		results: []nestedChannel{nestedChannel{0, out}},
-		abort:   make(chan bool),
-		errc:    make(chan error),
+		abort:   make(chan struct{}),
 	}
-	go it.handleErrors()
 	it.cleanup.Add(1)
 	go evaluator.eval(&context{
 		sn:       sn,
@@ -225,8 +214,8 @@
 		clientID: clientID,
 		in:       in,
 		out:      out,
+		evalIt:   it,
 		abort:    it.abort,
-		errc:     it.errc,
 		cleanup:  &it.cleanup,
 	})
 	return it
@@ -249,11 +238,12 @@
 	// evaluators should use context.emit instead of writing directly
 	// to out.
 	out chan<- *store.QueryResult
+	// evalIt is the iterator that interfaces with the client.  It is here
+	// to allow the evaluation code to propagate errors via setError().
+	evalIt *evalIterator
 	// abort will be closed if query evaluation should terminate early.
 	// evaluator implementations should regularly test if it is still open.
-	abort chan bool
-	// errc is where evaluators can propagate errors to the client.
-	errc chan<- error
+	abort chan struct{}
 	// cleanup is used for testing to ensure that no goroutines are leaked.
 	// evaluator.eval implementations should call Done when finished processing.
 	cleanup *sync.WaitGroup
@@ -398,8 +388,8 @@
 		clientID: c.clientID,
 		in:       c.in,
 		out:      srcOut,
+		evalIt:   c.evalIt,
 		abort:    c.abort,
-		errc:     c.errc,
 		cleanup:  c.cleanup,
 	}
 	c.cleanup.Add(1)
@@ -549,8 +539,8 @@
 			clientID: c.clientID,
 			in:       in,
 			out:      out,
+			evalIt:   c.evalIt,
 			abort:    c.abort,
-			errc:     c.errc,
 			cleanup:  c.cleanup,
 		}
 		c.cleanup.Add(1)
@@ -679,8 +669,8 @@
 		jval := arg.value(a.c, a.results[j])
 		res, err := compare(a.c, ival, jval)
 		if err != nil {
-			sendError(a.c.errc, fmt.Errorf("error while sorting (Pos %v Arg: %d) left: %s, right: %s; %v",
-				a.e.pos, n, a.results[i].Name, a.results[j].Name, err))
+			a.c.evalIt.setErrorf("error while sorting (Pos %v Arg: %d) left: %s, right: %s; %v",
+				a.e.pos, n, a.results[i].Name, a.results[j].Name, err)
 			return false
 		}
 		if res == 0 {
@@ -756,8 +746,8 @@
 
 	res, err := compare(c, lval, rval)
 	if err != nil {
-		sendError(c.errc, fmt.Errorf("error while evaluating predicate (Pos %v) for name '%s': %v",
-			p.pos, result.Name, err))
+		c.evalIt.setErrorf("error while evaluating predicate (Pos %v) for name '%s': %v",
+			p.pos, result.Name, err)
 		return false
 	}
 	switch p.comp {
@@ -774,7 +764,7 @@
 	case parse.CompGE:
 		return res >= 0
 	default:
-		sendError(c.errc, fmt.Errorf("unknown comparator %d at Pos %v", p.comp, p.pos))
+		c.evalIt.setErrorf("unknown comparator %d at Pos %v", p.comp, p.pos)
 		return false
 	}
 }
@@ -1013,8 +1003,8 @@
 		// e.name has no slashes.
 		val, ok := result.Fields[e.name]
 		if !ok {
-			sendError(c.errc, fmt.Errorf("name '%s' was not selected from '%s', found: [%s]",
-				e.name, result.Name, mapKeys(result.Fields)))
+			c.evalIt.setErrorf("name '%s' was not selected from '%s', found: [%s]",
+				e.name, result.Name, mapKeys(result.Fields))
 			return nil
 		}
 		return val
@@ -1022,7 +1012,8 @@
 	fullpath := naming.Join(result.Name, e.name)
 	entry, err := c.sn.Get(c.clientID, storage.ParsePath(fullpath))
 	if err != nil {
-		sendError(c.errc, fmt.Errorf("could not look up name '%s' relative to '%s': %v", e.name, result.Name, err))
+		c.evalIt.setErrorf("could not look up name '%s' relative to '%s': %v",
+			e.name, result.Name, err)
 		return nil
 	}
 	return entry.Value
@@ -1080,13 +1071,13 @@
 		case uint64:
 			return -v
 		default:
-			sendError(c.errc, fmt.Errorf("cannot negate value of type %T for %s", v, result.Name))
+			c.evalIt.setErrorf("cannot negate value of type %T for %s", v, result.Name)
 			return nil
 		}
 	case parse.OpPos:
 		return v
 	default:
-		sendError(c.errc, fmt.Errorf("unknown operator %d at Pos %v", e.op, e.pos))
+		c.evalIt.setErrorf("unknown operator %d at Pos %v", e.op, e.pos)
 		return nil
 	}
 }