veyron/lib/modules/core: expand the set of core modules.
Expand the set of core modules provided, cleanup various bugs
along the way.
Change-Id: I7f4966b91549a4edca049e908d6945bd03ef8172
diff --git a/lib/expect/expect.go b/lib/expect/expect.go
index 1573147..d5f738d 100644
--- a/lib/expect/expect.go
+++ b/lib/expect/expect.go
@@ -71,8 +71,8 @@
}
// NewSession creates a new Session. The parameter t may be safely be nil.
-func NewSession(t Testing, input *bufio.Reader, timeout time.Duration) *Session {
- return &Session{t: t, timeout: timeout, input: input}
+func NewSession(t Testing, input io.Reader, timeout time.Duration) *Session {
+ return &Session{t: t, timeout: timeout, input: bufio.NewReader(input)}
}
// Failed returns true if an error has been encountered by a prior call.
@@ -130,9 +130,8 @@
func readAll(r *bufio.Reader) (string, error) {
all := ""
for {
- buf := make([]byte, 4096*4)
- n, err := r.Read(buf)
- all += string(buf[:n])
+ l, err := r.ReadString('\n')
+ all += l
if err != nil {
if err == io.EOF {
return all, nil
@@ -241,6 +240,7 @@
// ReadLine reads the next line, if any, from the input stream. It will set
// the error state to io.EOF if it has read past the end of the stream.
+// ReadLine has no effect if an error has already occurred.
func (s *Session) ReadLine() string {
if s.Failed() {
return ""
@@ -255,9 +255,25 @@
// ReadAll reads all remaining input on the stream. Unlike all of the other
// methods it does not strip newlines from the input.
+// ReadAll has no effect if an error has already occurred.
func (s *Session) ReadAll() (string, error) {
if s.Failed() {
return "", s.err
}
return s.read(readAll)
}
+
+// Finish reads all remaining input on the stream regardless of any
+// prior errors and writes it to the supplied io.Writer parameter if non-nil.
+// It returns both the data read and the prior error, if any, otherwise it
+// returns any error that occurred reading the rest of the input.
+func (s *Session) Finish(w io.Writer) (string, error) {
+ a, err := s.read(readAll)
+ if w != nil {
+ fmt.Fprint(w, a)
+ }
+ if s.Failed() {
+ return a, s.err
+ }
+ return a, err
+}
diff --git a/lib/modules/core/core.go b/lib/modules/core/core.go
index f2851f7..980313e 100644
--- a/lib/modules/core/core.go
+++ b/lib/modules/core/core.go
@@ -3,39 +3,67 @@
//
// The available commands are:
//
-// root - runs a root mount table as a subprocess
-// prints the MT_NAME=<root name>, PID=<pid> variables to stdout
-// waits for stdin to be closed before it exits
-// prints "done" to stdout when stdin is closed.
-// mt <mp> - runs a mount table as a subprocess mounted on <mp>
-// NAMESPACE_ROOT should be set to the name of another mount table
-// (e.g. the value of MT_NAME from a root) in the shell's
-// environment. mt similarly prints MT_NAME, PID and waits for stdout
-// as per the root
-// command
+// root
+// runs a root mount table as a subprocess
+// prints the MT_NAME=<root name>, PID=<pid> variables to stdout
+// waits for stdin to be closed before it exits
+// prints "done" to stdout when stdin is closed.
+// mt <mp>
+// runs a mount table as a subprocess mounted on <mp>
+// NAMESPACE_ROOT should be set to the name of another mount table
+// (e.g. the value of MT_NAME from a root) in the shell's environment.
+// mt similarly prints MT_NAME, PID and waits for stdout as per the root
+// command
//
-// ls <glob>... - ls issues one or more globs using the local,
-// in-process namespace
-// lse <glob>... - lse issues one or more globs from a subprocess and hence
-// the subprocesses namespace. NAMESPACE_ROOT can be set in
-// the shell's environment prior to calling lse to have the
-// subprocesses namespace be relative to it.
+// ls <glob>...
+// ls issues one or more globs using the local in-process namespace. It
+// writes: RN=<number of items> and then R0=<name> to R(N-1)=<name>
+// lines of output. If -l is specified, then each line includes a trailing
+// set of detailed information, enclosed in [].
//
+// lse <glob>...
+// lse issues one or more globs from a subprocess and hence the
+// subprocesses namespace. NAMESPACE_ROOT can be set in the shell's
+// environment prior to calling lse to have the subprocesses namespace
+// be relative to it. The output format is the same ls.
+//
+// resolve <name>...
+// resolves name using the in-process namespace, the results are written
+// to stdout as variables R<n>=<addr>
+// resolveMT <name>...
+// resolves name to obtain the mount table hosting it using
+// the in-process namespace. The results are written as R<n>=<addr>
+// as per resolve.
+//
+// setRoots <name>...
+// sets the local namespace's roots to the supplied names.
+// echoClient
+// echoServer
package core
import "veyron.io/veyron/veyron/lib/modules"
const (
- RootMTCommand = "root"
- MTCommand = "mt"
- LSCommand = "ls"
- LSExternalCommand = "lse"
- // TODO(cnicolaou): provide a simple service that can be mounted.
- MountCommand = "mount"
+ RootMTCommand = "root"
+ MTCommand = "mt"
+ LSCommand = "ls"
+ LSExternalCommand = "lse"
+ SetNamespaceRootsCommand = "setRoots"
+ ResolveCommand = "resolve"
+ ResolveMTCommand = "resolveMT"
+ EchoServerCommand = "echoServer"
+ EchoClientCommand = "echoClient"
)
+// NewShell returns a new Shell instance with the core commands installed.
func NewShell() *modules.Shell {
shell := modules.NewShell()
+ Install(shell)
+ return shell
+}
+
+// Install installs the core commands into the supplied Shell.
+func Install(shell *modules.Shell) {
shell.AddSubprocess(RootMTCommand, "")
shell.AddSubprocess(MTCommand, `<mount point>
reads NAMESPACE_ROOT from its environment and mounts a new mount table at <mount point>`)
@@ -43,5 +71,14 @@
issues glob requests using the current processes namespace library`)
shell.AddSubprocess(LSExternalCommand, `<glob>...
runs a subprocess to issue glob requests using the subprocesses namespace library`)
- return shell
+ shell.AddFunction(ResolveCommand, resolveObject, `<name>
+ resolves name to obtain an object server address`)
+ shell.AddFunction(ResolveMTCommand, resolveMT, `<name>
+ resolves name to obtain a mount table address`)
+ shell.AddFunction(SetNamespaceRootsCommand, setNamespaceRoots, `<name>...
+ set the in-process namespace roots to <name>...`)
+ shell.AddSubprocess(EchoClientCommand, `<name> <message>...
+ invokes name.Echo(message)`)
+ shell.AddSubprocess(EchoServerCommand, `<name> <text>
+ runs an Echo server mounted at <name> and configured to return <text>: as a prefix in its response`)
}
diff --git a/lib/modules/core/core_test.go b/lib/modules/core/core_test.go
index b741bc7..6223cdc 100644
--- a/lib/modules/core/core_test.go
+++ b/lib/modules/core/core_test.go
@@ -1,12 +1,15 @@
package core_test
import (
+ "fmt"
"os"
"reflect"
"sort"
+ "strconv"
"testing"
"time"
+ "veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron2/rt"
"veyron.io/veyron/veyron/lib/expect"
@@ -41,7 +44,40 @@
s.ExpectVar("MT_NAME")
s.ExpectVar("PID")
root.CloseStdin()
- s.Expect("done")
+ s.Expect("PASS")
+}
+
+func startMountTables(t *testing.T, sh *modules.Shell, mountPoints ...string) (map[string]string, error) {
+ // Start root mount table
+ root, err := sh.Start(core.RootMTCommand)
+ if err != nil {
+ t.Fatalf("unexpected error for root mt: %s", err)
+ }
+ rootSession := expect.NewSession(t, root.Stdout(), time.Minute)
+ rootName := rootSession.ExpectVar("MT_NAME")
+ if t.Failed() {
+ return nil, rootSession.Error()
+ }
+ sh.SetVar("NAMESPACE_ROOT", rootName)
+ mountAddrs := make(map[string]string)
+ mountAddrs["root"] = rootName
+
+ // Start the mount tables
+ for _, mp := range mountPoints {
+ h, err := sh.Start(core.MTCommand, mp)
+ if err != nil {
+ return nil, fmt.Errorf("unexpected error for mt %q: %s", mp, err)
+ }
+ s := expect.NewSession(t, h.Stdout(), time.Minute)
+ // Wait until each mount table has at least called Serve to
+ // mount itself.
+ mountAddrs[mp] = s.ExpectVar("MT_NAME")
+ if s.Failed() {
+ return nil, s.Error()
+ }
+ }
+ return mountAddrs, nil
+
}
func getMatchingMountpoint(r [][]string) string {
@@ -70,46 +106,28 @@
defer shell.Cleanup(nil)
}
- // Start root mount table
- root, err := shell.Start(core.RootMTCommand)
+ mountPoints := []string{"a", "b", "c", "d", "e"}
+ mountAddrs, err := startMountTables(t, shell, mountPoints...)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
- rootSession := expect.NewSession(t, root.Stdout(), time.Minute)
- rootName := rootSession.ExpectVar("MT_NAME")
- shell.SetVar("NAMESPACE_ROOT", rootName)
-
- if t.Failed() {
- return
- }
- mountPoints := []string{"a", "b", "c", "d", "e"}
-
- // Start 3 mount tables
- for _, mp := range mountPoints {
- h, err := shell.Start(core.MTCommand, mp)
- if err != nil {
- t.Fatalf("unexpected error: %s", err)
- }
- s := expect.NewSession(t, h.Stdout(), time.Minute)
- // Wait until each mount table has at least called Serve to
- // mount itself.
- s.ExpectVar("MT_NAME")
-
- }
-
+ rootName := mountAddrs["root"]
ls, err := shell.Start(core.LSCommand, rootName+"/...")
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
lsSession := expect.NewSession(t, ls.Stdout(), time.Minute)
-
lsSession.SetVerbosity(testing.Verbose())
- lsSession.Expect(rootName)
+
+ if got, want := lsSession.ExpectVar("RN"), strconv.Itoa(len(mountPoints)+1); got != want {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+ lsSession.Expect("R0=" + rootName)
// Look for names that correspond to the mountpoints above (i.e, a, b or c)
pattern := ""
for _, n := range mountPoints {
- pattern = pattern + "(" + rootName + "/(" + n + ")$)|"
+ pattern = pattern + "^R[\\d]+=(" + rootName + "/(" + n + ")$)|"
}
pattern = pattern[:len(pattern)-1]
@@ -131,12 +149,16 @@
lseSession := expect.NewSession(t, lse.Stdout(), time.Minute)
lseSession.SetVerbosity(testing.Verbose())
+ if got, want := lseSession.ExpectVar("RN"), strconv.Itoa(len(mountPoints)); got != want {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+
pattern = ""
for _, n := range mountPoints {
// Since the LSExternalCommand runs in a subprocess with NAMESPACE_ROOT
// set to the name of the root mount table it sees to the relative name
// format of the mounted mount tables.
- pattern = pattern + "(^" + n + "$)|"
+ pattern = pattern + "^R[\\d]+=(" + n + "$)|"
}
pattern = pattern[:len(pattern)-1]
found = []string{}
@@ -150,6 +172,113 @@
}
}
+func TestEcho(t *testing.T) {
+ shell := core.NewShell()
+ if testing.Verbose() {
+ defer shell.Cleanup(os.Stderr)
+ } else {
+ defer shell.Cleanup(nil)
+ }
+ srv, err := shell.Start(core.EchoServerCommand, "test", "")
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ srvSession := expect.NewSession(t, srv.Stdout(), time.Minute)
+ name := srvSession.ExpectVar("NAME")
+ if len(name) == 0 {
+ t.Fatalf("failed to get name")
+ }
+
+ clt, err := shell.Start(core.EchoClientCommand, name, "a message")
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ cltSession := expect.NewSession(t, clt.Stdout(), time.Minute)
+ cltSession.Expect("test: a message")
+}
+
+func TestResolve(t *testing.T) {
+ shell := core.NewShell()
+ if testing.Verbose() {
+ defer shell.Cleanup(os.Stderr)
+ } else {
+ defer shell.Cleanup(nil)
+ }
+ mountPoints := []string{"a", "b"}
+ mountAddrs, err := startMountTables(t, shell, mountPoints...)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ rootName := mountAddrs["root"]
+ mtName := "b"
+ echoName := naming.Join(mtName, "echo")
+ srv, err := shell.Start(core.EchoServerCommand, "test", echoName)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ srvSession := expect.NewSession(t, srv.Stdout(), time.Minute)
+ srvSession.ExpectVar("NAME")
+ addr := srvSession.ExpectVar("ADDR")
+ addr = naming.JoinAddressName(addr, "//")
+
+ // Resolve an object
+ resolver, err := shell.Start(core.ResolveCommand, rootName+"/"+echoName)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ resolverSession := expect.NewSession(t, resolver.Stdout(), time.Minute)
+ if got, want := resolverSession.ExpectVar("RN"), "1"; got != want {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+ if got, want := resolverSession.ExpectVar("R0"), addr; got != want {
+ t.Errorf("got %v, want %v", got, want)
+ }
+ if err = resolver.Shutdown(nil); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+
+ // Resolve to a mount table using a rooted name.
+ addr = naming.JoinAddressName(mountAddrs[mtName], "//echo")
+ resolver, err = shell.Start(core.ResolveMTCommand, rootName+"/"+echoName)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ resolverSession = expect.NewSession(t, resolver.Stdout(), time.Minute)
+ if got, want := resolverSession.ExpectVar("RN"), "1"; got != want {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+ if got, want := resolverSession.ExpectVar("R0"), addr; got != want {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+ if err := resolver.Shutdown(nil); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+
+ // Resolve to a mount table, but using a relative name.
+ nsroots, err := shell.Start(core.SetNamespaceRootsCommand, rootName)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ if err := nsroots.Shutdown(nil); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+
+ resolver, err = shell.Start(core.ResolveMTCommand, echoName)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ resolverSession = expect.NewSession(t, resolver.Stdout(), time.Minute)
+ if got, want := resolverSession.ExpectVar("RN"), "1"; got != want {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+ if got, want := resolverSession.ExpectVar("R0"), addr; got != want {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+ if err := resolver.Shutdown(nil); err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+}
+
func TestHelperProcess(t *testing.T) {
if !modules.IsTestHelperProcess() {
return
diff --git a/lib/modules/core/echo.go b/lib/modules/core/echo.go
new file mode 100644
index 0000000..ae37372
--- /dev/null
+++ b/lib/modules/core/echo.go
@@ -0,0 +1,78 @@
+package core
+
+import (
+ "fmt"
+ "io"
+ "os"
+
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/naming"
+ "veyron.io/veyron/veyron2/rt"
+
+ "veyron.io/veyron/veyron/lib/modules"
+)
+
+func init() {
+ modules.RegisterChild(EchoServerCommand, echoServer)
+ modules.RegisterChild(EchoClientCommand, echoClient)
+}
+
+type echoServerObject struct {
+ id string
+}
+
+func (es *echoServerObject) Echo(call ipc.ServerCall, m string) (string, error) {
+ return es.id + ": " + m + "\n", nil
+}
+
+func echoServer(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+ if len(args) != 2 {
+ return fmt.Errorf("wrong # args")
+ }
+ id, mountPoint := args[0], args[1]
+ server, err := rt.R().NewServer()
+ if err != nil {
+ return err
+ }
+ defer server.Stop()
+ ep, err := server.Listen("tcp", "127.0.0.1:0")
+ if err != nil {
+ return err
+ }
+ if err := server.Serve(mountPoint, ipc.LeafDispatcher(&echoServerObject{id: id}, nil)); err != nil {
+ return err
+ }
+ fmt.Fprintf(stdout, "NAME=%s\n", naming.MakeTerminal(naming.JoinAddressName(ep.String(), "")))
+ fmt.Fprintf(stdout, "ADDR=%s\n", ep.String())
+ fmt.Fprintf(stdout, "PID=%d\n", os.Getpid())
+ modules.WaitForEOF(stdin)
+ return nil
+}
+
+func echoClient(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+
+ if len(args) < 2 {
+ return fmt.Errorf("wrong # args")
+ }
+ name := args[0]
+ args = args[1:]
+ client := rt.R().Client()
+ for _, a := range args {
+ ctxt := rt.R().NewContext()
+ h, err := client.StartCall(ctxt, name, "Echo", []interface{}{a})
+ if err != nil {
+ return err
+ }
+ var r string
+ var apperr error
+ if err := h.Finish(&r, &apperr); err != nil {
+ return err
+ } else {
+ if apperr != nil {
+ return apperr
+ }
+ }
+ fmt.Fprintf(stdout, r)
+ }
+ return nil
+}
diff --git a/lib/modules/core/mounttable.go b/lib/modules/core/mounttable.go
index f95cfad..1c523e0 100644
--- a/lib/modules/core/mounttable.go
+++ b/lib/modules/core/mounttable.go
@@ -7,6 +7,7 @@
"strings"
"veyron.io/veyron/veyron2"
+ "veyron.io/veyron/veyron2/context"
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron2/rt"
@@ -59,7 +60,6 @@
fmt.Fprintf(stdout, "MT_NAME=%s\n", name)
fmt.Fprintf(stdout, "PID=%d\n", os.Getpid())
modules.WaitForEOF(stdin)
- fmt.Fprintf(stdout, "done\n")
return nil
}
@@ -69,8 +69,9 @@
details = true
args = args[1:]
}
-
ns := rt.R().Namespace()
+ entry := 0
+ output := ""
for _, pattern := range args {
ch, err := ns.Glob(rt.R().NewContext(), pattern)
if err != nil {
@@ -78,19 +79,55 @@
}
for n := range ch {
if details {
- fmt.Fprintf(stdout, "%s [", n.Name)
+ output += fmt.Sprintf("R%d=%s[", entry, n.Name)
t := ""
for _, s := range n.Servers {
- t += fmt.Sprintf("%s:%ss, ", s.Server, s.TTL)
+ output += fmt.Sprintf("%s:%ss, ", s.Server, s.TTL)
}
t = strings.TrimSuffix(t, ", ")
- fmt.Fprintf(stdout, "%s]\n", t)
+ output += fmt.Sprintf("%s]\n", t)
+ entry += 1
} else {
if len(n.Name) > 0 {
- fmt.Fprintf(stdout, "%s\n", n.Name)
+ output += fmt.Sprintf("R%d=%s\n", entry, n.Name)
+ entry += 1
}
}
+
}
}
+ fmt.Fprintf(stdout, "RN=%d\n", entry)
+ fmt.Fprint(stdout, output)
return nil
}
+
+type resolver func(ctx context.T, name string) (names []string, err error)
+
+func resolve(fn resolver, stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+ if len(args) != 1 {
+ return fmt.Errorf("wrong # args")
+ }
+ name := args[0]
+ servers, err := fn(rt.R().NewContext(), name)
+ if err != nil {
+ return err
+ }
+ fmt.Fprintf(stdout, "RN=%d\n", len(servers))
+ for i, s := range servers {
+ fmt.Fprintf(stdout, "R%d=%s\n", i, s)
+ }
+ return nil
+}
+
+func resolveObject(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+ return resolve(rt.R().Namespace().Resolve, stdin, stdout, stderr, env, args...)
+}
+
+func resolveMT(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+ return resolve(rt.R().Namespace().ResolveToMountTable, stdin, stdout, stderr, env, args...)
+}
+
+func setNamespaceRoots(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+ ns := rt.R().Namespace()
+ return ns.SetRoots(args...)
+}
diff --git a/lib/modules/exec.go b/lib/modules/exec.go
index 3f27c3e..5d63b4a 100644
--- a/lib/modules/exec.go
+++ b/lib/modules/exec.go
@@ -26,7 +26,7 @@
handle *vexec.ParentHandle
sh *Shell
stderr *os.File
- stdout *bufio.Reader
+ stdout io.ReadCloser
stdin io.WriteCloser
}
@@ -68,7 +68,7 @@
return &execHandle{entryPoint: entryPoint}
}
-func (eh *execHandle) Stdout() *bufio.Reader {
+func (eh *execHandle) Stdout() io.Reader {
eh.mu.Lock()
defer eh.mu.Unlock()
return eh.stdout
@@ -152,7 +152,7 @@
}
handle := vexec.NewParentHandle(cmd)
- eh.stdout = bufio.NewReader(stdout)
+ eh.stdout = stdout
eh.stderr = stderr
eh.stdin = stdin
eh.handle = handle
diff --git a/lib/modules/func.go b/lib/modules/func.go
index 5a7c149..f44127b 100644
--- a/lib/modules/func.go
+++ b/lib/modules/func.go
@@ -6,7 +6,6 @@
"io"
"os"
"sync"
- "syscall"
)
type pipe struct {
@@ -16,7 +15,6 @@
mu sync.Mutex
main Main
stdin, stderr, stdout pipe
- bufferedStdout *bufio.Reader
err error
sh *Shell
wg sync.WaitGroup
@@ -26,10 +24,10 @@
return &functionHandle{main: main}
}
-func (fh *functionHandle) Stdout() *bufio.Reader {
+func (fh *functionHandle) Stdout() io.Reader {
fh.mu.Lock()
defer fh.mu.Unlock()
- return fh.bufferedStdout
+ return fh.stdout.r
}
func (fh *functionHandle) Stderr() io.Reader {
@@ -46,9 +44,8 @@
func (fh *functionHandle) CloseStdin() {
fh.mu.Lock()
- fd := fh.stdin.w.Fd()
+ fh.stdin.w.Close()
fh.mu.Unlock()
- syscall.Close(int(fd))
}
func (fh *functionHandle) start(sh *Shell, args ...string) (Handle, error) {
@@ -61,21 +58,25 @@
return nil, err
}
}
- fh.bufferedStdout = bufio.NewReader(fh.stdout.r)
fh.wg.Add(1)
go func() {
- err := fh.main(fh.stdin.r, fh.stdout.w, fh.stderr.w, sh.mergeOSEnv(), args...)
- if err != nil {
- fmt.Fprintf(fh.stderr.w, "%s\n", err)
- }
fh.mu.Lock()
- // We close these files using the Close system call since there
- // may be an oustanding read on them that would otherwise trigger
- // a test failure with go test -race
- syscall.Close(int(fh.stdin.w.Fd()))
- syscall.Close(int(fh.stdout.r.Fd()))
- syscall.Close(int(fh.stderr.r.Fd()))
+ stdin := fh.stdin.r
+ stdout := fh.stdout.w
+ stderr := fh.stderr.w
+ main := fh.main
+ fh.mu.Unlock()
+
+ err := main(stdin, stdout, stderr, sh.mergeOSEnv(), args...)
+ if err != nil {
+ fmt.Fprintf(stderr, "%s\n", err)
+ }
+
+ fh.mu.Lock()
+ fh.stdin.r.Close()
+ fh.stdout.w.Close()
+ fh.stderr.w.Close()
fh.err = err
fh.mu.Unlock()
fh.wg.Done()
@@ -85,18 +86,22 @@
func (fh *functionHandle) Shutdown(output io.Writer) error {
fh.mu.Lock()
- syscall.Close(int(fh.stdin.w.Fd()))
- if output != nil {
- scanner := bufio.NewScanner(fh.stderr.r)
- for scanner.Scan() {
- fmt.Fprintf(output, "%s\n", scanner.Text())
- }
- }
+ fh.stdin.w.Close()
+ stderr := fh.stderr.r
fh.mu.Unlock()
- fh.wg.Wait()
+ if output != nil {
+ scanner := bufio.NewScanner(stderr)
+ for scanner.Scan() {
+ l := scanner.Text()
+ fmt.Fprintf(output, "%s\n", l)
+ }
+ }
+ fh.wg.Wait()
fh.mu.Lock()
+ fh.stdout.r.Close()
+ fh.stderr.r.Close()
err := fh.err
fh.sh.forget(fh)
fh.mu.Unlock()
diff --git a/lib/modules/modules_internal_test.go b/lib/modules/modules_internal_test.go
index 349869b..920538d 100644
--- a/lib/modules/modules_internal_test.go
+++ b/lib/modules/modules_internal_test.go
@@ -34,22 +34,24 @@
sh.AddSubprocess("echonotregistered", "[args]*")
sh.AddSubprocess("echos", "[args]*")
sh.AddFunction("echof", Echo, "[args]*")
-
assertNumHandles(t, sh, 0)
+
_, _ = sh.Start("echonotregistered") // won't start.
hs, _ := sh.Start("echos", "a")
hf, _ := sh.Start("echof", "b")
-
assertNumHandles(t, sh, 2)
+
for i, h := range []Handle{hs, hf} {
if got := h.Shutdown(nil); got != nil {
t.Errorf("%d: got %q, want %q", i, got, nil)
}
}
assertNumHandles(t, sh, 0)
+
hs, _ = sh.Start("echos", "a", "b")
hf, _ = sh.Start("echof", "c")
assertNumHandles(t, sh, 2)
+
sh.Cleanup(nil)
assertNumHandles(t, sh, 0)
}
diff --git a/lib/modules/shell.go b/lib/modules/shell.go
index f3f9f97..7ceaf24 100644
--- a/lib/modules/shell.go
+++ b/lib/modules/shell.go
@@ -37,7 +37,6 @@
package modules
import (
- "bufio"
"fmt"
"io"
"strings"
@@ -68,6 +67,8 @@
// NewShell creates a new instance of Shell.
func NewShell() *Shell {
+ // TODO(cnicolaou): should create a new identity if one doesn't
+ // already exist
return &Shell{
env: make(map[string]string),
cmds: make(map[string]*commandDesc),
@@ -212,10 +213,10 @@
// Handle represents a running command.
type Handle interface {
- // Stdout returns a buffered reader to the running command's stdout stream.
- Stdout() *bufio.Reader
+ // Stdout returns a reader to the running command's stdout stream.
+ Stdout() io.Reader
- // Stderr returns an unbuffered reader to the running command's stderr
+ // Stderr returns a reader to the running command's stderr
// stream.
Stderr() io.Reader