lib: Add gosh.Pipeline, simplify AddStd{out,err}Writer.

This CL introduces a new Pipeline concept to gosh.  The idea is
similar to bash command pipelines; you can connect a sequence of
commands together, with stdout and/or stderr of one command
connected to the stdin of the next command.  The Pipeline API is
similar to the Cmd API, with unnecessary methods removed, and
with Pipe{Stdout,Stderr,CombinedOutput} methods to construct the
pipeline.

Also simplified Cmd.AddStd{out,err}Writer semantics, by changing
the argument from io.WriteCloser to io.Writer.  We no longer
automatically close these writers on exit.  This means we can now
write AddStdoutWriter(os.Stdout) without needing
gosh.NopWriteCloser, which has been removed.

If you need to close your writer, you are responsible for calling
close at the appropriate point in the code yourself.

MultiPart: 3/4

Change-Id: I3885b5eae997100dbf6f876818e1c9a1297f17b7
diff --git a/gosh/.api b/gosh/.api
index c48f046..edfb2a8 100644
--- a/gosh/.api
+++ b/gosh/.api
@@ -1,17 +1,18 @@
 pkg gosh, func InitChildMain()
 pkg gosh, func InitMain()
+pkg gosh, func NewPipeline(*Cmd, ...*Cmd) *Pipeline
 pkg gosh, func NewShell(Opts) *Shell
-pkg gosh, func NopWriteCloser(io.Writer) io.WriteCloser
 pkg gosh, func RegisterFunc(string, interface{}) *Func
 pkg gosh, func SendVars(map[string]string)
-pkg gosh, method (*Cmd) AddStderrWriter(io.WriteCloser)
-pkg gosh, method (*Cmd) AddStdoutWriter(io.WriteCloser)
+pkg gosh, method (*Cmd) AddStderrWriter(io.Writer)
+pkg gosh, method (*Cmd) AddStdoutWriter(io.Writer)
 pkg gosh, method (*Cmd) AwaitVars(...string) map[string]string
 pkg gosh, method (*Cmd) Clone() *Cmd
 pkg gosh, method (*Cmd) CombinedOutput() string
 pkg gosh, method (*Cmd) Pid() int
 pkg gosh, method (*Cmd) Run()
 pkg gosh, method (*Cmd) SetStdinReader(io.Reader)
+pkg gosh, method (*Cmd) Shell() *Shell
 pkg gosh, method (*Cmd) Signal(os.Signal)
 pkg gosh, method (*Cmd) Start()
 pkg gosh, method (*Cmd) StderrPipe() io.ReadCloser
@@ -21,7 +22,20 @@
 pkg gosh, method (*Cmd) StdoutStderr() (string, string)
 pkg gosh, method (*Cmd) Terminate(os.Signal)
 pkg gosh, method (*Cmd) Wait()
-pkg gosh, method (*Shell) AddToCleanup(func())
+pkg gosh, method (*Pipeline) Clone() *Pipeline
+pkg gosh, method (*Pipeline) Cmds() []*Cmd
+pkg gosh, method (*Pipeline) CombinedOutput() string
+pkg gosh, method (*Pipeline) PipeCombinedOutput(*Cmd)
+pkg gosh, method (*Pipeline) PipeStderr(*Cmd)
+pkg gosh, method (*Pipeline) PipeStdout(*Cmd)
+pkg gosh, method (*Pipeline) Run()
+pkg gosh, method (*Pipeline) Signal(os.Signal)
+pkg gosh, method (*Pipeline) Start()
+pkg gosh, method (*Pipeline) Stdout() string
+pkg gosh, method (*Pipeline) StdoutStderr() (string, string)
+pkg gosh, method (*Pipeline) Terminate(os.Signal)
+pkg gosh, method (*Pipeline) Wait()
+pkg gosh, method (*Shell) AddCleanupHandler(func())
 pkg gosh, method (*Shell) BuildGoPkg(string, ...string) string
 pkg gosh, method (*Shell) Cleanup()
 pkg gosh, method (*Shell) Cmd(string, ...string) *Cmd
@@ -39,6 +53,7 @@
 pkg gosh, type Cmd struct, Err error
 pkg gosh, type Cmd struct, ExitAfter time.Duration
 pkg gosh, type Cmd struct, ExitErrorIsOk bool
+pkg gosh, type Cmd struct, IgnoreClosedPipeError bool
 pkg gosh, type Cmd struct, IgnoreParentExit bool
 pkg gosh, type Cmd struct, OutputDir string
 pkg gosh, type Cmd struct, Path string
@@ -51,6 +66,7 @@
 pkg gosh, type Opts struct, Fatalf func(string, ...interface{})
 pkg gosh, type Opts struct, Logf func(string, ...interface{})
 pkg gosh, type Opts struct, PropagateChildOutput bool
+pkg gosh, type Pipeline struct
 pkg gosh, type Shell struct
 pkg gosh, type Shell struct, Args []string
 pkg gosh, type Shell struct, Err error
diff --git a/gosh/cmd.go b/gosh/cmd.go
index e956500..0ab4d61 100644
--- a/gosh/cmd.go
+++ b/gosh/cmd.go
@@ -21,8 +21,6 @@
 	errAlreadyCalledStart = errors.New("gosh: already called Cmd.Start")
 	errAlreadyCalledWait  = errors.New("gosh: already called Cmd.Wait")
 	errAlreadySetStdin    = errors.New("gosh: already set stdin")
-	errCloseStdout        = errors.New("gosh: use NopWriteCloser(os.Stdout) to prevent stdout from being closed")
-	errCloseStderr        = errors.New("gosh: use NopWriteCloser(os.Stderr) to prevent stderr from being closed")
 	errDidNotCallStart    = errors.New("gosh: did not call Cmd.Start")
 	errProcessExited      = errors.New("gosh: process exited")
 )
@@ -56,6 +54,13 @@
 	// ExitErrorIsOk specifies whether an *exec.ExitError should be reported via
 	// Shell.HandleError.
 	ExitErrorIsOk bool
+	// IgnoreClosedPipeError, if true, causes errors from read/write on a closed
+	// pipe to be indistinguishable from success. These errors often occur in
+	// command pipelines, e.g. "yes | head -1", where "yes" will receive a closed
+	// pipe error when it tries to write on stdout, after "head" has exited. If a
+	// closed pipe error occurs, Cmd.Err will be nil, and no err is reported to
+	// Shell.HandleError.
+	IgnoreClosedPipeError bool
 	// Internal state.
 	sh                *Shell
 	c                 *exec.Cmd
@@ -73,6 +78,11 @@
 	recvVars          map[string]string // protected by cond.L
 }
 
