veyron/lib/modules: replace StdoutPipe with a custom in-memory queue-based
ReadWriteCloser in order to decouple the consumers of stdout from using the
stdout file handle directly.

This was motivated by two issues observed in tests:

(1) a subtle data race that popped up between reading from the StdoutPipe's
parent end we were using and closing that pipe in Wait (called by Shutdown). As
explained in http://golang.org/pkg/os/exec/#Cmd.StdoutPipe, Reading from the
pipe after Wait has been called is incorrect. Note that even if we arrange for
all Expect* calls to complete before we call Shutdown or Cleanup on the handler
or shell respectively, it is not enough: the Expect* calls spin up a goroutine
that reads from stdout, and under some circumstances (e.g. timeout) that
goroutine outlives the Expect* call that started it.

(2) handle.Shutdown hanging for a very long time in spite of having a 10 second
timeout on the Wait call. The reason was the readTo loop that was trying to
drain stdout before Shutdown could call Wait: This loop would never complete if
the child process didn't exit (since it would not see an EOF on stdout).

While we're at it, also factor out the code to transcribe stderr into a separate
routine; replace readTo by io.Copy (didn't see a reason to have our own
implementation of readTo); and stop using the command name as the log file name
in newLogfile() since that may not contain valid file name charactes.

Change-Id: Ibdcb2df79eb44672d94a1f804ba0e75e5de31524
diff --git a/lib/expect/expect.go b/lib/expect/expect.go
index b9b8311..afed004 100644
--- a/lib/expect/expect.go
+++ b/lib/expect/expect.go
@@ -242,7 +242,7 @@
 		return [][]string{}
 	}
 	l, m, err := s.expectRE(pattern, n)
-	s.log(err, "ExpectVar: %s", l)
+	s.log(err, "ExpectRE: %s", l)
 	if err != nil {
 		s.error(err)
 		return [][]string{}
diff --git a/lib/modules/exec.go b/lib/modules/exec.go
index d656d0a..d34f534 100644
--- a/lib/modules/exec.go
+++ b/lib/modules/exec.go
@@ -24,6 +24,7 @@
 	stderr     *os.File
 	stdout     io.ReadCloser
 	stdin      io.WriteCloser
+	procErrCh  chan error
 }
 
 func testFlags() []string {
@@ -50,7 +51,7 @@
 	return fl
 }
 
-// IsTestHelperProces returns true if it is called in via
+// IsTestHelperProcess returns true if it is called in via
 // -run=TestHelperProcess which normally only ever happens for subprocesses
 // run from tests.
 func IsTestHelperProcess() bool {
@@ -62,7 +63,7 @@
 }
 
 func newExecHandle(name string) command {
-	return &execHandle{name: name, entryPoint: shellEntryPoint + "=" + name}
+	return &execHandle{name: name, entryPoint: shellEntryPoint + "=" + name, procErrCh: make(chan error, 1)}
 }
 
 func (eh *execHandle) Stdout() io.Reader {
@@ -112,15 +113,19 @@
 	newargs, newenv := eh.envelope(sh, env, args[1:]...)
 	cmd := exec.Command(os.Args[0], newargs[1:]...)
 	cmd.Env = newenv
-	stderr, err := newLogfile(strings.TrimLeft(eh.name, "-\n\t "))
+	stderr, err := newLogfile("stderr", eh.name)
 	if err != nil {
 		return nil, err
 	}
 	cmd.Stderr = stderr
-	stdout, err := cmd.StdoutPipe()
-	if err != nil {
-		return nil, err
-	}
+	// We use a custom queue-based Writer implementation for stdout to
+	// decouple the consumers of eh.stdout from the file where the child
+	// sends its output.  This avoids data races between closing the file
+	// and reading from it (since cmd.Wait will wait for the all readers to
+	// be done before closing it).  It also enables Shutdown to drain stdout
+	// while respecting the timeout.
+	stdout := newRW()
+	cmd.Stdout = stdout
 	stdin, err := cmd.StdinPipe()
 	if err != nil {
 		return nil, err
@@ -139,6 +144,15 @@
 	}
 	vlog.VI(1).Infof("Started: %q, pid %d", eh.name, cmd.Process.Pid)
 	err = handle.WaitForReady(sh.startTimeout)
+	go func() {
+		eh.procErrCh <- eh.handle.Wait(0)
+		// It's now safe to close eh.stdout, since Wait only returns
+		// once all writes from the pipe to the stdout Writer have
+		// completed.  Closing eh.stdout lets consumers of stdout wrap
+		// up (they'll receive EOF).
+		eh.stdout.Close()
+	}()
+
 	return eh, err
 }
 
