| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package main |
| |
| import ( |
| "bytes" |
| "flag" |
| "fmt" |
| "io" |
| "regexp" |
| "sort" |
| "strings" |
| "sync" |
| "sync/atomic" |
| "time" |
| |
| "v.io/x/lib/cmdline" |
| deviceimpl "v.io/x/ref/services/device/internal/impl" |
| |
| "v.io/v23" |
| "v.io/v23/context" |
| "v.io/v23/naming" |
| "v.io/v23/services/device" |
| "v.io/v23/verror" |
| |
| "v.io/x/ref/lib/v23cmd" |
| ) |
| |
| // globHandler is implemented by each command that wants to execute against name |
| // patterns. The handler is expected to be invoked against each glob result, |
| // and can be run concurrently. The handler should direct its output to the |
| // given stdout and stderr writers. |
| // |
| // Typical usage: |
| // |
| // func myCmdHandler(entry globResult, ctx *context.T, stdout, stderr io.Writer) error { |
| // output := myCmdProcessing(entry) |
| // fmt.Fprintf(stdout, output) |
| // ... |
| // } |
| // |
| // func runMyCmd(ctx *context.T, env *cmdline.Env, args []string) error { |
| // ... |
| // err := run(ctx, env, args, myCmdHandler) |
| // ... |
| // } |
| // |
| // var cmdMyCmd = &cmdline.Command { |
| // Runner: v23cmd.RunnerFunc(runMyCmd) |
| // ... |
| // } |
| // |
| // Alternatively, if all runMyCmd does is to call run, you can use globRunner to |
| // avoid having to define runMyCmd: |
| // |
| // var cmdMyCmd = &cmdline.Command { |
| // Runner: globRunner(myCmdHandler) |
| // ... |
| // } |
| type globHandler func(entry globResult, ctx *context.T, stdout, stderr io.Writer) error |
| |
| func globRunner(handler globHandler) cmdline.Runner { |
| return v23cmd.RunnerFunc(func(ctx *context.T, env *cmdline.Env, args []string) error { |
| return run(ctx, env, args, handler) |
| }) |
| } |
| |
| type objectKind int |
| |
| const ( |
| applicationInstallationObject objectKind = iota |
| applicationInstanceObject |
| deviceServiceObject |
| sentinelObjectKind |
| ) |
| |
| var objectKinds = []objectKind{ |
| applicationInstallationObject, |
| applicationInstanceObject, |
| deviceServiceObject, |
| } |
| |
| func init() { |
| // TODO(caprita): Move to glob_test.go once that exists. |
| if len(objectKinds) != int(sentinelObjectKind) { |
| panic(fmt.Sprintf("broken invariant: mismatching number of object kinds")) |
| } |
| } |
| |
| type globResult struct { |
| name string |
| status device.Status |
| kind objectKind |
| } |
| |
| type byTypeAndName []globResult |
| |
| func (a byTypeAndName) Len() int { return len(a) } |
| func (a byTypeAndName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } |
| |
| func (a byTypeAndName) Less(i, j int) bool { |
| r1, r2 := a[i], a[j] |
| if r1.kind != r2.kind { |
| return r1.kind < r2.kind |
| } |
| return r1.name < r2.name |
| } |
| |
| // run runs the given handler in parallel against each of the results obtained |
| // by globbing args, after performing filtering based on type |
| // (instance/installation) and state. No de-duping of results is performed. |
| // The outputs from each of the handlers are sorted: installations first, then |
| // instances (and alphabetically by object name for each group). |
| func run(ctx *context.T, env *cmdline.Env, args []string, handler globHandler) error { |
| results := glob(ctx, env, args) |
| sort.Sort(byTypeAndName(results)) |
| results = filterResults(results) |
| stdouts, stderrs := make([]bytes.Buffer, len(results)), make([]bytes.Buffer, len(results)) |
| var errorCounter uint32 = 0 |
| perResult := func(r globResult, index int) { |
| if err := handler(r, ctx, &stdouts[index], &stderrs[index]); err != nil { |
| fmt.Fprintf(&stderrs[index], "ERROR for \"%s\": %v.\n", r.name, err) |
| atomic.AddUint32(&errorCounter, 1) |
| } |
| } |
| // TODO(caprita): Add unit test logic to cover all parallelism options. |
| switch handlerParallelism { |
| case fullParallelism: |
| var wg sync.WaitGroup |
| for i, r := range results { |
| wg.Add(1) |
| go func(r globResult, i int) { |
| perResult(r, i) |
| wg.Done() |
| }(r, i) |
| } |
| wg.Wait() |
| case noParallelism: |
| for i, r := range results { |
| perResult(r, i) |
| } |
| case kindParallelism: |
| processed := 0 |
| for _, k := range objectKinds { |
| var wg sync.WaitGroup |
| for i, r := range results { |
| if r.kind != k { |
| continue |
| } |
| wg.Add(1) |
| processed++ |
| go func(r globResult, i int) { |
| perResult(r, i) |
| wg.Done() |
| }(r, i) |
| } |
| wg.Wait() |
| } |
| if processed != len(results) { |
| return fmt.Errorf("broken invariant: unhandled object kind") |
| } |
| } |
| for i := range results { |
| io.Copy(env.Stdout, &stdouts[i]) |
| io.Copy(env.Stderr, &stderrs[i]) |
| } |
| if errorCounter > 0 { |
| return fmt.Errorf("encountered a total of %d error(s)", errorCounter) |
| } |
| return nil |
| } |
| |
| func filterResults(results []globResult) []globResult { |
| var ret []globResult |
| for _, r := range results { |
| switch s := r.status.(type) { |
| case device.StatusInstance: |
| if onlyInstallations || !instanceStateFilter.apply(s.Value.State) { |
| continue |
| } |
| case device.StatusInstallation: |
| if onlyInstances || !installationStateFilter.apply(s.Value.State) { |
| continue |
| } |
| } |
| ret = append(ret, r) |
| } |
| return ret |
| } |
| |
| // TODO(caprita): We need to filter out debug objects under the app instances' |
| // namespaces, so that the tool works with ... patterns. We should change glob |
| // on device manager to not return debug objects by default for apps and instead |
| // put them under a __debug suffix (like it works for services). |
| var debugNameRE = regexp.MustCompile("/apps/[^/]+/[^/]+/[^/]+/(logs|stats|pprof)(/|$)") |
| |
| func getStatus(ctx *context.T, env *cmdline.Env, name string, resultsCh chan<- globResult) { |
| status, err := device.DeviceClient(name).Status(ctx) |
| // Skip non-instances/installations. |
| if verror.ErrorID(err) == deviceimpl.ErrInvalidSuffix.ID { |
| return |
| } |
| if err != nil { |
| fmt.Fprintf(env.Stderr, "Status(%v) failed: %v\n", name, err) |
| return |
| } |
| var kind objectKind |
| switch status.(type) { |
| case device.StatusInstallation: |
| kind = applicationInstallationObject |
| case device.StatusInstance: |
| kind = applicationInstanceObject |
| case device.StatusDevice: |
| kind = deviceServiceObject |
| default: |
| fmt.Fprintf(env.Stderr, "Status(%v) returned unrecognized status type %T\n", name, status) |
| return |
| } |
| resultsCh <- globResult{name, status, kind} |
| } |
| |
| func globOne(ctx *context.T, env *cmdline.Env, pattern string, resultsCh chan<- globResult) { |
| globCh, err := v23.GetNamespace(ctx).Glob(ctx, pattern) |
| if err != nil { |
| fmt.Fprintf(env.Stderr, "Glob(%v) failed: %v\n", pattern, err) |
| return |
| } |
| var wg sync.WaitGroup |
| // For each glob result. |
| for entry := range globCh { |
| switch v := entry.(type) { |
| case *naming.GlobReplyEntry: |
| name := v.Value.Name |
| // Skip debug objects. |
| if debugNameRE.MatchString(name) { |
| continue |
| } |
| wg.Add(1) |
| go func() { |
| getStatus(ctx, env, name, resultsCh) |
| wg.Done() |
| }() |
| case *naming.GlobReplyError: |
| fmt.Fprintf(env.Stderr, "Glob(%v) returned an error for %v: %v\n", pattern, v.Value.Name, v.Value.Error) |
| } |
| } |
| wg.Wait() |
| } |
| |
| // glob globs the given arguments and returns the results in arbitrary order. |
| func glob(ctx *context.T, env *cmdline.Env, args []string) []globResult { |
| ctx, cancel := context.WithTimeout(ctx, time.Minute) |
| defer cancel() |
| var wg sync.WaitGroup |
| resultsCh := make(chan globResult, 100) |
| // For each pattern. |
| for _, a := range args { |
| wg.Add(1) |
| go func(pattern string) { |
| globOne(ctx, env, pattern, resultsCh) |
| wg.Done() |
| }(a) |
| } |
| // Collect the glob results into a slice. |
| var results []globResult |
| done := make(chan struct{}) |
| go func() { |
| for r := range resultsCh { |
| results = append(results, r) |
| } |
| close(done) |
| }() |
| wg.Wait() |
| close(resultsCh) |
| <-done |
| return results |
| } |
| |
| type instanceStateFlag map[device.InstanceState]struct{} |
| |
| func (f *instanceStateFlag) apply(state device.InstanceState) bool { |
| if len(*f) == 0 { |
| return true |
| } |
| _, ok := (*f)[state] |
| return ok |
| } |
| |
| func (f *instanceStateFlag) String() string { |
| states := make([]string, 0, len(*f)) |
| for s := range *f { |
| states = append(states, s.String()) |
| } |
| sort.Strings(states) |
| return strings.Join(states, ",") |
| } |
| |
| func (f *instanceStateFlag) Set(s string) error { |
| if *f == nil { |
| *f = make(instanceStateFlag) |
| } |
| states := strings.Split(s, ",") |
| for _, s := range states { |
| state, err := device.InstanceStateFromString(s) |
| if err != nil { |
| return err |
| } |
| (*f)[state] = struct{}{} |
| } |
| return nil |
| } |
| |
| type installationStateFlag map[device.InstallationState]struct{} |
| |
| func (f *installationStateFlag) apply(state device.InstallationState) bool { |
| if len(*f) == 0 { |
| return true |
| } |
| _, ok := (*f)[state] |
| return ok |
| } |
| |
| func (f *installationStateFlag) String() string { |
| states := make([]string, 0, len(*f)) |
| for s := range *f { |
| states = append(states, s.String()) |
| } |
| sort.Strings(states) |
| return strings.Join(states, ",") |
| } |
| |
| func (f *installationStateFlag) Set(s string) error { |
| if *f == nil { |
| *f = make(installationStateFlag) |
| } |
| states := strings.Split(s, ",") |
| for _, s := range states { |
| state, err := device.InstallationStateFromString(s) |
| if err != nil { |
| return err |
| } |
| (*f)[state] = struct{}{} |
| } |
| return nil |
| } |
| |
| type parallelismFlag int |
| |
| const ( |
| fullParallelism parallelismFlag = iota |
| noParallelism |
| kindParallelism |
| sentinelParallelismFlag |
| ) |
| |
| var parallelismStrings = map[parallelismFlag]string{ |
| fullParallelism: "FULL", |
| noParallelism: "NONE", |
| kindParallelism: "BYKIND", |
| } |
| |
| const defaultParallelism = kindParallelism |
| |
| func init() { |
| if len(parallelismStrings) != int(sentinelParallelismFlag) { |
| panic(fmt.Sprintf("broken invariant: mismatching number of parallelism types")) |
| } |
| } |
| |
| func (p *parallelismFlag) String() string { |
| s, ok := parallelismStrings[*p] |
| if !ok { |
| return "UNKNOWN" |
| } |
| return s |
| } |
| |
| func (p *parallelismFlag) Set(s string) error { |
| for k, v := range parallelismStrings { |
| if s == v { |
| *p = k |
| return nil |
| } |
| } |
| return fmt.Errorf("unrecognized parallelism type: %v", s) |
| } |
| |
| var ( |
| instanceStateFilter instanceStateFlag |
| installationStateFilter installationStateFlag |
| onlyInstances bool |
| onlyInstallations bool |
| handlerParallelism parallelismFlag = defaultParallelism |
| ) |
| |
| func init() { |
| // NOTE: When addind new flags or changing default values, remember to |
| // also update ResetGlobFlags below. |
| flag.Var(&instanceStateFilter, "instance-state", fmt.Sprintf("If non-empty, specifies allowed instance states (all other instances get filtered out). The value of the flag is a comma-separated list of values from among: %v.", device.InstanceStateAll)) |
| flag.Var(&installationStateFilter, "installation-state", fmt.Sprintf("If non-empty, specifies allowed installation states (all others installations get filtered out). The value of the flag is a comma-separated list of values from among: %v.", device.InstallationStateAll)) |
| flag.BoolVar(&onlyInstances, "only-instances", false, "If set, only consider instances.") |
| flag.BoolVar(&onlyInstallations, "only-installations", false, "If set, only consider installations.") |
| var parallelismValues []string |
| for _, v := range parallelismStrings { |
| parallelismValues = append(parallelismValues, v) |
| } |
| sort.Strings(parallelismValues) |
| flag.Var(&handlerParallelism, "parallelism", "Specifies the level of parallelism for the handler execution. One of: "+strings.Join(parallelismValues, ",")) |
| } |
| |
| // ResetGlobFlags is meant for tests to restore the values of flag-configured |
| // variables when running multiple commands in the same process. |
| func ResetGlobFlags() { |
| instanceStateFilter = make(instanceStateFlag) |
| installationStateFilter = make(installationStateFlag) |
| onlyInstances = false |
| onlyInstallations = false |
| handlerParallelism = defaultParallelism |
| } |