+// Shell returns the shell that this Cmd was created from.
+func (c *Cmd) Shell() *Shell {
+	return c.sh
+}
+
 // Clone returns a new Cmd with a copy of this Cmd's configuration.
 func (c *Cmd) Clone() *Cmd {
 	c.sh.Ok()
@@ -94,8 +104,10 @@
 }
 
 // StdoutPipe returns a ReadCloser backed by an unlimited-size pipe for the
-// command's stdout. Must be called before Start. May be called more than once;
-// each invocation creates a new pipe.
+// command's stdout. The pipe will be closed when the process exits, but may
+// also be closed earlier by the caller, e.g. if all expected output has been
+// received. Must be called before Start. May be called more than once; each
+// call creates a new pipe.
 func (c *Cmd) StdoutPipe() io.ReadCloser {
 	c.sh.Ok()
 	res, err := c.stdoutPipe()
@@ -104,8 +116,10 @@
 }
 
 // StderrPipe returns a ReadCloser backed by an unlimited-size pipe for the
-// command's stderr. Must be called before Start. May be called more than once;
-// each invocation creates a new pipe.
+// command's stderr. The pipe will be closed when the process exits, but may
+// also be closed earlier by the caller, e.g. if all expected output has been
+// received. Must be called before Start. May be called more than once; each
+// call creates a new pipe.
 func (c *Cmd) StderrPipe() io.ReadCloser {
 	c.sh.Ok()
 	res, err := c.stderrPipe()
@@ -113,42 +127,28 @@
 	return res
 }
 
-// SetStdinReader configures this Cmd to read the child's stdin from the given
-// Reader. Must be called before Start. Only one call may be made to StdinPipe
-// or SetStdinReader; subsequent calls will fail.
+// SetStdinReader configures this Cmd to read stdin from the given Reader. Must
+// be called before Start. Only one call may be made to StdinPipe or
+// SetStdinReader; subsequent calls will fail.
 func (c *Cmd) SetStdinReader(r io.Reader) {
 	c.sh.Ok()
 	c.handleError(c.setStdinReader(r))
 }
 
