Merge "gosh: Add bufferedPipe ReadFrom and WriteTo."
diff --git a/gosh/.api b/gosh/.api
index c48f046..edfb2a8 100644
--- a/gosh/.api
+++ b/gosh/.api
@@ -1,17 +1,18 @@
 pkg gosh, func InitChildMain()
 pkg gosh, func InitMain()
+pkg gosh, func NewPipeline(*Cmd, ...*Cmd) *Pipeline
 pkg gosh, func NewShell(Opts) *Shell
-pkg gosh, func NopWriteCloser(io.Writer) io.WriteCloser
 pkg gosh, func RegisterFunc(string, interface{}) *Func
 pkg gosh, func SendVars(map[string]string)
-pkg gosh, method (*Cmd) AddStderrWriter(io.WriteCloser)
-pkg gosh, method (*Cmd) AddStdoutWriter(io.WriteCloser)
+pkg gosh, method (*Cmd) AddStderrWriter(io.Writer)
+pkg gosh, method (*Cmd) AddStdoutWriter(io.Writer)
 pkg gosh, method (*Cmd) AwaitVars(...string) map[string]string
 pkg gosh, method (*Cmd) Clone() *Cmd
 pkg gosh, method (*Cmd) CombinedOutput() string
 pkg gosh, method (*Cmd) Pid() int
 pkg gosh, method (*Cmd) Run()
 pkg gosh, method (*Cmd) SetStdinReader(io.Reader)
+pkg gosh, method (*Cmd) Shell() *Shell
 pkg gosh, method (*Cmd) Signal(os.Signal)
 pkg gosh, method (*Cmd) Start()
 pkg gosh, method (*Cmd) StderrPipe() io.ReadCloser
@@ -21,7 +22,20 @@
 pkg gosh, method (*Cmd) StdoutStderr() (string, string)
 pkg gosh, method (*Cmd) Terminate(os.Signal)
 pkg gosh, method (*Cmd) Wait()
-pkg gosh, method (*Shell) AddToCleanup(func())
+pkg gosh, method (*Pipeline) Clone() *Pipeline
+pkg gosh, method (*Pipeline) Cmds() []*Cmd
+pkg gosh, method (*Pipeline) CombinedOutput() string
+pkg gosh, method (*Pipeline) PipeCombinedOutput(*Cmd)
+pkg gosh, method (*Pipeline) PipeStderr(*Cmd)
+pkg gosh, method (*Pipeline) PipeStdout(*Cmd)
+pkg gosh, method (*Pipeline) Run()
+pkg gosh, method (*Pipeline) Signal(os.Signal)
+pkg gosh, method (*Pipeline) Start()
+pkg gosh, method (*Pipeline) Stdout() string
+pkg gosh, method (*Pipeline) StdoutStderr() (string, string)
+pkg gosh, method (*Pipeline) Terminate(os.Signal)
+pkg gosh, method (*Pipeline) Wait()
+pkg gosh, method (*Shell) AddCleanupHandler(func())
 pkg gosh, method (*Shell) BuildGoPkg(string, ...string) string
 pkg gosh, method (*Shell) Cleanup()
 pkg gosh, method (*Shell) Cmd(string, ...string) *Cmd
@@ -39,6 +53,7 @@
 pkg gosh, type Cmd struct, Err error
 pkg gosh, type Cmd struct, ExitAfter time.Duration
 pkg gosh, type Cmd struct, ExitErrorIsOk bool
+pkg gosh, type Cmd struct, IgnoreClosedPipeError bool
 pkg gosh, type Cmd struct, IgnoreParentExit bool
 pkg gosh, type Cmd struct, OutputDir string
 pkg gosh, type Cmd struct, Path string
@@ -51,6 +66,7 @@
 pkg gosh, type Opts struct, Fatalf func(string, ...interface{})
 pkg gosh, type Opts struct, Logf func(string, ...interface{})
 pkg gosh, type Opts struct, PropagateChildOutput bool
+pkg gosh, type Pipeline struct
 pkg gosh, type Shell struct
 pkg gosh, type Shell struct, Args []string
 pkg gosh, type Shell struct, Err error
diff --git a/gosh/cmd.go b/gosh/cmd.go
index e956500..0ab4d61 100644
--- a/gosh/cmd.go
+++ b/gosh/cmd.go
@@ -21,8 +21,6 @@
 	errAlreadyCalledStart = errors.New("gosh: already called Cmd.Start")
 	errAlreadyCalledWait  = errors.New("gosh: already called Cmd.Wait")
 	errAlreadySetStdin    = errors.New("gosh: already set stdin")
-	errCloseStdout        = errors.New("gosh: use NopWriteCloser(os.Stdout) to prevent stdout from being closed")
-	errCloseStderr        = errors.New("gosh: use NopWriteCloser(os.Stderr) to prevent stderr from being closed")
 	errDidNotCallStart    = errors.New("gosh: did not call Cmd.Start")
 	errProcessExited      = errors.New("gosh: process exited")
 )
@@ -56,6 +54,13 @@
 	// ExitErrorIsOk specifies whether an *exec.ExitError should be reported via
 	// Shell.HandleError.
 	ExitErrorIsOk bool
+	// IgnoreClosedPipeError, if true, causes errors from read/write on a closed
+	// pipe to be indistinguishable from success. These errors often occur in
+	// command pipelines, e.g. "yes | head -1", where "yes" will receive a closed
+	// pipe error when it tries to write on stdout, after "head" has exited. If a
+	// closed pipe error occurs, Cmd.Err will be nil, and no err is reported to
+	// Shell.HandleError.
+	IgnoreClosedPipeError bool
 	// Internal state.
 	sh                *Shell
 	c                 *exec.Cmd
@@ -73,6 +78,11 @@
 	recvVars          map[string]string // protected by cond.L
 }
 
+// Shell returns the shell that this Cmd was created from.
+func (c *Cmd) Shell() *Shell {
+	return c.sh
+}
+
 // Clone returns a new Cmd with a copy of this Cmd's configuration.
 func (c *Cmd) Clone() *Cmd {
 	c.sh.Ok()
@@ -94,8 +104,10 @@
 }
 
 // StdoutPipe returns a ReadCloser backed by an unlimited-size pipe for the
-// command's stdout. Must be called before Start. May be called more than once;
-// each invocation creates a new pipe.
+// command's stdout. The pipe will be closed when the process exits, but may
+// also be closed earlier by the caller, e.g. if all expected output has been
+// received. Must be called before Start. May be called more than once; each
+// call creates a new pipe.
 func (c *Cmd) StdoutPipe() io.ReadCloser {
 	c.sh.Ok()
 	res, err := c.stdoutPipe()
@@ -104,8 +116,10 @@
 }
 
 // StderrPipe returns a ReadCloser backed by an unlimited-size pipe for the
