veyron/lib/modules: new API for running common services and components.
This CL provides a new API that is intended to replace testutil/blackbox
and testutil/modules. It will make it very easy to run subprocesses for
common services and a library for doing so will be provided in a subsequent
CL. It hides the differences between running services either in-process or
as subprocesses. It provides a 'Shell' abstraction for managing the
services, represented as 'commands', and creating a context for them that
includes the notion of environment variables. It is designed so that the
'Expect' family of routines in blackbox can be layered on top of this
and thus become more generally useful.
This CL is preliminary and contains only a simple example of a subprocess
and inprocess usage.
Change-Id: I91d829ba214c28982e58ade2caba514134d5db1b
diff --git a/lib/modules/exec.go b/lib/modules/exec.go
new file mode 100644
index 0000000..b364171
--- /dev/null
+++ b/lib/modules/exec.go
@@ -0,0 +1,229 @@
+package modules
+
+import (
+ "bufio"
+ "flag"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "os/exec"
+ "strings"
+ "sync"
+ "time"
+
+ "veyron2/vlog"
+
+ // TODO(cnicolaou): move this to veyron/lib.
+ vexec "veyron/services/mgmt/lib/exec"
+)
+
+// execHandle implements both the command and Handle interfaces.
+type execHandle struct {
+ mu sync.Mutex
+ cmd *exec.Cmd
+ entryPoint string
+ handle *vexec.ParentHandle
+ stderr *os.File
+ stdout *bufio.Reader
+ stdin io.WriteCloser
+}
+
+func testFlags() []string {
+ var fl []string
+ // pass logging flags to any subprocesses
+ for fname, fval := range vlog.Log.ExplicitlySetFlags() {
+ fl = append(fl, "--"+fname+"="+fval)
+ }
+ timeout := flag.Lookup("test.timeout")
+ if timeout == nil {
+ // not a go test binary
+ return fl
+ }
+ // must be a go test binary
+ fl = append(fl, "-test.run=TestHelperProcess")
+ val := timeout.Value.(flag.Getter).Get().(time.Duration)
+ if val.String() != timeout.DefValue {
+ // use supplied command value for subprocesses
+ fl = append(fl, "--test.timeout="+timeout.Value.String())
+ } else {
+ // translate default value into 1m for subproccesses
+ fl = append(fl, "--test.timeout=1m")
+ }
+ return fl
+}
+
+// IsTestSubprocess returns true if it is called in via -run=TestHelperProcess
+// which normally only ever happens for subprocess run from tests.
+func IsTestHelperProcess() bool {
+ runFlag := flag.Lookup("test.run")
+ if runFlag == nil {
+ return false
+ }
+ return runFlag.Value.String() == "TestHelperProcess"
+}
+
+func newExecHandle(entryPoint string) command {
+ return &execHandle{entryPoint: entryPoint}
+}
+
+func (eh *execHandle) Stdout() *bufio.Reader {
+ eh.mu.Lock()
+ defer eh.mu.Unlock()
+ return eh.stdout
+}
+
+func (eh *execHandle) Stderr() io.Reader {
+ eh.mu.Lock()
+ defer eh.mu.Unlock()
+ return eh.stderr
+}
+
+func (eh *execHandle) Stdin() io.WriteCloser {
+ eh.mu.Lock()
+ defer eh.mu.Unlock()
+ return eh.stdin
+}
+
+// mergeOSEnv returns a slice contained the merged set of environment
+// variables from the OS environment and those in this Shell, preferring
+// values in the Shell environment over those found in the OS environment.
+func (sh *Shell) mergeOSEnvSlice() []string {
+ merged := sh.mergeOSEnv()
+ env := []string{}
+ for k, v := range merged {
+ env = append(env, k+"="+v)
+ }
+ return env
+}
+
+func osEnvironMap() map[string]string {
+ m := make(map[string]string)
+ for _, osv := range os.Environ() {
+ if len(osv) == 0 {
+ continue
+ }
+ parts := strings.SplitN(osv, "=", 2)
+ key := parts[0]
+ if len(parts) == 2 {
+ m[key] = parts[1]
+ } else {
+ m[key] = ""
+ }
+ }
+ return m
+}
+func (sh *Shell) mergeOSEnv() map[string]string {
+ merged := osEnvironMap()
+ sh.mu.Lock()
+ for k, v := range sh.env {
+ merged[k] = v
+ }
+ sh.mu.Unlock()
+ return merged
+}
+
+func (eh *execHandle) start(sh *Shell, args ...string) (Handle, error) {
+ eh.mu.Lock()
+ defer eh.mu.Unlock()
+ newargs := append(testFlags(), args...)
+ cmd := exec.Command(os.Args[0], newargs...)
+ cmd.Env = append(sh.mergeOSEnvSlice(), eh.entryPoint)
+ stderr, err := ioutil.TempFile("", "__modules__"+strings.TrimLeft(eh.entryPoint, "-\n\t "))
+ if err != nil {
+ return nil, err
+ }
+ cmd.Stderr = stderr
+ stdout, err := cmd.StdoutPipe()
+ if err != nil {
+ return nil, err
+ }
+ stdin, err := cmd.StdinPipe()
+ if err != nil {
+ return nil, err
+ }
+
+ handle := vexec.NewParentHandle(cmd)
+ eh.stdout = bufio.NewReader(stdout)
+ eh.stderr = stderr
+ eh.stdin = stdin
+ eh.handle = handle
+ eh.cmd = cmd
+ if err := handle.Start(); err != nil {
+ return nil, err
+ }
+ err = handle.WaitForReady(time.Second)
+ return eh, err
+}
+
+func (eh *execHandle) Shutdown(output io.Writer) {
+ eh.mu.Lock()
+ defer eh.mu.Unlock()
+ eh.stdin.Close()
+ if eh.stderr != nil {
+ defer func() {
+ eh.stderr.Close()
+ os.Remove(eh.stderr.Name())
+ }()
+ if output == nil {
+ return
+ }
+ if _, err := eh.stderr.Seek(0, 0); err != nil {
+ return
+ }
+ scanner := bufio.NewScanner(eh.stderr)
+ for scanner.Scan() {
+ fmt.Fprintf(output, "%s\n", scanner.Text())
+ }
+ }
+}
+
+const shellEntryPoint = "VEYRON_SHELL_HELPER_PROCESS_ENTRY_POINT"
+
+func RegisterChild(name string, main Main) {
+ child.Lock()
+ defer child.Unlock()
+ child.mains[name] = main
+}
+
+func Dispatch() error {
+ return child.dispatch()
+}
+
+func (child *childRegistrar) hasCommand(name string) bool {
+ child.Lock()
+ _, present := child.mains[name]
+ child.Unlock()
+ return present
+}
+
+func (child *childRegistrar) dispatch() error {
+ command := os.Getenv(shellEntryPoint)
+ if len(command) == 0 {
+ return fmt.Errorf("Failed to find entrypoint %q", shellEntryPoint)
+ }
+ child.Lock()
+ m := child.mains[command]
+ child.Unlock()
+ if m == nil {
+ return fmt.Errorf("Shell command %q not registered", command)
+ }
+ go func(pid int) {
+ for {
+ _, err := os.FindProcess(pid)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Looks like our parent exited: %v", err)
+ os.Exit(1)
+ }
+ time.Sleep(time.Second)
+ }
+ }(os.Getppid())
+ ch, err := vexec.GetChildHandle()
+ if err == nil {
+ // Only signal that the child is ready if we successfully get
+ // a child handle. We most likely failed to get a child handle
+ // because the subprocess was run directly from the command line.
+ ch.SetReady()
+ }
+ return m(os.Stdin, os.Stdout, os.Stderr, osEnvironMap(), flag.Args()...)
+}
diff --git a/lib/modules/func.go b/lib/modules/func.go
new file mode 100644
index 0000000..5d6160c
--- /dev/null
+++ b/lib/modules/func.go
@@ -0,0 +1,75 @@
+package modules
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "os"
+ "sync"
+)
+
+type pipe struct {
+ r, w *os.File
+}
+type functionHandle struct {
+ mu sync.Mutex
+ main Main
+ stdin, stderr, stdout pipe
+ bufferedStdout *bufio.Reader
+}
+
+func newFunctionHandle(main Main) command {
+ return &functionHandle{main: main}
+}
+
+func (fh *functionHandle) Stdout() *bufio.Reader {
+ fh.mu.Lock()
+ defer fh.mu.Unlock()
+ return fh.bufferedStdout
+}
+
+func (fh *functionHandle) Stderr() io.Reader {
+ fh.mu.Lock()
+ defer fh.mu.Unlock()
+ return fh.stderr.r
+}
+
+func (fh *functionHandle) Stdin() io.WriteCloser {
+ fh.mu.Lock()
+ defer fh.mu.Unlock()
+ return fh.stdin.w
+}
+
+func (fh *functionHandle) start(sh *Shell, args ...string) (Handle, error) {
+ fh.mu.Lock()
+ defer fh.mu.Unlock()
+ for _, p := range []*pipe{&fh.stdin, &fh.stdout, &fh.stderr} {
+ var err error
+ if p.r, p.w, err = os.Pipe(); err != nil {
+ return nil, err
+ }
+ }
+ fh.bufferedStdout = bufio.NewReader(fh.stdout.r)
+ go func() {
+ err := fh.main(fh.stdin.r, fh.stdout.w, fh.stderr.w, sh.mergeOSEnv(), args...)
+ if err != nil {
+ fmt.Fprintf(fh.stderr.w, "%s\n", err)
+ }
+ fh.stdin.r.Close()
+ fh.stdout.w.Close()
+ fh.stderr.w.Close()
+ }()
+ return fh, nil
+}
+
+func (fh *functionHandle) Shutdown(output io.Writer) {
+ fh.mu.Lock()
+ defer fh.mu.Unlock()
+ scanner := bufio.NewScanner(fh.stderr.r)
+ for scanner.Scan() {
+ fmt.Fprintf(output, "%s\n", scanner.Text())
+ }
+ fh.stdin.w.Close()
+ fh.stdout.r.Close()
+ fh.stderr.r.Close()
+}
diff --git a/lib/modules/modules_test.go b/lib/modules/modules_test.go
new file mode 100644
index 0000000..81739b7
--- /dev/null
+++ b/lib/modules/modules_test.go
@@ -0,0 +1,98 @@
+package modules_test
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "os"
+ "testing"
+ "time"
+
+ "veyron/lib/modules"
+)
+
+func init() {
+ modules.RegisterChild("envtest", PrintEnv)
+}
+
+func PrintEnv(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+ for _, a := range args {
+ if v := env[a]; len(v) > 0 {
+ fmt.Fprintf(stdout, a+"="+v+"\n")
+ } else {
+ fmt.Fprintf(stderr, "missing %s\n", a)
+ }
+ }
+ buf := [1]byte{0x0}
+ stdin.Read(buf[:])
+ fmt.Fprintf(stdout, "done\n")
+ return nil
+}
+
+func waitForInput(scanner *bufio.Scanner) bool {
+ ch := make(chan struct{})
+ go func(ch chan<- struct{}) {
+ scanner.Scan()
+ ch <- struct{}{}
+ }(ch)
+ select {
+ case <-ch:
+ return true
+ case <-time.After(10 * time.Second):
+ return false
+ }
+}
+
+func testCommand(t *testing.T, sh *modules.Shell, name, key, val string) {
+ h, err := sh.Start(name, key)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ defer func() {
+ sh.Cleanup(os.Stderr)
+ }()
+ scanner := bufio.NewScanner(h.Stdout())
+ if !waitForInput(scanner) {
+ t.Errorf("timeout")
+ return
+ }
+ if got, want := scanner.Text(), key+"="+val; got != want {
+ t.Errorf("got %q, want %q", got, want)
+ }
+ h.Stdin().Close()
+ if !waitForInput(scanner) {
+ t.Errorf("timeout")
+ return
+ }
+ if got, want := scanner.Text(), "done"; got != want {
+ t.Errorf("got %q, want %q", got, want)
+ }
+}
+
+func TestChild(t *testing.T) {
+ sh := modules.NewShell()
+ key, val := "simpleVar", "foo & bar"
+ sh.SetVar(key, val)
+ sh.AddSubprocess("envtest", "envtest: <variables to print>...")
+ testCommand(t, sh, "envtest", key, val)
+}
+
+func TestFunction(t *testing.T) {
+ sh := modules.NewShell()
+ key, val := "simpleVar", "foo & bar & baz"
+ sh.SetVar(key, val)
+ sh.AddFunction("envtest", PrintEnv, "envtest: <variables to print>...")
+ testCommand(t, sh, "envtest", key, val)
+}
+
+func TestHelperProcess(t *testing.T) {
+ if !modules.IsTestHelperProcess() {
+ return
+ }
+ if err := modules.Dispatch(); err != nil {
+ t.Fatalf("failed: %v", err)
+ }
+}
+
+// TODO(cnicolaou): more complete tests for environment variables,
+// OS being overridden by Shell for example.
diff --git a/lib/modules/shell.go b/lib/modules/shell.go
new file mode 100644
index 0000000..09941b8
--- /dev/null
+++ b/lib/modules/shell.go
@@ -0,0 +1,238 @@
+// Package modules provides a mechanism for running commonly used services
+// as subprocesses and client functionality for accessing those services.
+// Such services and functions are collectively called 'commands' and are
+// registered with and executed within a context, defined by the Shell type.
+// The Shell is analagous to the original UNIX shell and maintains a
+// key, value store of variables that is accessible to all of the commands that
+// it hosts. These variables may be referenced by the arguments passed to
+// commands.
+//
+// Commands are added to a shell in two ways: one for a subprocess and another
+// for an inprocess function.
+//
+// - subprocesses are added using the AddSubprocess method in the parent
+// and by the modules.RegisterChild function in the child process (typically
+// RegisterChild is called from an init function). modules.Dispatch must
+// be called in the child process to execute the subprocess 'Main' function
+// provided to RegisterChild.
+// - inprocess functions are added using the AddFunction method.
+//
+// In all cases commands are started by invoking the Start method on the
+// Shell with the name of the command to run. An instance of the Handle
+// interface is returned which can be used to interact with the function
+// or subprocess, and in particular to read/write data from/to it using io
+// channels that follow the stdin, stdout, stderr convention.
+//
+// A simple protocol must be followed by all commands, namely, they
+// should wait for their stdin stream to be closed before exiting. The
+// caller can then coordinate with any command by writing to that stdin
+// stream and reading responses from the stdout stream, and it can close
+// stdin when it's ready for the command to exit.
+//
+// The signature of the function that implements the command is the
+// same for both types of command and is defined by the Main function type.
+// In particular stdin, stdout and stderr are provided as parameters, as is
+// a map representation of the shell's environment.
+package modules
+
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "sync"
+
+ "veyron2/vlog"
+)
+
+// Shell represents the context within which commands are run.
+type Shell struct {
+ mu sync.Mutex
+ env map[string]string
+ cmds map[string]*commandDesc
+ handles map[Handle]struct{}
+}
+
+type commandDesc struct {
+ factory func() command
+ help string
+}
+
+type childRegistrar struct {
+ sync.Mutex
+ mains map[string]Main
+}
+
+var child = &childRegistrar{mains: make(map[string]Main)}
+
+// NewShell creates a new instance of Shell.
+func NewShell() *Shell {
+ return &Shell{
+ env: make(map[string]string),
+ cmds: make(map[string]*commandDesc),
+ handles: make(map[Handle]struct{}),
+ }
+}
+
+type Main func(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error
+
+// AddSubprocess adds a new command to the Shell that will be run
+// as a subprocess. In addition, the child process must call RegisterChild
+// using the same name used here and provide the function to be executed
+// in the child.
+func (sh *Shell) AddSubprocess(name string, help string) {
+ if !child.hasCommand(name) {
+ vlog.Infof("Warning: %q is not registered with modules.Dispatcher", name)
+ }
+ entryPoint := shellEntryPoint + "=" + name
+ sh.mu.Lock()
+ sh.cmds[name] = &commandDesc{func() command { return newExecHandle(entryPoint) }, help}
+ sh.mu.Unlock()
+}
+
+// AddFunction adds a new command to the Shell that will be run
+// within the current process.
+func (sh *Shell) AddFunction(name string, main Main, help string) {
+ sh.mu.Lock()
+ sh.cmds[name] = &commandDesc{func() command { return newFunctionHandle(main) }, help}
+ sh.mu.Unlock()
+}
+
+// String returns a string representation of the Shell, which is a
+// concatenation of the help messages of each Command currently available
+// to it.
+func (sh *Shell) String() string {
+ sh.mu.Lock()
+ defer sh.mu.Unlock()
+ h := ""
+ for _, c := range sh.cmds {
+ h += c.help
+ }
+ return h
+}
+
+// Help returns the help message for the specified command.
+func (sh *Shell) Help(command string) string {
+ sh.mu.Lock()
+ defer sh.mu.Unlock()
+ if c := sh.cmds[command]; c != nil {
+ return c.help
+ }
+ return ""
+}
+
+// Start starts the specified command, it returns a Handle which can be used
+// for interacting with that command. The Shell tracks all of the Handles
+// that it creates so that it can shut them down when asked to. If any
+// application calls Shutdown on a handle directly, it must call the Forget
+// method on the Shell instance hosting that Handle to avoid storage leaks.
+func (sh *Shell) Start(command string, args ...string) (Handle, error) {
+ sh.mu.Lock()
+ cmd := sh.cmds[command]
+ if cmd == nil {
+ sh.mu.Unlock()
+ return nil, fmt.Errorf("command %q is not available", command)
+ }
+ expanded := sh.expand(args...)
+ sh.mu.Unlock()
+ h, err := cmd.factory().start(sh, expanded...)
+ if err != nil {
+ return nil, err
+ }
+ sh.mu.Lock()
+ sh.handles[h] = struct{}{}
+ sh.mu.Unlock()
+ return h, nil
+}
+
+// Forget tells the Shell to stop tracking the supplied Handle.
+func (sh *Shell) Forget(h Handle) {
+ sh.mu.Lock()
+ delete(sh.handles, h)
+ sh.mu.Unlock()
+}
+
+func (sh *Shell) expand(args ...string) []string {
+ exp := []string{}
+ for _, a := range args {
+ if len(a) > 0 && a[0] == '$' {
+ if v, present := sh.env[a[1:]]; present {
+ exp = append(exp, v)
+ continue
+ }
+ }
+ exp = append(exp, a)
+ }
+ return exp
+}
+
+// GetVar returns the variable associated with the specified key
+// and an indication of whether it is defined or not.
+func (sh *Shell) GetVar(key string) (string, bool) {
+ sh.mu.Lock()
+ defer sh.mu.Unlock()
+ v, present := sh.env[key]
+ return v, present
+}
+
+// SetVar sets the value to be associated with key.
+func (sh *Shell) SetVar(key, value string) {
+ sh.mu.Lock()
+ defer sh.mu.Unlock()
+ // TODO(cnicolaou): expand value
+ sh.env[key] = value
+}
+
+// Env returns the entire set of environment variables associated with this
+// Shell as a string slice.
+func (sh *Shell) Env() []string {
+ vars := []string{}
+ sh.mu.Lock()
+ defer sh.mu.Unlock()
+ for k, v := range sh.env {
+ vars = append(vars, k+"="+v)
+ }
+ return vars
+}
+
+// Cleanup calls Shutdown on all of the Handles currently being tracked
+// by the Shell. Any buffered output from the command's stderr stream
+// will be written to the supplied io.Writer. If the io.Writer is nil
+// then any such output is lost.
+func (sh *Shell) Cleanup(output io.Writer) {
+ sh.mu.Lock()
+ defer sh.mu.Unlock()
+ for k, _ := range sh.handles {
+ k.Shutdown(output)
+ }
+ sh.handles = make(map[Handle]struct{})
+}
+
+// Handle represents a running command.
+type Handle interface {
+ // Stdout returns a buffered reader to the running command's stdout stream.
+ Stdout() *bufio.Reader
+
+ // Stderr returns an unbuffered reader to the running command's stderr
+ // stream.
+ Stderr() io.Reader
+
+ // Stdin returns a writer to the running command's stdin. The
+ // convention is for commands to wait for stdin to be closed before
+ // they exit, thus the caller should close stdin when it wants the
+ // command to exit cleanly.
+ Stdin() io.WriteCloser
+
+ // Shutdown closes the Stdin for the command. It is primarily intended
+ // for being called by the Shell, if other application code calls it
+ // then it should use the Shell's Forget method to have the Shell stop
+ // tracking the handle. Any buffered stderr output from the command will
+ // be written to the supplied io.Writer. If the io.Writer is nil then
+ // any such output is lost.
+ Shutdown(io.Writer)
+}
+
+// command is used to abstract the implementations of inprocess and subprocess
+// commands.
+type command interface {
+ start(sh *Shell, args ...string) (Handle, error)
+}