blob: d47af45ab0e0c5f32c22304c101325d7f9911909 [file] [log] [blame]
package util
import (
"bytes"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"os/exec"
"strings"
"time"
)
// Should switch to a logging package that supports levels,
// e.g. https://github.com/golang/glog
var debug = flag.Bool("debug", false,
"If true, dump more information during run.")
func checkWrite(output string, writer io.Writer) {
n, err := writer.Write([]byte(output))
if err != nil {
log.Fatalf("Could not write %d bytes: %v", len(output), err)
} else if n != len(output) {
log.Fatalf("Expected to write %d bytes, wrote %d", len(output), n)
}
}
// check reports the error fatally if it's non-nil.
func check(msg string, err error) {
if err != nil {
fmt.Printf("Problem with %s: %v\n", msg, err)
log.Fatal(err)
}
}
// blockOutput pairs the output collected from a stream (i.e. stderr
// or stdout) as a result of executing all or part of a command block
// with a bool indicating if the output is associated with shell
// success or shell failure. Output can appear on stderr without
// neccessarily being associated with shell failure.
type blockOutput struct {
success bool
output string
}
// accumulateOutput returns a channel to which it writes objects that
// contain what purport to be the entire output of one command block.
//
// To do so, it accumulates strings off a channel representing command
// block output until the channel closes, or until a string arrives
// that matches a particular pattern.
//
// On the happy path, strings are accumulated and every so often sent
// out with a success == true flag attached. This continues until the
// input channel closes.
//
// On a sad path, an accumulation of strings is sent with a success ==
// false flag attached, and the function exits early, before it's
// input channel closes.
func accumulateOutput(prefix string, in <-chan string) <-chan *blockOutput {
out := make(chan *blockOutput)
var accum bytes.Buffer
go func() {
defer close(out)
for line := range in {
if strings.HasPrefix(line, MsgTimeout) {
accum.WriteString("\n" + line + "\n")
accum.WriteString("A subprocess might still be running.\n")
if *debug {
fmt.Printf("DEBUG: accumulateOutput %s: Timeout return.\n", prefix)
}
out <- &blockOutput{false, accum.String()}
return
}
if strings.HasPrefix(line, MsgError) {
accum.WriteString(line + "\n")
if *debug {
fmt.Printf("DEBUG: accumulateOutput %s: Error return.\n", prefix)
}
out <- &blockOutput{false, accum.String()}
return
}
if strings.HasPrefix(line, MsgHappy) {
if *debug {
fmt.Printf("DEBUG: accumulateOutput %s: %s\n", prefix, line)
}
out <- &blockOutput{true, accum.String()}
accum.Reset()
} else {
if *debug {
fmt.Printf("DEBUG: accumulateOutput %s: Accumulating [%s]\n", prefix, line)
}
accum.WriteString(line + "\n")
}
}
if *debug {
fmt.Printf("DEBUG: accumulateOutput %s: <--- This channel has closed.\n", prefix)
}
trailing := strings.TrimSpace(accum.String())
if len(trailing) > 0 {
if *debug {
fmt.Printf(
"DEBUG: accumulateOutput %s: Erroneous (missing-happy) output [%s]\n", prefix, accum.String())
}
out <- &blockOutput{false, accum.String()}
} else {
if *debug {
fmt.Printf("DEBUG: accumulateOutput %s: Nothing trailing.\n", prefix)
}
}
}()
return out
}
// ScriptResult pairs blockOutput with meta data about shell execution.
type ScriptResult struct {
blockOutput
fileName string // File in which the error occurred.
index int // Command block index.
block *CommandBlock // Content of actual command block.
problem error // Error, if any.
message string // Detailed error message, if any.
}
func (x ScriptResult) GetFileName() string {
return x.fileName
}
func (x ScriptResult) GetProblem() error {
return x.problem
}
// ScriptBucket associates a list of commandBlocks with the name of the
// file they came from.
type ScriptBucket struct {
fileName string
script []*CommandBlock
}
func (x ScriptBucket) GetFileName() string {
return x.fileName
}
func (x ScriptBucket) GetScript() []*CommandBlock {
return x.script
}
func NewScriptBucket(fileName string, script []*CommandBlock) *ScriptBucket {
return &ScriptBucket{fileName, script}
}
// userBehavior acts like a command line user.
//
// It writes command blocks to shell, then waits after each block to
// see if the block worked. If the block appeared to complete without
// error, the routine sends the next block, else it exits early.
func userBehavior(scriptBuckets []*ScriptBucket, blockTimeout time.Duration,
stdOut, stdErr io.ReadCloser) (errResult *ScriptResult) {
emptyArray := []string{}
chOut := BuffScanner(blockTimeout, "stdout", stdOut, *debug)
chErr := BuffScanner(1*time.Minute, "stderr", stdErr, *debug)
chAccOut := accumulateOutput("stdOut", chOut)
chAccErr := accumulateOutput("stdErr", chErr)
errResult = &ScriptResult{blockOutput{false, ""}, "", -1, &CommandBlock{emptyArray, ""}, nil, ""}
for _, bucket := range scriptBuckets {
for i, block := range bucket.script {
blockName := block.labels[0]
fmt.Printf("Running %s (%d/%d) from %s\n",
blockName, i+1, len(bucket.script), bucket.fileName)
if *debug {
fmt.Printf("DEBUG: userBehavior: sending \"%s\"\n", block.codeText)
}
result := <-chAccOut
if result == nil || !result.success {
// A nil result means stdout has closed early because a
// sub-subprocess failed.
if result == nil {
if *debug {
fmt.Printf("DEBUG: userBehavior: stdout Result == nil.\n")
// fmt.Printf("DEBUG: userBehavior: sending warning to stdErr\n")
}
// chErr <- MsgError + " : early termination; stdout has closed."
} else {
if *debug {
fmt.Printf("DEBUG: userBehavior: stdout Result: %s\n", result.output)
}
errResult.output = result.output
errResult.message = result.output
}
errResult.fileName = bucket.fileName
errResult.index = i
errResult.block = block
fillErrResult(chAccErr, errResult)
return
}
}
}
fmt.Printf("All done, no errors triggered.\n")
return
}
// fillErrResult fills an instance of ScriptResult.
func fillErrResult(chAccErr <-chan *blockOutput, errResult *ScriptResult) {
result := <-chAccErr
if result == nil {
if *debug {
fmt.Printf("DEBUG: userBehavior: stderr Result == nil.\n")
}
errResult.problem = errors.New("unknown")
return
}
errResult.problem = errors.New(result.output)
errResult.message = result.output
if *debug {
fmt.Printf("DEBUG: userBehavior: stderr Result: %s\n", result.output)
}
}
func dumpCapturedOutput(name, delim, output string) {
fmt.Fprintf(os.Stderr, "\n%s capture:\n", name)
fmt.Fprintf(os.Stderr, delim)
fmt.Fprintf(os.Stderr, output)
fmt.Fprintf(os.Stderr, "\n")
fmt.Fprintf(os.Stderr, delim)
}
// Complain spits the contents of a ScriptResult to stderr.
func Complain(result *ScriptResult, label string) {
delim := strings.Repeat("-", 70) + "\n"
fmt.Fprintf(os.Stderr, "Error in block '%s' (#%d of script '%s') in %s:\n",
result.block.labels[0], result.index+1, label, result.fileName)
fmt.Fprintf(os.Stderr, delim)
fmt.Fprintf(os.Stderr, string(result.block.codeText))
fmt.Fprintf(os.Stderr, delim)
dumpCapturedOutput("Stdout", delim, result.output)
if len(result.message) > 0 {
dumpCapturedOutput("Stderr", delim, result.message)
}
}
// RunInSubShell runs command blocks in a subprocess, stopping and
// reporting on any error. The subprocess runs with the -e flag, so
// it will abort if any sub-subprocess (any command) fails.
//
// Command blocks are strings presumably holding code from some shell
// language. The strings may be more complex than single commands
// delimitted by linefeeds - e.g. blocks that operate on HERE
// documents, or multi-line commands using line continuation via '\',
// quotes or curly brackets.
//
// This function itself is not a shell interpreter, so it has no idea
// if one line of text from a command block is an individual command
// or part of something else.
//
// Error reporting works by discarding output from command blocks that
// succeeded, and only reporting the contents of stdout and stderr
// when the subprocess exits on error.
func RunInSubShell(scriptBuckets []*ScriptBucket, blockTimeout time.Duration) (
result *ScriptResult) {
// Write script buckets to a file to be executed.
scriptFile, err := ioutil.TempFile("", "mdrip-script-")
check("create temp file", err)
check("chmod temp file", os.Chmod(scriptFile.Name(), 0744))
for _, bucket := range scriptBuckets {
for _, block := range bucket.script {
checkWrite(block.codeText, scriptFile)
checkWrite("\n", scriptFile)
checkWrite("echo "+MsgHappy+" "+block.labels[0]+"\n", scriptFile)
}
}
if *debug {
fmt.Printf("DEBUG: RunInSubShell: running commands from %s\n", scriptFile.Name())
}
defer func() {
check("delete temp file", os.Remove(scriptFile.Name()))
}()
// Adding "-e" to force the subshell to die on any error.
shell := exec.Command("bash", "-e", scriptFile.Name())
stdIn, err := shell.StdinPipe()
check("in pipe", err)
check("close shell's stdin", stdIn.Close())
stdOut, err := shell.StdoutPipe()
check("out pipe", err)
stdErr, err := shell.StderrPipe()
check("err pipe", err)
err = shell.Start()
check("shell start", err)
pid := shell.Process.Pid
if *debug {
fmt.Printf("DEBUG: RunInSubShell: pid = %d\n", pid)
}
pgid, err := getProcesssGroupId(pid)
if err == nil {
if *debug {
fmt.Printf("DEBUG: RunInSubShell: pgid = %d\n", pgid)
}
}
result = userBehavior(scriptBuckets, blockTimeout, stdOut, stdErr)
if *debug {
fmt.Printf("DEBUG: RunInSubShell: Waiting for shell to end.\n")
}
waitError := shell.Wait()
if result.problem == nil {
result.problem = waitError
}
if *debug {
fmt.Printf("DEBUG: RunInSubShell: Shell done.\n")
}
// killProcesssGroup(pgid)
return
}