-// command's stderr. Must be called before Start. May be called more than once;
-// each invocation creates a new pipe.
+// command's stderr. The pipe will be closed when the process exits, but may
+// also be closed earlier by the caller, e.g. if all expected output has been
+// received. Must be called before Start. May be called more than once; each
+// call creates a new pipe.
 func (c *Cmd) StderrPipe() io.ReadCloser {
 	c.sh.Ok()
 	res, err := c.stderrPipe()
@@ -113,42 +127,28 @@
 	return res
 }
 
-// SetStdinReader configures this Cmd to read the child's stdin from the given
-// Reader. Must be called before Start. Only one call may be made to StdinPipe
-// or SetStdinReader; subsequent calls will fail.
+// SetStdinReader configures this Cmd to read stdin from the given Reader. Must
+// be called before Start. Only one call may be made to StdinPipe or
+// SetStdinReader; subsequent calls will fail.
 func (c *Cmd) SetStdinReader(r io.Reader) {
 	c.sh.Ok()
 	c.handleError(c.setStdinReader(r))
 }
 
-// AddStdoutWriter configures this Cmd to tee the child's stdout to the given
-// WriteCloser, which will be closed when the process exits.
-//
-// If the same WriteCloser is passed to both AddStdoutWriter and
-// AddStderrWriter, Cmd will ensure that its methods are never called
-// concurrently and that Close is only called once.
-//
-// Use NopWriteCloser to extend a Writer to a WriteCloser, or to prevent an
-// existing WriteCloser from being closed. It is an error to pass in os.Stdout
-// or os.Stderr, since they shouldn't be closed.
-func (c *Cmd) AddStdoutWriter(wc io.WriteCloser) {
+// AddStdoutWriter configures this Cmd to tee stdout to the given Writer. Must
+// be called before Start. If the same Writer is passed to both AddStdoutWriter
+// and AddStderrWriter, Cmd will ensure that Write is never called concurrently.
+func (c *Cmd) AddStdoutWriter(w io.Writer) {
 	c.sh.Ok()
-	c.handleError(c.addStdoutWriter(wc))
+	c.handleError(c.addStdoutWriter(w))
 }
 
-// AddStderrWriter configures this Cmd to tee the child's stderr to the given
-// WriteCloser, which will be closed when the process exits.
-//
-// If the same WriteCloser is passed to both AddStdoutWriter and
-// AddStderrWriter, Cmd will ensure that its methods are never called
-// concurrently and that Close is only called once.
-//
-// Use NopWriteCloser to extend a Writer to a WriteCloser, or to prevent an
-// existing WriteCloser from being closed. It is an error to pass in os.Stdout
-// or os.Stderr, since they shouldn't be closed.
-func (c *Cmd) AddStderrWriter(wc io.WriteCloser) {
+// AddStderrWriter configures this Cmd to tee stderr to the given Writer. Must
+// be called before Start. If the same Writer is passed to both AddStdoutWriter
+// and AddStderrWriter, Cmd will ensure that Write is never called concurrently.
+func (c *Cmd) AddStderrWriter(w io.Writer) {
 	c.sh.Ok()
-	c.handleError(c.addStderrWriter(wc))
+	c.handleError(c.addStderrWriter(w))
 }
 
 // Start starts the command.
@@ -172,15 +172,15 @@
 	c.handleError(c.wait())
 }
 
-// Signal sends a signal to the process.
+// Signal sends a signal to the underlying process.
 func (c *Cmd) Signal(sig os.Signal) {
 	c.sh.Ok()
 	c.handleError(c.signal(sig))
 }
 
-// Terminate sends a signal to the process, then waits for it to exit. Terminate
-// is different from Signal followed by Wait: Terminate succeeds as long as the
-// process exits, whereas Wait fails if the exit code isn't 0.
+// Terminate sends a signal to the underlying process, then waits for it to
+// exit. Terminate is different from Signal followed by Wait: Terminate succeeds
+// as long as the process exits, whereas Wait fails if the exit code isn't 0.
 func (c *Cmd) Terminate(sig os.Signal) {
 	c.sh.Ok()
 	c.handleError(c.terminate(sig))
@@ -271,7 +271,35 @@
 	return err == nil
 }
 
+// An explanation of closed pipe errors. Consider the pipeline "yes | head -1",
+// where yes keeps writing "y\n" to stdout, and head succeeds after it reads the
+// first line. There is an os pipe connecting the two commands, and when head
+// exits, it causes yes to receive a closed pipe error on its next write. Should
+// we consider such a pipeline to have succeeded or failed?
+//
+// Bash only looks at the exit status of the last command by default, thus the
+// pipeline succeeds. But that's dangerous, since yes could have crashed and we
+// wouldn't know. It's recommended to run "set -o pipefail" to tell bash to
+// check the exit status of all commands. But that causes the pipeline to fail.
+//
+// IgnoreClosedPipeError handles this case. gosh.Pipeline sets this option to
+// true, so that by default the pipeline above will succeed, but will fail on
+// any other error. Note that the exec package always returns an ExitError if
+// the child process exited with a non-zero exit code; the closed pipe error is
+// only returned if the child process exited with a zero exit code, and Write on
+// the io.MultiWriter in the parent process received the closed pipe error.
+//
+// TODO(toddw): We could adopt the convention that exit code 141 indicates
+// "closed pipe", and use IgnoreClosedPipeError to also ignore that case. We
+// choose 141 because it's 128 + 13, where SIGPIPE is 13, and there is an
+// existing convention for this. By default Go programs ignore SIGPIPE, so we
+// might also want to add code to InitChildMain to exit the program with 141 if
+// it receives SIGPIPE.
+
 func (c *Cmd) handleError(err error) {
+	if c.IgnoreClosedPipeError && isClosedPipeError(err) {
+		err = nil
+	}
 	c.Err = err
 	if !c.errorIsOk(err) {
 		c.sh.HandleError(err)
@@ -400,6 +428,7 @@
 	res.PropagateOutput = c.PropagateOutput
 	res.OutputDir = c.OutputDir
 	res.ExitErrorIsOk = c.ExitErrorIsOk
+	res.IgnoreClosedPipeError = c.IgnoreClosedPipeError
 	return res, nil
 }
 
@@ -436,11 +465,7 @@
 
 func (c *Cmd) stdinPipeCopier(dst io.WriteCloser, src io.Reader) {
 	var firstErr error
-	_, err := io.Copy(dst, src)
-	// Ignore EPIPE errors copying to stdin, indicating the process exited. This
-	// mirrors logic in os/exec/exec_posix.go. Also see:
-	// https://github.com/golang/go/issues/9173
-	if pe, ok := err.(*os.PathError); !ok || pe.Op != "write" || pe.Path != "|1" || pe.Err != syscall.EPIPE {
+	if _, err := io.Copy(dst, src); err != nil && !isClosedPipeError(err) {
 		firstErr = err
 	}
 	if err := dst.Close(); err != nil && firstErr == nil {
@@ -449,6 +474,21 @@
 	c.stdinDoneChan <- firstErr
 }
 
+// isClosedPipeError returns true iff the error indicates a closed pipe. This
+// typically occurs with a pipeline of commands "A | B"; if B exits first, the
+// next write by A will receive a closed pipe error. Also see:
+// https://github.com/golang/go/issues/9173
+func isClosedPipeError(err error) bool {
+	if err == io.ErrClosedPipe {
+		return true
+	}
+	// Closed pipe on os.Pipe; mirrors logic in os/exec/exec_posix.go.
+	if pe, ok := err.(*os.PathError); ok && pe.Op == "write" && pe.Path == "|1" && pe.Err == syscall.EPIPE {
+		return true
+	}
+	return false
+}
+
 func (c *Cmd) setStdinReader(r io.Reader) error {
 	switch {
 	case c.calledStart:
@@ -480,42 +520,37 @@
 	return p, nil
 }
 
-func (c *Cmd) addStdoutWriter(wc io.WriteCloser) error {
-	switch {
-	case c.calledStart:
+func (c *Cmd) addStdoutWriter(w io.Writer) error {
+	if c.calledStart {
 		return errAlreadyCalledStart
-	case wc == os.Stdout:
-		return errCloseStdout
-	case wc == os.Stderr:
-		return errCloseStderr
 	}
-	c.stdoutWriters = append(c.stdoutWriters, wc)
-	c.afterWaitClosers = append(c.afterWaitClosers, wc)
+	c.stdoutWriters = append(c.stdoutWriters, w)
 	return nil
 }
 
-func (c *Cmd) addStderrWriter(wc io.WriteCloser) error {
-	switch {
-	case c.calledStart:
+func (c *Cmd) addStderrWriter(w io.Writer) error {
+	if c.calledStart {
 		return errAlreadyCalledStart
-	case wc == os.Stdout:
-		return errCloseStdout
-	case wc == os.Stderr:
-		return errCloseStderr
 	}
-	c.stderrWriters = append(c.stderrWriters, wc)
-	c.afterWaitClosers = append(c.afterWaitClosers, wc)
+	c.stderrWriters = append(c.stderrWriters, w)
 	return nil
 }
 
 // TODO(sadovsky): Maybe wrap every child process with a "supervisor" process
 // that calls InitChildMain.
 
-func (c *Cmd) start() error {
+func (c *Cmd) start() (e error) {
 	defer func() {
-		closeClosers(c.afterStartClosers)
+		// Always close afterStartClosers upon return. Only close afterWaitClosers
+		// if start failed; if start succeeds, they're closed in the startExitWaiter
+		// goroutine. Only the first error is reported.
+		if err := closeClosers(c.afterStartClosers); e == nil {
+			e = err
+		}
 		if !c.started {
-			closeClosers(c.afterWaitClosers)
+			if err := closeClosers(c.afterWaitClosers); e == nil {
+				e = err
+			}
 		}
 	}()
 	if c.calledStart {
@@ -568,7 +603,9 @@
 		c.exited = true
 		c.cond.Signal()
 		c.cond.L.Unlock()
-		closeClosers(c.afterWaitClosers)
+		if err := closeClosers(c.afterWaitClosers); waitErr == nil {
+			waitErr = err
+		}
 		if c.stdinDoneChan != nil {
 			// Wait for the stdinPipeCopier goroutine to finish.
 			if err := <-c.stdinDoneChan; waitErr == nil {
@@ -579,24 +616,23 @@
 	}()
 }
 
-func closeClosers(closers []io.Closer) {
-	// If the same WriteCloser was passed to both AddStdoutWriter and
-	// AddStderrWriter, we should only close it once.
-	cm := map[io.Closer]bool{}
+func closeClosers(closers []io.Closer) error {
+	var firstErr error
 	for _, closer := range closers {
-		if !cm[closer] {
-			cm[closer] = true
-			closer.Close() // best-effort; ignore returned error
+		if err := closer.Close(); firstErr == nil {
+			firstErr = err
 		}
 	}
+	return firstErr
 }
 
 // TODO(sadovsky): Maybe add optional timeouts for Cmd.{awaitVars,wait}.
 
 func (c *Cmd) awaitVars(keys ...string) (map[string]string, error) {
-	if !c.started {
+	switch {
+	case !c.started:
 		return nil, errDidNotCallStart
-	} else if c.calledWait {
+	case c.calledWait:
 		return nil, errAlreadyCalledWait
 	}
 	wantKeys := map[string]bool{}
@@ -626,9 +662,10 @@
 }
 
 func (c *Cmd) wait() error {
-	if !c.started {
+	switch {
+	case !c.started:
 		return errDidNotCallStart
-	} else if c.calledWait {
+	case c.calledWait:
 		return errAlreadyCalledWait
 	}
 	c.calledWait = true
@@ -647,9 +684,10 @@
 // of the os.Signal interface, and have the signal and terminate methods map
 // that to Process.Kill.
 func (c *Cmd) signal(sig os.Signal) error {
-	if !c.started {
+	switch {
+	case !c.started:
 		return errDidNotCallStart
-	} else if c.calledWait {
+	case c.calledWait:
 		return errAlreadyCalledWait
 	}
 	if !c.isRunning() {
diff --git a/gosh/pipeline.go b/gosh/pipeline.go
new file mode 100644
index 0000000..c9aec89
--- /dev/null
+++ b/gosh/pipeline.go
@@ -0,0 +1,395 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package gosh
+
+import (
+	"bytes"
+	"errors"
+	"io"
+	"os"
+)
+
+var errPipelineCmdsHaveDifferentShells = errors.New("gosh: pipeline cmds have different shells")
+
+// Pipeline represents a pipeline of commands, where the stdout and/or stderr of
+// one command is connected to the stdin of the next command.
+//
+// The failure semantics of a pipeline are determined by the failure semantics
+// of each command; by default the pipeline fails if any command fails, and
+// read/write errors on closed pipes are ignored. This is different from bash,
+// where the default is to only check the status of the last command, unless
+// "set -o pipefail" is enabled to check the status of all commands, causing
+// closed pipe errors to fail the pipeline. Use Cmd.ExitErrorIsOk and
+// Cmd.IgnoreClosedPipeError to fine-tune the failure semantics.
+//
+// The implementation of Pipeline only uses exported methods from Shell and Cmd.
+type Pipeline struct {
+	sh    *Shell
+	cmds  []*Cmd      // INVARIANT: len(cmds) > 0
+	state []pipeState // INVARIANT: len(state) == len(cmds) - 1
+}
+
+type pipeState struct {
+	mode       pipeMode  // describes how to connect cmds[i] to cmds[i+1]
+	stdinRead  io.Closer // read-side closer of the stdin pipe for cmds[i+1]
+	stdinWrite io.Closer // write-side closer of the stdin pipe for cmds[i+1]
+}
+
+type pipeMode int
+
+const (
+	pipeStdout pipeMode = iota
+	pipeStderr
+	pipeCombinedOutput
+)
+
+// NewPipeline returns a new Pipeline starting with c. The stdout of each
+// command is connected to the stdin of the next command, via calls to
+// Pipeline.PipeStdout. To construct pipelines involving stderr, call
+// Pipeline.PipeStderr or Pipeline.PipeCombinedOutput directly.
+//
+// Each command must have been created from the same Shell. Errors are reported
+// to c.Shell, via Shell.HandleError. Sets Cmd.IgnoreClosedPipeError to true for
+// all commands.
+func NewPipeline(c *Cmd, cmds ...*Cmd) *Pipeline {
+	sh := c.Shell()
+	sh.Ok()
+	res, err := newPipeline(sh, c, cmds...)
+	handleError(sh, err)
+	return res
+}
+
+// Cmds returns the commands in the pipeline.
+func (p *Pipeline) Cmds() []*Cmd {
+	return p.cmds
+}
+
+// Clone returns a new Pipeline where p's commands are cloned and connected with
+// the same pipeline structure as in p.
+func (p *Pipeline) Clone() *Pipeline {
+	p.sh.Ok()
+	res, err := p.clone()
+	handleError(p.sh, err)
+	return res
+}
+
+// PipeStdout connects the stdout of the last command in p to the stdin of c,
+// and appends c to the commands in p. Must be called before Start. Sets
+// c.IgnoreClosedPipeError to true.
+func (p *Pipeline) PipeStdout(c *Cmd) {
+	p.sh.Ok()
+	handleError(p.sh, p.pipeTo(c, pipeStdout, false))
+}
+
+// PipeStderr connects the stderr of the last command in p to the stdin of c,
+// and appends c to the commands in p. Must be called before Start. Sets
+// c.IgnoreClosedPipeError to true.
+func (p *Pipeline) PipeStderr(c *Cmd) {
+	p.sh.Ok()
+	handleError(p.sh, p.pipeTo(c, pipeStderr, false))
+}
+
+// PipeCombinedOutput connects the combined stdout and stderr of the last
+// command in p to the stdin of c, and appends c to the commands in p. Must be
+// called before Start. Sets c.IgnoreClosedPipeError to true.
+func (p *Pipeline) PipeCombinedOutput(c *Cmd) {
+	p.sh.Ok()
+	handleError(p.sh, p.pipeTo(c, pipeCombinedOutput, false))
+}
+
+// Start starts all commands in the pipeline.
+func (p *Pipeline) Start() {
+	p.sh.Ok()
+	handleError(p.sh, p.start())
+}
+
+// Wait waits for all commands in the pipeline to exit.
+func (p *Pipeline) Wait() {
+	p.sh.Ok()
+	handleError(p.sh, p.wait())
+}
+
+// Signal sends a signal to all underlying processes in the pipeline.
+func (p *Pipeline) Signal(sig os.Signal) {
+	p.sh.Ok()
+	handleError(p.sh, p.signal(sig))
+}
+
+// Terminate sends a signal to all underlying processes in the pipeline, then
+// waits for all processes to exit. Terminate is different from Signal followed
+// by Wait: Terminate succeeds as long as all processes exit, whereas Wait fails
+// if any process's exit code isn't 0.
+func (p *Pipeline) Terminate(sig os.Signal) {
+	p.sh.Ok()
+	handleError(p.sh, p.terminate(sig))
+}
+
+// Run calls Start followed by Wait.
+func (p *Pipeline) Run() {
+	p.sh.Ok()
+	handleError(p.sh, p.run())
+}
+
+// Stdout calls Start followed by Wait, then returns the last command's stdout.
+func (p *Pipeline) Stdout() string {
+	p.sh.Ok()
+	res, err := p.stdout()
+	handleError(p.sh, err)
+	return res
+}
+
+// StdoutStderr calls Start followed by Wait, then returns the last command's
+// stdout and stderr.
+func (p *Pipeline) StdoutStderr() (string, string) {
+	p.sh.Ok()
+	stdout, stderr, err := p.stdoutStderr()
+	handleError(p.sh, err)
+	return stdout, stderr
+}
+
+// CombinedOutput calls Start followed by Wait, then returns the last command's
+// combined stdout and stderr.
+func (p *Pipeline) CombinedOutput() string {
+	p.sh.Ok()
+	res, err := p.combinedOutput()
+	handleError(p.sh, err)
+	return res
+}
+
+////////////////////////////////////////
+// Internals
+
+// handleError is used instead of direct calls to Shell.HandleError throughout
+// the pipeline implementation. This is needed to handle the case where the user
+// has set Shell.Opts.Fatalf to a non-fatal function.
+//
+// The general pattern is that after each Shell or Cmd method is called, we
+// check p.sh.Err. If there was an error it is wrapped with errAlreadyHandled,
+// indicating that Shell.HandleError has already been called with this error,
+// and should not be called again.
+func handleError(sh *Shell, err error) {
+	if _, ok := err.(errAlreadyHandled); ok {
+		return // the shell already handled this error
+	}
+	sh.HandleError(err)
+}
+
+type errAlreadyHandled struct {
+	error
+}
+
+func newPipeline(sh *Shell, first *Cmd, cmds ...*Cmd) (*Pipeline, error) {
+	p := &Pipeline{sh: sh, cmds: []*Cmd{first}}
+	first.IgnoreClosedPipeError = true
+	for _, c := range cmds {
+		if err := p.pipeTo(c, pipeStdout, false); err != nil {
+			return nil, err
+		}
+	}
+	return p, nil
+}
+
+func (p *Pipeline) clone() (*Pipeline, error) {
+	// Replicate the pipeline structure with cloned commands.
+	first := p.cmds[0].Clone()
+	if p.sh.Err != nil {
+		return nil, errAlreadyHandled{p.sh.Err}
+	}
+	res := &Pipeline{sh: p.sh, cmds: []*Cmd{first}}
+	for i := 1; i < len(p.cmds); i++ {
+		if err := res.pipeTo(p.cmds[i], p.state[i-1].mode, true); err != nil {
+			return nil, err
+		}
+	}
+	return res, nil
+}
+
+func (p *Pipeline) pipeTo(c *Cmd, mode pipeMode, clone bool) (e error) {
+	if p.sh != c.Shell() {
+		return errPipelineCmdsHaveDifferentShells
+	}
+	if clone {
+		c = c.Clone()
+		if p.sh.Err != nil {
+			return errAlreadyHandled{p.sh.Err}
+		}
+	} else {
+		c.IgnoreClosedPipeError = true
+	}
+	// We could just use c.StdinPipe() here, but that provides unlimited size
+	// buffering using a newBufferedPipe chained to an os.Pipe. We want limited
+	// size buffering to avoid unlimited memory growth, so we just use an os.Pipe.
+	pr, pw, err := os.Pipe()
+	if err != nil {
+		return err
+	}
+	defer func() {
+		// Close both ends of the pipe on failure; on success they'll be closed at
+		// the appropriate times in start and wait.
+		if e != nil {
+			pr.Close()
+			pw.Close()
+		}
+	}()
+	if c.SetStdinReader(pr); p.sh.Err != nil {
+		return errAlreadyHandled{p.sh.Err}
+	}
+	last := p.cmds[len(p.cmds)-1]
+	if mode == pipeStdout || mode == pipeCombinedOutput {
+		if last.AddStdoutWriter(pw); p.sh.Err != nil {
+			return errAlreadyHandled{p.sh.Err}
+		}
+	}
+	if mode == pipeStderr || mode == pipeCombinedOutput {
+		if last.AddStderrWriter(pw); p.sh.Err != nil {
+			return errAlreadyHandled{p.sh.Err}
+		}
+	}
+	p.cmds = append(p.cmds, c)
+	p.state = append(p.state, pipeState{mode, pr, pw})
+	return nil
+}
+
+// TODO(toddw): Clean up resources in Shell.Cleanup. E.g. we'll currently leak
+// the os.Pipe fds if the user sets up a pipeline, but never calls Start (or
+// Wait, Terminate).
+
+func (p *Pipeline) start() error {
+	// Start all commands in the pipeline, capturing the first error.
+	// Ensure all commands are processed, by avoiding early-exit.
+	var shErr, closeErr error
+	for i, c := range p.cmds {
+		p.sh.Err = nil
+		if c.Start(); p.sh.Err != nil && shErr == nil {
+			shErr = p.sh.Err
+		}
+		// Close the read-side of the stdin pipe for this command. The fd has been
+		// passed to the child process via Start, so we don't need it open anymore.
+		if i > 0 {
+			if err := p.state[i-1].stdinRead.Close(); err != nil && closeErr == nil {
+				closeErr = err
+			}
+		}
+	}
+	// If anything failed, close the write-side of the stdin pipes too.
+	if shErr != nil || closeErr != nil {
+		for _, state := range p.state {
+			state.stdinWrite.Close() // ignore error, since start or close failed.
+		}
+	}
+	if shErr != nil {
+		p.sh.Err = shErr
+		return errAlreadyHandled{shErr}
+	}
+	return closeErr
+}
+
+func (p *Pipeline) wait() error {
+	// Wait for all commands in the pipeline, capturing the first error.
+	// Ensure all commands are processed, by avoiding early-exit.
+	var shErr, closeErr error
+	for i, c := range p.cmds {
+		p.sh.Err = nil
+		if c.Wait(); p.sh.Err != nil && shErr == nil {
+			shErr = p.sh.Err
+		}
+		// Close the write-side of the stdin pipe for the next command, so that the
+		// next command will eventually read an EOF.
+		if i < len(p.state) {
+			if err := p.state[i].stdinWrite.Close(); err != nil && closeErr == nil {
+				closeErr = err
+			}
+		}
+	}
+	if shErr != nil {
+		p.sh.Err = shErr
+		return errAlreadyHandled{shErr}
+	}
+	return closeErr
+}
+
+func (p *Pipeline) signal(sig os.Signal) error {
+	// Signal all commands in the pipeline, capturing the first error.
+	// Ensure all commands are processed, by avoiding early-exit.
+	var shErr error
+	for _, c := range p.cmds {
+		p.sh.Err = nil
+		if c.Signal(sig); p.sh.Err != nil && shErr == nil {
+			shErr = p.sh.Err
+		}
+	}
+	if shErr != nil {
+		p.sh.Err = shErr
+		return errAlreadyHandled{shErr}
+	}
+	return nil
+}
+
+func (p *Pipeline) terminate(sig os.Signal) error {
+	// Terminate all commands in the pipeline, capturing the first error.
+	// Ensure all commands are processed, by avoiding early-exit.
+	var shErr, closeErr error
+	for i, c := range p.cmds {
+		p.sh.Err = nil
+		if c.Terminate(sig); p.sh.Err != nil && shErr == nil {
+			shErr = p.sh.Err
+		}
+		// Close the write-side of the stdin pipe for the next command, so that the
+		// next command will eventually read an EOF.
+		if i < len(p.state) {
+			if err := p.state[i].stdinWrite.Close(); err != nil && closeErr == nil {
+				closeErr = err
+			}
+		}
+	}
+	if shErr != nil {
+		p.sh.Err = shErr
+		return errAlreadyHandled{shErr}
+	}
+	return closeErr
+}
+
+func (p *Pipeline) run() error {
+	if err := p.start(); err != nil {
+		return err
+	}
+	return p.wait()
+}
+
+func (p *Pipeline) stdout() (string, error) {
+	var stdout bytes.Buffer
+	last := p.cmds[len(p.cmds)-1]
+	if last.AddStdoutWriter(&stdout); p.sh.Err != nil {
+		return "", errAlreadyHandled{p.sh.Err}
+	}
+	err := p.run()
+	return stdout.String(), err
+}
+
+func (p *Pipeline) stdoutStderr() (string, string, error) {
+	var stdout, stderr bytes.Buffer
+	last := p.cmds[len(p.cmds)-1]
+	if last.AddStdoutWriter(&stdout); p.sh.Err != nil {
+		return "", "", errAlreadyHandled{p.sh.Err}
+	}
+	if last.AddStderrWriter(&stderr); p.sh.Err != nil {
+		return "", "", errAlreadyHandled{p.sh.Err}
+	}
+	err := p.run()
+	return stdout.String(), stderr.String(), err
+}
+
+func (p *Pipeline) combinedOutput() (string, error) {
+	var output bytes.Buffer
+	last := p.cmds[len(p.cmds)-1]
+	if last.addStdoutWriter(&output); p.sh.Err != nil {
+		return "", errAlreadyHandled{p.sh.Err}
+	}
+	if last.addStderrWriter(&output); p.sh.Err != nil {
+		return "", errAlreadyHandled{p.sh.Err}
+	}
+	err := p.run()
+	return output.String(), err
+}
diff --git a/gosh/pipeline_test.go b/gosh/pipeline_test.go
new file mode 100644
index 0000000..d26d5b9
--- /dev/null
+++ b/gosh/pipeline_test.go
@@ -0,0 +1,241 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package gosh_test
+
+import (
+	"fmt"
+	"os"
+	"testing"
+	"time"
+
+	"v.io/x/lib/gosh"
+)
+
+func TestPipeline(t *testing.T) {
+	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+	defer sh.Cleanup()
+
+	echo := sh.FuncCmd(echoFunc)
+	echo.Args = append(echo.Args, "foo")
+	replace := sh.FuncCmd(replaceFunc, byte('f'), byte('Z'))
+	cat := sh.FuncCmd(catFunc)
+	p := gosh.NewPipeline(echo, replace, cat)
+	eq(t, p.Stdout(), "Zoo\n")
+	eq(t, p.Clone().Stdout(), "Zoo\n")
+
+	// Try piping only stdout.
+	p = gosh.NewPipeline(sh.FuncCmd(writeFunc, true, true))
+	p.PipeStdout(sh.FuncCmd(replaceFunc, byte('A'), byte('Z')))
+	eq(t, p.Stdout(), "ZZ")
+	eq(t, p.Clone().Stdout(), "ZZ")
+
+	// Try piping only stderr.
+	p = gosh.NewPipeline(sh.FuncCmd(writeFunc, true, true))
+	p.PipeStderr(sh.FuncCmd(replaceFunc, byte('B'), byte('Z')))
+	eq(t, p.Stdout(), "ZZ")
+	eq(t, p.Clone().Stdout(), "ZZ")
+
+	// Try piping both stdout and stderr.
+	p = gosh.NewPipeline(sh.FuncCmd(writeFunc, true, true))
+	p.PipeCombinedOutput(sh.FuncCmd(catFunc))
+	// Note, we can't assume any particular ordering of stdout and stderr, so we
+	// simply check the length of the combined output.
+	eq(t, len(p.Stdout()), 4)
+	eq(t, len(p.Clone().Stdout()), 4)
+
+	// Try piping combinations.
+	p = gosh.NewPipeline(sh.FuncCmd(writeFunc, true, true))
+	p.PipeStderr(sh.FuncCmd(replaceFunc, byte('B'), byte('x')))
+	p.PipeStdout(sh.FuncCmd(replaceFunc, byte('x'), byte('Z')))
+	p.PipeStdout(sh.FuncCmd(catFunc))
+	eq(t, p.Stdout(), "ZZ")
+	eq(t, p.Clone().Stdout(), "ZZ")
+}
+
+func TestPipelineDifferentShells(t *testing.T) {
+	sh1 := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+	defer sh1.Cleanup()
+	sh2 := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+	defer sh2.Cleanup()
+
+	setsErr(t, sh1, func() { gosh.NewPipeline(sh1.FuncCmd(echoFunc), sh2.FuncCmd(catFunc)) })
+	setsErr(t, sh2, func() { gosh.NewPipeline(sh2.FuncCmd(echoFunc), sh1.FuncCmd(catFunc)) })
+	p := gosh.NewPipeline(sh1.FuncCmd(echoFunc))
+	setsErr(t, sh1, func() { p.PipeStdout(sh2.FuncCmd(catFunc)) })
+	p = gosh.NewPipeline(sh1.FuncCmd(echoFunc))
+	setsErr(t, sh1, func() { p.PipeStderr(sh2.FuncCmd(catFunc)) })
+	p = gosh.NewPipeline(sh1.FuncCmd(echoFunc))
+	setsErr(t, sh1, func() { p.PipeCombinedOutput(sh2.FuncCmd(catFunc)) })
+}
+
+var writeLoopFunc = gosh.RegisterFunc("writeLoopFunc", func() error {
+	for {
+		if _, err := os.Stdout.Write([]byte("a\n")); err != nil {
+			// Always return success; the purpose of this command is to ensure that
+			// when the next command in the pipeline fails, it causes a closed pipe
+			// write error here to exit the loop.
+			return nil
+		}
+	}
+})
+
+func TestPipelineClosedPipe(t *testing.T) {
+	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+	defer sh.Cleanup()
+	writeLoop, readLine := sh.FuncCmd(writeLoopFunc), sh.FuncCmd(readFunc)
+
+	// WriteLoop finishes because it gets a closed pipe write error after readLine
+	// finishes. Note that the closed pipe error is ignored.
+	p := gosh.NewPipeline(writeLoop, readLine)
+	eq(t, p.Stdout(), "")
+	ok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+	p = p.Clone()
+	eq(t, p.Stdout(), "")
+	ok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+}
+
+func TestPipelineCmdFailure(t *testing.T) {
+	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+	defer sh.Cleanup()
+	cat := sh.FuncCmd(catFunc)
+	exit1 := sh.FuncCmd(exitFunc, 1)
+	writeLoop := sh.FuncCmd(writeLoopFunc)
+	echoFoo := sh.FuncCmd(echoFunc)
+	echoFoo.Args = append(echoFoo.Args, "foo")
+
+	// Exit1 fails, and cat finishes with success since it sees an EOF.
+	p := gosh.NewPipeline(exit1.Clone(), cat.Clone())
+	setsErr(t, sh, p.Run)
+	nok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+	p = p.Clone()
+	setsErr(t, sh, p.Run)
+	nok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+	ok(t, sh.Err)
+
+	// Exit1 fails, and echoFoo finishes with success since it ignores stdin.
+	p = gosh.NewPipeline(exit1.Clone(), echoFoo.Clone())
+	setsErr(t, sh, p.Run)
+	nok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+	p = p.Clone()
+	setsErr(t, sh, p.Run)
+	nok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+	ok(t, sh.Err)
+
+	// Exit1 fails, causing writeLoop to finish and succeed.
+	p = gosh.NewPipeline(writeLoop.Clone(), exit1.Clone())
+	setsErr(t, sh, p.Run)
+	ok(t, p.Cmds()[0].Err)
+	nok(t, p.Cmds()[1].Err)
+	p = p.Clone()
+	setsErr(t, sh, p.Run)
+	ok(t, p.Cmds()[0].Err)
+	nok(t, p.Cmds()[1].Err)
+	ok(t, sh.Err)
+
+	// Same tests, but allowing the exit error from exit1.
+	exit1.ExitErrorIsOk = true
+
+	// Exit1 fails, and cat finishes with success since it sees an EOF.
+	p = gosh.NewPipeline(exit1.Clone(), cat.Clone())
+	eq(t, p.Stdout(), "")
+	nok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+	ok(t, sh.Err)
+	p = p.Clone()
+	eq(t, p.Stdout(), "")
+	nok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+	ok(t, sh.Err)
+
+	// Exit1 fails, and echoFoo finishes with success since it ignores stdin.
+	p = gosh.NewPipeline(exit1.Clone(), echoFoo.Clone())
+	eq(t, p.Stdout(), "foo\n")
+	nok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+	ok(t, sh.Err)
+	p = p.Clone()
+	eq(t, p.Stdout(), "foo\n")
+	nok(t, p.Cmds()[0].Err)
+	ok(t, p.Cmds()[1].Err)
+	ok(t, sh.Err)
+
+	// Exit1 fails, causing writeLoop to finish and succeed.
+	p = gosh.NewPipeline(writeLoop.Clone(), exit1.Clone())
+	eq(t, p.Stdout(), "")
+	ok(t, p.Cmds()[0].Err)
+	nok(t, p.Cmds()[1].Err)
+	ok(t, sh.Err)
+	p = p.Clone()
+	eq(t, p.Stdout(), "")
+	ok(t, p.Cmds()[0].Err)
+	nok(t, p.Cmds()[1].Err)
+	ok(t, sh.Err)
+}
+
+func TestPipelineSignal(t *testing.T) {
+	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+	defer sh.Cleanup()
+
+	for _, d := range []time.Duration{0, time.Hour} {
+		for _, s := range []os.Signal{os.Interrupt, os.Kill} {
+			fmt.Println(d, s)
+			p := gosh.NewPipeline(sh.FuncCmd(sleepFunc, d, 0), sh.FuncCmd(sleepFunc, d, 0))
+			p.Start()
+			p.Cmds()[0].AwaitVars("ready")
+			p.Cmds()[1].AwaitVars("ready")
+			// Wait for a bit to allow the zero-sleep commands to exit.
+			time.Sleep(100 * time.Millisecond)
+			p.Signal(s)
+			switch {
+			case s == os.Interrupt:
+				// Wait should succeed as long as the exit code was 0, regardless of
+				// whether the signal arrived or the processes had already exited.
+				p.Wait()
+			case d != 0:
+				// Note: We don't call Wait in the {d: 0, s: os.Kill} case because doing
+				// so makes the test flaky on slow systems.
+				setsErr(t, sh, func() { p.Wait() })
+			}
+		}
+	}
+
+	// Signal should fail if Wait has been called.
+	z := time.Duration(0)
+	p := gosh.NewPipeline(sh.FuncCmd(sleepFunc, z, 0), sh.FuncCmd(sleepFunc, z, 0))
+	p.Run()
+	setsErr(t, sh, func() { p.Signal(os.Interrupt) })
+}
+
+func TestPipelineTerminate(t *testing.T) {
+	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+	defer sh.Cleanup()
+
+	for _, d := range []time.Duration{0, time.Hour} {
+		for _, s := range []os.Signal{os.Interrupt, os.Kill} {
+			fmt.Println(d, s)
+			p := gosh.NewPipeline(sh.FuncCmd(sleepFunc, d, 0), sh.FuncCmd(sleepFunc, d, 0))
+			p.Start()
+			p.Cmds()[0].AwaitVars("ready")
+			p.Cmds()[1].AwaitVars("ready")
+			// Wait for a bit to allow the zero-sleep commands to exit.
+			time.Sleep(100 * time.Millisecond)
+			// Terminate should succeed regardless of the exit code, and regardless of
+			// whether the signal arrived or the processes had already exited.
+			p.Terminate(s)
+		}
+	}
+
+	// Terminate should fail if Wait has been called.
+	z := time.Duration(0)
+	p := gosh.NewPipeline(sh.FuncCmd(sleepFunc, z, 0), sh.FuncCmd(sleepFunc, z, 0))
+	p.Run()
+	setsErr(t, sh, func() { p.Terminate(os.Interrupt) })
+}
diff --git a/gosh/shell.go b/gosh/shell.go
index 4426990..4dbcc51 100644
--- a/gosh/shell.go
+++ b/gosh/shell.go
@@ -15,7 +15,6 @@
 import (
 	"errors"
 	"fmt"
-	"io"
 	"io/ioutil"
 	"log"
 	"math/rand"
@@ -55,15 +54,15 @@
 	// Args is the list of args to append to subsequent command invocations.
 	Args []string
 	// Internal state.
-	calledNewShell bool
-	cleanupDone    chan struct{}
-	cleanupMu      sync.Mutex // protects the fields below; held during cleanup
-	calledCleanup  bool
-	cmds           []*Cmd
-	tempFiles      []*os.File
-	tempDirs       []string
-	dirStack       []string // for pushd/popd
-	cleanupFuncs   []func()
+	calledNewShell  bool
+	cleanupDone     chan struct{}
+	cleanupMu       sync.Mutex // protects the fields below; held during cleanup
+	calledCleanup   bool
+	cmds            []*Cmd
+	tempFiles       []*os.File
+	tempDirs        []string
+	dirStack        []string // for pushd/popd
+	cleanupHandlers []func()
 }
 
 // Opts configures Shell.
@@ -176,10 +175,12 @@
 	sh.HandleError(sh.popd())
 }
 
-// AddToCleanup registers the given function to be called by Shell.Cleanup().
-func (sh *Shell) AddToCleanup(f func()) {
+// AddCleanupHandler registers the given function to be called during cleanup.
+// Cleanup handlers are called in LIFO order, possibly in a separate goroutine
+// spawned by gosh.
+func (sh *Shell) AddCleanupHandler(f func()) {
 	sh.Ok()
-	sh.HandleError(sh.addToCleanup(f))
+	sh.HandleError(sh.addCleanupHandler(f))
 }
 
 // Cleanup cleans up all resources (child processes, temporary files and
@@ -450,13 +451,13 @@
 	return nil
 }
 
-func (sh *Shell) addToCleanup(f func()) error {
+func (sh *Shell) addCleanupHandler(f func()) error {
 	sh.cleanupMu.Lock()
 	defer sh.cleanupMu.Unlock()
 	if sh.calledCleanup {
 		return errAlreadyCalledCleanup
 	}
-	sh.cleanupFuncs = append(sh.cleanupFuncs, f)
+	sh.cleanupHandlers = append(sh.cleanupHandlers, f)
 	return nil
 }
 
@@ -532,9 +533,9 @@
 			sh.logf("os.Chdir(%q) failed: %v\n", dir, err)
 		}
 	}
-	// Call any registered cleanup functions in LIFO order.
-	for i := len(sh.cleanupFuncs) - 1; i >= 0; i-- {
-		sh.cleanupFuncs[i]()
+	// Call cleanup handlers in LIFO order.
+	for i := len(sh.cleanupHandlers) - 1; i >= 0; i-- {
+		sh.cleanupHandlers[i]()
 	}
 	close(sh.cleanupDone)
 }
@@ -564,15 +565,3 @@
 	}
 	os.Exit(0)
 }
-
-// NopWriteCloser returns a WriteCloser with a no-op Close method wrapping the
-// provided Writer.
-func NopWriteCloser(w io.Writer) io.WriteCloser {
-	return nopWriteCloser{w}
-}
-
-type nopWriteCloser struct {
-	io.Writer
-}
-
-func (nopWriteCloser) Close() error { return nil }
diff --git a/gosh/shell_test.go b/gosh/shell_test.go
index d8d6930..0a7a6b6 100644
--- a/gosh/shell_test.go
+++ b/gosh/shell_test.go
@@ -28,7 +28,7 @@
 	"time"
 
 	"v.io/x/lib/gosh"
-	"v.io/x/lib/gosh/internal/gosh_example_lib"
+	lib "v.io/x/lib/gosh/internal/gosh_example_lib"
 )
 
 var fakeError = errors.New("fake error")
@@ -95,11 +95,13 @@
 
 // Simplified versions of various Unix commands.
 var (
-	catFunc = gosh.RegisterFunc("catFunc", func() {
-		io.Copy(os.Stdout, os.Stdin)
+	catFunc = gosh.RegisterFunc("catFunc", func() error {
+		_, err := io.Copy(os.Stdout, os.Stdin)
+		return err
 	})
-	echoFunc = gosh.RegisterFunc("echoFunc", func() {
-		fmt.Println(os.Args[1])
+	echoFunc = gosh.RegisterFunc("echoFunc", func() error {
+		_, err := fmt.Println(os.Args[1])
+		return err
 	})
 	readFunc = gosh.RegisterFunc("readFunc", func() {
 		bufio.NewReader(os.Stdin).ReadString('\n')
@@ -494,16 +496,16 @@
 	defer sh.Cleanup()
 
 	c := sh.FuncCmd(writeFunc, true, true)
-	c.AddStdoutWriter(gosh.NopWriteCloser(os.Stdout))
-	c.AddStderrWriter(gosh.NopWriteCloser(os.Stderr))
+	c.AddStdoutWriter(os.Stdout)
+	c.AddStderrWriter(os.Stderr)
 	c.Run()
 
 	fmt.Fprint(os.Stdout, " stdout done")
 	fmt.Fprint(os.Stderr, " stderr done")
 })
 
-// Tests that it's safe to add wrapped os.Stdout and os.Stderr as writers.
-func TestAddWritersWrappedStdoutStderr(t *testing.T) {
+// Tests that it's safe to add os.Stdout and os.Stderr as writers.
+func TestAddStdoutStderrWriter(t *testing.T) {
 	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
 	defer sh.Cleanup()
 
@@ -512,26 +514,14 @@
 	eq(t, stderr, "BB stderr done")
 }
 
-// Tests that adding non-wrapped os.Stdout or os.Stderr fails.
-func TestAddWritersNonWrappedStdoutStderr(t *testing.T) {
-	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
-	defer sh.Cleanup()
-
-	c := sh.FuncCmd(writeMoreFunc)
-	setsErr(t, sh, func() { c.AddStdoutWriter(os.Stdout) })
-	setsErr(t, sh, func() { c.AddStdoutWriter(os.Stderr) })
-	setsErr(t, sh, func() { c.AddStderrWriter(os.Stdout) })
-	setsErr(t, sh, func() { c.AddStderrWriter(os.Stderr) })
-}
-
 func TestCombinedOutput(t *testing.T) {
 	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
 	defer sh.Cleanup()
 
 	c := sh.FuncCmd(writeFunc, true, true)
 	buf := &bytes.Buffer{}
-	c.AddStdoutWriter(gosh.NopWriteCloser(buf))
-	c.AddStderrWriter(gosh.NopWriteCloser(buf))
+	c.AddStdoutWriter(buf)
+	c.AddStderrWriter(buf)
 	output := c.CombinedOutput()
 	// Note, we can't assume any particular ordering of stdout and stderr, so we
 	// simply check the length of the combined output.
@@ -565,35 +555,22 @@
 	eq(t, string(stderr), "BB")
 }
 
-type countingWriteCloser struct {
-	io.Writer
-	count int
-}
-
-func (wc *countingWriteCloser) Close() error {
-	wc.count++
-	return nil
-}
-
-// Tests that Close is called exactly once on a given WriteCloser, even if that
-// WriteCloser is passed to Add{Stdout,Stderr}Writer multiple times.
-func TestAddWritersCloseOnce(t *testing.T) {
-	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
-	defer sh.Cleanup()
-
-	c := sh.FuncCmd(writeFunc, true, true)
-	buf := &bytes.Buffer{}
-	wc := &countingWriteCloser{Writer: buf}
-	c.AddStdoutWriter(wc)
-	c.AddStdoutWriter(wc)
-	c.AddStderrWriter(wc)
-	c.AddStderrWriter(wc)
-	c.Run()
-	// Note, we can't assume any particular ordering of stdout and stderr, so we
-	// simply check the length of the combined output.
-	eq(t, len(buf.String()), 8)
-	eq(t, wc.count, 1)
-}
+var replaceFunc = gosh.RegisterFunc("replaceFunc", func(old, new byte) error {
+	buf := make([]byte, 1024)
+	for {
+		n, err := os.Stdin.Read(buf)
+		switch {
+		case err == io.EOF:
+			return nil
+		case err != nil:
+			return err
+		}
+		rep := bytes.Replace(buf[:n], []byte{old}, []byte{new}, -1)
+		if _, err := os.Stdout.Write(rep); err != nil {
+			return err
+		}
+	}
+})
 
 func TestSignal(t *testing.T) {
 	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
@@ -718,6 +695,34 @@
 	nok(t, c.Err)
 }
 
+func TestIgnoreClosedPipeError(t *testing.T) {
+	sh := gosh.NewShell(gosh.Opts{Fatalf: makeFatalf(t), Logf: t.Logf})
+	defer sh.Cleanup()
+
+	// Since writeLoopFunc will only finish if it receives a write error, it's
+	// depending on the closed pipe error from closedPipeErrorWriter.
+	c := sh.FuncCmd(writeLoopFunc)
+	c.AddStdoutWriter(errorWriter{io.ErrClosedPipe})
+	c.IgnoreClosedPipeError = true
+	c.Run()
+	ok(t, c.Err)
+	ok(t, sh.Err)
+
+	// Without IgnoreClosedPipeError, the command fails.
+	c = sh.FuncCmd(writeLoopFunc)
+	c.AddStdoutWriter(errorWriter{io.ErrClosedPipe})
+	setsErr(t, sh, func() { c.Run() })
+	nok(t, c.Err)
+}
+
+type errorWriter struct {
+	error
+}
+
+func (w errorWriter) Write(p []byte) (int, error) {
+	return 0, w.error
+}
+
 // Tests that sh.Ok panics under various conditions.
 func TestOkPanics(t *testing.T) {
 	func() { // errDidNotCallNewShell