Merge "veyron/lib/modules: new API for running common services and components."
diff --git a/lib/modules/exec.go b/lib/modules/exec.go
new file mode 100644
index 0000000..b364171
--- /dev/null
+++ b/lib/modules/exec.go
@@ -0,0 +1,229 @@
+package modules
+import (
+ "bufio"
+ "flag"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "os/exec"
+ "strings"
+ "sync"
+ "time"
+ "veyron2/vlog"
+ // TODO(cnicolaou): move this to veyron/lib.
+ vexec "veyron/services/mgmt/lib/exec"
+// execHandle implements both the command and Handle interfaces.
+type execHandle struct {
+ mu sync.Mutex
+ cmd *exec.Cmd
+ entryPoint string
+ handle *vexec.ParentHandle
+ stderr *os.File
+ stdout *bufio.Reader
+ stdin io.WriteCloser
+func testFlags() []string {
+ var fl []string
+ // pass logging flags to any subprocesses
+ for fname, fval := range vlog.Log.ExplicitlySetFlags() {
+ fl = append(fl, "--"+fname+"="+fval)
+ }
+ timeout := flag.Lookup("test.timeout")
+ if timeout == nil {
+ // not a go test binary
+ return fl
+ }
+ // must be a go test binary
+ fl = append(fl, "")
+ val := timeout.Value.(flag.Getter).Get().(time.Duration)
+ if val.String() != timeout.DefValue {
+ // use supplied command value for subprocesses
+ fl = append(fl, "--test.timeout="+timeout.Value.String())
+ } else {
+ // translate default value into 1m for subproccesses
+ fl = append(fl, "--test.timeout=1m")
+ }
+ return fl
+// IsTestSubprocess returns true if it is called in via -run=TestHelperProcess
+// which normally only ever happens for subprocess run from tests.
+func IsTestHelperProcess() bool {
+ runFlag := flag.Lookup("")
+ if runFlag == nil {
+ return false
+ }
+ return runFlag.Value.String() == "TestHelperProcess"
+func newExecHandle(entryPoint string) command {
+ return &execHandle{entryPoint: entryPoint}
+func (eh *execHandle) Stdout() *bufio.Reader {
+ defer
+ return eh.stdout
+func (eh *execHandle) Stderr() io.Reader {
+ defer
+ return eh.stderr
+func (eh *execHandle) Stdin() io.WriteCloser {
+ defer
+ return eh.stdin
+// mergeOSEnv returns a slice contained the merged set of environment
+// variables from the OS environment and those in this Shell, preferring
+// values in the Shell environment over those found in the OS environment.
+func (sh *Shell) mergeOSEnvSlice() []string {
+ merged := sh.mergeOSEnv()
+ env := []string{}
+ for k, v := range merged {
+ env = append(env, k+"="+v)
+ }
+ return env
+func osEnvironMap() map[string]string {
+ m := make(map[string]string)
+ for _, osv := range os.Environ() {
+ if len(osv) == 0 {
+ continue
+ }
+ parts := strings.SplitN(osv, "=", 2)
+ key := parts[0]
+ if len(parts) == 2 {
+ m[key] = parts[1]
+ } else {
+ m[key] = ""
+ }
+ }
+ return m
+func (sh *Shell) mergeOSEnv() map[string]string {
+ merged := osEnvironMap()
+ for k, v := range sh.env {
+ merged[k] = v
+ }
+ return merged
+func (eh *execHandle) start(sh *Shell, args ...string) (Handle, error) {
+ defer
+ newargs := append(testFlags(), args...)
+ cmd := exec.Command(os.Args[0], newargs...)
+ cmd.Env = append(sh.mergeOSEnvSlice(), eh.entryPoint)
+ stderr, err := ioutil.TempFile("", "__modules__"+strings.TrimLeft(eh.entryPoint, "-\n\t "))
+ if err != nil {
+ return nil, err
+ }
+ cmd.Stderr = stderr
+ stdout, err := cmd.StdoutPipe()
+ if err != nil {
+ return nil, err
+ }
+ stdin, err := cmd.StdinPipe()
+ if err != nil {
+ return nil, err
+ }
+ handle := vexec.NewParentHandle(cmd)
+ eh.stdout = bufio.NewReader(stdout)
+ eh.stderr = stderr
+ eh.stdin = stdin
+ eh.handle = handle
+ eh.cmd = cmd
+ if err := handle.Start(); err != nil {
+ return nil, err
+ }
+ err = handle.WaitForReady(time.Second)
+ return eh, err
+func (eh *execHandle) Shutdown(output io.Writer) {
+ defer
+ eh.stdin.Close()
+ if eh.stderr != nil {
+ defer func() {
+ eh.stderr.Close()
+ os.Remove(eh.stderr.Name())
+ }()
+ if output == nil {
+ return
+ }
+ if _, err := eh.stderr.Seek(0, 0); err != nil {
+ return
+ }
+ scanner := bufio.NewScanner(eh.stderr)
+ for scanner.Scan() {
+ fmt.Fprintf(output, "%s\n", scanner.Text())
+ }
+ }
+func RegisterChild(name string, main Main) {
+ child.Lock()
+ defer child.Unlock()
+ child.mains[name] = main
+func Dispatch() error {
+ return child.dispatch()
+func (child *childRegistrar) hasCommand(name string) bool {
+ child.Lock()
+ _, present := child.mains[name]
+ child.Unlock()
+ return present
+func (child *childRegistrar) dispatch() error {
+ command := os.Getenv(shellEntryPoint)
+ if len(command) == 0 {
+ return fmt.Errorf("Failed to find entrypoint %q", shellEntryPoint)
+ }
+ child.Lock()
+ m := child.mains[command]
+ child.Unlock()
+ if m == nil {
+ return fmt.Errorf("Shell command %q not registered", command)
+ }
+ go func(pid int) {
+ for {
+ _, err := os.FindProcess(pid)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Looks like our parent exited: %v", err)
+ os.Exit(1)
+ }
+ time.Sleep(time.Second)
+ }
+ }(os.Getppid())
+ ch, err := vexec.GetChildHandle()
+ if err == nil {
+ // Only signal that the child is ready if we successfully get
+ // a child handle. We most likely failed to get a child handle
+ // because the subprocess was run directly from the command line.
+ ch.SetReady()
+ }
+ return m(os.Stdin, os.Stdout, os.Stderr, osEnvironMap(), flag.Args()...)
diff --git a/lib/modules/func.go b/lib/modules/func.go
new file mode 100644
index 0000000..5d6160c
--- /dev/null
+++ b/lib/modules/func.go
@@ -0,0 +1,75 @@
+package modules
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "os"
+ "sync"
+type pipe struct {
+ r, w *os.File
+type functionHandle struct {
+ mu sync.Mutex
+ main Main
+ stdin, stderr, stdout pipe
+ bufferedStdout *bufio.Reader
+func newFunctionHandle(main Main) command {
+ return &functionHandle{main: main}
+func (fh *functionHandle) Stdout() *bufio.Reader {
+ defer
+ return fh.bufferedStdout
+func (fh *functionHandle) Stderr() io.Reader {
+ defer
+ return fh.stderr.r
+func (fh *functionHandle) Stdin() io.WriteCloser {
+ defer
+ return fh.stdin.w
+func (fh *functionHandle) start(sh *Shell, args ...string) (Handle, error) {
+ defer
+ for _, p := range []*pipe{&fh.stdin, &fh.stdout, &fh.stderr} {
+ var err error
+ if p.r, p.w, err = os.Pipe(); err != nil {
+ return nil, err
+ }
+ }
+ fh.bufferedStdout = bufio.NewReader(fh.stdout.r)
+ go func() {
+ err := fh.main(fh.stdin.r, fh.stdout.w, fh.stderr.w, sh.mergeOSEnv(), args...)
+ if err != nil {
+ fmt.Fprintf(fh.stderr.w, "%s\n", err)
+ }
+ fh.stdin.r.Close()
+ fh.stdout.w.Close()
+ fh.stderr.w.Close()
+ }()
+ return fh, nil
+func (fh *functionHandle) Shutdown(output io.Writer) {
+ defer
+ scanner := bufio.NewScanner(fh.stderr.r)
+ for scanner.Scan() {
+ fmt.Fprintf(output, "%s\n", scanner.Text())
+ }
+ fh.stdin.w.Close()
+ fh.stdout.r.Close()
+ fh.stderr.r.Close()
diff --git a/lib/modules/modules_test.go b/lib/modules/modules_test.go
new file mode 100644
index 0000000..81739b7
--- /dev/null
+++ b/lib/modules/modules_test.go
@@ -0,0 +1,98 @@
+package modules_test
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "os"
+ "testing"
+ "time"
+ "veyron/lib/modules"
+func init() {
+ modules.RegisterChild("envtest", PrintEnv)
+func PrintEnv(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+ for _, a := range args {
+ if v := env[a]; len(v) > 0 {
+ fmt.Fprintf(stdout, a+"="+v+"\n")
+ } else {
+ fmt.Fprintf(stderr, "missing %s\n", a)
+ }
+ }
+ buf := [1]byte{0x0}
+ stdin.Read(buf[:])
+ fmt.Fprintf(stdout, "done\n")
+ return nil
+func waitForInput(scanner *bufio.Scanner) bool {
+ ch := make(chan struct{})
+ go func(ch chan<- struct{}) {
+ scanner.Scan()
+ ch <- struct{}{}
+ }(ch)
+ select {
+ case <-ch:
+ return true
+ case <-time.After(10 * time.Second):
+ return false
+ }
+func testCommand(t *testing.T, sh *modules.Shell, name, key, val string) {
+ h, err := sh.Start(name, key)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ defer func() {
+ sh.Cleanup(os.Stderr)
+ }()
+ scanner := bufio.NewScanner(h.Stdout())
+ if !waitForInput(scanner) {
+ t.Errorf("timeout")
+ return
+ }
+ if got, want := scanner.Text(), key+"="+val; got != want {
+ t.Errorf("got %q, want %q", got, want)
+ }
+ h.Stdin().Close()
+ if !waitForInput(scanner) {
+ t.Errorf("timeout")
+ return
+ }
+ if got, want := scanner.Text(), "done"; got != want {
+ t.Errorf("got %q, want %q", got, want)
+ }
+func TestChild(t *testing.T) {
+ sh := modules.NewShell()
+ key, val := "simpleVar", "foo & bar"
+ sh.SetVar(key, val)
+ sh.AddSubprocess("envtest", "envtest: <variables to print>...")
+ testCommand(t, sh, "envtest", key, val)
+func TestFunction(t *testing.T) {
+ sh := modules.NewShell()
+ key, val := "simpleVar", "foo & bar & baz"
+ sh.SetVar(key, val)
+ sh.AddFunction("envtest", PrintEnv, "envtest: <variables to print>...")
+ testCommand(t, sh, "envtest", key, val)
+func TestHelperProcess(t *testing.T) {
+ if !modules.IsTestHelperProcess() {
+ return
+ }
+ if err := modules.Dispatch(); err != nil {
+ t.Fatalf("failed: %v", err)
+ }
+// TODO(cnicolaou): more complete tests for environment variables,
+// OS being overridden by Shell for example.
diff --git a/lib/modules/shell.go b/lib/modules/shell.go
new file mode 100644
index 0000000..09941b8
--- /dev/null
+++ b/lib/modules/shell.go
@@ -0,0 +1,238 @@
+// Package modules provides a mechanism for running commonly used services
+// as subprocesses and client functionality for accessing those services.
+// Such services and functions are collectively called 'commands' and are
+// registered with and executed within a context, defined by the Shell type.
+// The Shell is analagous to the original UNIX shell and maintains a
+// key, value store of variables that is accessible to all of the commands that
+// it hosts. These variables may be referenced by the arguments passed to
+// commands.
+// Commands are added to a shell in two ways: one for a subprocess and another
+// for an inprocess function.
+// - subprocesses are added using the AddSubprocess method in the parent
+// and by the modules.RegisterChild function in the child process (typically
+// RegisterChild is called from an init function). modules.Dispatch must
+// be called in the child process to execute the subprocess 'Main' function
+// provided to RegisterChild.
+// - inprocess functions are added using the AddFunction method.
+// In all cases commands are started by invoking the Start method on the
+// Shell with the name of the command to run. An instance of the Handle
+// interface is returned which can be used to interact with the function
+// or subprocess, and in particular to read/write data from/to it using io
+// channels that follow the stdin, stdout, stderr convention.
+// A simple protocol must be followed by all commands, namely, they
+// should wait for their stdin stream to be closed before exiting. The
+// caller can then coordinate with any command by writing to that stdin
+// stream and reading responses from the stdout stream, and it can close
+// stdin when it's ready for the command to exit.
+// The signature of the function that implements the command is the
+// same for both types of command and is defined by the Main function type.
+// In particular stdin, stdout and stderr are provided as parameters, as is
+// a map representation of the shell's environment.
+package modules
+import (
+ "bufio"
+ "fmt"
+ "io"
+ "sync"
+ "veyron2/vlog"
+// Shell represents the context within which commands are run.
+type Shell struct {
+ mu sync.Mutex
+ env map[string]string
+ cmds map[string]*commandDesc
+ handles map[Handle]struct{}
+type commandDesc struct {
+ factory func() command
+ help string
+type childRegistrar struct {
+ sync.Mutex
+ mains map[string]Main
+var child = &childRegistrar{mains: make(map[string]Main)}
+// NewShell creates a new instance of Shell.
+func NewShell() *Shell {
+ return &Shell{
+ env: make(map[string]string),
+ cmds: make(map[string]*commandDesc),
+ handles: make(map[Handle]struct{}),
+ }
+type Main func(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error
+// AddSubprocess adds a new command to the Shell that will be run
+// as a subprocess. In addition, the child process must call RegisterChild
+// using the same name used here and provide the function to be executed
+// in the child.
+func (sh *Shell) AddSubprocess(name string, help string) {
+ if !child.hasCommand(name) {
+ vlog.Infof("Warning: %q is not registered with modules.Dispatcher", name)
+ }
+ entryPoint := shellEntryPoint + "=" + name
+ sh.cmds[name] = &commandDesc{func() command { return newExecHandle(entryPoint) }, help}
+// AddFunction adds a new command to the Shell that will be run
+// within the current process.
+func (sh *Shell) AddFunction(name string, main Main, help string) {
+ sh.cmds[name] = &commandDesc{func() command { return newFunctionHandle(main) }, help}
+// String returns a string representation of the Shell, which is a
+// concatenation of the help messages of each Command currently available
+// to it.
+func (sh *Shell) String() string {
+ defer
+ h := ""
+ for _, c := range sh.cmds {
+ h +=
+ }
+ return h
+// Help returns the help message for the specified command.
+func (sh *Shell) Help(command string) string {
+ defer
+ if c := sh.cmds[command]; c != nil {
+ return
+ }
+ return ""
+// Start starts the specified command, it returns a Handle which can be used
+// for interacting with that command. The Shell tracks all of the Handles
+// that it creates so that it can shut them down when asked to. If any
+// application calls Shutdown on a handle directly, it must call the Forget
+// method on the Shell instance hosting that Handle to avoid storage leaks.
+func (sh *Shell) Start(command string, args ...string) (Handle, error) {
+ cmd := sh.cmds[command]
+ if cmd == nil {
+ return nil, fmt.Errorf("command %q is not available", command)
+ }
+ expanded := sh.expand(args...)
+ h, err := cmd.factory().start(sh, expanded...)
+ if err != nil {
+ return nil, err
+ }
+ sh.handles[h] = struct{}{}
+ return h, nil
+// Forget tells the Shell to stop tracking the supplied Handle.
+func (sh *Shell) Forget(h Handle) {
+ delete(sh.handles, h)
+func (sh *Shell) expand(args ...string) []string {
+ exp := []string{}
+ for _, a := range args {
+ if len(a) > 0 && a[0] == '$' {
+ if v, present := sh.env[a[1:]]; present {
+ exp = append(exp, v)
+ continue
+ }
+ }
+ exp = append(exp, a)
+ }
+ return exp
+// GetVar returns the variable associated with the specified key
+// and an indication of whether it is defined or not.
+func (sh *Shell) GetVar(key string) (string, bool) {
+ defer
+ v, present := sh.env[key]
+ return v, present
+// SetVar sets the value to be associated with key.
+func (sh *Shell) SetVar(key, value string) {
+ defer
+ // TODO(cnicolaou): expand value
+ sh.env[key] = value
+// Env returns the entire set of environment variables associated with this
+// Shell as a string slice.
+func (sh *Shell) Env() []string {
+ vars := []string{}
+ defer
+ for k, v := range sh.env {
+ vars = append(vars, k+"="+v)
+ }
+ return vars
+// Cleanup calls Shutdown on all of the Handles currently being tracked
+// by the Shell. Any buffered output from the command's stderr stream
+// will be written to the supplied io.Writer. If the io.Writer is nil
+// then any such output is lost.
+func (sh *Shell) Cleanup(output io.Writer) {
+ defer
+ for k, _ := range sh.handles {
+ k.Shutdown(output)
+ }
+ sh.handles = make(map[Handle]struct{})
+// Handle represents a running command.
+type Handle interface {
+ // Stdout returns a buffered reader to the running command's stdout stream.
+ Stdout() *bufio.Reader
+ // Stderr returns an unbuffered reader to the running command's stderr
+ // stream.
+ Stderr() io.Reader
+ // Stdin returns a writer to the running command's stdin. The
+ // convention is for commands to wait for stdin to be closed before
+ // they exit, thus the caller should close stdin when it wants the
+ // command to exit cleanly.
+ Stdin() io.WriteCloser
+ // Shutdown closes the Stdin for the command. It is primarily intended
+ // for being called by the Shell, if other application code calls it
+ // then it should use the Shell's Forget method to have the Shell stop
+ // tracking the handle. Any buffered stderr output from the command will
+ // be written to the supplied io.Writer. If the io.Writer is nil then
+ // any such output is lost.
+ Shutdown(io.Writer)
+// command is used to abstract the implementations of inprocess and subprocess
+// commands.
+type command interface {
+ start(sh *Shell, args ...string) (Handle, error)