gosh: Drop {Send,Await}Ready, simplify {Send,Await}Vars
- switches from "#! " to "# gosh " message prefix
- encodes vars as a JSON object, without the wrapping "msg"
object
- switches from stdout to stderr for sending vars messages,
so that "PropagateOutput" doesn't affect the parent's
stdout unless the child explicitly writes to stdout
Change-Id: Ife27ad387dd31b5dde8de347bca7d35808964305
diff --git a/gosh/.api b/gosh/.api
index d8cead2..d805d75 100644
--- a/gosh/.api
+++ b/gosh/.api
@@ -4,11 +4,9 @@
pkg gosh, func NewShell(Opts) *Shell
pkg gosh, func NopWriteCloser(io.Writer) io.WriteCloser
pkg gosh, func RegisterFunc(string, interface{}) *Func
-pkg gosh, func SendReady()
pkg gosh, func SendVars(map[string]string)
pkg gosh, method (*Cmd) AddStderrWriter(io.WriteCloser)
pkg gosh, method (*Cmd) AddStdoutWriter(io.WriteCloser)
-pkg gosh, method (*Cmd) AwaitReady()
pkg gosh, method (*Cmd) AwaitVars(...string) map[string]string
pkg gosh, method (*Cmd) Clone() *Cmd
pkg gosh, method (*Cmd) CombinedOutput() string
diff --git a/gosh/child.go b/gosh/child.go
index 95c47ef..81d0ba8 100644
--- a/gosh/child.go
+++ b/gosh/child.go
@@ -14,34 +14,18 @@
"time"
)
-const (
- msgPrefix = "#! "
- typeReady = "ready"
- typeVars = "vars"
-)
+const varsPrefix = "# gosh "
-type msg struct {
- Type string
- Vars map[string]string // nil if Type is typeReady
-}
-
-func send(m msg) {
- data, err := json.Marshal(m)
+// SendVars sends the given vars to the parent process. Writes a line of the
+// form "# gosh { ... JSON object ... }" to stderr.
+func SendVars(vars map[string]string) {
+ data, err := json.Marshal(vars)
if err != nil {
panic(err)
}
- fmt.Printf("%s%s\n", msgPrefix, data)
-}
-
-// SendReady tells the parent process that this child process is "ready", e.g.
-// ready to serve requests.
-func SendReady() {
- send(msg{Type: typeReady})
-}
-
-// SendVars sends the given vars to the parent process.
-func SendVars(vars map[string]string) {
- send(msg{Type: typeVars, Vars: vars})
+ // TODO(sadovsky): Handle the case where the JSON object contains a newline
+ // character.
+ fmt.Fprintf(os.Stderr, "%s%s\n", varsPrefix, data)
}
// watchParent periodically checks whether the parent process has exited and, if
diff --git a/gosh/cmd.go b/gosh/cmd.go
index 5aab1c7..3d01ce3 100644
--- a/gosh/cmd.go
+++ b/gosh/cmd.go
@@ -8,7 +8,6 @@
"bytes"
"encoding/json"
"errors"
- "fmt"
"io"
"os"
"os/exec"
@@ -71,7 +70,6 @@
stdoutWriters []io.Writer
stderrWriters []io.Writer
closers []io.Closer
- recvReady bool // protected by cond.L
recvVars map[string]string // protected by cond.L
}
@@ -152,15 +150,8 @@
c.handleError(c.start())
}
-// AwaitReady waits for the child process to call SendReady. Must not be called
-// before Start or after Wait.
-func (c *Cmd) AwaitReady() {
- c.sh.Ok()
- c.handleError(c.awaitReady())
-}
-
// AwaitVars waits for the child process to send values for the given vars
-// (using SendVars). Must not be called before Start or after Wait.
+// (e.g. using SendVars). Must not be called before Start or after Wait.
func (c *Cmd) AwaitVars(keys ...string) map[string]string {
c.sh.Ok()
res, err := c.awaitVars(keys...)
@@ -305,7 +296,7 @@
type recvWriter struct {
c *Cmd
buf bytes.Buffer
- readPrefix bool // if true, we've read len(msgPrefix) for the current line
+ readPrefix bool // if true, we've read len(varsPrefix) for the current line
skipLine bool // if true, ignore bytes until next '\n'
}
@@ -313,34 +304,24 @@
for _, b := range p {
if b == '\n' {
if w.readPrefix && !w.skipLine {
- m := msg{}
- if err := json.Unmarshal(w.buf.Bytes(), &m); err != nil {
+ vars := make(map[string]string)
+ if err := json.Unmarshal(w.buf.Bytes(), &vars); err != nil {
return 0, err
}
- switch m.Type {
- case typeReady:
- w.c.cond.L.Lock()
- w.c.recvReady = true
- w.c.cond.Signal()
- w.c.cond.L.Unlock()
- case typeVars:
- w.c.cond.L.Lock()
- w.c.recvVars = mergeMaps(w.c.recvVars, m.Vars)
- w.c.cond.Signal()
- w.c.cond.L.Unlock()
- default:
- return 0, fmt.Errorf("unknown message type: %q", m.Type)
- }
+ w.c.cond.L.Lock()
+ w.c.recvVars = mergeMaps(w.c.recvVars, vars)
+ w.c.cond.Signal()
+ w.c.cond.L.Unlock()
}
// Reset state for next line.
w.readPrefix, w.skipLine = false, false
w.buf.Reset()
} else if !w.skipLine {
w.buf.WriteByte(b)
- if !w.readPrefix && w.buf.Len() == len(msgPrefix) {
+ if !w.readPrefix && w.buf.Len() == len(varsPrefix) {
w.readPrefix = true
- prefix := string(w.buf.Next(len(msgPrefix)))
- if prefix != msgPrefix {
+ prefix := string(w.buf.Next(len(varsPrefix)))
+ if prefix != varsPrefix {
w.skipLine = true
}
}
@@ -362,7 +343,7 @@
}
func (c *Cmd) makeStdoutStderr() (io.Writer, io.Writer, error) {
- c.stdoutWriters = append(c.stdoutWriters, &recvWriter{c: c})
+ c.stderrWriters = append(c.stderrWriters, &recvWriter{c: c})
if c.PropagateOutput {
c.stdoutWriters = append(c.stdoutWriters, os.Stdout)
c.stderrWriters = append(c.stderrWriters, os.Stderr)
@@ -549,26 +530,7 @@
}()
}
-// TODO(sadovsky): Maybe add optional timeouts for
-// Cmd.{awaitReady,awaitVars,wait}.
-
-func (c *Cmd) awaitReady() error {
- if !c.started {
- return errDidNotCallStart
- } else if c.calledWait {
- return errAlreadyCalledWait
- }
- c.cond.L.Lock()
- defer c.cond.L.Unlock()
- for !c.exited && !c.recvReady {
- c.cond.Wait()
- }
- // Return nil error if both conditions triggered simultaneously.
- if !c.recvReady {
- return errProcessExited
- }
- return nil
-}
+// TODO(sadovsky): Maybe add optional timeouts for Cmd.{awaitVars,wait}.
func (c *Cmd) awaitVars(keys ...string) (map[string]string, error) {
if !c.started {
diff --git a/gosh/internal/gosh_example/main.go b/gosh/internal/gosh_example/main.go
index d3ecab2..30e8d22 100644
--- a/gosh/internal/gosh_example/main.go
+++ b/gosh/internal/gosh_example/main.go
@@ -19,8 +19,7 @@
binPath := sh.BuildGoPkg("v.io/x/lib/gosh/internal/gosh_example_server")
c := sh.Cmd(binPath)
c.Start()
- c.AwaitReady()
- addr := c.AwaitVars("Addr")["Addr"]
+ addr := c.AwaitVars("addr")["addr"]
fmt.Println(addr)
// Run client.
@@ -41,8 +40,7 @@
// Start server.
c := sh.FuncCmd(serveFunc)
c.Start()
- c.AwaitReady()
- addr := c.AwaitVars("Addr")["Addr"]
+ addr := c.AwaitVars("addr")["addr"]
fmt.Println(addr)
// Run client.
diff --git a/gosh/internal/gosh_example_lib/lib.go b/gosh/internal/gosh_example_lib/lib.go
index be51c33..3f8bcbb 100644
--- a/gosh/internal/gosh_example_lib/lib.go
+++ b/gosh/internal/gosh_example_lib/lib.go
@@ -52,10 +52,9 @@
if err != nil {
panic(err)
}
- gosh.SendVars(map[string]string{"Addr": ln.Addr().String()})
go func() {
time.Sleep(100 * time.Millisecond)
- gosh.SendReady()
+ gosh.SendVars(map[string]string{"addr": ln.Addr().String()})
}()
if err = srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)}); err != nil {
panic(err)
diff --git a/gosh/shell.go b/gosh/shell.go
index 8e17747..4426990 100644
--- a/gosh/shell.go
+++ b/gosh/shell.go
@@ -268,15 +268,15 @@
go func() {
select {
case sig := <-ch:
- // A termination signal was received, the process will exit.
+ // A termination signal was received; the process will exit.
sh.logf("Received signal: %v\n", sig)
sh.cleanupMu.Lock()
defer sh.cleanupMu.Unlock()
if !sh.calledCleanup {
sh.cleanup()
}
- // Note: We hold cleanupMu during os.Exit(1) so that the main goroutine will
- // not call Shell.Ok() and panic before we exit.
+ // Note: We hold cleanupMu during os.Exit(1) so that the main goroutine
+ // will not call Shell.Ok() and panic before we exit.
os.Exit(1)
case <-sh.cleanupDone:
// The user called sh.Cleanup; stop listening for signals and exit this
@@ -329,8 +329,8 @@
}
func (sh *Shell) wait() error {
- // Note: It is illegal to call newCmd() concurrently with Shell.wait(), so we
- // need not hold cleanupMu when accessing sh.cmds below.
+ // Note: It is illegal to call newCmdInternal concurrently with Shell.wait, so
+ // we need not hold cleanupMu when accessing sh.cmds below.
var res error
for _, c := range sh.cmds {
if !c.started || c.calledWait {
diff --git a/gosh/shell_test.go b/gosh/shell_test.go
index 04b3510..56aa87e 100644
--- a/gosh/shell_test.go
+++ b/gosh/shell_test.go
@@ -118,9 +118,9 @@
<-ch
os.Exit(0)
}()
- // The parent waits for this ready signal, to avoid the race where a signal
- // is sent before the handler is installed.
- gosh.SendReady()
+ // The parent waits for this "ready" notification to avoid the race where a
+ // signal is sent before the handler is installed.
+ gosh.SendVars(map[string]string{"ready": ""})
time.Sleep(d)
os.Exit(code)
})
@@ -209,8 +209,7 @@
binPath := sh.BuildGoPkg("v.io/x/lib/gosh/internal/gosh_example_server")
c := sh.Cmd(binPath)
c.Start()
- c.AwaitReady()
- addr := c.AwaitVars("Addr")["Addr"]
+ addr := c.AwaitVars("addr")["addr"]
neq(t, addr, "")
// Run client.
@@ -231,8 +230,7 @@
// Start server.
c := sh.FuncCmd(serveFunc)
c.Start()
- c.AwaitReady()
- addr := c.AwaitVars("Addr")["Addr"]
+ addr := c.AwaitVars("addr")["addr"]
neq(t, addr, "")
// Run client.
@@ -258,17 +256,13 @@
})
)
-// Tests that Await{Ready,Vars} return immediately when the process exits.
+// Tests that AwaitVars returns immediately when the process exits.
func TestAwaitProcessExit(t *testing.T) {
sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
defer sh.Cleanup()
c := sh.FuncCmd(exitFunc, 0)
c.Start()
- setsErr(t, sh, func() { c.AwaitReady() })
-
- c = sh.FuncCmd(exitFunc, 0)
- c.Start()
setsErr(t, sh, func() { c.AwaitVars("foo") })
}
@@ -548,7 +542,7 @@
fmt.Println(d, s)
c := sh.FuncCmd(sleepFunc, d, 0)
c.Start()
- c.AwaitReady()
+ c.AwaitVars("ready")
// Wait for a bit to allow the zero-sleep commands to exit.
time.Sleep(100 * time.Millisecond)
c.Signal(s)
@@ -580,7 +574,7 @@
fmt.Println(d, s)
c := sh.FuncCmd(sleepFunc, d, 0)
c.Start()
- c.AwaitReady()
+ c.AwaitVars("ready")
// Wait for a bit to allow the zero-sleep commands to exit.
time.Sleep(100 * time.Millisecond)
// Terminate should succeed regardless of the exit code, and regardless of