@@ -151,38 +165,34 @@
 	defer eh.mu.Unlock()
 	vlog.VI(1).Infof("Shutdown: %q", eh.name)
 	eh.stdin.Close()
-	logFile := eh.stderr.Name()
 	defer eh.sh.Forget(eh)
 
-	defer func() {
-		os.Remove(logFile)
-	}()
-
-	// TODO(cnicolaou): make this configurable
-	timeout := 10 * time.Second
-	if stdout == nil && stderr == nil {
-		return eh.handle.Wait(timeout)
-	}
-
+	waitStdout := make(chan struct{})
 	if stdout != nil {
-		// Read from stdin before waiting for the child process to ensure
-		// that we get to read all of its output.
-		readTo(eh.stdout, stdout)
+		// Drain stdout.
+		go func() {
+			io.Copy(stdout, eh.stdout)
+			close(waitStdout)
+		}()
+	} else {
+		close(waitStdout)
 	}
 
-	procErr := eh.handle.Wait(timeout)
-
-	// Stderr is buffered to a file, so we can safely read it after we
-	// wait for the process.
-	eh.stderr.Close()
-	if stderr != nil {
-		stderrFile, err := os.Open(logFile)
-		if err != nil {
-			vlog.VI(1).Infof("failed to open %q: %s\n", logFile, err)
-			return procErr
-		}
-		readTo(stderrFile, stderr)
-		stderrFile.Close()
+	var procErr error
+	select {
+	case procErr = <-eh.procErrCh:
+		// The child has exited already.
+	case <-time.After(eh.sh.waitTimeout):
+		// Time out waiting for child to exit.
+		procErr = vexec.ErrTimeout
+		// Force close stdout to unblock any readers of stdout
+		// (including the drain loop started above).
+		eh.stdout.Close()
 	}
+	<-waitStdout
+
+	// Transcribe stderr.
+	outputFromFile(eh.stderr, stderr)
+
 	return procErr
 }
diff --git a/lib/modules/func.go b/lib/modules/func.go
index 54ce341..8fd0523 100644
--- a/lib/modules/func.go
+++ b/lib/modules/func.go
@@ -65,7 +65,7 @@
 			return nil, err
 		}
 	}
-	stderr, err := newLogfile(args[0])
+	stderr, err := newLogfile("stderr", args[0])
 	if err != nil {
 		return nil, err
 	}
@@ -111,7 +111,9 @@
 	fh.mu.Unlock()
 
 	// Read stdout until EOF to ensure that we read all of it.
-	readTo(stdout, stdout_w)
+	if stdout_w != nil {
+		io.Copy(stdout_w, stdout)
+	}
 	fh.wg.Wait()
 
 	fh.mu.Lock()
@@ -120,12 +122,13 @@
 
 	// Safe to close stderr now.
 	stderr.Close()
-	stderr, err := os.Open(stderr.Name())
-	if err == nil {
-		readTo(stderr, stderr_w)
-		stderr.Close()
-	} else {
-		fmt.Fprintf(os.Stderr, "failed to open %q: %s\n", stderr.Name(), err)
+	if stderr_w != nil {
+		if stderr, err := os.Open(stderr.Name()); err == nil {
+			io.Copy(stderr_w, stderr)
+			stderr.Close()
+		} else {
+			fmt.Fprintf(os.Stderr, "failed to open %q: %s\n", stderr.Name(), err)
+		}
 	}
 
 	fh.mu.Lock()
diff --git a/lib/modules/modules_internal_test.go b/lib/modules/modules_internal_test.go
index d0112e6..8ce9b17 100644
--- a/lib/modules/modules_internal_test.go
+++ b/lib/modules/modules_internal_test.go
@@ -13,7 +13,7 @@
 		return fmt.Errorf("no args")
 	}
 	for _, a := range args {
-		fmt.Println(a)
+		fmt.Fprintln(stdout, a)
 	}
 	return nil
 }
