blob: 3f17f19a2348673e9cee890e471553de52f7a6b2 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors, All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"bufio"
"flag"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"os/signal"
"regexp"
"sort"
"strings"
"sync"
"v.io/jiri"
"v.io/jiri/gitutil"
"v.io/jiri/profiles/profilescmdline"
"v.io/jiri/profiles/profilesreader"
"v.io/jiri/project"
"v.io/jiri/tool"
"v.io/x/lib/cmdline"
"v.io/x/lib/envvar"
"v.io/x/lib/simplemr"
)
var (
cmdRunP *cmdline.Command
runpFlags runpFlagValues
)
func newRunP() *cmdline.Command {
return &cmdline.Command{
Runner: jiri.RunnerFunc(runRunp),
Name: "runp",
Short: "Run a command in parallel across jiri projects",
Long: `
Run a command in parallel across one or more jiri projects using the specified
profile target's environment. Commands are run using the shell specified by the
users $SHELL environment variable, or "sh" if that's not set. Thus commands
are run as $SHELL -c "args..."
`,
ArgsName: "<command line>",
ArgsLong: `
A command line to be run in each project specified by the supplied command
line flags. Any environment variables intended to be evaluated when the
command line is run must be quoted to avoid expansion before being passed to
runp by the shell.
`,
}
}
type runpFlagValues struct {
profilescmdline.ReaderFlagValues
projectKeys string
verbose bool
interactive bool
hasUncommitted bool
hasUntracked bool
hasGerritMessage bool
showNamePrefix bool
showKeyPrefix bool
exitOnError bool
collateOutput bool
editMessage bool
hasBranch string
}
func registerCommonFlags(flags *flag.FlagSet, values *runpFlagValues) {
profilescmdline.RegisterReaderFlags(flags, &values.ReaderFlagValues, "", jiri.ProfilesDBDir)
flags.BoolVar(&values.verbose, "v", false, "Print verbose logging information")
flags.StringVar(&values.projectKeys, "projects", "", "A Regular expression specifying project keys to run commands in. By default, runp will use projects that have the same branch checked as the current project unless it is run from outside of a project in which case it will default to using all projects.")
flags.BoolVar(&values.hasUncommitted, "has-uncommitted", false, "If specified, match projects that have, or have no, uncommitted changes")
flags.BoolVar(&values.hasUntracked, "has-untracked", false, "If specified, match projects that have, or have no, untracked files")
flags.BoolVar(&values.hasGerritMessage, "has-gerrit-message", false, "If specified, match branches that have, or have no, gerrit message")
flags.BoolVar(&values.interactive, "interactive", true, "If set, the command to be run is interactive and should not have its stdout/stderr manipulated. This flag cannot be used with -show-name-prefix, -show-key-prefix or -collate-stdout.")
flags.BoolVar(&values.showNamePrefix, "show-name-prefix", false, "If set, each line of output from each project will begin with the name of the project followed by a colon. This is intended for use with long running commands where the output needs to be streamed. Stdout and stderr are spliced apart. This flag cannot be used with -interactive, -show-key-prefix or -collate-stdout.")
flags.BoolVar(&values.showKeyPrefix, "show-key-prefix", false, "If set, each line of output from each project will begin with the key of the project followed by a colon. This is intended for use with long running commands where the output needs to be streamed. Stdout and stderr are spliced apart. This flag cannot be used with -interactive, -show-name-prefix or -collate-stdout")
flags.BoolVar(&values.collateOutput, "collate-stdout", true, "Collate all stdout output from each parallel invocation and display it as if had been generated sequentially. This flag cannot be used with -show-name-prefix, -show-key-prefix or -interactive.")
flags.BoolVar(&values.exitOnError, "exit-on-error", false, "If set, all commands will killed as soon as one reports an error, otherwise, each will run to completion.")
flags.StringVar(&values.hasBranch, "has-branch", "", "A regular expression specifying branch names to use in matching projects. A project will match if the specified branch exists, even if it is not checked out.")
}
func init() {
// Avoid an intialization loop between cmdline.Command.Runner which
// refers to cmdRunP and runRunp referring back to cmdRunP.ParsedFlags.
cmdRunP = newRunP()
cmdRoot.Children = append(cmdRoot.Children, cmdRunP)
registerCommonFlags(&cmdRunP.Flags, &runpFlags)
}
type mapInput struct {
*project.ProjectState
key project.ProjectKey
jirix *jiri.X
index, total int
result error
}
func newmapInput(jirix *jiri.X, state *project.ProjectState, key project.ProjectKey, index, total int) *mapInput {
return &mapInput{
ProjectState: state,
key: key,
jirix: jirix.Clone(tool.ContextOpts{}),
index: index,
total: total,
}
}
func stateNames(states map[project.ProjectKey]*mapInput) []string {
n := []string{}
for _, state := range states {
n = append(n, state.Project.Name)
}
sort.Strings(n)
return n
}
func stateKeys(states map[project.ProjectKey]*mapInput) []string {
n := []string{}
for key := range states {
n = append(n, string(key))
}
sort.Strings(n)
return n
}
type runner struct {
args []string
reader *profilesreader.Reader
serializedWriterLock sync.Mutex
collatedOutputLock sync.Mutex
}
func (r *runner) serializedWriter(w io.Writer) io.Writer {
return &sharedLockWriter{&r.serializedWriterLock, w}
}
type sharedLockWriter struct {
mu *sync.Mutex
f io.Writer
}
func (lw *sharedLockWriter) Write(d []byte) (int, error) {
lw.mu.Lock()
defer lw.mu.Unlock()
return lw.f.Write(d)
}
func copyWithPrefix(prefix string, w io.Writer, r io.Reader) {
reader := bufio.NewReader(r)
for {
line, err := reader.ReadString('\n')
if err != nil {
if line != "" {
fmt.Fprintf(w, "%v: %v\n", prefix, line)
}
break
}
fmt.Fprintf(w, "%v: %v", prefix, line)
}
}
type mapOutput struct {
mi *mapInput
outputFilename string
key string
err error
}
func (r *runner) Map(mr *simplemr.MR, key string, val interface{}) error {
mi := val.(*mapInput)
output := &mapOutput{
key: key,
mi: mi}
jirix := mi.jirix
path := os.Getenv("SHELL")
if path == "" {
path = "sh"
}
var wg sync.WaitGroup
cmd := exec.Command(path, "-c", strings.Join(r.args, " "))
cmd.Env = envvar.MapToSlice(jirix.Env())
cmd.Dir = mi.ProjectState.Project.Path
cmd.Stdin = mi.jirix.Stdin()
var stdoutCloser, stderrCloser io.Closer
if runpFlags.interactive {
cmd.Stdout = jirix.Stdout()
cmd.Stderr = jirix.Stderr()
} else {
var stdout io.Writer
stderr := r.serializedWriter(jirix.Stderr())
var cleanup func()
if runpFlags.collateOutput {
// Write standard output to a file, stderr
// is not collated.
f, err := ioutil.TempFile("", mi.ProjectState.Project.Name+"-")
if err != nil {
return err
}
stdout = f
output.outputFilename = f.Name()
cleanup = func() {
os.Remove(output.outputFilename)
}
// The child process will have exited by the
// time this method returns so it's safe to close the file
// here.
defer f.Close()
} else {
stdout = r.serializedWriter(jirix.Stdout())
cleanup = func() {}
}
if !runpFlags.showNamePrefix && !runpFlags.showKeyPrefix {
// write directly to stdout, stderr if there's no prefix
cmd.Stdout = stdout
cmd.Stderr = stderr
} else {
stdoutReader, stdoutWriter, err := os.Pipe()
if err != nil {
cleanup()
return err
}
stderrReader, stderrWriter, err := os.Pipe()
if err != nil {
cleanup()
stdoutReader.Close()
stdoutWriter.Close()
return err
}
cmd.Stdout = stdoutWriter
cmd.Stderr = stderrWriter
// Record the write end of the pipe so that it can be closed
// after the child has exited, this ensures that all goroutines
// will finish.
stdoutCloser = stdoutWriter
stderrCloser = stderrWriter
prefix := key
if runpFlags.showNamePrefix {
prefix = mi.ProjectState.Project.Name
}
wg.Add(2)
go func() { copyWithPrefix(prefix, stdout, stdoutReader); wg.Done() }()
go func() { copyWithPrefix(prefix, stderr, stderrReader); wg.Done() }()
}
}
if err := cmd.Start(); err != nil {
mi.result = err
}
done := make(chan error)
go func() {
done <- cmd.Wait()
}()
select {
case output.err = <-done:
if output.err != nil && runpFlags.exitOnError {
mr.Cancel()
}
case <-mr.CancelCh():
output.err = cmd.Process.Kill()
}
for _, closer := range []io.Closer{stdoutCloser, stderrCloser} {
if closer != nil {
closer.Close()
}
}
wg.Wait()
mr.MapOut(key, output)
return nil
}
func (r *runner) Reduce(mr *simplemr.MR, key string, values []interface{}) error {
for _, v := range values {
mo := v.(*mapOutput)
jirix := mo.mi.jirix
if mo.err != nil {
fmt.Fprintf(jirix.Stdout(), "FAILED: %v: %s %v\n", mo.key, strings.Join(r.args, " "), mo.err)
return mo.err
} else {
if runpFlags.collateOutput {
r.collatedOutputLock.Lock()
defer r.collatedOutputLock.Unlock()
defer os.Remove(mo.outputFilename)
if fi, err := os.Open(mo.outputFilename); err == nil {
io.Copy(jirix.Stdout(), fi)
fi.Close()
} else {
return err
}
}
}
}
return nil
}
func runp(jirix *jiri.X, cmd *cmdline.Command, args []string) error {
hasUntrackedSet := profilescmdline.IsFlagSet(cmd.ParsedFlags, "has-untracked")
hasUncommitedSet := profilescmdline.IsFlagSet(cmd.ParsedFlags, "has-uncommitted")
hasGerritSet := profilescmdline.IsFlagSet(cmd.ParsedFlags, "has-gerrit-message")
if runpFlags.interactive {
runpFlags.collateOutput = false
}
var keysRE, branchRE *regexp.Regexp
var err error
if profilescmdline.IsFlagSet(cmd.ParsedFlags, "projects") {
re := ""
for _, pre := range strings.Split(runpFlags.projectKeys, ",") {
re += pre + "|"
}
re = strings.TrimRight(re, "|")
keysRE, err = regexp.Compile(re)
if err != nil {
return fmt.Errorf("failed to compile projects regexp: %q: %v", runpFlags.projectKeys, err)
}
}
if profilescmdline.IsFlagSet(cmd.ParsedFlags, "has-branch") {
branchRE, err = regexp.Compile(runpFlags.hasBranch)
if err != nil {
return fmt.Errorf("failed to compile has-branch regexp: %q: %v", runpFlags.hasBranch, err)
}
}
for _, f := range []string{"show-key-prefix", "show-name-prefix"} {
if profilescmdline.IsFlagSet(cmd.ParsedFlags, f) {
if runpFlags.interactive && profilescmdline.IsFlagSet(cmd.ParsedFlags, "interactive") {
fmt.Fprintf(jirix.Stderr(), "WARNING: interactive mode being disabled because %s was set\n", f)
}
runpFlags.interactive = false
runpFlags.collateOutput = true
break
}
}
git := gitutil.New(jirix.NewSeq())
homeBranch, err := git.CurrentBranchName()
if err != nil {
// jiri was run from outside of a project. Let's assume we'll
// use all projects if none have been specified via the projects flag.
if keysRE == nil {
keysRE = regexp.MustCompile(".*")
}
}
dirty := false
if hasUntrackedSet || hasUncommitedSet {
dirty = true
}
states, err := project.GetProjectStates(jirix, dirty)
if err != nil {
return err
}
mapInputs := map[project.ProjectKey]*mapInput{}
var keys project.ProjectKeys
for key, state := range states {
if keysRE != nil {
if !keysRE.MatchString(string(key)) {
continue
}
} else {
if state.CurrentBranch != homeBranch {
continue
}
}
if branchRE != nil {
found := false
for _, br := range state.Branches {
if branchRE.MatchString(br.Name) {
found = true
break
}
}
if !found {
continue
}
}
if hasUntrackedSet && (state.HasUntracked != runpFlags.hasUntracked) {
continue
}
if hasUncommitedSet && (state.HasUncommitted != runpFlags.hasUncommitted) {
continue
}
if hasGerritSet {
hasMsg := false
for _, br := range state.Branches {
if (state.CurrentBranch == br.Name) && br.HasGerritMessage {
hasMsg = true
break
}
}
if hasMsg != runpFlags.hasGerritMessage {
continue
}
}
mapInputs[key] = &mapInput{
ProjectState: state,
jirix: jirix,
key: key,
}
keys = append(keys, key)
}
total := len(mapInputs)
index := 1
for _, mi := range mapInputs {
mi.index = index
mi.total = total
index++
}
if runpFlags.verbose {
fmt.Fprintf(jirix.Stdout(), "Project Names: %s\n", strings.Join(stateNames(mapInputs), " "))
fmt.Fprintf(jirix.Stdout(), "Project Keys: %s\n", strings.Join(stateKeys(mapInputs), " "))
}
reader, err := profilesreader.NewReader(jirix, runpFlags.ProfilesMode, runpFlags.DBFilename)
runner := &runner{
reader: reader,
args: args,
}
mr := simplemr.MR{}
if runpFlags.interactive {
// Run one mapper at a time.
mr.NumMappers = 1
sort.Sort(keys)
}
in, out := make(chan *simplemr.Record, len(mapInputs)), make(chan *simplemr.Record, len(mapInputs))
sigch := make(chan os.Signal)
signal.Notify(sigch, os.Interrupt)
go func() { <-sigch; mr.Cancel() }()
go mr.Run(in, out, runner, runner)
for _, key := range keys {
in <- &simplemr.Record{string(key), []interface{}{mapInputs[key]}}
}
close(in)
<-out
return mr.Error()
}
func runRunp(jirix *jiri.X, args []string) error {
return runp(jirix, cmdRunP, args)
}