| // Copyright 2016 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package main |
| |
| import ( |
| "fmt" |
| "runtime" |
| "time" |
| |
| cloudmonitoring "google.golang.org/api/monitoring/v3" |
| |
| "v.io/jiri/tool" |
| "v.io/v23" |
| "v.io/v23/context" |
| "v.io/v23/naming" |
| "v.io/x/devtools/internal/monitoring" |
| "v.io/x/devtools/internal/test" |
| "v.io/x/lib/gcm" |
| ) |
| |
| const ( |
| cloudSyncbasePattern = "sb/syncbased-*" |
| sysStatCPUUsagePercent = "__debug/stats/system/syscpu/Percent" |
| sysStatMemUsagePercent = "__debug/stats/system/sysmem/UsedPercent" |
| sysStatMemUsageBytes = "__debug/stats/system/sysmem/Used" |
| sysStatDiskUsagePercent = "__debug/stats/system/sysdisk/%2Fdata/UsedPercent" |
| sysStatDiskUsageBytes = "__debug/stats/system/sysdisk/%2Fdata/Used" |
| ) |
| |
| var ( |
| cloudSyncbaseTimeout = 20 * time.Second |
| ) |
| |
| type cloudSyncbaseStatsTask struct { |
| mountedName string |
| sbMountEntry *naming.MountEntry |
| taskType cloudSyncbaseStatsTaskType |
| } |
| |
| type cloudSyncbaseStatsTaskType int |
| |
| const ( |
| taskTypeCpu cloudSyncbaseStatsTaskType = iota |
| taskTypeMem |
| taskTypeDisk |
| taskTypeLatency |
| taskTypeQPS |
| ) |
| |
| type cloudSyncbaseStatsResult struct { |
| err error |
| } |
| |
| type metricData struct { |
| metricLabel string |
| value float64 |
| } |
| |
| func checkCloudSyncbaseInstances(v23ctx *context.T, ctx *tool.Context, s *cloudmonitoring.Service) error { |
| v23ctx, cancel := context.WithTimeout(v23ctx, cloudSyncbaseTimeout) |
| defer cancel() |
| |
| // Find all cloud syncbase instances. |
| glob, err := v23.GetNamespace(v23ctx).Glob(v23ctx, cloudSyncbasePattern) |
| if err != nil { |
| return err |
| } |
| sbInstances := []*naming.MountEntry{} |
| for e := range glob { |
| switch e := e.(type) { |
| case *naming.GlobReplyEntry: |
| sbInstances = append(sbInstances, &e.Value) |
| case *naming.GlobReplyError: |
| fmt.Fprintf(ctx.Stderr(), "%v\n", e.Value.Error) |
| } |
| } |
| |
| // Send number of instances to GCM. |
| md, err := gcm.GetMetric("cloud-syncbase", projectFlag) |
| if err != nil { |
| return err |
| } |
| now := time.Now().UTC().Format(time.RFC3339) |
| numInstances := len(sbInstances) |
| if err := sendDataToGCM(s, md, float64(numInstances), now, "", "", "count", "_"); err != nil { |
| fmt.Fprintf(ctx.Stderr(), "%v\n", err) |
| } else { |
| test.Pass(ctx, "number of instances: %v\n", numInstances) |
| } |
| |
| // Query stats from each instance. |
| taskTypes := []cloudSyncbaseStatsTaskType{taskTypeCpu, taskTypeMem, taskTypeDisk, taskTypeLatency, taskTypeQPS} |
| numTasks := numInstances * len(taskTypes) |
| tasks := make(chan cloudSyncbaseStatsTask, numTasks) |
| taskResults := make(chan cloudSyncbaseStatsResult, numTasks) |
| aggs := map[string]*aggregator{} |
| // Start workers and distribute work to them. |
| for i := 0; i < runtime.NumCPU(); i++ { |
| go statsWorker(v23ctx, ctx, s, now, aggs, md, tasks, taskResults) |
| } |
| for _, sb := range sbInstances { |
| for _, t := range taskTypes { |
| tasks <- cloudSyncbaseStatsTask{ |
| mountedName: sb.Name, |
| sbMountEntry: sb, |
| taskType: t, |
| } |
| } |
| } |
| close(tasks) |
| |
| // Wait for all results to come back and send the aggregated data to GCM. |
| for i := 0; i < numTasks; i++ { |
| result := <-taskResults |
| if result.err != nil { |
| fmt.Fprintf(ctx.Stderr(), "%v\n", result.err) |
| continue |
| } |
| } |
| mdAgg, err := gcm.GetMetric("cloud-syncbase-agg", projectFlag) |
| if err != nil { |
| return err |
| } |
| for metricLabel, agg := range aggs { |
| if err := sendAggregatedDataToGCM(ctx, s, mdAgg, agg, now, metricLabel); err != nil { |
| fmt.Fprintf(ctx.Stderr(), "%v\n", err) |
| } |
| } |
| |
| return nil |
| } |
| |
| // statsWorker queries stats based on the task type, sends the results to GCM, |
| // and updates the corresponding aggregator. |
| func statsWorker( |
| v23ctx *context.T, ctx *tool.Context, s *cloudmonitoring.Service, |
| now string, aggs map[string]*aggregator, md *cloudmonitoring.MetricDescriptor, |
| tasks <-chan cloudSyncbaseStatsTask, results chan<- cloudSyncbaseStatsResult) { |
| for t := range tasks { |
| result := cloudSyncbaseStatsResult{} |
| metrics := []metricData{} |
| switch t.taskType { |
| case taskTypeCpu: |
| if cpuUsagePct, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatCPUUsagePercent); err != nil { |
| result.err = err |
| } else { |
| metrics = append(metrics, metricData{ |
| metricLabel: "syscpu-usage-pct", |
| value: cpuUsagePct, |
| }) |
| } |
| case taskTypeMem: |
| if memUsagePct, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatMemUsagePercent); err != nil { |
| result.err = err |
| } else { |
| metrics = append(metrics, |
| metricData{ |
| metricLabel: "sysmem-usage-pct", |
| value: memUsagePct, |
| }) |
| } |
| if memUsageBytes, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatMemUsageBytes); err != nil { |
| result.err = err |
| } else { |
| metrics = append(metrics, |
| metricData{ |
| metricLabel: "sysmem-usage-bytes", |
| value: memUsageBytes, |
| }) |
| } |
| case taskTypeDisk: |
| if diskUsagePct, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatDiskUsagePercent); err != nil { |
| result.err = err |
| } else { |
| metrics = append(metrics, |
| metricData{ |
| metricLabel: "sysdisk-usage-pct", |
| value: diskUsagePct, |
| }) |
| } |
| if diskUsageBytes, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatDiskUsageBytes); err != nil { |
| result.err = err |
| } else { |
| metrics = append(metrics, |
| metricData{ |
| metricLabel: "sysdisk-usage-bytes", |
| value: diskUsageBytes, |
| }) |
| } |
| case taskTypeLatency: |
| if lat, err := getLatency(v23ctx, t.sbMountEntry); err != nil { |
| result.err = err |
| } else { |
| metrics = append(metrics, metricData{ |
| metricLabel: "latency", |
| value: float64(lat.Nanoseconds()) / 1000000.0, |
| }) |
| } |
| case taskTypeQPS: |
| if _, totalQPS, err := getQPS(v23ctx, ctx, t.sbMountEntry); err != nil { |
| result.err = err |
| } else { |
| metrics = append(metrics, metricData{ |
| metricLabel: "qps", |
| value: totalQPS, |
| }) |
| } |
| } |
| for _, metric := range metrics { |
| getAggregator(aggs, metric.metricLabel).add(metric.value) |
| if err := sendDataToGCM(s, md, metric.value, now, "", "", metric.metricLabel, t.mountedName); err != nil { |
| fmt.Fprintf(ctx.Stderr(), "%v\n", err) |
| } else { |
| test.Pass(ctx, "%s, %s: %v\n", t.mountedName, metric.metricLabel, metric.value) |
| } |
| } |
| results <- result |
| } |
| } |
| |
| func getAggregator(aggs map[string]*aggregator, metricLabel string) *aggregator { |
| _, ok := aggs[metricLabel] |
| if !ok { |
| aggs[metricLabel] = newAggregator() |
| } |
| return aggs[metricLabel] |
| } |
| |
| func getSysStat(v23ctx *context.T, ctx *tool.Context, me *naming.MountEntry, stat string) (float64, error) { |
| me.Name = "" // Need this to make monitoring.GetStat work correctly. |
| values, err := monitoring.GetStat(v23ctx, ctx, *me, stat) |
| if err != nil { |
| return -1, err |
| } |
| v := values[0] |
| fv, err := v.GetFloat64Value() |
| if err != nil { |
| return -1, err |
| } |
| return fv, nil |
| } |