veyron/lib/modules/core: expand the set of core modules.

Expand the set of core modules provided, cleanup various bugs
along the way.

Change-Id: I7f4966b91549a4edca049e908d6945bd03ef8172
diff --git a/lib/expect/expect.go b/lib/expect/expect.go
index 1573147..d5f738d 100644
--- a/lib/expect/expect.go
+++ b/lib/expect/expect.go
@@ -71,8 +71,8 @@
 }
 
 // NewSession creates a new Session. The parameter t may be safely be nil.
-func NewSession(t Testing, input *bufio.Reader, timeout time.Duration) *Session {
-	return &Session{t: t, timeout: timeout, input: input}
+func NewSession(t Testing, input io.Reader, timeout time.Duration) *Session {
+	return &Session{t: t, timeout: timeout, input: bufio.NewReader(input)}
 }
 
 // Failed returns true if an error has been encountered by a prior call.
@@ -130,9 +130,8 @@
 func readAll(r *bufio.Reader) (string, error) {
 	all := ""
 	for {
-		buf := make([]byte, 4096*4)
-		n, err := r.Read(buf)
-		all += string(buf[:n])
+		l, err := r.ReadString('\n')
+		all += l
 		if err != nil {
 			if err == io.EOF {
 				return all, nil
@@ -241,6 +240,7 @@
 
 // ReadLine reads the next line, if any, from the input stream. It will set
 // the error state to io.EOF if it has read past the end of the stream.
+// ReadLine has no effect if an error has already occurred.
 func (s *Session) ReadLine() string {
 	if s.Failed() {
 		return ""
@@ -255,9 +255,25 @@
 
 // ReadAll reads all remaining input on the stream. Unlike all of the other
 // methods it does not strip newlines from the input.
+// ReadAll has no effect if an error has already occurred.
 func (s *Session) ReadAll() (string, error) {
 	if s.Failed() {
 		return "", s.err
 	}
 	return s.read(readAll)
 }
+
+// Finish reads all remaining input on the stream regardless of any
+// prior errors and writes it to the supplied io.Writer parameter if non-nil.
+// It returns both the data read and the prior error, if any, otherwise it
+// returns any error that occurred reading the rest of the input.
+func (s *Session) Finish(w io.Writer) (string, error) {
+	a, err := s.read(readAll)
+	if w != nil {
+		fmt.Fprint(w, a)
+	}
+	if s.Failed() {
+		return a, s.err
+	}
+	return a, err
+}
diff --git a/lib/modules/core/core.go b/lib/modules/core/core.go
index f2851f7..980313e 100644
--- a/lib/modules/core/core.go
+++ b/lib/modules/core/core.go
@@ -3,39 +3,67 @@
 //
 // The available commands are:
 //
-// root     - runs a root mount table as a subprocess
-//            prints the MT_NAME=<root name>, PID=<pid> variables to stdout
-//            waits for stdin to be closed before it exits
-//            prints "done" to stdout when stdin is closed.
-// mt <mp>  - runs a mount table as a subprocess mounted on <mp>
-//            NAMESPACE_ROOT should be set to the name of another mount table
-//            (e.g. the value of MT_NAME from a root) in the shell's
-//            environment. mt similarly prints MT_NAME, PID and waits for stdout
-//            as per the root
-//            command
+// root
+//   runs a root mount table as a subprocess
+//   prints the MT_NAME=<root name>, PID=<pid> variables to stdout
+//   waits for stdin to be closed before it exits
+//   prints "done" to stdout when stdin is closed.
+// mt <mp>
+//   runs a mount table as a subprocess mounted on <mp>
+//   NAMESPACE_ROOT should be set to the name of another mount table
+//   (e.g. the value of MT_NAME from a root) in the shell's environment.
+//   mt similarly prints MT_NAME, PID and waits for stdout as per the root
+//   command
 //
-// ls <glob>...  - ls issues one or more globs using the local,
-//                 in-process namespace
-// lse <glob>... - lse issues one or more globs from a subprocess and hence
-//                 the subprocesses namespace. NAMESPACE_ROOT can be set in
-//                 the shell's environment prior to calling lse to have the
-//                 subprocesses namespace be relative to it.
+// ls <glob>...
+//   ls issues one or more globs using the local in-process namespace. It
+//   writes: RN=<number of items> and then R0=<name> to R(N-1)=<name>
+//   lines of output. If -l is specified, then each line includes a trailing
+//   set of detailed information, enclosed in [].
 //
+// lse <glob>...
+//   lse issues one or more globs from a subprocess and hence the
+//   subprocesses namespace. NAMESPACE_ROOT can be set in the shell's
+//   environment prior to calling lse to have the subprocesses namespace
+//   be relative to it. The output format is the same ls.
+//
+// resolve <name>...
+//    resolves name using the in-process namespace, the results are written
+//    to stdout as variables R<n>=<addr>
+// resolveMT <name>...
+//    resolves name to obtain the mount table hosting it using
+//    the in-process namespace. The results are written as R<n>=<addr>
+//    as per resolve.
+//
+// setRoots <name>...
+//    sets the local namespace's roots to the supplied names.
+// echoClient
+// echoServer
 package core
 
 import "veyron.io/veyron/veyron/lib/modules"
 
 const (
-	RootMTCommand     = "root"
-	MTCommand         = "mt"
-	LSCommand         = "ls"
-	LSExternalCommand = "lse"
-	// TODO(cnicolaou): provide a simple service that can be mounted.
-	MountCommand = "mount"
+	RootMTCommand            = "root"
+	MTCommand                = "mt"
+	LSCommand                = "ls"
+	LSExternalCommand        = "lse"
+	SetNamespaceRootsCommand = "setRoots"
+	ResolveCommand           = "resolve"
+	ResolveMTCommand         = "resolveMT"
+	EchoServerCommand        = "echoServer"
+	EchoClientCommand        = "echoClient"
 )
 
+// NewShell returns a new Shell instance with the core commands installed.
 func NewShell() *modules.Shell {
 	shell := modules.NewShell()
+	Install(shell)
+	return shell
+}
+
+// Install installs the core commands into the supplied Shell.
+func Install(shell *modules.Shell) {
 	shell.AddSubprocess(RootMTCommand, "")
 	shell.AddSubprocess(MTCommand, `<mount point>
 	reads NAMESPACE_ROOT from its environment and mounts a new mount table at <mount point>`)
@@ -43,5 +71,14 @@
 	issues glob requests using the current processes namespace library`)
 	shell.AddSubprocess(LSExternalCommand, `<glob>...
 	runs a subprocess to issue glob requests using the subprocesses namespace library`)
-	return shell
+	shell.AddFunction(ResolveCommand, resolveObject, `<name>
+	resolves name to obtain an object server address`)
+	shell.AddFunction(ResolveMTCommand, resolveMT, `<name>
+	resolves name to obtain a mount table address`)
+	shell.AddFunction(SetNamespaceRootsCommand, setNamespaceRoots, `<name>...
+	set the in-process namespace roots to <name>...`)
+	shell.AddSubprocess(EchoClientCommand, `<name> <message>...
+	invokes name.Echo(message)`)
+	shell.AddSubprocess(EchoServerCommand, `<name> <text>
+	runs an Echo server mounted at <name> and configured to return <text>: as a prefix in its response`)
 }
diff --git a/lib/modules/core/core_test.go b/lib/modules/core/core_test.go
index b741bc7..6223cdc 100644
--- a/lib/modules/core/core_test.go
+++ b/lib/modules/core/core_test.go
@@ -1,12 +1,15 @@
 package core_test
 
 import (
+	"fmt"
 	"os"
 	"reflect"
 	"sort"
+	"strconv"
 	"testing"
 	"time"
 
+	"veyron.io/veyron/veyron2/naming"
 	"veyron.io/veyron/veyron2/rt"
 
 	"veyron.io/veyron/veyron/lib/expect"
@@ -41,7 +44,40 @@
 	s.ExpectVar("MT_NAME")
 	s.ExpectVar("PID")
 	root.CloseStdin()
-	s.Expect("done")
+	s.Expect("PASS")
+}
+
+func startMountTables(t *testing.T, sh *modules.Shell, mountPoints ...string) (map[string]string, error) {
+	// Start root mount table
+	root, err := sh.Start(core.RootMTCommand)
+	if err != nil {
+		t.Fatalf("unexpected error for root mt: %s", err)
+	}
+	rootSession := expect.NewSession(t, root.Stdout(), time.Minute)
+	rootName := rootSession.ExpectVar("MT_NAME")
+	if t.Failed() {
+		return nil, rootSession.Error()
+	}
+	sh.SetVar("NAMESPACE_ROOT", rootName)
+	mountAddrs := make(map[string]string)
+	mountAddrs["root"] = rootName
+
+	// Start the mount tables
+	for _, mp := range mountPoints {
+		h, err := sh.Start(core.MTCommand, mp)
+		if err != nil {
+			return nil, fmt.Errorf("unexpected error for mt %q: %s", mp, err)
+		}
+		s := expect.NewSession(t, h.Stdout(), time.Minute)
+		// Wait until each mount table has at least called Serve to
+		// mount itself.
+		mountAddrs[mp] = s.ExpectVar("MT_NAME")
+		if s.Failed() {
+			return nil, s.Error()
+		}
+	}
+	return mountAddrs, nil
+
 }
 
 func getMatchingMountpoint(r [][]string) string {
@@ -70,46 +106,28 @@
 		defer shell.Cleanup(nil)
 	}
 
-	// Start root mount table
-	root, err := shell.Start(core.RootMTCommand)
+	mountPoints := []string{"a", "b", "c", "d", "e"}
+	mountAddrs, err := startMountTables(t, shell, mountPoints...)
 	if err != nil {
 		t.Fatalf("unexpected error: %s", err)
 	}
-	rootSession := expect.NewSession(t, root.Stdout(), time.Minute)
-	rootName := rootSession.ExpectVar("MT_NAME")
-	shell.SetVar("NAMESPACE_ROOT", rootName)
-
-	if t.Failed() {
-		return
-	}
-	mountPoints := []string{"a", "b", "c", "d", "e"}
-
-	// Start 3 mount tables
-	for _, mp := range mountPoints {
-		h, err := shell.Start(core.MTCommand, mp)
-		if err != nil {
-			t.Fatalf("unexpected error: %s", err)
-		}
-		s := expect.NewSession(t, h.Stdout(), time.Minute)
-		// Wait until each mount table has at least called Serve to
-		// mount itself.
-		s.ExpectVar("MT_NAME")
-
-	}
-
+	rootName := mountAddrs["root"]
 	ls, err := shell.Start(core.LSCommand, rootName+"/...")
 	if err != nil {
 		t.Fatalf("unexpected error: %s", err)
 	}
 	lsSession := expect.NewSession(t, ls.Stdout(), time.Minute)
-
 	lsSession.SetVerbosity(testing.Verbose())
-	lsSession.Expect(rootName)
+
+	if got, want := lsSession.ExpectVar("RN"), strconv.Itoa(len(mountPoints)+1); got != want {
+		t.Fatalf("got %v, want %v", got, want)
+	}
+	lsSession.Expect("R0=" + rootName)
 
 	// Look for names that correspond to the mountpoints above (i.e, a, b or c)
 	pattern := ""
 	for _, n := range mountPoints {
-		pattern = pattern + "(" + rootName + "/(" + n + ")$)|"
+		pattern = pattern + "^R[\\d]+=(" + rootName + "/(" + n + ")$)|"
 	}
 	pattern = pattern[:len(pattern)-1]
 
@@ -131,12 +149,16 @@
 	lseSession := expect.NewSession(t, lse.Stdout(), time.Minute)
 	lseSession.SetVerbosity(testing.Verbose())
 
+	if got, want := lseSession.ExpectVar("RN"), strconv.Itoa(len(mountPoints)); got != want {
+		t.Fatalf("got %v, want %v", got, want)
+	}
+
 	pattern = ""
 	for _, n := range mountPoints {
 		// Since the LSExternalCommand runs in a subprocess with NAMESPACE_ROOT
 		// set to the name of the root mount table it sees to the relative name
 		// format of the mounted mount tables.
-		pattern = pattern + "(^" + n + "$)|"
+		pattern = pattern + "^R[\\d]+=(" + n + "$)|"
 	}
 	pattern = pattern[:len(pattern)-1]
 	found = []string{}
@@ -150,6 +172,113 @@
 	}
 }
 
+func TestEcho(t *testing.T) {
+	shell := core.NewShell()
+	if testing.Verbose() {
+		defer shell.Cleanup(os.Stderr)
+	} else {
+		defer shell.Cleanup(nil)
+	}
+	srv, err := shell.Start(core.EchoServerCommand, "test", "")
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	srvSession := expect.NewSession(t, srv.Stdout(), time.Minute)
+	name := srvSession.ExpectVar("NAME")
+	if len(name) == 0 {
+		t.Fatalf("failed to get name")
+	}
+
+	clt, err := shell.Start(core.EchoClientCommand, name, "a message")
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	cltSession := expect.NewSession(t, clt.Stdout(), time.Minute)
+	cltSession.Expect("test: a message")
+}
+
+func TestResolve(t *testing.T) {
+	shell := core.NewShell()
+	if testing.Verbose() {
+		defer shell.Cleanup(os.Stderr)
+	} else {
+		defer shell.Cleanup(nil)
+	}
+	mountPoints := []string{"a", "b"}
+	mountAddrs, err := startMountTables(t, shell, mountPoints...)
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	rootName := mountAddrs["root"]
+	mtName := "b"
+	echoName := naming.Join(mtName, "echo")
+	srv, err := shell.Start(core.EchoServerCommand, "test", echoName)
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	srvSession := expect.NewSession(t, srv.Stdout(), time.Minute)
+	srvSession.ExpectVar("NAME")
+	addr := srvSession.ExpectVar("ADDR")
+	addr = naming.JoinAddressName(addr, "//")
+
+	// Resolve an object
+	resolver, err := shell.Start(core.ResolveCommand, rootName+"/"+echoName)
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	resolverSession := expect.NewSession(t, resolver.Stdout(), time.Minute)
+	if got, want := resolverSession.ExpectVar("RN"), "1"; got != want {
+		t.Fatalf("got %v, want %v", got, want)
+	}
+	if got, want := resolverSession.ExpectVar("R0"), addr; got != want {
+		t.Errorf("got %v, want %v", got, want)
+	}
+	if err = resolver.Shutdown(nil); err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+
+	// Resolve to a mount table using a rooted name.
+	addr = naming.JoinAddressName(mountAddrs[mtName], "//echo")
+	resolver, err = shell.Start(core.ResolveMTCommand, rootName+"/"+echoName)
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	resolverSession = expect.NewSession(t, resolver.Stdout(), time.Minute)
+	if got, want := resolverSession.ExpectVar("RN"), "1"; got != want {
+		t.Fatalf("got %v, want %v", got, want)
+	}
+	if got, want := resolverSession.ExpectVar("R0"), addr; got != want {
+		t.Fatalf("got %v, want %v", got, want)
+	}
+	if err := resolver.Shutdown(nil); err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+
+	// Resolve to a mount table, but using a relative name.
+	nsroots, err := shell.Start(core.SetNamespaceRootsCommand, rootName)
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	if err := nsroots.Shutdown(nil); err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+
+	resolver, err = shell.Start(core.ResolveMTCommand, echoName)
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	resolverSession = expect.NewSession(t, resolver.Stdout(), time.Minute)
+	if got, want := resolverSession.ExpectVar("RN"), "1"; got != want {
+		t.Fatalf("got %v, want %v", got, want)
+	}
+	if got, want := resolverSession.ExpectVar("R0"), addr; got != want {
+		t.Fatalf("got %v, want %v", got, want)
+	}
+	if err := resolver.Shutdown(nil); err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+}
+
 func TestHelperProcess(t *testing.T) {
 	if !modules.IsTestHelperProcess() {
 		return
diff --git a/lib/modules/core/echo.go b/lib/modules/core/echo.go
new file mode 100644
index 0000000..ae37372
--- /dev/null
+++ b/lib/modules/core/echo.go
@@ -0,0 +1,78 @@
+package core
+
+import (
+	"fmt"
+	"io"
+	"os"
+
+	"veyron.io/veyron/veyron2/ipc"
+	"veyron.io/veyron/veyron2/naming"
+	"veyron.io/veyron/veyron2/rt"
+
+	"veyron.io/veyron/veyron/lib/modules"
+)
+
+func init() {
+	modules.RegisterChild(EchoServerCommand, echoServer)
+	modules.RegisterChild(EchoClientCommand, echoClient)
+}
+
+type echoServerObject struct {
+	id string
+}
+
+func (es *echoServerObject) Echo(call ipc.ServerCall, m string) (string, error) {
+	return es.id + ": " + m + "\n", nil
+}
+
+func echoServer(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+	if len(args) != 2 {
+		return fmt.Errorf("wrong # args")
+	}
+	id, mountPoint := args[0], args[1]
+	server, err := rt.R().NewServer()
+	if err != nil {
+		return err
+	}
+	defer server.Stop()
+	ep, err := server.Listen("tcp", "127.0.0.1:0")
+	if err != nil {
+		return err
+	}
+	if err := server.Serve(mountPoint, ipc.LeafDispatcher(&echoServerObject{id: id}, nil)); err != nil {
+		return err
+	}
+	fmt.Fprintf(stdout, "NAME=%s\n", naming.MakeTerminal(naming.JoinAddressName(ep.String(), "")))
+	fmt.Fprintf(stdout, "ADDR=%s\n", ep.String())
+	fmt.Fprintf(stdout, "PID=%d\n", os.Getpid())
+	modules.WaitForEOF(stdin)
+	return nil
+}
+
+func echoClient(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+
+	if len(args) < 2 {
+		return fmt.Errorf("wrong # args")
+	}
+	name := args[0]
+	args = args[1:]
+	client := rt.R().Client()
+	for _, a := range args {
+		ctxt := rt.R().NewContext()
+		h, err := client.StartCall(ctxt, name, "Echo", []interface{}{a})
+		if err != nil {
+			return err
+		}
+		var r string
+		var apperr error
+		if err := h.Finish(&r, &apperr); err != nil {
+			return err
+		} else {
+			if apperr != nil {
+				return apperr
+			}
+		}
+		fmt.Fprintf(stdout, r)
+	}
+	return nil
+}
diff --git a/lib/modules/core/mounttable.go b/lib/modules/core/mounttable.go
index f95cfad..1c523e0 100644
--- a/lib/modules/core/mounttable.go
+++ b/lib/modules/core/mounttable.go
@@ -7,6 +7,7 @@
 	"strings"
 
 	"veyron.io/veyron/veyron2"
+	"veyron.io/veyron/veyron2/context"
 	"veyron.io/veyron/veyron2/naming"
 	"veyron.io/veyron/veyron2/rt"
 
@@ -59,7 +60,6 @@
 	fmt.Fprintf(stdout, "MT_NAME=%s\n", name)
 	fmt.Fprintf(stdout, "PID=%d\n", os.Getpid())
 	modules.WaitForEOF(stdin)
-	fmt.Fprintf(stdout, "done\n")
 	return nil
 }
 
@@ -69,8 +69,9 @@
 		details = true
 		args = args[1:]
 	}
-
 	ns := rt.R().Namespace()
+	entry := 0
+	output := ""
 	for _, pattern := range args {
 		ch, err := ns.Glob(rt.R().NewContext(), pattern)
 		if err != nil {
@@ -78,19 +79,55 @@
 		}
 		for n := range ch {
 			if details {
-				fmt.Fprintf(stdout, "%s [", n.Name)
+				output += fmt.Sprintf("R%d=%s[", entry, n.Name)
 				t := ""
 				for _, s := range n.Servers {
-					t += fmt.Sprintf("%s:%ss, ", s.Server, s.TTL)
+					output += fmt.Sprintf("%s:%ss, ", s.Server, s.TTL)
 				}
 				t = strings.TrimSuffix(t, ", ")
-				fmt.Fprintf(stdout, "%s]\n", t)
+				output += fmt.Sprintf("%s]\n", t)
+				entry += 1
 			} else {
 				if len(n.Name) > 0 {
-					fmt.Fprintf(stdout, "%s\n", n.Name)
+					output += fmt.Sprintf("R%d=%s\n", entry, n.Name)
+					entry += 1
 				}
 			}
+
 		}
 	}
+	fmt.Fprintf(stdout, "RN=%d\n", entry)
+	fmt.Fprint(stdout, output)
 	return nil
 }
+
+type resolver func(ctx context.T, name string) (names []string, err error)
+
+func resolve(fn resolver, stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+	if len(args) != 1 {
+		return fmt.Errorf("wrong # args")
+	}
+	name := args[0]
+	servers, err := fn(rt.R().NewContext(), name)
+	if err != nil {
+		return err
+	}
+	fmt.Fprintf(stdout, "RN=%d\n", len(servers))
+	for i, s := range servers {
+		fmt.Fprintf(stdout, "R%d=%s\n", i, s)
+	}
+	return nil
+}
+
+func resolveObject(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+	return resolve(rt.R().Namespace().Resolve, stdin, stdout, stderr, env, args...)
+}
+
+func resolveMT(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+	return resolve(rt.R().Namespace().ResolveToMountTable, stdin, stdout, stderr, env, args...)
+}
+
+func setNamespaceRoots(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+	ns := rt.R().Namespace()
+	return ns.SetRoots(args...)
+}
diff --git a/lib/modules/exec.go b/lib/modules/exec.go
index 3f27c3e..5d63b4a 100644
--- a/lib/modules/exec.go
+++ b/lib/modules/exec.go
@@ -26,7 +26,7 @@
 	handle     *vexec.ParentHandle
 	sh         *Shell
 	stderr     *os.File
-	stdout     *bufio.Reader
+	stdout     io.ReadCloser
 	stdin      io.WriteCloser
 }
 
@@ -68,7 +68,7 @@
 	return &execHandle{entryPoint: entryPoint}
 }
 
-func (eh *execHandle) Stdout() *bufio.Reader {
+func (eh *execHandle) Stdout() io.Reader {
 	eh.mu.Lock()
 	defer eh.mu.Unlock()
 	return eh.stdout
@@ -152,7 +152,7 @@
 	}
 
 	handle := vexec.NewParentHandle(cmd)
-	eh.stdout = bufio.NewReader(stdout)
+	eh.stdout = stdout
 	eh.stderr = stderr
 	eh.stdin = stdin
 	eh.handle = handle
diff --git a/lib/modules/func.go b/lib/modules/func.go
index 5a7c149..f44127b 100644
--- a/lib/modules/func.go
+++ b/lib/modules/func.go
@@ -6,7 +6,6 @@
 	"io"
 	"os"
 	"sync"
-	"syscall"
 )
 
 type pipe struct {
@@ -16,7 +15,6 @@
 	mu                    sync.Mutex
 	main                  Main
 	stdin, stderr, stdout pipe
-	bufferedStdout        *bufio.Reader
 	err                   error
 	sh                    *Shell
 	wg                    sync.WaitGroup
@@ -26,10 +24,10 @@
 	return &functionHandle{main: main}
 }
 
-func (fh *functionHandle) Stdout() *bufio.Reader {
+func (fh *functionHandle) Stdout() io.Reader {
 	fh.mu.Lock()
 	defer fh.mu.Unlock()
-	return fh.bufferedStdout
+	return fh.stdout.r
 }
 
 func (fh *functionHandle) Stderr() io.Reader {
@@ -46,9 +44,8 @@
 
 func (fh *functionHandle) CloseStdin() {
 	fh.mu.Lock()
-	fd := fh.stdin.w.Fd()
+	fh.stdin.w.Close()
 	fh.mu.Unlock()
-	syscall.Close(int(fd))
 }
 
 func (fh *functionHandle) start(sh *Shell, args ...string) (Handle, error) {
@@ -61,21 +58,25 @@
 			return nil, err
 		}
 	}
-	fh.bufferedStdout = bufio.NewReader(fh.stdout.r)
 	fh.wg.Add(1)
 
 	go func() {
-		err := fh.main(fh.stdin.r, fh.stdout.w, fh.stderr.w, sh.mergeOSEnv(), args...)
-		if err != nil {
-			fmt.Fprintf(fh.stderr.w, "%s\n", err)
-		}
 		fh.mu.Lock()
-		// We close these files using the Close system call since there
-		// may be an oustanding read on them that would otherwise trigger
-		// a test failure with go test -race
-		syscall.Close(int(fh.stdin.w.Fd()))
-		syscall.Close(int(fh.stdout.r.Fd()))
-		syscall.Close(int(fh.stderr.r.Fd()))
+		stdin := fh.stdin.r
+		stdout := fh.stdout.w
+		stderr := fh.stderr.w
+		main := fh.main
+		fh.mu.Unlock()
+
+		err := main(stdin, stdout, stderr, sh.mergeOSEnv(), args...)
+		if err != nil {
+			fmt.Fprintf(stderr, "%s\n", err)
+		}
+
+		fh.mu.Lock()
+		fh.stdin.r.Close()
+		fh.stdout.w.Close()
+		fh.stderr.w.Close()
 		fh.err = err
 		fh.mu.Unlock()
 		fh.wg.Done()
@@ -85,18 +86,22 @@
 
 func (fh *functionHandle) Shutdown(output io.Writer) error {
 	fh.mu.Lock()
-	syscall.Close(int(fh.stdin.w.Fd()))
-	if output != nil {
-		scanner := bufio.NewScanner(fh.stderr.r)
-		for scanner.Scan() {
-			fmt.Fprintf(output, "%s\n", scanner.Text())
-		}
-	}
+	fh.stdin.w.Close()
+	stderr := fh.stderr.r
 	fh.mu.Unlock()
 
-	fh.wg.Wait()
+	if output != nil {
+		scanner := bufio.NewScanner(stderr)
+		for scanner.Scan() {
+			l := scanner.Text()
+			fmt.Fprintf(output, "%s\n", l)
+		}
+	}
 
+	fh.wg.Wait()
 	fh.mu.Lock()
+	fh.stdout.r.Close()
+	fh.stderr.r.Close()
 	err := fh.err
 	fh.sh.forget(fh)
 	fh.mu.Unlock()
diff --git a/lib/modules/modules_internal_test.go b/lib/modules/modules_internal_test.go
index 349869b..920538d 100644
--- a/lib/modules/modules_internal_test.go
+++ b/lib/modules/modules_internal_test.go
@@ -34,22 +34,24 @@
 	sh.AddSubprocess("echonotregistered", "[args]*")
 	sh.AddSubprocess("echos", "[args]*")
 	sh.AddFunction("echof", Echo, "[args]*")
-
 	assertNumHandles(t, sh, 0)
+
 	_, _ = sh.Start("echonotregistered") // won't start.
 	hs, _ := sh.Start("echos", "a")
 	hf, _ := sh.Start("echof", "b")
-
 	assertNumHandles(t, sh, 2)
+
 	for i, h := range []Handle{hs, hf} {
 		if got := h.Shutdown(nil); got != nil {
 			t.Errorf("%d: got %q, want %q", i, got, nil)
 		}
 	}
 	assertNumHandles(t, sh, 0)
+
 	hs, _ = sh.Start("echos", "a", "b")
 	hf, _ = sh.Start("echof", "c")
 	assertNumHandles(t, sh, 2)
+
 	sh.Cleanup(nil)
 	assertNumHandles(t, sh, 0)
 }
diff --git a/lib/modules/shell.go b/lib/modules/shell.go
index f3f9f97..7ceaf24 100644
--- a/lib/modules/shell.go
+++ b/lib/modules/shell.go
@@ -37,7 +37,6 @@
 package modules
 
 import (
-	"bufio"
 	"fmt"
 	"io"
 	"strings"
@@ -68,6 +67,8 @@
 
 // NewShell creates a new instance of Shell.
 func NewShell() *Shell {
+	// TODO(cnicolaou): should create a new identity if one doesn't
+	// already exist
 	return &Shell{
 		env:     make(map[string]string),
 		cmds:    make(map[string]*commandDesc),
@@ -212,10 +213,10 @@
 
 // Handle represents a running command.
 type Handle interface {
-	// Stdout returns a buffered reader to the running command's stdout stream.
-	Stdout() *bufio.Reader
+	// Stdout returns a reader to the running command's stdout stream.
+	Stdout() io.Reader
 
-	// Stderr returns an unbuffered reader to the running command's stderr
+	// Stderr returns a reader to the running command's stderr
 	// stream.
 	Stderr() io.Reader