Merge "gosh: Add bufferedPipe ReadFrom and WriteTo."
diff --git a/gosh/.api b/gosh/.api
index c48f046..edfb2a8 100644
--- a/gosh/.api
+++ b/gosh/.api
@@ -1,17 +1,18 @@
pkg gosh, func InitChildMain()
pkg gosh, func InitMain()
+pkg gosh, func NewPipeline(*Cmd, ...*Cmd) *Pipeline
pkg gosh, func NewShell(Opts) *Shell
-pkg gosh, func NopWriteCloser(io.Writer) io.WriteCloser
pkg gosh, func RegisterFunc(string, interface{}) *Func
pkg gosh, func SendVars(map[string]string)
-pkg gosh, method (*Cmd) AddStderrWriter(io.WriteCloser)
-pkg gosh, method (*Cmd) AddStdoutWriter(io.WriteCloser)
+pkg gosh, method (*Cmd) AddStderrWriter(io.Writer)
+pkg gosh, method (*Cmd) AddStdoutWriter(io.Writer)
pkg gosh, method (*Cmd) AwaitVars(...string) map[string]string
pkg gosh, method (*Cmd) Clone() *Cmd
pkg gosh, method (*Cmd) CombinedOutput() string
pkg gosh, method (*Cmd) Pid() int
pkg gosh, method (*Cmd) Run()
pkg gosh, method (*Cmd) SetStdinReader(io.Reader)
+pkg gosh, method (*Cmd) Shell() *Shell
pkg gosh, method (*Cmd) Signal(os.Signal)
pkg gosh, method (*Cmd) Start()
pkg gosh, method (*Cmd) StderrPipe() io.ReadCloser
@@ -21,7 +22,20 @@
pkg gosh, method (*Cmd) StdoutStderr() (string, string)
pkg gosh, method (*Cmd) Terminate(os.Signal)
pkg gosh, method (*Cmd) Wait()
-pkg gosh, method (*Shell) AddToCleanup(func())
+pkg gosh, method (*Pipeline) Clone() *Pipeline
+pkg gosh, method (*Pipeline) Cmds() []*Cmd
+pkg gosh, method (*Pipeline) CombinedOutput() string
+pkg gosh, method (*Pipeline) PipeCombinedOutput(*Cmd)
+pkg gosh, method (*Pipeline) PipeStderr(*Cmd)
+pkg gosh, method (*Pipeline) PipeStdout(*Cmd)
+pkg gosh, method (*Pipeline) Run()
+pkg gosh, method (*Pipeline) Signal(os.Signal)
+pkg gosh, method (*Pipeline) Start()
+pkg gosh, method (*Pipeline) Stdout() string
+pkg gosh, method (*Pipeline) StdoutStderr() (string, string)
+pkg gosh, method (*Pipeline) Terminate(os.Signal)
+pkg gosh, method (*Pipeline) Wait()
+pkg gosh, method (*Shell) AddCleanupHandler(func())
pkg gosh, method (*Shell) BuildGoPkg(string, ...string) string
pkg gosh, method (*Shell) Cleanup()
pkg gosh, method (*Shell) Cmd(string, ...string) *Cmd
@@ -39,6 +53,7 @@
pkg gosh, type Cmd struct, Err error
pkg gosh, type Cmd struct, ExitAfter time.Duration
pkg gosh, type Cmd struct, ExitErrorIsOk bool
+pkg gosh, type Cmd struct, IgnoreClosedPipeError bool
pkg gosh, type Cmd struct, IgnoreParentExit bool
pkg gosh, type Cmd struct, OutputDir string
pkg gosh, type Cmd struct, Path string
@@ -51,6 +66,7 @@
pkg gosh, type Opts struct, Fatalf func(string, ...interface{})
pkg gosh, type Opts struct, Logf func(string, ...interface{})
pkg gosh, type Opts struct, PropagateChildOutput bool
+pkg gosh, type Pipeline struct
pkg gosh, type Shell struct
pkg gosh, type Shell struct, Args []string
pkg gosh, type Shell struct, Err error
diff --git a/gosh/cmd.go b/gosh/cmd.go
index e956500..0ab4d61 100644
--- a/gosh/cmd.go
+++ b/gosh/cmd.go
@@ -21,8 +21,6 @@
errAlreadyCalledStart = errors.New("gosh: already called Cmd.Start")
errAlreadyCalledWait = errors.New("gosh: already called Cmd.Wait")
errAlreadySetStdin = errors.New("gosh: already set stdin")
- errCloseStdout = errors.New("gosh: use NopWriteCloser(os.Stdout) to prevent stdout from being closed")
- errCloseStderr = errors.New("gosh: use NopWriteCloser(os.Stderr) to prevent stderr from being closed")
errDidNotCallStart = errors.New("gosh: did not call Cmd.Start")
errProcessExited = errors.New("gosh: process exited")
)
@@ -56,6 +54,13 @@
// ExitErrorIsOk specifies whether an *exec.ExitError should be reported via
// Shell.HandleError.
ExitErrorIsOk bool
+ // IgnoreClosedPipeError, if true, causes errors from read/write on a closed
+ // pipe to be indistinguishable from success. These errors often occur in
+ // command pipelines, e.g. "yes | head -1", where "yes" will receive a closed
+ // pipe error when it tries to write on stdout, after "head" has exited. If a
+ // closed pipe error occurs, Cmd.Err will be nil, and no err is reported to
+ // Shell.HandleError.
+ IgnoreClosedPipeError bool
// Internal state.
sh *Shell
c *exec.Cmd
@@ -73,6 +78,11 @@
recvVars map[string]string // protected by cond.L
}
+// Shell returns the shell that this Cmd was created from.
+func (c *Cmd) Shell() *Shell {
+ return c.sh
+}
+
// Clone returns a new Cmd with a copy of this Cmd's configuration.
func (c *Cmd) Clone() *Cmd {
c.sh.Ok()
@@ -94,8 +104,10 @@
}
// StdoutPipe returns a ReadCloser backed by an unlimited-size pipe for the
-// command's stdout. Must be called before Start. May be called more than once;
-// each invocation creates a new pipe.
+// command's stdout. The pipe will be closed when the process exits, but may
+// also be closed earlier by the caller, e.g. if all expected output has been
+// received. Must be called before Start. May be called more than once; each
+// call creates a new pipe.
func (c *Cmd) StdoutPipe() io.ReadCloser {
c.sh.Ok()
res, err := c.stdoutPipe()
@@ -104,8 +116,10 @@
}
// StderrPipe returns a ReadCloser backed by an unlimited-size pipe for the
-// command's stderr. Must be called before Start. May be called more than once;
-// each invocation creates a new pipe.
+// command's stderr. The pipe will be closed when the process exits, but may
+// also be closed earlier by the caller, e.g. if all expected output has been
+// received. Must be called before Start. May be called more than once; each
+// call creates a new pipe.
func (c *Cmd) StderrPipe() io.ReadCloser {
c.sh.Ok()
res, err := c.stderrPipe()
@@ -113,42 +127,28 @@
return res
}
-// SetStdinReader configures this Cmd to read the child's stdin from the given
-// Reader. Must be called before Start. Only one call may be made to StdinPipe
-// or SetStdinReader; subsequent calls will fail.
+// SetStdinReader configures this Cmd to read stdin from the given Reader. Must
+// be called before Start. Only one call may be made to StdinPipe or
+// SetStdinReader; subsequent calls will fail.
func (c *Cmd) SetStdinReader(r io.Reader) {
c.sh.Ok()
c.handleError(c.setStdinReader(r))
}
-// AddStdoutWriter configures this Cmd to tee the child's stdout to the given
-// WriteCloser, which will be closed when the process exits.
-//
-// If the same WriteCloser is passed to both AddStdoutWriter and
-// AddStderrWriter, Cmd will ensure that its methods are never called
-// concurrently and that Close is only called once.
-//
-// Use NopWriteCloser to extend a Writer to a WriteCloser, or to prevent an
-// existing WriteCloser from being closed. It is an error to pass in os.Stdout
-// or os.Stderr, since they shouldn't be closed.
-func (c *Cmd) AddStdoutWriter(wc io.WriteCloser) {
+// AddStdoutWriter configures this Cmd to tee stdout to the given Writer. Must
+// be called before Start. If the same Writer is passed to both AddStdoutWriter
+// and AddStderrWriter, Cmd will ensure that Write is never called concurrently.
+func (c *Cmd) AddStdoutWriter(w io.Writer) {
c.sh.Ok()
- c.handleError(c.addStdoutWriter(wc))
+ c.handleError(c.addStdoutWriter(w))
}
-// AddStderrWriter configures this Cmd to tee the child's stderr to the given
-// WriteCloser, which will be closed when the process exits.
-//
-// If the same WriteCloser is passed to both AddStdoutWriter and
-// AddStderrWriter, Cmd will ensure that its methods are never called
-// concurrently and that Close is only called once.
-//
-// Use NopWriteCloser to extend a Writer to a WriteCloser, or to prevent an
-// existing WriteCloser from being closed. It is an error to pass in os.Stdout
-// or os.Stderr, since they shouldn't be closed.
-func (c *Cmd) AddStderrWriter(wc io.WriteCloser) {
+// AddStderrWriter configures this Cmd to tee stderr to the given Writer. Must
+// be called before Start. If the same Writer is passed to both AddStdoutWriter
+// and AddStderrWriter, Cmd will ensure that Write is never called concurrently.
+func (c *Cmd) AddStderrWriter(w io.Writer) {
c.sh.Ok()
- c.handleError(c.addStderrWriter(wc))
+ c.handleError(c.addStderrWriter(w))
}
// Start starts the command.
@@ -172,15 +172,15 @@
c.handleError(c.wait())
}
-// Signal sends a signal to the process.
+// Signal sends a signal to the underlying process.
func (c *Cmd) Signal(sig os.Signal) {
c.sh.Ok()
c.handleError(c.signal(sig))
}
-// Terminate sends a signal to the process, then waits for it to exit. Terminate
-// is different from Signal followed by Wait: Terminate succeeds as long as the
-// process exits, whereas Wait fails if the exit code isn't 0.
+// Terminate sends a signal to the underlying process, then waits for it to
+// exit. Terminate is different from Signal followed by Wait: Terminate succeeds
+// as long as the process exits, whereas Wait fails if the exit code isn't 0.
func (c *Cmd) Terminate(sig os.Signal) {
c.sh.Ok()
c.handleError(c.terminate(sig))
@@ -271,7 +271,35 @@
return err == nil
}
+// An explanation of closed pipe errors. Consider the pipeline "yes | head -1",
+// where yes keeps writing "y\n" to stdout, and head succeeds after it reads the
+// first line. There is an os pipe connecting the two commands, and when head
+// exits, it causes yes to receive a closed pipe error on its next write. Should
+// we consider such a pipeline to have succeeded or failed?
+//
+// Bash only looks at the exit status of the last command by default, thus the
+// pipeline succeeds. But that's dangerous, since yes could have crashed and we
+// wouldn't know. It's recommended to run "set -o pipefail" to tell bash to
+// check the exit status of all commands. But that causes the pipeline to fail.
+//
+// IgnoreClosedPipeError handles this case. gosh.Pipeline sets this option to
+// true, so that by default the pipeline above will succeed, but will fail on
+// any other error. Note that the exec package always returns an ExitError if
+// the child process exited with a non-zero exit code; the closed pipe error is
+// only returned if the child process exited with a zero exit code, and Write on
+// the io.MultiWriter in the parent process received the closed pipe error.
+//
+// TODO(toddw): We could adopt the convention that exit code 141 indicates
+// "closed pipe", and use IgnoreClosedPipeError to also ignore that case. We
+// choose 141 because it's 128 + 13, where SIGPIPE is 13, and there is an
+// existing convention for this. By default Go programs ignore SIGPIPE, so we
+// might also want to add code to InitChildMain to exit the program with 141 if
+// it receives SIGPIPE.
+
func (c *Cmd) handleError(err error) {
+ if c.IgnoreClosedPipeError && isClosedPipeError(err) {
+ err = nil
+ }
c.Err = err
if !c.errorIsOk(err) {
c.sh.HandleError(err)
@@ -400,6 +428,7 @@
res.PropagateOutput = c.PropagateOutput
res.OutputDir = c.OutputDir
res.ExitErrorIsOk = c.ExitErrorIsOk
+ res.IgnoreClosedPipeError = c.IgnoreClosedPipeError
return res, nil
}
@@ -436,11 +465,7 @@
func (c *Cmd) stdinPipeCopier(dst io.WriteCloser, src io.Reader) {
var firstErr error
- _, err := io.Copy(dst, src)
- // Ignore EPIPE errors copying to stdin, indicating the process exited. This
- // mirrors logic in os/exec/exec_posix.go. Also see:
- // https://github.com/golang/go/issues/9173
- if pe, ok := err.(*os.PathError); !ok || pe.Op != "write" || pe.Path != "|1" || pe.Err != syscall.EPIPE {
+ if _, err := io.Copy(dst, src); err != nil && !isClosedPipeError(err) {
firstErr = err
}
if err := dst.Close(); err != nil && firstErr == nil {
@@ -449,6 +474,21 @@
c.stdinDoneChan <- firstErr
}
+// isClosedPipeError returns true iff the error indicates a closed pipe. This
+// typically occurs with a pipeline of commands "A | B"; if B exits first, the
+// next write by A will receive a closed pipe error. Also see:
+// https://github.com/golang/go/issues/9173
+func isClosedPipeError(err error) bool {
+ if err == io.ErrClosedPipe {
+ return true
+ }
+ // Closed pipe on os.Pipe; mirrors logic in os/exec/exec_posix.go.
+ if pe, ok := err.(*os.PathError); ok && pe.Op == "write" && pe.Path == "|1" && pe.Err == syscall.EPIPE {
+ return true
+ }
+ return false
+}
+
func (c *Cmd) setStdinReader(r io.Reader) error {
switch {
case c.calledStart:
@@ -480,42 +520,37 @@
return p, nil
}
-func (c *Cmd) addStdoutWriter(wc io.WriteCloser) error {
- switch {
- case c.calledStart:
+func (c *Cmd) addStdoutWriter(w io.Writer) error {
+ if c.calledStart {
return errAlreadyCalledStart
- case wc == os.Stdout:
- return errCloseStdout
- case wc == os.Stderr:
- return errCloseStderr
}
- c.stdoutWriters = append(c.stdoutWriters, wc)
- c.afterWaitClosers = append(c.afterWaitClosers, wc)
+ c.stdoutWriters = append(c.stdoutWriters, w)
return nil
}
-func (c *Cmd) addStderrWriter(wc io.WriteCloser) error {
- switch {
- case c.calledStart:
+func (c *Cmd) addStderrWriter(w io.Writer) error {
+ if c.calledStart {
return errAlreadyCalledStart
- case wc == os.Stdout:
- return errCloseStdout
- case wc == os.Stderr:
- return errCloseStderr
}
- c.stderrWriters = append(c.stderrWriters, wc)
- c.afterWaitClosers = append(c.afterWaitClosers, wc)
+ c.stderrWriters = append(c.stderrWriters, w)
return nil
}
// TODO(sadovsky): Maybe wrap every child process with a "supervisor" process
// that calls InitChildMain.
-func (c *Cmd) start() error {
+func (c *Cmd) start() (e error) {
defer func() {
- closeClosers(c.afterStartClosers)
+ // Always close afterStartClosers upon return. Only close afterWaitClosers
+ // if start failed; if start succeeds, they're closed in the startExitWaiter
+ // goroutine. Only the first error is reported.
+ if err := closeClosers(c.afterStartClosers); e == nil {
+ e = err
+ }
if !c.started {
- closeClosers(c.afterWaitClosers)
+ if err := closeClosers(c.afterWaitClosers); e == nil {
+ e = err
+ }
}
}()
if c.calledStart {
@@ -568,7 +603,9 @@
c.exited = true
c.cond.Signal()
c.cond.L.Unlock()
- closeClosers(c.afterWaitClosers)
+ if err := closeClosers(c.afterWaitClosers); waitErr == nil {
+ waitErr = err
+ }
if c.stdinDoneChan != nil {
// Wait for the stdinPipeCopier goroutine to finish.
if err := <-c.stdinDoneChan; waitErr == nil {
@@ -579,24 +616,23 @@
}()
}
-func closeClosers(closers []io.Closer) {
- // If the same WriteCloser was passed to both AddStdoutWriter and
- // AddStderrWriter, we should only close it once.
- cm := map[io.Closer]bool{}
+func closeClosers(closers []io.Closer) error {
+ var firstErr error
for _, closer := range closers {
- if !cm[closer] {
- cm[closer] = true
- closer.Close() // best-effort; ignore returned error
+ if err := closer.Close(); firstErr == nil {
+ firstErr = err
}
}
+ return firstErr
}
// TODO(sadovsky): Maybe add optional timeouts for Cmd.{awaitVars,wait}.
func (c *Cmd) awaitVars(keys ...string) (map[string]string, error) {
- if !c.started {
+ switch {
+ case !c.started:
return nil, errDidNotCallStart
- } else if c.calledWait {
+ case c.calledWait:
return nil, errAlreadyCalledWait
}
wantKeys := map[string]bool{}
@@ -626,9 +662,10 @@
}
func (c *Cmd) wait() error {
- if !c.started {
+ switch {
+ case !c.started:
return errDidNotCallStart
- } else if c.calledWait {
+ case c.calledWait:
return errAlreadyCalledWait
}
c.calledWait = true
@@ -647,9 +684,10 @@
// of the os.Signal interface, and have the signal and terminate methods map
// that to Process.Kill.
func (c *Cmd) signal(sig os.Signal) error {
- if !c.started {
+ switch {
+ case !c.started:
return errDidNotCallStart
- } else if c.calledWait {
+ case c.calledWait:
return errAlreadyCalledWait
}
if !c.isRunning() {
diff --git a/gosh/pipeline.go b/gosh/pipeline.go
new file mode 100644
index 0000000..c9aec89
--- /dev/null
+++ b/gosh/pipeline.go
@@ -0,0 +1,395 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package gosh
+
+import (
+ "bytes"
+ "errors"
+ "io"
+ "os"
+)
+
+var errPipelineCmdsHaveDifferentShells = errors.New("gosh: pipeline cmds have different shells")
+
+// Pipeline represents a pipeline of commands, where the stdout and/or stderr of
+// one command is connected to the stdin of the next command.
+//
+// The failure semantics of a pipeline are determined by the failure semantics
+// of each command; by default the pipeline fails if any command fails, and
+// read/write errors on closed pipes are ignored. This is different from bash,
+// where the default is to only check the status of the last command, unless
+// "set -o pipefail" is enabled to check the status of all commands, causing
+// closed pipe errors to fail the pipeline. Use Cmd.ExitErrorIsOk and
+// Cmd.IgnoreClosedPipeError to fine-tune the failure semantics.
+//
+// The implementation of Pipeline only uses exported methods from Shell and Cmd.
+type Pipeline struct {
+ sh *Shell
+ cmds []*Cmd // INVARIANT: len(cmds) > 0
+ state []pipeState // INVARIANT: len(state) == len(cmds) - 1
+}
+
+type pipeState struct {
+ mode pipeMode // describes how to connect cmds[i] to cmds[i+1]
+ stdinRead io.Closer // read-side closer of the stdin pipe for cmds[i+1]
+ stdinWrite io.Closer // write-side closer of the stdin pipe for cmds[i+1]
+}
+
+type pipeMode int
+
+const (
+ pipeStdout pipeMode = iota
+ pipeStderr
+ pipeCombinedOutput
+)
+
+// NewPipeline returns a new Pipeline starting with c. The stdout of each
+// command is connected to the stdin of the next command, via calls to
+// Pipeline.PipeStdout. To construct pipelines involving stderr, call
+// Pipeline.PipeStderr or Pipeline.PipeCombinedOutput directly.
+//
+// Each command must have been created from the same Shell. Errors are reported
+// to c.Shell, via Shell.HandleError. Sets Cmd.IgnoreClosedPipeError to true for
+// all commands.
+func NewPipeline(c *Cmd, cmds ...*Cmd) *Pipeline {
+ sh := c.Shell()
+ sh.Ok()
+ res, err := newPipeline(sh, c, cmds...)
+ handleError(sh, err)
+ return res
+}
+
+// Cmds returns the commands in the pipeline.
+func (p *Pipeline) Cmds() []*Cmd {
+ return p.cmds
+}
+
+// Clone returns a new Pipeline where p's commands are cloned and connected with
+// the same pipeline structure as in p.
+func (p *Pipeline) Clone() *Pipeline {
+ p.sh.Ok()
+ res, err := p.clone()
+ handleError(p.sh, err)
+ return res
+}
+
+// PipeStdout connects the stdout of the last command in p to the stdin of c,
+// and appends c to the commands in p. Must be called before Start. Sets
+// c.IgnoreClosedPipeError to true.
+func (p *Pipeline) PipeStdout(c *Cmd) {
+ p.sh.Ok()
+ handleError(p.sh, p.pipeTo(c, pipeStdout, false))
+}
+
+// PipeStderr connects the stderr of the last command in p to the stdin of c,
+// and appends c to the commands in p. Must be called before Start. Sets
+// c.IgnoreClosedPipeError to true.
+func (p *Pipeline) PipeStderr(c *Cmd) {
+ p.sh.Ok()
+ handleError(p.sh, p.pipeTo(c, pipeStderr, false))
+}
+
+// PipeCombinedOutput connects the combined stdout and stderr of the last
+// command in p to the stdin of c, and appends c to the commands in p. Must be
+// called before Start. Sets c.IgnoreClosedPipeError to true.
+func (p *Pipeline) PipeCombinedOutput(c *Cmd) {
+ p.sh.Ok()
+ handleError(p.sh, p.pipeTo(c, pipeCombinedOutput, false))
+}
+
+// Start starts all commands in the pipeline.
+func (p *Pipeline) Start() {
+ p.sh.Ok()
+ handleError(p.sh, p.start())
+}
+
+// Wait waits for all commands in the pipeline to exit.
+func (p *Pipeline) Wait() {
+ p.sh.Ok()
+ handleError(p.sh, p.wait())
+}
+
+// Signal sends a signal to all underlying processes in the pipeline.
+func (p *Pipeline) Signal(sig os.Signal) {
+ p.sh.Ok()
+ handleError(p.sh, p.signal(sig))
+}
+
+// Terminate sends a signal to all underlying processes in the pipeline, then
+// waits for all processes to exit. Terminate is different from Signal followed
+// by Wait: Terminate succeeds as long as all processes exit, whereas Wait fails
+// if any process's exit code isn't 0.
+func (p *Pipeline) Terminate(sig os.Signal) {
+ p.sh.Ok()
+ handleError(p.sh, p.terminate(sig))
+}
+
+// Run calls Start followed by Wait.
+func (p *Pipeline) Run() {
+ p.sh.Ok()
+ handleError(p.sh, p.run())
+}
+
+// Stdout calls Start followed by Wait, then returns the last command's stdout.
+func (p *Pipeline) Stdout() string {
+ p.sh.Ok()
+ res, err := p.stdout()
+ handleError(p.sh, err)
+ return res
+}
+
+// StdoutStderr calls Start followed by Wait, then returns the last command's
+// stdout and stderr.
+func (p *Pipeline) StdoutStderr() (string, string) {
+ p.sh.Ok()
+ stdout, stderr, err := p.stdoutStderr()
+ handleError(p.sh, err)
+ return stdout, stderr
+}
+
+// CombinedOutput calls Start followed by Wait, then returns the last command's
+// combined stdout and stderr.
+func (p *Pipeline) CombinedOutput() string {
+ p.sh.Ok()
+ res, err := p.combinedOutput()
+ handleError(p.sh, err)
+ return res
+}
+
+////////////////////////////////////////
+// Internals
+
+// handleError is used instead of direct calls to Shell.HandleError throughout
+// the pipeline implementation. This is needed to handle the case where the user
+// has set Shell.Opts.Fatalf to a non-fatal function.
+//
+// The general pattern is that after each Shell or Cmd method is called, we
+// check p.sh.Err. If there was an error it is wrapped with errAlreadyHandled,
+// indicating that Shell.HandleError has already been called with this error,
+// and should not be called again.
+func handleError(sh *Shell, err error) {
+ if _, ok := err.(errAlreadyHandled); ok {
+ return // the shell already handled this error
+ }
+ sh.HandleError(err)
+}
+
+type errAlreadyHandled struct {
+ error
+}
+
+func newPipeline(sh *Shell, first *Cmd, cmds ...*Cmd) (*Pipeline, error) {
+ p := &Pipeline{sh: sh, cmds: []*Cmd{first}}
+ first.IgnoreClosedPipeError = true
+ for _, c := range cmds {
+ if err := p.pipeTo(c, pipeStdout, false); err != nil {
+ return nil, err
+ }
+ }
+ return p, nil
+}
+
+func (p *Pipeline) clone() (*Pipeline, error) {
+ // Replicate the pipeline structure with cloned commands.
+ first := p.cmds[0].Clone()
+ if p.sh.Err != nil {
+ return nil, errAlreadyHandled{p.sh.Err}
+ }
+ res := &Pipeline{sh: p.sh, cmds: []*Cmd{first}}
+ for i := 1; i < len(p.cmds); i++ {
+ if err := res.pipeTo(p.cmds[i], p.state[i-1].mode, true); err != nil {
+ return nil, err
+ }
+ }
+ return res, nil
+}
+
+func (p *Pipeline) pipeTo(c *Cmd, mode pipeMode, clone bool) (e error) {
+ if p.sh != c.Shell() {
+ return errPipelineCmdsHaveDifferentShells
+ }
+ if clone {
+ c = c.Clone()
+ if p.sh.Err != nil {
+ return errAlreadyHandled{p.sh.Err}
+ }
+ } else {
+ c.IgnoreClosedPipeError = true
+ }
+ // We could just use c.StdinPipe() here, but that provides unlimited size
+ // buffering using a newBufferedPipe chained to an os.Pipe. We want limited
+ // size buffering to avoid unlimited memory growth, so we just use an os.Pipe.
+ pr, pw, err := os.Pipe()
+ if err != nil {
+ return err
+ }
+ defer func() {
+ // Close both ends of the pipe on failure; on success they'll be closed at
+ // the appropriate times in start and wait.
+ if e != nil {
+ pr.Close()
+ pw.Close()
+ }
+ }()
+ if c.SetStdinReader(pr); p.sh.Err != nil {
+ return errAlreadyHandled{p.sh.Err}
+ }
+ last := p.cmds[len(p.cmds)-1]
+ if mode == pipeStdout || mode == pipeCombinedOutput {
+ if last.AddStdoutWriter(pw); p.sh.Err != nil {
+ return errAlreadyHandled{p.sh.Err}
+ }
+ }
+ if mode == pipeStderr || mode == pipeCombinedOutput {
+ if last.AddStderrWriter(pw); p.sh.Err != nil {
+ return errAlreadyHandled{p.sh.Err}
+ }
+ }
+ p.cmds = append(p.cmds, c)
+ p.state = append(p.state, pipeState{mode, pr, pw})
+ return nil
+}
+
+// TODO(toddw): Clean up resources in Shell.Cleanup. E.g. we'll currently leak
+// the os.Pipe fds if the user sets up a pipeline, but never calls Start (or
+// Wait, Terminate).
+
+func (p *Pipeline) start() error {
+ // Start all commands in the pipeline, capturing the first error.
+ // Ensure all commands are processed, by avoiding early-exit.
+ var shErr, closeErr error
+ for i, c := range p.cmds {
+ p.sh.Err = nil
+ if c.Start(); p.sh.Err != nil && shErr == nil {
+ shErr = p.sh.Err
+ }
+ // Close the read-side of the stdin pipe for this command. The fd has been
+ // passed to the child process via Start, so we don't need it open anymore.
+ if i > 0 {
+ if err := p.state[i-1].stdinRead.Close(); err != nil && closeErr == nil {
+ closeErr = err
+ }
+ }
+ }
+ // If anything failed, close the write-side of the stdin pipes too.
+ if shErr != nil || closeErr != nil {
+ for _, state := range p.state {
+ state.stdinWrite.Close() // ignore error, since start or close failed.
+ }
+ }
+ if shErr != nil {
+ p.sh.Err = shErr
+ return errAlreadyHandled{shErr}
+ }
+ return closeErr
+}
+
+func (p *Pipeline) wait() error {
+ // Wait for all commands in the pipeline, capturing the first error.
+ // Ensure all commands are processed, by avoiding early-exit.
+ var shErr, closeErr error
+ for i, c := range p.cmds {
+ p.sh.Err = nil
+ if c.Wait(); p.sh.Err != nil && shErr == nil {
+ shErr = p.sh.Err
+ }
+ // Close the write-side of the stdin pipe for the next command, so that the
+ // next command will eventually read an EOF.
+ if i < len(p.state) {
+ if err := p.state[i].stdinWrite.Close(); err != nil && closeErr == nil {
+ closeErr = err
+ }
+ }
+ }
+ if shErr != nil {
+ p.sh.Err = shErr
+ return errAlreadyHandled{shErr}
+ }
+ return closeErr
+}
+
+func (p *Pipeline) signal(sig os.Signal) error {
+ // Signal all commands in the pipeline, capturing the first error.
+ // Ensure all commands are processed, by avoiding early-exit.
+ var shErr error
+ for _, c := range p.cmds {
+ p.sh.Err = nil
+ if c.Signal(sig); p.sh.Err != nil && shErr == nil {
+ shErr = p.sh.Err
+ }
+ }
+ if shErr != nil {
+ p.sh.Err = shErr
+ return errAlreadyHandled{shErr}
+ }
+ return nil
+}
+
+func (p *Pipeline) terminate(sig os.Signal) error {
+ // Terminate all commands in the pipeline, capturing the first error.
+ // Ensure all commands are processed, by avoiding early-exit.
+ var shErr, closeErr error
+ for i, c := range p.cmds {
+ p.sh.Err = nil
+ if c.Terminate(sig); p.sh.Err != nil && shErr == nil {
+ shErr = p.sh.Err
+ }
+ // Close the write-side of the stdin pipe for the next command, so that the
+ // next command will eventually read an EOF.
+ if i < len(p.state) {
+ if err := p.state[i].stdinWrite.Close(); err != nil && closeErr == nil {
+ closeErr = err
+ }
+ }
+ }
+ if shErr != nil {
+ p.sh.Err = shErr
+ return errAlreadyHandled{shErr}
+ }
+ return closeErr
+}
+
+func (p *Pipeline) run() error {
+ if err := p.start(); err != nil {
+ return err
+ }
+ return p.wait()
+}
+
+func (p *Pipeline) stdout() (string, error) {
+ var stdout bytes.Buffer
+ last := p.cmds[len(p.cmds)-1]
+ if last.AddStdoutWriter(&stdout); p.sh.Err != nil {
+ return "", errAlreadyHandled{p.sh.Err}
+ }
+ err := p.run()
+ return stdout.String(), err
+}
+
+func (p *Pipeline) stdoutStderr() (string, string, error) {
+ var stdout, stderr bytes.Buffer
+ last := p.cmds[len(p.cmds)-1]
+ if last.AddStdoutWriter(&stdout); p.sh.Err != nil {
+ return "", "", errAlreadyHandled{p.sh.Err}
+ }
+ if last.AddStderrWriter(&stderr); p.sh.Err != nil {
+ return "", "", errAlreadyHandled{p.sh.Err}
+ }
+ err := p.run()
+ return stdout.String(), stderr.String(), err
+}
+
+func (p *Pipeline) combinedOutput() (string, error) {
+ var output bytes.Buffer
+ last := p.cmds[len(p.cmds)-1]
+ if last.addStdoutWriter(&output); p.sh.Err != nil {
+ return "", errAlreadyHandled{p.sh.Err}
+ }
+ if last.addStderrWriter(&output); p.sh.Err != nil {
+ return "", errAlreadyHandled{p.sh.Err}
+ }
+ err := p.run()
+ return output.String(), err
+}
diff --git a/gosh/pipeline_test.go b/gosh/pipeline_test.go
new file mode 100644
index 0000000..d26d5b9
--- /dev/null
+++ b/gosh/pipeline_test.go
@@ -0,0 +1,241 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package gosh_test
+
+import (
+ "fmt"
+ "os"
+ "testing"
+ "time"
+
+ "v.io/x/lib/gosh"
+)
+
+func TestPipeline(t *testing.T) {
+ sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+ defer sh.Cleanup()
+
+ echo := sh.FuncCmd(echoFunc)
+ echo.Args = append(echo.Args, "foo")
+ replace := sh.FuncCmd(replaceFunc, byte('f'), byte('Z'))
+ cat := sh.FuncCmd(catFunc)
+ p := gosh.NewPipeline(echo, replace, cat)
+ eq(t, p.Stdout(), "Zoo\n")
+ eq(t, p.Clone().Stdout(), "Zoo\n")
+
+ // Try piping only stdout.
+ p = gosh.NewPipeline(sh.FuncCmd(writeFunc, true, true))
+ p.PipeStdout(sh.FuncCmd(replaceFunc, byte('A'), byte('Z')))
+ eq(t, p.Stdout(), "ZZ")
+ eq(t, p.Clone().Stdout(), "ZZ")
+
+ // Try piping only stderr.
+ p = gosh.NewPipeline(sh.FuncCmd(writeFunc, true, true))
+ p.PipeStderr(sh.FuncCmd(replaceFunc, byte('B'), byte('Z')))
+ eq(t, p.Stdout(), "ZZ")
+ eq(t, p.Clone().Stdout(), "ZZ")
+
+ // Try piping both stdout and stderr.
+ p = gosh.NewPipeline(sh.FuncCmd(writeFunc, true, true))
+ p.PipeCombinedOutput(sh.FuncCmd(catFunc))
+ // Note, we can't assume any particular ordering of stdout and stderr, so we
+ // simply check the length of the combined output.
+ eq(t, len(p.Stdout()), 4)
+ eq(t, len(p.Clone().Stdout()), 4)
+
+ // Try piping combinations.
+ p = gosh.NewPipeline(sh.FuncCmd(writeFunc, true, true))
+ p.PipeStderr(sh.FuncCmd(replaceFunc, byte('B'), byte('x')))
+ p.PipeStdout(sh.FuncCmd(replaceFunc, byte('x'), byte('Z')))
+ p.PipeStdout(sh.FuncCmd(catFunc))
+ eq(t, p.Stdout(), "ZZ")
+ eq(t, p.Clone().Stdout(), "ZZ")
+}
+
+func TestPipelineDifferentShells(t *testing.T) {
+ sh1 := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+ defer sh1.Cleanup()
+ sh2 := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+ defer sh2.Cleanup()
+
+ setsErr(t, sh1, func() { gosh.NewPipeline(sh1.FuncCmd(echoFunc), sh2.FuncCmd(catFunc)) })
+ setsErr(t, sh2, func() { gosh.NewPipeline(sh2.FuncCmd(echoFunc), sh1.FuncCmd(catFunc)) })
+ p := gosh.NewPipeline(sh1.FuncCmd(echoFunc))
+ setsErr(t, sh1, func() { p.PipeStdout(sh2.FuncCmd(catFunc)) })
+ p = gosh.NewPipeline(sh1.FuncCmd(echoFunc))
+ setsErr(t, sh1, func() { p.PipeStderr(sh2.FuncCmd(catFunc)) })
+ p = gosh.NewPipeline(sh1.FuncCmd(echoFunc))
+ setsErr(t, sh1, func() { p.PipeCombinedOutput(sh2.FuncCmd(catFunc)) })
+}
+
+var writeLoopFunc = gosh.RegisterFunc("writeLoopFunc", func() error {
+ for {
+ if _, err := os.Stdout.Write([]byte("a\n")); err != nil {
+ // Always return success; the purpose of this command is to ensure that
+ // when the next command in the pipeline fails, it causes a closed pipe
+ // write error here to exit the loop.
+ return nil
+ }
+ }
+})
+
+func TestPipelineClosedPipe(t *testing.T) {
+ sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+ defer sh.Cleanup()
+ writeLoop, readLine := sh.FuncCmd(writeLoopFunc), sh.FuncCmd(readFunc)
+
+ // WriteLoop finishes because it gets a closed pipe write error after readLine
+ // finishes. Note that the closed pipe error is ignored.
+ p := gosh.NewPipeline(writeLoop, readLine)
+ eq(t, p.Stdout(), "")
+ ok(t, p.Cmds()[0].Err)
+ ok(t, p.Cmds()[1].Err)
+ p = p.Clone()
+ eq(t, p.Stdout(), "")
+ ok(t, p.Cmds()[0].Err)
+ ok(t, p.Cmds()[1].Err)
+}
+
+func TestPipelineCmdFailure(t *testing.T) {
+ sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+ defer sh.Cleanup()
+ cat := sh.FuncCmd(catFunc)
+ exit1 := sh.FuncCmd(exitFunc, 1)
+ writeLoop := sh.FuncCmd(writeLoopFunc)
+ echoFoo := sh.FuncCmd(echoFunc)
+ echoFoo.Args = append(echoFoo.Args, "foo")
+
+ // Exit1 fails, and cat finishes with success since it sees an EOF.
+ p := gosh.NewPipeline(exit1.Clone(), cat.Clone())
+ setsErr(t, sh, p.Run)
+ nok(t, p.Cmds()[0].Err)
+ ok(t, p.Cmds()[1].Err)
+ p = p.Clone()
+ setsErr(t, sh, p.Run)
+ nok(t, p.Cmds()[0].Err)
+ ok(t, p.Cmds()[1].Err)
+ ok(t, sh.Err)
+
+ // Exit1 fails, and echoFoo finishes with success since it ignores stdin.
+ p = gosh.NewPipeline(exit1.Clone(), echoFoo.Clone())
+ setsErr(t, sh, p.Run)
+ nok(t, p.Cmds()[0].Err)
+ ok(t, p.Cmds()[1].Err)
+ p = p.Clone()
+ setsErr(t, sh, p.Run)
+ nok(t, p.Cmds()[0].Err)
+ ok(t, p.Cmds()[1].Err)
+ ok(t, sh.Err)
+
+ // Exit1 fails, causing writeLoop to finish and succeed.
+ p = gosh.NewPipeline(writeLoop.Clone(), exit1.Clone())
+ setsErr(t, sh, p.Run)
+ ok(t, p.Cmds()[0].Err)
+ nok(t, p.Cmds()[1].Err)
+ p = p.Clone()
+ setsErr(t, sh, p.Run)
+ ok(t, p.Cmds()[0].Err)
+ nok(t, p.Cmds()[1].Err)
+ ok(t, sh.Err)
+
+ // Same tests, but allowing the exit error from exit1.
+ exit1.ExitErrorIsOk = true
+
+ // Exit1 fails, and cat finishes with success since it sees an EOF.
+ p = gosh.NewPipeline(exit1.Clone(), cat.Clone())
+ eq(t, p.Stdout(), "")
+ nok(t, p.Cmds()[0].Err)
+ ok(t, p.Cmds()[1].Err)
+ ok(t, sh.Err)
+ p = p.Clone()
+ eq(t, p.Stdout(), "")
+ nok(t, p.Cmds()[0].Err)
+ ok(t, p.Cmds()[1].Err)
+ ok(t, sh.Err)
+
+ // Exit1 fails, and echoFoo finishes with success since it ignores stdin.
+ p = gosh.NewPipeline(exit1.Clone(), echoFoo.Clone())
+ eq(t, p.Stdout(), "foo\n")
+ nok(t, p.Cmds()[0].Err)
+ ok(t, p.Cmds()[1].Err)
+ ok(t, sh.Err)
+ p = p.Clone()
+ eq(t, p.Stdout(), "foo\n")
+ nok(t, p.Cmds()[0].Err)
+ ok(t, p.Cmds()[1].Err)
+ ok(t, sh.Err)
+
+ // Exit1 fails, causing writeLoop to finish and succeed.
+ p = gosh.NewPipeline(writeLoop.Clone(), exit1.Clone())
+ eq(t, p.Stdout(), "")
+ ok(t, p.Cmds()[0].Err)
+ nok(t, p.Cmds()[1].Err)
+ ok(t, sh.Err)
+ p = p.Clone()
+ eq(t, p.Stdout(), "")
+ ok(t, p.Cmds()[0].Err)
+ nok(t, p.Cmds()[1].Err)
+ ok(t, sh.Err)
+}
+
+func TestPipelineSignal(t *testing.T) {
+ sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+ defer sh.Cleanup()
+
+ for _, d := range []time.Duration{0, time.Hour} {
+ for _, s := range []os.Signal{os.Interrupt, os.Kill} {
+ fmt.Println(d, s)
+ p := gosh.NewPipeline(sh.FuncCmd(sleepFunc, d, 0), sh.FuncCmd(sleepFunc, d, 0))
+ p.Start()
+ p.Cmds()[0].AwaitVars("ready")
+ p.Cmds()[1].AwaitVars("ready")
+ // Wait for a bit to allow the zero-sleep commands to exit.
+ time.Sleep(100 * time.Millisecond)
+ p.Signal(s)
+ switch {
+ case s == os.Interrupt:
+ // Wait should succeed as long as the exit code was 0, regardless of
+ // whether the signal arrived or the processes had already exited.
+ p.Wait()
+ case d != 0:
+ // Note: We don't call Wait in the {d: 0, s: os.Kill} case because doing
+ // so makes the test flaky on slow systems.
+ setsErr(t, sh, func() { p.Wait() })
+ }
+ }
+ }
+
+ // Signal should fail if Wait has been called.
+ z := time.Duration(0)
+ p := gosh.NewPipeline(sh.FuncCmd(sleepFunc, z, 0), sh.FuncCmd(sleepFunc, z, 0))
+ p.Run()
+ setsErr(t, sh, func() { p.Signal(os.Interrupt) })
+}
+
+func TestPipelineTerminate(t *testing.T) {
+ sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+ defer sh.Cleanup()
+
+ for _, d := range []time.Duration{0, time.Hour} {
+ for _, s := range []os.Signal{os.Interrupt, os.Kill} {
+ fmt.Println(d, s)
+ p := gosh.NewPipeline(sh.FuncCmd(sleepFunc, d, 0), sh.FuncCmd(sleepFunc, d, 0))
+ p.Start()
+ p.Cmds()[0].AwaitVars("ready")
+ p.Cmds()[1].AwaitVars("ready")
+ // Wait for a bit to allow the zero-sleep commands to exit.
+ time.Sleep(100 * time.Millisecond)
+ // Terminate should succeed regardless of the exit code, and regardless of
+ // whether the signal arrived or the processes had already exited.
+ p.Terminate(s)
+ }
+ }
+
+ // Terminate should fail if Wait has been called.
+ z := time.Duration(0)
+ p := gosh.NewPipeline(sh.FuncCmd(sleepFunc, z, 0), sh.FuncCmd(sleepFunc, z, 0))
+ p.Run()
+ setsErr(t, sh, func() { p.Terminate(os.Interrupt) })
+}
diff --git a/gosh/shell.go b/gosh/shell.go
index 4426990..4dbcc51 100644
--- a/gosh/shell.go
+++ b/gosh/shell.go
@@ -15,7 +15,6 @@
import (
"errors"
"fmt"
- "io"
"io/ioutil"
"log"
"math/rand"
@@ -55,15 +54,15 @@
// Args is the list of args to append to subsequent command invocations.
Args []string
// Internal state.
- calledNewShell bool
- cleanupDone chan struct{}
- cleanupMu sync.Mutex // protects the fields below; held during cleanup
- calledCleanup bool
- cmds []*Cmd
- tempFiles []*os.File
- tempDirs []string
- dirStack []string // for pushd/popd
- cleanupFuncs []func()
+ calledNewShell bool
+ cleanupDone chan struct{}
+ cleanupMu sync.Mutex // protects the fields below; held during cleanup
+ calledCleanup bool
+ cmds []*Cmd
+ tempFiles []*os.File
+ tempDirs []string
+ dirStack []string // for pushd/popd
+ cleanupHandlers []func()
}
// Opts configures Shell.
@@ -176,10 +175,12 @@
sh.HandleError(sh.popd())
}
-// AddToCleanup registers the given function to be called by Shell.Cleanup().
-func (sh *Shell) AddToCleanup(f func()) {
+// AddCleanupHandler registers the given function to be called during cleanup.
+// Cleanup handlers are called in LIFO order, possibly in a separate goroutine
+// spawned by gosh.
+func (sh *Shell) AddCleanupHandler(f func()) {
sh.Ok()
- sh.HandleError(sh.addToCleanup(f))
+ sh.HandleError(sh.addCleanupHandler(f))
}
// Cleanup cleans up all resources (child processes, temporary files and
@@ -450,13 +451,13 @@
return nil
}
-func (sh *Shell) addToCleanup(f func()) error {
+func (sh *Shell) addCleanupHandler(f func()) error {
sh.cleanupMu.Lock()
defer sh.cleanupMu.Unlock()
if sh.calledCleanup {
return errAlreadyCalledCleanup
}
- sh.cleanupFuncs = append(sh.cleanupFuncs, f)
+ sh.cleanupHandlers = append(sh.cleanupHandlers, f)
return nil
}
@@ -532,9 +533,9 @@
sh.logf("os.Chdir(%q) failed: %v\n", dir, err)
}
}
- // Call any registered cleanup functions in LIFO order.
- for i := len(sh.cleanupFuncs) - 1; i >= 0; i-- {
- sh.cleanupFuncs[i]()
+ // Call cleanup handlers in LIFO order.
+ for i := len(sh.cleanupHandlers) - 1; i >= 0; i-- {
+ sh.cleanupHandlers[i]()
}
close(sh.cleanupDone)
}
@@ -564,15 +565,3 @@
}
os.Exit(0)
}
-
-// NopWriteCloser returns a WriteCloser with a no-op Close method wrapping the
-// provided Writer.
-func NopWriteCloser(w io.Writer) io.WriteCloser {
- return nopWriteCloser{w}
-}
-
-type nopWriteCloser struct {
- io.Writer
-}
-
-func (nopWriteCloser) Close() error { return nil }
diff --git a/gosh/shell_test.go b/gosh/shell_test.go
index d8d6930..0a7a6b6 100644
--- a/gosh/shell_test.go
+++ b/gosh/shell_test.go
@@ -28,7 +28,7 @@
"time"
"v.io/x/lib/gosh"
- "v.io/x/lib/gosh/internal/gosh_example_lib"
+ lib "v.io/x/lib/gosh/internal/gosh_example_lib"
)
var fakeError = errors.New("fake error")
@@ -95,11 +95,13 @@
// Simplified versions of various Unix commands.
var (
- catFunc = gosh.RegisterFunc("catFunc", func() {
- io.Copy(os.Stdout, os.Stdin)
+ catFunc = gosh.RegisterFunc("catFunc", func() error {
+ _, err := io.Copy(os.Stdout, os.Stdin)
+ return err
})
- echoFunc = gosh.RegisterFunc("echoFunc", func() {
- fmt.Println(os.Args[1])
+ echoFunc = gosh.RegisterFunc("echoFunc", func() error {
+ _, err := fmt.Println(os.Args[1])
+ return err
})
readFunc = gosh.RegisterFunc("readFunc", func() {
bufio.NewReader(os.Stdin).ReadString('\n')
@@ -494,16 +496,16 @@
defer sh.Cleanup()
c := sh.FuncCmd(writeFunc, true, true)
- c.AddStdoutWriter(gosh.NopWriteCloser(os.Stdout))
- c.AddStderrWriter(gosh.NopWriteCloser(os.Stderr))
+ c.AddStdoutWriter(os.Stdout)
+ c.AddStderrWriter(os.Stderr)
c.Run()
fmt.Fprint(os.Stdout, " stdout done")
fmt.Fprint(os.Stderr, " stderr done")
})
-// Tests that it's safe to add wrapped os.Stdout and os.Stderr as writers.
-func TestAddWritersWrappedStdoutStderr(t *testing.T) {
+// Tests that it's safe to add os.Stdout and os.Stderr as writers.
+func TestAddStdoutStderrWriter(t *testing.T) {
sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
defer sh.Cleanup()
@@ -512,26 +514,14 @@
eq(t, stderr, "BB stderr done")
}
-// Tests that adding non-wrapped os.Stdout or os.Stderr fails.
-func TestAddWritersNonWrappedStdoutStderr(t *testing.T) {
- sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
- defer sh.Cleanup()
-
- c := sh.FuncCmd(writeMoreFunc)
- setsErr(t, sh, func() { c.AddStdoutWriter(os.Stdout) })
- setsErr(t, sh, func() { c.AddStdoutWriter(os.Stderr) })
- setsErr(t, sh, func() { c.AddStderrWriter(os.Stdout) })
- setsErr(t, sh, func() { c.AddStderrWriter(os.Stderr) })
-}
-
func TestCombinedOutput(t *testing.T) {
sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
defer sh.Cleanup()
c := sh.FuncCmd(writeFunc, true, true)
buf := &bytes.Buffer{}
- c.AddStdoutWriter(gosh.NopWriteCloser(buf))
- c.AddStderrWriter(gosh.NopWriteCloser(buf))
+ c.AddStdoutWriter(buf)
+ c.AddStderrWriter(buf)
output := c.CombinedOutput()
// Note, we can't assume any particular ordering of stdout and stderr, so we
// simply check the length of the combined output.
@@ -565,35 +555,22 @@
eq(t, string(stderr), "BB")
}
-type countingWriteCloser struct {
- io.Writer
- count int
-}
-
-func (wc *countingWriteCloser) Close() error {
- wc.count++
- return nil
-}
-
-// Tests that Close is called exactly once on a given WriteCloser, even if that
-// WriteCloser is passed to Add{Stdout,Stderr}Writer multiple times.
-func TestAddWritersCloseOnce(t *testing.T) {
- sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
- defer sh.Cleanup()
-
- c := sh.FuncCmd(writeFunc, true, true)
- buf := &bytes.Buffer{}
- wc := &countingWriteCloser{Writer: buf}
- c.AddStdoutWriter(wc)
- c.AddStdoutWriter(wc)
- c.AddStderrWriter(wc)
- c.AddStderrWriter(wc)
- c.Run()
- // Note, we can't assume any particular ordering of stdout and stderr, so we
- // simply check the length of the combined output.
- eq(t, len(buf.String()), 8)
- eq(t, wc.count, 1)
-}
+var replaceFunc = gosh.RegisterFunc("replaceFunc", func(old, new byte) error {
+ buf := make([]byte, 1024)
+ for {
+ n, err := os.Stdin.Read(buf)
+ switch {
+ case err == io.EOF:
+ return nil
+ case err != nil:
+ return err
+ }
+ rep := bytes.Replace(buf[:n], []byte{old}, []byte{new}, -1)
+ if _, err := os.Stdout.Write(rep); err != nil {
+ return err
+ }
+ }
+})
func TestSignal(t *testing.T) {
sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
@@ -718,6 +695,34 @@
nok(t, c.Err)
}
+func TestIgnoreClosedPipeError(t *testing.T) {
+ sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+ defer sh.Cleanup()
+
+ // Since writeLoopFunc will only finish if it receives a write error, it's
+ // depending on the closed pipe error from closedPipeErrorWriter.
+ c := sh.FuncCmd(writeLoopFunc)
+ c.AddStdoutWriter(errorWriter{io.ErrClosedPipe})
+ c.IgnoreClosedPipeError = true
+ c.Run()
+ ok(t, c.Err)
+ ok(t, sh.Err)
+
+ // Without IgnoreClosedPipeError, the command fails.
+ c = sh.FuncCmd(writeLoopFunc)
+ c.AddStdoutWriter(errorWriter{io.ErrClosedPipe})
+ setsErr(t, sh, func() { c.Run() })
+ nok(t, c.Err)
+}
+
+type errorWriter struct {
+ error
+}
+
+func (w errorWriter) Write(p []byte) (int, error) {
+ return 0, w.error
+}
+
// Tests that sh.Ok panics under various conditions.
func TestOkPanics(t *testing.T) {
func() { // errDidNotCallNewShell