diff --git a/lib/modules/modules_test.go b/lib/modules/modules_test.go
index 0852a00..5eb1366 100644
--- a/lib/modules/modules_test.go
+++ b/lib/modules/modules_test.go
@@ -9,6 +9,7 @@
 	"reflect"
 	"sort"
 	"strings"
+	"syscall"
 	"testing"
 	"time"
 
@@ -23,11 +24,16 @@
 	modules.RegisterChild("printenv", "printenv", PrintEnv)
 	modules.RegisterChild("echos", "[args]*", Echo)
 	modules.RegisterChild("errortestChild", "", ErrorMain)
+	modules.RegisterChild("ignores_stdin", "", ignoresStdin)
 
 	modules.RegisterFunction("envtestf", "envtest: <variables to print>...", PrintFromEnv)
 	modules.RegisterFunction("echof", "[args]*", Echo)
 	modules.RegisterFunction("errortestFunc", "", ErrorMain)
+}
 
+func ignoresStdin(io.Reader, io.Writer, io.Writer, map[string]string, ...string) error {
+	<-time.After(time.Minute)
+	return nil
 }
 
 func Echo(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
@@ -186,6 +192,60 @@
 	testShutdown(t, modules.NewShell(), "echos", false)
 }
 
+// TestShutdownSubprocessIgnoresStdin verifies that Shutdown doesn't wait
+// forever if a child does not die upon closing stdin; but instead times out and
+// returns an appropriate error.
+func TestShutdownSubprocessIgnoresStdin(t *testing.T) {
+	sh := modules.NewShell()
+	sh.SetWaitTimeout(time.Second)
+	h, err := sh.Start("ignores_stdin", nil)
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	var stdoutBuf, stderrBuf bytes.Buffer
+	if err := sh.Cleanup(&stdoutBuf, &stderrBuf); err == nil || err.Error() != exec.ErrTimeout.Error() {
+		t.Errorf("unexpected error in Cleanup: got %v, want %v", err, exec.ErrTimeout)
+	}
+	if err := syscall.Kill(h.Pid(), syscall.SIGINT); err != nil {
+		t.Errorf("Kill failed: %v", err)
+	}
+}
+
+// TestStdoutRace exemplifies a potential race between reading from child's
+// stdout and closing stdout in Wait (called by Shutdown).
+//
+// NOTE: triggering the actual --race failure is hard, even if the
+// implementation inappropriately sets stdout to the file that is to be closed
+// in Wait.
+func TestStdoutRace(t *testing.T) {
+	sh := modules.NewShell()
+	sh.SetWaitTimeout(time.Second)
+	h, err := sh.Start("ignores_stdin", nil)
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	ch := make(chan error, 1)
+	go func() {
+		buf := make([]byte, 5)
+		// This will block since the child is not writing anything on
+		// stdout.
+		_, err := h.Stdout().Read(buf)
+		ch <- err
+	}()
+	// Give the goroutine above a chance to run, so that we're blocked on
+	// stdout.Read.
+	<-time.After(time.Second)
+	// Cleanup should close stdout, and unblock the goroutine.
+	sh.Cleanup(nil, nil)
+	if got, want := <-ch, io.EOF; got != want {
+		t.Errorf("Expected %v, got %v instead", want, got)
+	}
+
+	if err := syscall.Kill(h.Pid(), syscall.SIGINT); err != nil {
+		t.Errorf("Kill failed: %v", err)
+	}
+}
+
 func TestShutdownFunction(t *testing.T) {
 	testShutdown(t, modules.NewShell(), "echof", true)
 }