-// AddStdoutWriter configures this Cmd to tee the child's stdout to the given
-// WriteCloser, which will be closed when the process exits.
-//
-// If the same WriteCloser is passed to both AddStdoutWriter and
-// AddStderrWriter, Cmd will ensure that its methods are never called
-// concurrently and that Close is only called once.
-//
-// Use NopWriteCloser to extend a Writer to a WriteCloser, or to prevent an
-// existing WriteCloser from being closed. It is an error to pass in os.Stdout
-// or os.Stderr, since they shouldn't be closed.
-func (c *Cmd) AddStdoutWriter(wc io.WriteCloser) {
+// AddStdoutWriter configures this Cmd to tee stdout to the given Writer. Must
+// be called before Start. If the same Writer is passed to both AddStdoutWriter
+// and AddStderrWriter, Cmd will ensure that Write is never called concurrently.
+func (c *Cmd) AddStdoutWriter(w io.Writer) {
 	c.sh.Ok()
-	c.handleError(c.addStdoutWriter(wc))
+	c.handleError(c.addStdoutWriter(w))
 }
 
-// AddStderrWriter configures this Cmd to tee the child's stderr to the given
-// WriteCloser, which will be closed when the process exits.
-//
-// If the same WriteCloser is passed to both AddStdoutWriter and
-// AddStderrWriter, Cmd will ensure that its methods are never called
-// concurrently and that Close is only called once.
-//
-// Use NopWriteCloser to extend a Writer to a WriteCloser, or to prevent an
-// existing WriteCloser from being closed. It is an error to pass in os.Stdout
-// or os.Stderr, since they shouldn't be closed.
-func (c *Cmd) AddStderrWriter(wc io.WriteCloser) {
+// AddStderrWriter configures this Cmd to tee stderr to the given Writer. Must
+// be called before Start. If the same Writer is passed to both AddStdoutWriter
+// and AddStderrWriter, Cmd will ensure that Write is never called concurrently.
+func (c *Cmd) AddStderrWriter(w io.Writer) {
 	c.sh.Ok()
-	c.handleError(c.addStderrWriter(wc))
+	c.handleError(c.addStderrWriter(w))
 }
 
 // Start starts the command.
@@ -172,15 +172,15 @@
 	c.handleError(c.wait())
 }
 
-// Signal sends a signal to the process.
+// Signal sends a signal to the underlying process.
 func (c *Cmd) Signal(sig os.Signal) {
 	c.sh.Ok()
 	c.handleError(c.signal(sig))
 }
 
-// Terminate sends a signal to the process, then waits for it to exit. Terminate
-// is different from Signal followed by Wait: Terminate succeeds as long as the
-// process exits, whereas Wait fails if the exit code isn't 0.
+// Terminate sends a signal to the underlying process, then waits for it to
+// exit. Terminate is different from Signal followed by Wait: Terminate succeeds
+// as long as the process exits, whereas Wait fails if the exit code isn't 0.
 func (c *Cmd) Terminate(sig os.Signal) {
 	c.sh.Ok()
 	c.handleError(c.terminate(sig))
@@ -271,7 +271,35 @@
 	return err == nil
 }
 
+// An explanation of closed pipe errors. Consider the pipeline "yes | head -1",
+// where yes keeps writing "y\n" to stdout, and head succeeds after it reads the
+// first line. There is an os pipe connecting the two commands, and when head
+// exits, it causes yes to receive a closed pipe error on its next write. Should
+// we consider such a pipeline to have succeeded or failed?
+//
+// Bash only looks at the exit status of the last command by default, thus the
+// pipeline succeeds. But that's dangerous, since yes could have crashed and we
+// wouldn't know. It's recommended to run "set -o pipefail" to tell bash to
+// check the exit status of all commands. But that causes the pipeline to fail.
+//
+// IgnoreClosedPipeError handles this case. gosh.Pipeline sets this option to
+// true, so that by default the pipeline above will succeed, but will fail on
+// any other error. Note that the exec package always returns an ExitError if
+// the child process exited with a non-zero exit code; the closed pipe error is
+// only returned if the child process exited with a zero exit code, and Write on
+// the io.MultiWriter in the parent process received the closed pipe error.
+//
+// TODO(toddw): We could adopt the convention that exit code 141 indicates
+// "closed pipe", and use IgnoreClosedPipeError to also ignore that case. We
+// choose 141 because it's 128 + 13, where SIGPIPE is 13, and there is an
+// existing convention for this. By default Go programs ignore SIGPIPE, so we
+// might also want to add code to InitChildMain to exit the program with 141 if
+// it receives SIGPIPE.
+
 func (c *Cmd) handleError(err error) {
+	if c.IgnoreClosedPipeError && isClosedPipeError(err) {
+		err = nil
+	}
 	c.Err = err
 	if !c.errorIsOk(err) {
 		c.sh.HandleError(err)
@@ -400,6 +428,7 @@
 	res.PropagateOutput = c.PropagateOutput
 	res.OutputDir = c.OutputDir
 	res.ExitErrorIsOk = c.ExitErrorIsOk
+	res.IgnoreClosedPipeError = c.IgnoreClosedPipeError
 	return res, nil
 }
 
@@ -436,11 +465,7 @@
 
 func (c *Cmd) stdinPipeCopier(dst io.WriteCloser, src io.Reader) {
 	var firstErr error
-	_, err := io.Copy(dst, src)
-	// Ignore EPIPE errors copying to stdin, indicating the process exited. This
-	// mirrors logic in os/exec/exec_posix.go. Also see:
-	// https://github.com/golang/go/issues/9173
-	if pe, ok := err.(*os.PathError); !ok || pe.Op != "write" || pe.Path != "|1" || pe.Err != syscall.EPIPE {
+	if _, err := io.Copy(dst, src); err != nil && !isClosedPipeError(err) {
 		firstErr = err
 	}
 	if err := dst.Close(); err != nil && firstErr == nil {
@@ -449,6 +474,21 @@
 	c.stdinDoneChan <- firstErr
 }
 
+// isClosedPipeError returns true iff the error indicates a closed pipe. This
+// typically occurs with a pipeline of commands "A | B"; if B exits first, the
+// next write by A will receive a closed pipe error. Also see:
+// https://github.com/golang/go/issues/9173
+func isClosedPipeError(err error) bool {
+	if err == io.ErrClosedPipe {
+		return true
+	}
+	// Closed pipe on os.Pipe; mirrors logic in os/exec/exec_posix.go.
+	if pe, ok := err.(*os.PathError); ok && pe.Op == "write" && pe.Path == "|1" && pe.Err == syscall.EPIPE {
+		return true
+	}
+	return false
+}
+
 func (c *Cmd) setStdinReader(r io.Reader) error {
 	switch {
 	case c.calledStart:
@@ -480,42 +520,37 @@
 	return p, nil
 }
 
-func (c *Cmd) addStdoutWriter(wc io.WriteCloser) error {
-	switch {
-	case c.calledStart:
+func (c *Cmd) addStdoutWriter(w io.Writer) error {
+	if c.calledStart {
 		return errAlreadyCalledStart
-	case wc == os.Stdout:
-		return errCloseStdout
-	case wc == os.Stderr:
-		return errCloseStderr
 	}
-	c.stdoutWriters = append(c.stdoutWriters, wc)
-	c.afterWaitClosers = append(c.afterWaitClosers, wc)
+	c.stdoutWriters = append(c.stdoutWriters, w)
 	return nil
 }
 
-func (c *Cmd) addStderrWriter(wc io.WriteCloser) error {
-	switch {
-	case c.calledStart:
+func (c *Cmd) addStderrWriter(w io.Writer) error {
+	if c.calledStart {
 		return errAlreadyCalledStart
-	case wc == os.Stdout:
-		return errCloseStdout
-	case wc == os.Stderr:
-		return errCloseStderr
 	}
-	c.stderrWriters = append(c.stderrWriters, wc)
-	c.afterWaitClosers = append(c.afterWaitClosers, wc)
+	c.stderrWriters = append(c.stderrWriters, w)
 	return nil
 }
 
 // TODO(sadovsky): Maybe wrap every child process with a "supervisor" process
 // that calls InitChildMain.
 
-func (c *Cmd) start() error {
+func (c *Cmd) start() (e error) {
 	defer func() {
-		closeClosers(c.afterStartClosers)
+		// Always close afterStartClosers upon return. Only close afterWaitClosers
+		// if start failed; if start succeeds, they're closed in the startExitWaiter
+		// goroutine. Only the first error is reported.
+		if err := closeClosers(c.afterStartClosers); e == nil {
+			e = err
+		}
 		if !c.started {
-			closeClosers(c.afterWaitClosers)
+			if err := closeClosers(c.afterWaitClosers); e == nil {
+				e = err
+			}
 		}
 	}()
 	if c.calledStart {
@@ -568,7 +603,9 @@
 		c.exited = true
 		c.cond.Signal()
 		c.cond.L.Unlock()
-		closeClosers(c.afterWaitClosers)
+		if err := closeClosers(c.afterWaitClosers); waitErr == nil {
+			waitErr = err
+		}
 		if c.stdinDoneChan != nil {
 			// Wait for the stdinPipeCopier goroutine to finish.
 			if err := <-c.stdinDoneChan; waitErr == nil {
@@ -579,24 +616,23 @@
 	}()
 }
 
-func closeClosers(closers []io.Closer) {
-	// If the same WriteCloser was passed to both AddStdoutWriter and
-	// AddStderrWriter, we should only close it once.
-	cm := map[io.Closer]bool{}
+func closeClosers(closers []io.Closer) error {
+	var firstErr error
 	for _, closer := range closers {
-		if !cm[closer] {
-			cm[closer] = true
-			closer.Close() // best-effort; ignore returned error
+		if err := closer.Close(); firstErr == nil {
+			firstErr = err
 		}
 	}
+	return firstErr
 }
 
 // TODO(sadovsky): Maybe add optional timeouts for Cmd.{awaitVars,wait}.
 
 func (c *Cmd) awaitVars(keys ...string) (map[string]string, error) {
-	if !c.started {
+	switch {
+	case !c.started:
 		return nil, errDidNotCallStart
-	} else if c.calledWait {
+	case c.calledWait:
 		return nil, errAlreadyCalledWait
 	}
 	wantKeys := map[string]bool{}
@@ -626,9 +662,10 @@
 }
 
 func (c *Cmd) wait() error {
-	if !c.started {
+	switch {
+	case !c.started:
 		return errDidNotCallStart
-	} else if c.calledWait {
+	case c.calledWait:
 		return errAlreadyCalledWait
 	}
 	c.calledWait = true
@@ -647,9 +684,10 @@
 // of the os.Signal interface, and have the signal and terminate methods map
 // that to Process.Kill.
 func (c *Cmd) signal(sig os.Signal) error {
-	if !c.started {
+	switch {
+	case !c.started:
 		return errDidNotCallStart
-	} else if c.calledWait {
+	case c.calledWait:
 		return errAlreadyCalledWait
 	}
 	if !c.isRunning() {
diff --git a/gosh/pipeline.go b/gosh/pipeline.go
new file mode 100644
index 0000000..c9aec89
--- /dev/null
+++ b/gosh/pipeline.go
@@ -0,0 +1,395 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package gosh
+
+import (
+	"bytes"
+	"errors"
+	"io"
+	"os"
+)
+
+var errPipelineCmdsHaveDifferentShells = errors.New("gosh: pipeline cmds have different shells")
+
+// Pipeline represents a pipeline of commands, where the stdout and/or stderr of
+// one command is connected to the stdin of the next command.
+//
+// The failure semantics of a pipeline are determined by the failure semantics
+// of each command; by default the pipeline fails if any command fails, and
+// read/write errors on closed pipes are ignored. This is different from bash,
+// where the default is to only check the status of the last command, unless
+// "set -o pipefail" is enabled to check the status of all commands, causing
+// closed pipe errors to fail the pipeline. Use Cmd.ExitErrorIsOk and
+// Cmd.IgnoreClosedPipeError to fine-tune the failure semantics.
+//
+// The implementation of Pipeline only uses exported methods from Shell and Cmd.
+type Pipeline struct {
+	sh    *Shell
+	cmds  []*Cmd      // INVARIANT: len(cmds) > 0
+	state []pipeState // INVARIANT: len(state) == len(cmds) - 1
+}
+
+type pipeState struct {
+	mode       pipeMode  // describes how to connect cmds[i] to cmds[i+1]
+	stdinRead  io.Closer // read-side closer of the stdin pipe for cmds[i+1]
+	stdinWrite io.Closer // write-side closer of the stdin pipe for cmds[i+1]
+}
+
+type pipeMode int
+
+const (
+	pipeStdout pipeMode = iota
+	pipeStderr
+	pipeCombinedOutput
+)
+
+// NewPipeline returns a new Pipeline starting with c. The stdout of each
+// command is connected to the stdin of the next command, via calls to
+// Pipeline.PipeStdout. To construct pipelines involving stderr, call
+// Pipeline.PipeStderr or Pipeline.PipeCombinedOutput directly.
+//
+// Each command must have been created from the same Shell. Errors are reported
+// to c.Shell, via Shell.HandleError. Sets Cmd.IgnoreClosedPipeError to true for
+// all commands.
+func NewPipeline(c *Cmd, cmds ...*Cmd) *Pipeline {
+	sh := c.Shell()
+	sh.Ok()
+	res, err := newPipeline(sh, c, cmds...)
+	handleError(sh, err)
+	return res
+}
+
+// Cmds returns the commands in the pipeline.
+func (p *Pipeline) Cmds() []*Cmd {
+	return p.cmds
+}
+
+// Clone returns a new Pipeline where p's commands are cloned and connected with
+// the same pipeline structure as in p.
+func (p *Pipeline) Clone() *Pipeline {
+	p.sh.Ok()
+	res, err := p.clone()
+	handleError(p.sh, err)
+	return res
+}
+
+// PipeStdout connects the stdout of the last command in p to the stdin of c,
+// and appends c to the commands in p. Must be called before Start. Sets
+// c.IgnoreClosedPipeError to true.
+func (p *Pipeline) PipeStdout(c *Cmd) {
+	p.sh.Ok()
+	handleError(p.sh, p.pipeTo(c, pipeStdout, false))
+}
+
+// PipeStderr connects the stderr of the last command in p to the stdin of c,
+// and appends c to the commands in p. Must be called before Start. Sets
+// c.IgnoreClosedPipeError to true.
+func (p *Pipeline) PipeStderr(c *Cmd) {
+	p.sh.Ok()
+	handleError(p.sh, p.pipeTo(c, pipeStderr, false))
+}
+
+// PipeCombinedOutput connects the combined stdout and stderr of the last
+// command in p to the stdin of c, and appends c to the commands in p. Must be
+// called before Start. Sets c.IgnoreClosedPipeError to true.
+func (p *Pipeline) PipeCombinedOutput(c *Cmd) {
+	p.sh.Ok()
+	handleError(p.sh, p.pipeTo(c, pipeCombinedOutput, false))
+}
+
+// Start starts all commands in the pipeline.
+func (p *Pipeline) Start() {
+	p.sh.Ok()
+	handleError(p.sh, p.start())
+}
+
+// Wait waits for all commands in the pipeline to exit.
+func (p *Pipeline) Wait() {
+	p.sh.Ok()
+	handleError(p.sh, p.wait())
+}
+
+// Signal sends a signal to all underlying processes in the pipeline.
+func (p *Pipeline) Signal(sig os.Signal) {
+	p.sh.Ok()
+	handleError(p.sh, p.signal(sig))
+}
+
+// Terminate sends a signal to all underlying processes in the pipeline, then
+// waits for all processes to exit. Terminate is different from Signal followed
+// by Wait: Terminate succeeds as long as all processes exit, whereas Wait fails
+// if any process's exit code isn't 0.
+func (p *Pipeline) Terminate(sig os.Signal) {
+	p.sh.Ok()
+	handleError(p.sh, p.terminate(sig))
+}
+
+// Run calls Start followed by Wait.
+func (p *Pipeline) Run() {
+	p.sh.Ok()
+	handleError(p.sh, p.run())
+}
+
+// Stdout calls Start followed by Wait, then returns the last command's stdout.
+func (p *Pipeline) Stdout() string {
+	p.sh.Ok()
+	res, err := p.stdout()
+	handleError(p.sh, err)
+	return res
+}
+
+// StdoutStderr calls Start followed by Wait, then returns the last command's
+// stdout and stderr.
+func (p *Pipeline) StdoutStderr() (string, string) {
+	p.sh.Ok()
+	stdout, stderr, err := p.stdoutStderr()
+	handleError(p.sh, err)
+	return stdout, stderr
+}
+
+// CombinedOutput calls Start followed by Wait, then returns the last command's
+// combined stdout and stderr.
+func (p *Pipeline) CombinedOutput() string {
+	p.sh.Ok()
+	res, err := p.combinedOutput()
+	handleError(p.sh, err)
+	return res
+}
+
+////////////////////////////////////////
+// Internals
+
+// handleError is used instead of direct calls to Shell.HandleError throughout
+// the pipeline implementation. This is needed to handle the case where the user
+// has set Shell.Opts.Fatalf to a non-fatal function.
+//
+// The general pattern is that after each Shell or Cmd method is called, we
+// check p.sh.Err. If there was an error it is wrapped with errAlreadyHandled,
+// indicating that Shell.HandleError has already been called with this error,
+// and should not be called again.
+func handleError(sh *Shell, err error) {
+	if _, ok := err.(errAlreadyHandled); ok {
+		return // the shell already handled this error
+	}
+	sh.HandleError(err)
+}
+
+type errAlreadyHandled struct {
+	error
+}
+
+func newPipeline(sh *Shell, first *Cmd, cmds ...*Cmd) (*Pipeline, error) {
+	p := &Pipeline{sh: sh, cmds: []*Cmd{first}}
+	first.IgnoreClosedPipeError = true
+	for _, c := range cmds {
+		if err := p.pipeTo(c, pipeStdout, false); err != nil {
+			return nil, err
+		}
+	}
+	return p, nil
+}
+
+func (p *Pipeline) clone() (*Pipeline, error) {
+	// Replicate the pipeline structure with cloned commands.
+	first := p.cmds[0].Clone()
+	if p.sh.Err != nil {
+		return nil, errAlreadyHandled{p.sh.Err}
+	}
+	res := &Pipeline{sh: p.sh, cmds: []*Cmd{first}}
+	for i := 1; i < len(p.cmds); i++ {
+		if err := res.pipeTo(p.cmds[i], p.state[i-1].mode, true); err != nil {
+			return nil, err
+		}
+	}
+	return res, nil
+}
+
+func (p *Pipeline) pipeTo(c *Cmd, mode pipeMode, clone bool) (e error) {
+	if p.sh != c.Shell() {
+		return errPipelineCmdsHaveDifferentShells
+	}
+	if clone {
+		c = c.Clone()
+		if p.sh.Err != nil {
+			return errAlreadyHandled{p.sh.Err}
+		}
+	} else {
+		c.IgnoreClosedPipeError = true
+	}
+	// We could just use c.StdinPipe() here, but that provides unlimited size
+	// buffering using a newBufferedPipe chained to an os.Pipe. We want limited
+	// size buffering to avoid unlimited memory growth, so we just use an os.Pipe.
+	pr, pw, err := os.Pipe()
+	if err != nil {
+		return err
+	}
+	defer func() {
+		// Close both ends of the pipe on failure; on success they'll be closed at
+		// the appropriate times in start and wait.
+		if e != nil {
+			pr.Close()
+			pw.Close()
+		}
+	}()
+	if c.SetStdinReader(pr); p.sh.Err != nil {
+		return errAlreadyHandled{p.sh.Err}
+	}
+	last := p.cmds[len(p.cmds)-1]
+	if mode == pipeStdout || mode == pipeCombinedOutput {
+		if last.AddStdoutWriter(pw); p.sh.Err != nil {
+			return errAlreadyHandled{p.sh.Err}
+		}
+	}
+	if mode == pipeStderr || mode == pipeCombinedOutput {
+		if last.AddStderrWriter(pw); p.sh.Err != nil {
+			return errAlreadyHandled{p.sh.Err}
+		}
+	}
+	p.cmds = append(p.cmds, c)
+	p.state = append(p.state, pipeState{mode, pr, pw})
+	return nil
+}
+
+// TODO(toddw): Clean up resources in Shell.Cleanup. E.g. we'll currently leak
+// the os.Pipe fds if the user sets up a pipeline, but never calls Start (or
+// Wait, Terminate).
+
+func (p *Pipeline) start() error {
+	// Start all commands in the pipeline, capturing the first error.
+	// Ensure all commands are processed, by avoiding early-exit.
+	var shErr, closeErr error
+	for i, c := range p.cmds {
+		p.sh.Err = nil
+		if c.Start(); p.sh.Err != nil && shErr == nil {
+			shErr = p.sh.Err
+		}
+		// Close the read-side of the stdin pipe for this command. The fd has been
+		// passed to the child process via Start, so we don't need it open anymore.
+		if i > 0 {
+			if err := p.state[i-1].stdinRead.Close(); err != nil && closeErr == nil {
+				closeErr = err
+			}
+		}
+	}
+	// If anything failed, close the write-side of the stdin pipes too.
+	if shErr != nil || closeErr != nil {
+		for _, state := range p.state {
+			state.stdinWrite.Close() // ignore error, since start or close failed.
+		}
+	}
+	if shErr != nil {
+		p.sh.Err = shErr
+		return errAlreadyHandled{shErr}
+	}
+	return closeErr
+}
+
+func (p *Pipeline) wait() error {
+	// Wait for all commands in the pipeline, capturing the first error.
+	// Ensure all commands are processed, by avoiding early-exit.
+	var shErr, closeErr error
+	for i, c := range p.cmds {
+		p.sh.Err = nil
+		if c.Wait(); p.sh.Err != nil && shErr == nil {
+			shErr = p.sh.Err
+		}
+		// Close the write-side of the stdin pipe for the next command, so that the
+		// next command will eventually read an EOF.
+		if i < len(p.state) {
+			if err := p.state[i].stdinWrite.Close(); err != nil && closeErr == nil {
+				closeErr = err
+			}
+		}
+	}
+	if shErr != nil {
+		p.sh.Err = shErr
+		return errAlreadyHandled{shErr}
+	}
+	return closeErr
+}
+
+func (p *Pipeline) signal(sig os.Signal) error {
+	// Signal all commands in the pipeline, capturing the first error.
+	// Ensure all commands are processed, by avoiding early-exit.
+	var shErr error
+	for _, c := range p.cmds {
+		p.sh.Err = nil
+		if c.Signal(sig); p.sh.Err != nil && shErr == nil {
+			shErr = p.sh.Err
+		}
+	}
+	if shErr != nil {
+		p.sh.Err = shErr
+		return errAlreadyHandled{shErr}
+	}
+	return nil
+}
+
+func (p *Pipeline) terminate(sig os.Signal) error {
+	// Terminate all commands in the pipeline, capturing the first error.
+	// Ensure all commands are processed, by avoiding early-exit.
+	var shErr, closeErr error
+	for i, c := range p.cmds {
+		p.sh.Err = nil
+		if c.Terminate(sig); p.sh.Err != nil && shErr == nil {
+			shErr = p.sh.Err
+		}
+		// Close the write-side of the stdin pipe for the next command, so that the
+		// next command will eventually read an EOF.
+		if i < len(p.state) {
+			if err := p.state[i].stdinWrite.Close(); err != nil && closeErr == nil {
+				closeErr = err
+			}
+		}
+	}
+	if shErr != nil {
+		p.sh.Err = shErr
+		return errAlreadyHandled{shErr}
+	}
+	return closeErr
+}
+
+func (p *Pipeline) run() error {
+	if err := p.start(); err != nil {
+		return err
+	}
+	return p.wait()
+}
+
+func (p *Pipeline) stdout() (string, error) {
+	var stdout bytes.Buffer
+	last := p.cmds[len(p.cmds)-1]
+	if last.AddStdoutWriter(&stdout); p.sh.Err != nil {
+		return "", errAlreadyHandled{p.sh.Err}
+	}
+	err := p.run()
+	return stdout.String(), err
+}
+
+func (p *Pipeline) stdoutStderr() (string, string, error) {
+	var stdout, stderr bytes.Buffer
+	last := p.cmds[len(p.cmds)-1]
+	if last.AddStdoutWriter(&stdout); p.sh.Err != nil {
+		return "", "", errAlreadyHandled{p.sh.Err}
+	}
+	if last.AddStderrWriter(&stderr); p.sh.Err != nil {
+		return "", "", errAlreadyHandled{p.sh.Err}
+	}
+	err := p.run()
+	return stdout.String(), stderr.String(), err
+}
+
+func (p *Pipeline) combinedOutput() (string, error) {
+	var output bytes.Buffer
+	last := p.cmds[len(p.cmds)-1]
+	if last.addStdoutWriter(&output); p.sh.Err != nil {
+		return "", errAlreadyHandled{p.sh.Err}
+	}
+	if last.addStderrWriter(&output); p.sh.Err != nil {
+		return "", errAlreadyHandled{p.sh.Err}
+	}
+	err := p.run()
+	return output.String(), err
+}
diff --git a/gosh/pipeline_test.go b/gosh/pipeline_test.go
new file mode 100644
index 0000000..d26d5b9
--- /dev/null
+++ b/gosh/pipeline_test.go
@@ -0,0 +1,241 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package gosh_test
+
+import (
+	"fmt"
+	"os"
+	"testing"
+	"time"
+
+	"v.io/x/lib/gosh"
+)
+
+func TestPipeline(t *testing.T) {
+	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+	defer sh.Cleanup()
+
+	echo := sh.FuncCmd(echoFunc)
+	echo.Args = append(echo.Args, "foo")
+	replace := sh.FuncCmd(replaceFunc, byte('f'), byte('Z'))
+	cat := sh.FuncCmd(catFunc)
+	p := gosh.NewPipeline(echo, replace, cat)
+	eq(t, p.Stdout(), "Zoo\n")
+	eq(t, p.Clone().Stdout(), "Zoo\n")
+
+	// Try piping only stdout.
+	p = gosh.NewPipeline(sh.FuncCmd(writeFunc, true, true))
+	p.PipeStdout(sh.FuncCmd(replaceFunc, byte('A'), byte('Z')))
+	eq(t, p.Stdout(), "ZZ")
+	eq(t, p.Clone().Stdout(), "ZZ")
+
+	// Try piping only stderr.
+	p = gosh.NewPipeline(sh.FuncCmd(writeFunc, true, true))
+	p.PipeStderr(sh.FuncCmd(replaceFunc, byte('B'), byte('Z')))
+	eq(t, p.Stdout(), "ZZ")
+	eq(t, p.Clone().Stdout(), "ZZ")
+
+	// Try piping both stdout and stderr.
+	p = gosh.NewPipeline(sh.FuncCmd(writeFunc, true, true))
+	p.PipeCombinedOutput(sh.FuncCmd(catFunc))
+	// Note, we can't assume any particular ordering of stdout and stderr, so we
+	// simply check the length of the combined output.
+	eq(t, len(p.Stdout()), 4)
+	eq(t, len(p.Clone().Stdout()), 4)
+
+	// Try piping combinations.
+	p = gosh.NewPipeline(sh.FuncCmd(writeFunc, true, true))
+	p.PipeStderr(sh.FuncCmd(replaceFunc, byte('B'), byte('x')))
+	p.PipeStdout(sh.FuncCmd(replaceFunc, byte('x'), byte('Z')))
+	p.PipeStdout(sh.FuncCmd(catFunc))
+	eq(t, p.Stdout(), "ZZ")
+	eq(t, p.Clone().Stdout(), "ZZ")
+}
+
+func TestPipelineDifferentShells(t *testing.T) {
+	sh1 := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+	defer sh1.Cleanup()
+	sh2 := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+	defer sh2.Cleanup()
+
+	setsErr(t, sh1, func() { gosh.NewPipeline(sh1.FuncCmd(echoFunc), sh2.FuncCmd(catFunc)) })
+	setsErr(t, sh2, func() { gosh.NewPipeline(sh2.FuncCmd(echoFunc), sh1.FuncCmd(catFunc)) })
+	p := gosh.NewPipeline(sh1.FuncCmd(echoFunc))
+	setsErr(t, sh1, func() { p.PipeStdout(sh2.FuncCmd(catFunc)) })
+	p = gosh.NewPipeline(sh1.FuncCmd(echoFunc))
+	setsErr(t, sh1, func() { p.PipeStderr(sh2.FuncCmd(catFunc)) })
+	p = gosh.NewPipeline(sh1.FuncCmd(echoFunc))
+	setsErr(t, sh1, func() { p.PipeCombinedOutput(sh2.FuncCmd(catFunc)) })
+}
+
+var writeLoopFunc = gosh.RegisterFunc("writeLoopFunc", func() error {
+	for {
+		if _, err := os.Stdout.Write([]byte("a\n")); err != nil {
+			// Always return success; the purpose of this command is to ensure that
+			// when the next command in the pipeline fails, it causes a closed pipe
+			// write error here to exit the loop.
+			return nil
+		}
+	}
+})
+
+func TestPipelineClosedPipe(t *testing.T) {
+	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+	defer sh.Cleanup()
+	writeLoop, readLine := sh.FuncCmd(writeLoopFunc), sh.FuncCmd(readFunc)
+
+	// WriteLoop finishes because it gets a closed pipe write error after readLine
+	// finishes. Note that the closed pipe error is ignored.
+	p := gosh.NewPipeline(writeLoop, readLine)
+	eq(t, p.Stdout(), "")
+	ok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+	p = p.Clone()
+	eq(t, p.Stdout(), "")
+	ok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+}
+
+func TestPipelineCmdFailure(t *testing.T) {
+	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+	defer sh.Cleanup()
+	cat := sh.FuncCmd(catFunc)
+	exit1 := sh.FuncCmd(exitFunc, 1)
+	writeLoop := sh.FuncCmd(writeLoopFunc)
+	echoFoo := sh.FuncCmd(echoFunc)
+	echoFoo.Args = append(echoFoo.Args, "foo")
+
+	// Exit1 fails, and cat finishes with success since it sees an EOF.
+	p := gosh.NewPipeline(exit1.Clone(), cat.Clone())
+	setsErr(t, sh, p.Run)
+	nok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+	p = p.Clone()
+	setsErr(t, sh, p.Run)
+	nok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+	ok(t, sh.Err)
+
+	// Exit1 fails, and echoFoo finishes with success since it ignores stdin.
+	p = gosh.NewPipeline(exit1.Clone(), echoFoo.Clone())
+	setsErr(t, sh, p.Run)
+	nok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+	p = p.Clone()
+	setsErr(t, sh, p.Run)
+	nok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+	ok(t, sh.Err)
+
+	// Exit1 fails, causing writeLoop to finish and succeed.
+	p = gosh.NewPipeline(writeLoop.Clone(), exit1.Clone())
+	setsErr(t, sh, p.Run)
+	ok(t, p.Cmds()[0].Err)
+	nok(t, p.Cmds()[1].Err)
+	p = p.Clone()
+	setsErr(t, sh, p.Run)
+	ok(t, p.Cmds()[0].Err)
+	nok(t, p.Cmds()[1].Err)
+	ok(t, sh.Err)
+
+	// Same tests, but allowing the exit error from exit1.
+	exit1.ExitErrorIsOk = true
+
+	// Exit1 fails, and cat finishes with success since it sees an EOF.
+	p = gosh.NewPipeline(exit1.Clone(), cat.Clone())
+	eq(t, p.Stdout(), "")
+	nok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+	ok(t, sh.Err)
+	p = p.Clone()
+	eq(t, p.Stdout(), "")
+	nok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+	ok(t, sh.Err)
+
+	// Exit1 fails, and echoFoo finishes with success since it ignores stdin.
+	p = gosh.NewPipeline(exit1.Clone(), echoFoo.Clone())
+	eq(t, p.Stdout(), "foo\n")
+	nok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+	ok(t, sh.Err)
+	p = p.Clone()
+	eq(t, p.Stdout(), "foo\n")
+	nok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+	ok(t, sh.Err)
+
+	// Exit1 fails, causing writeLoop to finish and succeed.
+	p = gosh.NewPipeline(writeLoop.Clone(), exit1.Clone())
+	eq(t, p.Stdout(), "")
+	ok(t, p.Cmds()[0].Err)
+	nok(t, p.Cmds()[1].Err)
+	ok(t, sh.Err)
+	p = p.Clone()
+	eq(t, p.Stdout(), "")
+	ok(t, p.Cmds()[0].Err)
+	nok(t, p.Cmds()[1].Err)
+	ok(t, sh.Err)
+}
+
+func TestPipelineSignal(t *testing.T) {
+	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+	defer sh.Cleanup()
+
+	for _, d := range []time.Duration{0, time.Hour} {
+		for _, s := range []os.Signal{os.Interrupt, os.Kill} {
+			fmt.Println(d, s)
+			p := gosh.NewPipeline(sh.FuncCmd(sleepFunc, d, 0), sh.FuncCmd(sleepFunc, d, 0))
+			p.Start()
+			p.Cmds()[0].AwaitVars("ready")
+			p.Cmds()[1].AwaitVars("ready")
+			// Wait for a bit to allow the zero-sleep commands to exit.
+			time.Sleep(100 * time.Millisecond)
+			p.Signal(s)
+			switch {
+			case s == os.Interrupt:
+				// Wait should succeed as long as the exit code was 0, regardless of
+				// whether the signal arrived or the processes had already exited.
+				p.Wait()
+			case d != 0:
+				// Note: We don't call Wait in the {d: 0, s: os.Kill} case because doing
+				// so makes the test flaky on slow systems.
+				setsErr(t, sh, func() { p.Wait() })
+			}
+		}
+	}
+
+	// Signal should fail if Wait has been called.
+	z := time.Duration(0)
+	p := gosh.NewPipeline(sh.FuncCmd(sleepFunc, z, 0), sh.FuncCmd(sleepFunc, z, 0))
+	p.Run()
+	setsErr(t, sh, func() { p.Signal(os.Interrupt) })
+}
+
+func TestPipelineTerminate(t *testing.T) {
+	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+	defer sh.Cleanup()
+
+	for _, d := range []time.Duration{0, time.Hour} {
+		for _, s := range []os.Signal{os.Interrupt, os.Kill} {
+			fmt.Println(d, s)
+			p := gosh.NewPipeline(sh.FuncCmd(sleepFunc, d, 0), sh.FuncCmd(sleepFunc, d, 0))
+			p.Start()
+			p.Cmds()[0].AwaitVars("ready")
+			p.Cmds()[1].AwaitVars("ready")
+			// Wait for a bit to allow the zero-sleep commands to exit.
+			time.Sleep(100 * time.Millisecond)
+			// Terminate should succeed regardless of the exit code, and regardless of
+			// whether the signal arrived or the processes had already exited.
+			p.Terminate(s)
+		}
+	}
+
+	// Terminate should fail if Wait has been called.
+	z := time.Duration(0)
+	p := gosh.NewPipeline(sh.FuncCmd(sleepFunc, z, 0), sh.FuncCmd(sleepFunc, z, 0))
+	p.Run()
+	setsErr(t, sh, func() { p.Terminate(os.Interrupt) })
+}
diff --git a/gosh/shell.go b/gosh/shell.go
index 4426990..4dbcc51 100644
--- a/gosh/shell.go
+++ b/gosh/shell.go
@@ -15,7 +15,6 @@
 import (
 	"errors"
 	"fmt"
-	"io"
 	"io/ioutil"
 	"log"
 	"math/rand"
@@ -55,15 +54,15 @@
 	// Args is the list of args to append to subsequent command invocations.
 	Args []string
 	// Internal state.
-	calledNewShell bool
-	cleanupDone    chan struct{}
-	cleanupMu      sync.Mutex // protects the fields below; held during cleanup
-	calledCleanup  bool
-	cmds           []*Cmd
-	tempFiles      []*os.File
-	tempDirs       []string
-	dirStack       []string // for pushd/popd
-	cleanupFuncs   []func()
+	calledNewShell  bool
+	cleanupDone     chan struct{}
+	cleanupMu       sync.Mutex // protects the fields below; held during cleanup
+	calledCleanup   bool
+	cmds            []*Cmd
+	tempFiles       []*os.File
+	tempDirs        []string
+	dirStack        []string // for pushd/popd
+	cleanupHandlers []func()
 }
 
 // Opts configures Shell.
@@ -176,10 +175,12 @@
 	sh.HandleError(sh.popd())
 }
 
-// AddToCleanup registers the given function to be called by Shell.Cleanup().
-func (sh *Shell) AddToCleanup(f func()) {
+// AddCleanupHandler registers the given function to be called during cleanup.
+// Cleanup handlers are called in LIFO order, possibly in a separate goroutine
+// spawned by gosh.
+func (sh *Shell) AddCleanupHandler(f func()) {
 	sh.Ok()
-	sh.HandleError(sh.addToCleanup(f))
+	sh.HandleError(sh.addCleanupHandler(f))
 }
 
 // Cleanup cleans up all resources (child processes, temporary files and
@@ -450,13 +451,13 @@
 	return nil
 }
 
-func (sh *Shell) addToCleanup(f func()) error {
+func (sh *Shell) addCleanupHandler(f func()) error {
 	sh.cleanupMu.Lock()
 	defer sh.cleanupMu.Unlock()
 	if sh.calledCleanup {
 		return errAlreadyCalledCleanup
 	}
-	sh.cleanupFuncs = append(sh.cleanupFuncs, f)
+	sh.cleanupHandlers = append(sh.cleanupHandlers, f)
 	return nil
 }
 
@@ -532,9 +533,9 @@
 			sh.logf("os.Chdir(%q) failed: %v\n", dir, err)
 		}
 	}
-	// Call any registered cleanup functions in LIFO order.
-	for i := len(sh.cleanupFuncs) - 1; i >= 0; i-- {
-		sh.cleanupFuncs[i]()
+	// Call cleanup handlers in LIFO order.
+	for i := len(sh.cleanupHandlers) - 1; i >= 0; i-- {
+		sh.cleanupHandlers[i]()
 	}
 	close(sh.cleanupDone)
 }
@@ -564,15 +565,3 @@
 	}
 	os.Exit(0)
 }
-
-// NopWriteCloser returns a WriteCloser with a no-op Close method wrapping the
-// provided Writer.
-func NopWriteCloser(w io.Writer) io.WriteCloser {
-	return nopWriteCloser{w}
-}
-
-type nopWriteCloser struct {
-	io.Writer
-}
-
-func (nopWriteCloser) Close() error { return nil }
diff --git a/gosh/shell_test.go b/gosh/shell_test.go
index d8d6930..0a7a6b6 100644
--- a/gosh/shell_test.go
+++ b/gosh/shell_test.go
@@ -28,7 +28,7 @@
 	"time"
 
 	"v.io/x/lib/gosh"
-	"v.io/x/lib/gosh/internal/gosh_example_lib"
+	lib "v.io/x/lib/gosh/internal/gosh_example_lib"
 )
 
 var fakeError = errors.New("fake error")
@@ -95,11 +95,13 @@
 
 // Simplified versions of various Unix commands.
 var (
-	catFunc = gosh.RegisterFunc("catFunc", func() {
-		io.Copy(os.Stdout, os.Stdin)
+	catFunc = gosh.RegisterFunc("catFunc", func() error {
+		_, err := io.Copy(os.Stdout, os.Stdin)
+		return err
 	})
-	echoFunc = gosh.RegisterFunc("echoFunc", func() {
-		fmt.Println(os.Args[1])
+	echoFunc = gosh.RegisterFunc("echoFunc", func() error {
+		_, err := fmt.Println(os.Args[1])
+		return err
 	})
 	readFunc = gosh.RegisterFunc("readFunc", func() {
 		bufio.NewReader(os.Stdin).ReadString('\n')
@@ -494,16 +496,16 @@
 	defer sh.Cleanup()
 
 	c := sh.FuncCmd(writeFunc, true, true)
-	c.AddStdoutWriter(gosh.NopWriteCloser(os.Stdout))
-	c.AddStderrWriter(gosh.NopWriteCloser(os.Stderr))
+	c.AddStdoutWriter(os.Stdout)
+	c.AddStderrWriter(os.Stderr)
 	c.Run()
 
 	fmt.Fprint(os.Stdout, " stdout done")
 	fmt.Fprint(os.Stderr, " stderr done")
 })
 
-// Tests that it's safe to add wrapped os.Stdout and os.Stderr as writers.
-func TestAddWritersWrappedStdoutStderr(t *testing.T) {
+// Tests that it's safe to add os.Stdout and os.Stderr as writers.
+func TestAddStdoutStderrWriter(t *testing.T) {
 	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
 	defer sh.Cleanup()
 
@@ -512,26 +514,14 @@
 	eq(t, stderr, "BB stderr done")
 }
 
-// Tests that adding non-wrapped os.Stdout or os.Stderr fails.
-func TestAddWritersNonWrappedStdoutStderr(t *testing.T) {
-	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
-	defer sh.Cleanup()
-
-	c := sh.FuncCmd(writeMoreFunc)
-	setsErr(t, sh, func() { c.AddStdoutWriter(os.Stdout) })
-	setsErr(t, sh, func() { c.AddStdoutWriter(os.Stderr) })
-	setsErr(t, sh, func() { c.AddStderrWriter(os.Stdout) })
-	setsErr(t, sh, func() { c.AddStderrWriter(os.Stderr) })
-}
-
 func TestCombinedOutput(t *testing.T) {
 	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
 	defer sh.Cleanup()
 
 	c := sh.FuncCmd(writeFunc, true, true)
 	buf := &bytes.Buffer{}
-	c.AddStdoutWriter(gosh.NopWriteCloser(buf))
-	c.AddStderrWriter(gosh.NopWriteCloser(buf))
+	c.AddStdoutWriter(buf)
+	c.AddStderrWriter(buf)
 	output := c.CombinedOutput()
 	// Note, we can't assume any particular ordering of stdout and stderr, so we
 	// simply check the length of the combined output.
@@ -565,35 +555,22 @@
 	eq(t, string(stderr), "BB")
 }
 
-type countingWriteCloser struct {
-	io.Writer
-	count int
-}
-
-func (wc *countingWriteCloser) Close() error {
-	wc.count++
-	return nil
-}
-
-// Tests that Close is called exactly once on a given WriteCloser, even if that
-// WriteCloser is passed to Add{Stdout,Stderr}Writer multiple times.
-func TestAddWritersCloseOnce(t *testing.T) {
-	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
-	defer sh.Cleanup()
-
-	c := sh.FuncCmd(writeFunc, true, true)
-	buf := &bytes.Buffer{}
-	wc := &countingWriteCloser{Writer: buf}
-	c.AddStdoutWriter(wc)
-	c.AddStdoutWriter(wc)
-	c.AddStderrWriter(wc)
-	c.AddStderrWriter(wc)
-	c.Run()
-	// Note, we can't assume any particular ordering of stdout and stderr, so we
-	// simply check the length of the combined output.
-	eq(t, len(buf.String()), 8)
-	eq(t, wc.count, 1)
-}
+var replaceFunc = gosh.RegisterFunc("replaceFunc", func(old, new byte) error {
+	buf := make([]byte, 1024)
+	for {
+		n, err := os.Stdin.Read(buf)
+		switch {
+		case err == io.EOF:
+			return nil
+		case err != nil:
+			return err
+		}
+		rep := bytes.Replace(buf[:n], []byte{old}, []byte{new}, -1)
+		if _, err := os.Stdout.Write(rep); err != nil {
+			return err
+		}
+	}
+})
 
 func TestSignal(t *testing.T) {
 	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
@@ -718,6 +695,34 @@
 	nok(t, c.Err)
 }
 
+func TestIgnoreClosedPipeError(t *testing.T) {
+	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+	defer sh.Cleanup()
+
+	// Since writeLoopFunc will only finish if it receives a write error, it's
+	// depending on the closed pipe error from closedPipeErrorWriter.
+	c := sh.FuncCmd(writeLoopFunc)
+	c.AddStdoutWriter(errorWriter{io.ErrClosedPipe})
+	c.IgnoreClosedPipeError = true
+	c.Run()
+	ok(t, c.Err)
+	ok(t, sh.Err)
+
+	// Without IgnoreClosedPipeError, the command fails.
+	c = sh.FuncCmd(writeLoopFunc)
+	c.AddStdoutWriter(errorWriter{io.ErrClosedPipe})
+	setsErr(t, sh, func() { c.Run() })
+	nok(t, c.Err)
+}
+
+type errorWriter struct {
+	error
+}
+
+func (w errorWriter) Write(p []byte) (int, error) {
+	return 0, w.error
+}
+
 // Tests that sh.Ok panics under various conditions.
 func TestOkPanics(t *testing.T) {
 	func() { // errDidNotCallNewShell