Merge "veyron/lib/modules: new API for running common services and components."
diff --git a/lib/modules/exec.go b/lib/modules/exec.go
new file mode 100644
index 0000000..b364171
--- /dev/null
+++ b/lib/modules/exec.go
@@ -0,0 +1,229 @@
+package modules
+
+import (
+	"bufio"
+	"flag"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"os"
+	"os/exec"
+	"strings"
+	"sync"
+	"time"
+
+	"veyron2/vlog"
+
+	// TODO(cnicolaou): move this to veyron/lib.
+	vexec "veyron/services/mgmt/lib/exec"
+)
+
+// execHandle implements both the command and Handle interfaces.
+type execHandle struct {
+	mu         sync.Mutex
+	cmd        *exec.Cmd
+	entryPoint string
+	handle     *vexec.ParentHandle
+	stderr     *os.File
+	stdout     *bufio.Reader
+	stdin      io.WriteCloser
+}
+
+func testFlags() []string {
+	var fl []string
+	// pass logging flags to any subprocesses
+	for fname, fval := range vlog.Log.ExplicitlySetFlags() {
+		fl = append(fl, "--"+fname+"="+fval)
+	}
+	timeout := flag.Lookup("test.timeout")
+	if timeout == nil {
+		// not a go test binary
+		return fl
+	}
+	// must be a go test binary
+	fl = append(fl, "-test.run=TestHelperProcess")
+	val := timeout.Value.(flag.Getter).Get().(time.Duration)
+	if val.String() != timeout.DefValue {
+		// use supplied command value for subprocesses
+		fl = append(fl, "--test.timeout="+timeout.Value.String())
+	} else {
+		// translate default value into 1m for subproccesses
+		fl = append(fl, "--test.timeout=1m")
+	}
+	return fl
+}
+
+// IsTestSubprocess returns true if it is called in via -run=TestHelperProcess
+// which normally only ever happens for subprocess run from tests.
+func IsTestHelperProcess() bool {
+	runFlag := flag.Lookup("test.run")
+	if runFlag == nil {
+		return false
+	}
+	return runFlag.Value.String() == "TestHelperProcess"
+}
+
+func newExecHandle(entryPoint string) command {
+	return &execHandle{entryPoint: entryPoint}
+}
+
+func (eh *execHandle) Stdout() *bufio.Reader {
+	eh.mu.Lock()
+	defer eh.mu.Unlock()
+	return eh.stdout
+}
+
+func (eh *execHandle) Stderr() io.Reader {
+	eh.mu.Lock()
+	defer eh.mu.Unlock()
+	return eh.stderr
+}
+
+func (eh *execHandle) Stdin() io.WriteCloser {
+	eh.mu.Lock()
+	defer eh.mu.Unlock()
+	return eh.stdin
+}
+
+// mergeOSEnv returns a slice contained the merged set of environment
+// variables from the OS environment and those in this Shell, preferring
+// values in the Shell environment over those found in the OS environment.
+func (sh *Shell) mergeOSEnvSlice() []string {
+	merged := sh.mergeOSEnv()
+	env := []string{}
+	for k, v := range merged {
+		env = append(env, k+"="+v)
+	}
+	return env
+}
+
+func osEnvironMap() map[string]string {
+	m := make(map[string]string)
+	for _, osv := range os.Environ() {
+		if len(osv) == 0 {
+			continue
+		}
+		parts := strings.SplitN(osv, "=", 2)
+		key := parts[0]
+		if len(parts) == 2 {
+			m[key] = parts[1]
+		} else {
+			m[key] = ""
+		}
+	}
+	return m
+}
+func (sh *Shell) mergeOSEnv() map[string]string {
+	merged := osEnvironMap()
+	sh.mu.Lock()
+	for k, v := range sh.env {
+		merged[k] = v
+	}
+	sh.mu.Unlock()
+	return merged
+}
+
+func (eh *execHandle) start(sh *Shell, args ...string) (Handle, error) {
+	eh.mu.Lock()
+	defer eh.mu.Unlock()
+	newargs := append(testFlags(), args...)
+	cmd := exec.Command(os.Args[0], newargs...)
+	cmd.Env = append(sh.mergeOSEnvSlice(), eh.entryPoint)
+	stderr, err := ioutil.TempFile("", "__modules__"+strings.TrimLeft(eh.entryPoint, "-\n\t "))
+	if err != nil {
+		return nil, err
+	}
+	cmd.Stderr = stderr
+	stdout, err := cmd.StdoutPipe()
+	if err != nil {
+		return nil, err
+	}
+	stdin, err := cmd.StdinPipe()
+	if err != nil {
+		return nil, err
+	}
+
+	handle := vexec.NewParentHandle(cmd)
+	eh.stdout = bufio.NewReader(stdout)
+	eh.stderr = stderr
+	eh.stdin = stdin
+	eh.handle = handle
+	eh.cmd = cmd
+	if err := handle.Start(); err != nil {
+		return nil, err
+	}
+	err = handle.WaitForReady(time.Second)
+	return eh, err
+}
+
+func (eh *execHandle) Shutdown(output io.Writer) {
+	eh.mu.Lock()
+	defer eh.mu.Unlock()
+	eh.stdin.Close()
+	if eh.stderr != nil {
+		defer func() {
+			eh.stderr.Close()
+			os.Remove(eh.stderr.Name())
+		}()
+		if output == nil {
+			return
+		}
+		if _, err := eh.stderr.Seek(0, 0); err != nil {
+			return
+		}
+		scanner := bufio.NewScanner(eh.stderr)
+		for scanner.Scan() {
+			fmt.Fprintf(output, "%s\n", scanner.Text())
+		}
+	}
+}
+
+const shellEntryPoint = "VEYRON_SHELL_HELPER_PROCESS_ENTRY_POINT"
+
+func RegisterChild(name string, main Main) {
+	child.Lock()
+	defer child.Unlock()
+	child.mains[name] = main
+}
+
+func Dispatch() error {
+	return child.dispatch()
+}
+
+func (child *childRegistrar) hasCommand(name string) bool {
+	child.Lock()
+	_, present := child.mains[name]
+	child.Unlock()
+	return present
+}
+
+func (child *childRegistrar) dispatch() error {
+	command := os.Getenv(shellEntryPoint)
+	if len(command) == 0 {
+		return fmt.Errorf("Failed to find entrypoint %q", shellEntryPoint)
+	}
+	child.Lock()
+	m := child.mains[command]
+	child.Unlock()
+	if m == nil {
+		return fmt.Errorf("Shell command %q not registered", command)
+	}
+	go func(pid int) {
+		for {
+			_, err := os.FindProcess(pid)
+			if err != nil {
+				fmt.Fprintf(os.Stderr, "Looks like our parent exited: %v", err)
+				os.Exit(1)
+			}
+			time.Sleep(time.Second)
+		}
+	}(os.Getppid())
+	ch, err := vexec.GetChildHandle()
+	if err == nil {
+		// Only signal that the child is ready if we successfully get
+		// a child handle. We most likely failed to get a child handle
+		// because the subprocess was run directly from the command line.
+		ch.SetReady()
+	}
+	return m(os.Stdin, os.Stdout, os.Stderr, osEnvironMap(), flag.Args()...)
+}
diff --git a/lib/modules/func.go b/lib/modules/func.go
new file mode 100644
index 0000000..5d6160c
--- /dev/null
+++ b/lib/modules/func.go
@@ -0,0 +1,75 @@
+package modules
+
+import (
+	"bufio"
+	"fmt"
+	"io"
+	"os"
+	"sync"
+)
+
+type pipe struct {
+	r, w *os.File
+}
+type functionHandle struct {
+	mu                    sync.Mutex
+	main                  Main
+	stdin, stderr, stdout pipe
+	bufferedStdout        *bufio.Reader
+}
+
+func newFunctionHandle(main Main) command {
+	return &functionHandle{main: main}
+}
+
+func (fh *functionHandle) Stdout() *bufio.Reader {
+	fh.mu.Lock()
+	defer fh.mu.Unlock()
+	return fh.bufferedStdout
+}
+
+func (fh *functionHandle) Stderr() io.Reader {
+	fh.mu.Lock()
+	defer fh.mu.Unlock()
+	return fh.stderr.r
+}
+
+func (fh *functionHandle) Stdin() io.WriteCloser {
+	fh.mu.Lock()
+	defer fh.mu.Unlock()
+	return fh.stdin.w
+}
+
+func (fh *functionHandle) start(sh *Shell, args ...string) (Handle, error) {
+	fh.mu.Lock()
+	defer fh.mu.Unlock()
+	for _, p := range []*pipe{&fh.stdin, &fh.stdout, &fh.stderr} {
+		var err error
+		if p.r, p.w, err = os.Pipe(); err != nil {
+			return nil, err
+		}
+	}
+	fh.bufferedStdout = bufio.NewReader(fh.stdout.r)
+	go func() {
+		err := fh.main(fh.stdin.r, fh.stdout.w, fh.stderr.w, sh.mergeOSEnv(), args...)
+		if err != nil {
+			fmt.Fprintf(fh.stderr.w, "%s\n", err)
+		}
+		fh.stdin.r.Close()
+		fh.stdout.w.Close()
+		fh.stderr.w.Close()
+	}()
+	return fh, nil
+}
+
+func (fh *functionHandle) Shutdown(output io.Writer) {
+	fh.mu.Lock()
+	defer fh.mu.Unlock()
+	scanner := bufio.NewScanner(fh.stderr.r)
+	for scanner.Scan() {
+		fmt.Fprintf(output, "%s\n", scanner.Text())
+	}
+	fh.stdin.w.Close()
+	fh.stdout.r.Close()
+	fh.stderr.r.Close()
+}
diff --git a/lib/modules/modules_test.go b/lib/modules/modules_test.go
new file mode 100644
index 0000000..81739b7
--- /dev/null
+++ b/lib/modules/modules_test.go
@@ -0,0 +1,98 @@
+package modules_test
+
+import (
+	"bufio"
+	"fmt"
+	"io"
+	"os"
+	"testing"
+	"time"
+
+	"veyron/lib/modules"
+)
+
+func init() {
+	modules.RegisterChild("envtest", PrintEnv)
+}
+
+func PrintEnv(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error {
+	for _, a := range args {
+		if v := env[a]; len(v) > 0 {
+			fmt.Fprintf(stdout, a+"="+v+"\n")
+		} else {
+			fmt.Fprintf(stderr, "missing %s\n", a)
+		}
+	}
+	buf := [1]byte{0x0}
+	stdin.Read(buf[:])
+	fmt.Fprintf(stdout, "done\n")
+	return nil
+}
+
+func waitForInput(scanner *bufio.Scanner) bool {
+	ch := make(chan struct{})
+	go func(ch chan<- struct{}) {
+		scanner.Scan()
+		ch <- struct{}{}
+	}(ch)
+	select {
+	case <-ch:
+		return true
+	case <-time.After(10 * time.Second):
+		return false
+	}
+}
+
+func testCommand(t *testing.T, sh *modules.Shell, name, key, val string) {
+	h, err := sh.Start(name, key)
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	defer func() {
+		sh.Cleanup(os.Stderr)
+	}()
+	scanner := bufio.NewScanner(h.Stdout())
+	if !waitForInput(scanner) {
+		t.Errorf("timeout")
+		return
+	}
+	if got, want := scanner.Text(), key+"="+val; got != want {
+		t.Errorf("got %q, want %q", got, want)
+	}
+	h.Stdin().Close()
+	if !waitForInput(scanner) {
+		t.Errorf("timeout")
+		return
+	}
+	if got, want := scanner.Text(), "done"; got != want {
+		t.Errorf("got %q, want %q", got, want)
+	}
+}
+
+func TestChild(t *testing.T) {
+	sh := modules.NewShell()
+	key, val := "simpleVar", "foo & bar"
+	sh.SetVar(key, val)
+	sh.AddSubprocess("envtest", "envtest: <variables to print>...")
+	testCommand(t, sh, "envtest", key, val)
+}
+
+func TestFunction(t *testing.T) {
+	sh := modules.NewShell()
+	key, val := "simpleVar", "foo & bar & baz"
+	sh.SetVar(key, val)
+	sh.AddFunction("envtest", PrintEnv, "envtest: <variables to print>...")
+	testCommand(t, sh, "envtest", key, val)
+}
+
+func TestHelperProcess(t *testing.T) {
+	if !modules.IsTestHelperProcess() {
+		return
+	}
+	if err := modules.Dispatch(); err != nil {
+		t.Fatalf("failed: %v", err)
+	}
+}
+
+// TODO(cnicolaou): more complete tests for environment variables,
+// OS being overridden by Shell for example.
diff --git a/lib/modules/shell.go b/lib/modules/shell.go
new file mode 100644
index 0000000..09941b8
--- /dev/null
+++ b/lib/modules/shell.go
@@ -0,0 +1,238 @@
+// Package modules provides a mechanism for running commonly used services
+// as subprocesses and client functionality for accessing those services.
+// Such services and functions are collectively called 'commands' and are
+// registered with and executed within a context, defined by the Shell type.
+// The Shell is analagous to the original UNIX shell and maintains a
+// key, value store of variables that is accessible to all of the commands that
+// it hosts. These variables may be referenced by the arguments passed to
+// commands.
+//
+// Commands are added to a shell in two ways: one for a subprocess and another
+// for an inprocess function.
+//
+// - subprocesses are added using the AddSubprocess method in the parent
+//   and by the modules.RegisterChild function in the child process (typically
+//   RegisterChild is called from an init function). modules.Dispatch must
+//   be called in the child process to execute the subprocess 'Main' function
+//   provided to RegisterChild.
+// - inprocess functions are added using the AddFunction method.
+//
+// In all cases commands are started by invoking the Start method on the
+// Shell with the name of the command to run. An instance of the Handle
+// interface is returned which can be used to interact with the function
+// or subprocess, and in particular to read/write data from/to it using io
+// channels that follow the stdin, stdout, stderr convention.
+//
+// A simple protocol must be followed by all commands, namely, they
+// should wait for their stdin stream to be closed before exiting. The
+// caller can then coordinate with any command by writing to that stdin
+// stream and reading responses from the stdout stream, and it can close
+// stdin when it's ready for the command to exit.
+//
+// The signature of the function that implements the command is the
+// same for both types of command and is defined by the Main function type.
+// In particular stdin, stdout and stderr are provided as parameters, as is
+// a map representation of the shell's environment.
+package modules
+
+import (
+	"bufio"
+	"fmt"
+	"io"
+	"sync"
+
+	"veyron2/vlog"
+)
+
+// Shell represents the context within which commands are run.
+type Shell struct {
+	mu      sync.Mutex
+	env     map[string]string
+	cmds    map[string]*commandDesc
+	handles map[Handle]struct{}
+}
+
+type commandDesc struct {
+	factory func() command
+	help    string
+}
+
+type childRegistrar struct {
+	sync.Mutex
+	mains map[string]Main
+}
+
+var child = &childRegistrar{mains: make(map[string]Main)}
+
+// NewShell creates a new instance of Shell.
+func NewShell() *Shell {
+	return &Shell{
+		env:     make(map[string]string),
+		cmds:    make(map[string]*commandDesc),
+		handles: make(map[Handle]struct{}),
+	}
+}
+
+type Main func(stdin io.Reader, stdout, stderr io.Writer, env map[string]string, args ...string) error
+
+// AddSubprocess adds a new command to the Shell that will be run
+// as a subprocess. In addition, the child process must call RegisterChild
+// using the same name used here and provide the function to be executed
+// in the child.
+func (sh *Shell) AddSubprocess(name string, help string) {
+	if !child.hasCommand(name) {
+		vlog.Infof("Warning: %q is not registered with modules.Dispatcher", name)
+	}
+	entryPoint := shellEntryPoint + "=" + name
+	sh.mu.Lock()
+	sh.cmds[name] = &commandDesc{func() command { return newExecHandle(entryPoint) }, help}
+	sh.mu.Unlock()
+}
+
+// AddFunction adds a new command to the Shell that will be run
+// within the current process.
+func (sh *Shell) AddFunction(name string, main Main, help string) {
+	sh.mu.Lock()
+	sh.cmds[name] = &commandDesc{func() command { return newFunctionHandle(main) }, help}
+	sh.mu.Unlock()
+}
+
+// String returns a string representation of the Shell, which is a
+// concatenation of the help messages of each Command currently available
+// to it.
+func (sh *Shell) String() string {
+	sh.mu.Lock()
+	defer sh.mu.Unlock()
+	h := ""
+	for _, c := range sh.cmds {
+		h += c.help
+	}
+	return h
+}
+
+// Help returns the help message for the specified command.
+func (sh *Shell) Help(command string) string {
+	sh.mu.Lock()
+	defer sh.mu.Unlock()
+	if c := sh.cmds[command]; c != nil {
+		return c.help
+	}
+	return ""
+}
+
+// Start starts the specified command, it returns a Handle which can be used
+// for interacting with that command. The Shell tracks all of the Handles
+// that it creates so that it can shut them down when asked to. If any
+// application calls Shutdown on a handle directly, it must call the Forget
+// method on the Shell instance hosting that Handle to avoid storage leaks.
+func (sh *Shell) Start(command string, args ...string) (Handle, error) {
+	sh.mu.Lock()
+	cmd := sh.cmds[command]
+	if cmd == nil {
+		sh.mu.Unlock()
+		return nil, fmt.Errorf("command %q is not available", command)
+	}
+	expanded := sh.expand(args...)
+	sh.mu.Unlock()
+	h, err := cmd.factory().start(sh, expanded...)
+	if err != nil {
+		return nil, err
+	}
+	sh.mu.Lock()
+	sh.handles[h] = struct{}{}
+	sh.mu.Unlock()
+	return h, nil
+}
+
+// Forget tells the Shell to stop tracking the supplied Handle.
+func (sh *Shell) Forget(h Handle) {
+	sh.mu.Lock()
+	delete(sh.handles, h)
+	sh.mu.Unlock()
+}
+
+func (sh *Shell) expand(args ...string) []string {
+	exp := []string{}
+	for _, a := range args {
+		if len(a) > 0 && a[0] == '$' {
+			if v, present := sh.env[a[1:]]; present {
+				exp = append(exp, v)
+				continue
+			}
+		}
+		exp = append(exp, a)
+	}
+	return exp
+}
+
+// GetVar returns the variable associated with the specified key
+// and an indication of whether it is defined or not.
+func (sh *Shell) GetVar(key string) (string, bool) {
+	sh.mu.Lock()
+	defer sh.mu.Unlock()
+	v, present := sh.env[key]
+	return v, present
+}
+
+// SetVar sets the value to be associated with key.
+func (sh *Shell) SetVar(key, value string) {
+	sh.mu.Lock()
+	defer sh.mu.Unlock()
+	// TODO(cnicolaou): expand value
+	sh.env[key] = value
+}
+
+// Env returns the entire set of environment variables associated with this
+// Shell as a string slice.
+func (sh *Shell) Env() []string {
+	vars := []string{}
+	sh.mu.Lock()
+	defer sh.mu.Unlock()
+	for k, v := range sh.env {
+		vars = append(vars, k+"="+v)
+	}
+	return vars
+}
+
+// Cleanup calls Shutdown on all of the Handles currently being tracked
+// by the Shell. Any buffered output from the command's stderr stream
+// will be written to the supplied io.Writer. If the io.Writer is nil
+// then any such output is lost.
+func (sh *Shell) Cleanup(output io.Writer) {
+	sh.mu.Lock()
+	defer sh.mu.Unlock()
+	for k, _ := range sh.handles {
+		k.Shutdown(output)
+	}
+	sh.handles = make(map[Handle]struct{})
+}
+
+// Handle represents a running command.
+type Handle interface {
+	// Stdout returns a buffered reader to the running command's stdout stream.
+	Stdout() *bufio.Reader
+
+	// Stderr returns an unbuffered reader to the running command's stderr
+	// stream.
+	Stderr() io.Reader
+
+	// Stdin returns a writer to the running command's stdin. The
+	// convention is for commands to wait for stdin to be closed before
+	// they exit, thus the caller should close stdin when it wants the
+	// command to exit cleanly.
+	Stdin() io.WriteCloser
+
+	// Shutdown closes the Stdin for the command. It is primarily intended
+	// for being called by the Shell, if other application code calls it
+	// then it should use the Shell's Forget method to have the Shell stop
+	// tracking the handle. Any buffered stderr output from the command will
+	// be written to the supplied io.Writer. If the io.Writer is nil then
+	// any such output is lost.
+	Shutdown(io.Writer)
+}
+
+// command is used to abstract the implementations of inprocess and subprocess
+// commands.
+type command interface {
+	start(sh *Shell, args ...string) (Handle, error)
+}