diff --git a/lib/modules/queue_rw.go b/lib/modules/queue_rw.go
new file mode 100644
index 0000000..8dbba9f
--- /dev/null
+++ b/lib/modules/queue_rw.go
@@ -0,0 +1,59 @@
+package modules
+
+import (
+	"io"
+	"sync"
+
+	// TODO(caprita): Move upcqueue into veyron/lib.
+	"veyron.io/veyron/veyron/runtimes/google/lib/upcqueue"
+)
+
+// TODO(caprita): Write unit tests.
+
+// queueRW implements a ReadWriteCloser backed by an unbounded in-memory
+// producer-consumer queue.
+type queueRW struct {
+	sync.Mutex
+	q        *upcqueue.T
+	buf      []byte
+	buffered int
+	cancel   chan struct{}
+}
+
+func newRW() io.ReadWriteCloser {
+	return &queueRW{q: upcqueue.New(), cancel: make(chan struct{})}
+}
+
+func (q *queueRW) Close() error {
+	// We use an empty message to signal EOF to the reader.
+	_, err := q.Write([]byte{})
+	return err
+}
+
+func (q *queueRW) Read(p []byte) (n int, err error) {
+	q.Lock()
+	defer q.Unlock()
+	for q.buffered == 0 {
+		elem, err := q.q.Get(q.cancel)
+		if err != nil {
+			return 0, io.EOF
+		}
+		s := elem.(string)
+		b := []byte(s)
+		if len(b) == 0 {
+			close(q.cancel)
+			return 0, io.EOF
+		}
+		q.buf, q.buffered = b, len(b)
+	}
+	copied := copy(p, q.buf[:q.buffered])
+	q.buffered -= copied
+	return copied, nil
+}
+
+func (q *queueRW) Write(p []byte) (n int, err error) {
+	if err := q.q.Put(string(p)); err != nil {
+		return 0, err
+	}
+	return len(p), nil
+}
diff --git a/lib/modules/shell.go b/lib/modules/shell.go
index 9e4c3b9..7b91f1a 100644
--- a/lib/modules/shell.go
+++ b/lib/modules/shell.go
@@ -49,12 +49,12 @@
 
 // Shell represents the context within which commands are run.
 type Shell struct {
-	mu           sync.Mutex
-	env          map[string]string
-	handles      map[Handle]struct{}
-	credDir      string
-	startTimeout time.Duration
-	config       exec.Config
+	mu                        sync.Mutex
+	env                       map[string]string
+	handles                   map[Handle]struct{}
+	credDir                   string
+	startTimeout, waitTimeout time.Duration
+	config                    exec.Config
 }
 
 // NewShell creates a new instance of Shell. If this new instance is is a test
@@ -66,6 +66,7 @@
 		env:          make(map[string]string),
 		handles:      make(map[Handle]struct{}),
 		startTimeout: time.Minute,
+		waitTimeout:  10 * time.Second,
 		config:       exec.NewConfig(),
 	}
 	if flag.Lookup("test.run") != nil && os.Getenv(consts.VeyronCredentials) == "" {
@@ -146,6 +147,11 @@
 	sh.startTimeout = d
 }
 
+// SetWaitTimeout sets the timeout for waiting on subcommands to complete.
+func (sh *Shell) SetWaitTimeout(d time.Duration) {
+	sh.waitTimeout = d
+}
+
 // CommandEnvelope returns the command line and environment that would be
 // used for running the subprocess or function if it were started with the
 // specifed arguments.
diff --git a/lib/modules/util.go b/lib/modules/util.go
index c35c74e..7ad2fcb 100644
--- a/lib/modules/util.go
+++ b/lib/modules/util.go
@@ -1,30 +1,39 @@
 package modules
 
 import (
-	"bufio"
 	"fmt"
+	"hash/adler32"
 	"io"
 	"io/ioutil"
 	"os"
 	"strings"
+
+	"veyron.io/veyron/veyron2/vlog"
 )
 
-func newLogfile(prefix string) (*os.File, error) {
-	f, err := ioutil.TempFile("", "__modules__"+prefix)
+func newLogfile(prefix, name string) (*os.File, error) {
+	nameHash := adler32.Checksum([]byte(name))
+	f, err := ioutil.TempFile("", fmt.Sprintf("__modules__%s-%x", prefix, nameHash))
 	if err != nil {
 		return nil, err
 	}
 	return f, nil
 }
 
-func readTo(r io.Reader, w io.Writer) {
-	if w == nil {
+func outputFromFile(f *os.File, out io.Writer) {
+	f.Close()
+	fName := f.Name()
+	defer os.Remove(fName)
+	if out == nil {
 		return
 	}
-	scanner := bufio.NewScanner(r)
-	for scanner.Scan() {
-		fmt.Fprintf(w, "%s\n", scanner.Text())
+	var err error
+	if f, err = os.Open(fName); err != nil {
+		vlog.VI(1).Infof("failed to open %q: %s\n", fName, err)
+		return
 	}
+	io.Copy(out, f)
+	f.Close()
 }
 
 // envSliceToMap returns a map representation of a string slive