vmon/lib: add vmon check for cloud syncbase instances.
Change-Id: Iaaf2c16304d415b15be9ac97c14ff06282213fe8
diff --git a/internal/monitoring/monitoring.go b/internal/monitoring/monitoring.go
index 8b10445..8081feb 100644
--- a/internal/monitoring/monitoring.go
+++ b/internal/monitoring/monitoring.go
@@ -82,8 +82,10 @@
return i, nil
case int64:
return float64(i), nil
+ case uint64:
+ return float64(i), nil
default:
- return 0, fmt.Errorf("invalid value: %v", sv.Value)
+ return 0, fmt.Errorf("invalid value: %v, %T", sv.Value, sv.Value)
}
}
@@ -107,6 +109,15 @@
// customMetricDescriptors is a map from metric's short names to their
// MetricDescriptor definitions.
var customMetricDescriptors = map[string]*cloudmonitoring.MetricDescriptor{
+ // Custom metrics for recording stats of cloud syncbase instances.
+ "cloud-syncbase": createMetric("cloud-syncbase", "Stats of cloud syncbase instances.", "double", false, []labelData{
+ labelData{
+ key: "mounted_name",
+ description: "The relative mounted name of the instance",
+ },
+ }),
+ "cloud-syncbase-agg": createMetric("cloud-syncbase-agg", "The aggregated stats of cloud syncbase instances.", "double", false, aggLabelData),
+
// Custom metrics for recording check latency and its aggregation
// of vanadium production services.
"service-latency": createMetric("service/latency", "The check latency (ms) of vanadium production services.", "double", true, nil),
diff --git a/vmon/check.go b/vmon/check.go
index a628334..1b46d09 100644
--- a/vmon/check.go
+++ b/vmon/check.go
@@ -20,6 +20,7 @@
// checkFunctions is a map from check names to the corresponding check functions.
var checkFunctions = map[string]func(*context.T, *tool.Context, *cloudmonitoring.Service) error{
+ "cloud-syncbase": checkCloudSyncbaseInstances,
"jenkins": checkJenkins,
"service-latency": checkServiceLatency,
"service-permethod-latency": checkServicePerMethodLatency,
diff --git a/vmon/cloudsyncbase.go b/vmon/cloudsyncbase.go
new file mode 100644
index 0000000..9b086a5
--- /dev/null
+++ b/vmon/cloudsyncbase.go
@@ -0,0 +1,234 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "fmt"
+ "runtime"
+ "time"
+
+ cloudmonitoring "google.golang.org/api/monitoring/v3"
+
+ "v.io/jiri/tool"
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/naming"
+ "v.io/x/devtools/internal/monitoring"
+ "v.io/x/devtools/internal/test"
+)
+
+const (
+ cloudSyncbasePattern = "home/*/syncbased"
+ sysStatCPUUsagePercent = "__debug/stats/system/syscpu/Percent"
+ sysStatMemUsagePercent = "__debug/stats/system/sysmem/UsedPercent"
+ sysStatMemUsageBytes = "__debug/stats/system/sysmem/Used"
+ sysStatDiskUsagePercent = "__debug/stats/system/sysdisk/%2Fdata/UsedPercent"
+ sysStatDiskUsageBytes = "__debug/stats/system/sysdisk/%2Fdata/Used"
+)
+
+var (
+ cloudSyncbaseTimeout = 20 * time.Second
+)
+
+type cloudSyncbaseStatsTask struct {
+ mountedName string
+ sbMountEntry *naming.MountEntry
+ taskType cloudSyncbaseStatsTaskType
+}
+
+type cloudSyncbaseStatsTaskType int
+
+const (
+ taskTypeCpu cloudSyncbaseStatsTaskType = iota
+ taskTypeMem
+ taskTypeDisk
+ taskTypeLatency
+ taskTypeQPS
+)
+
+type cloudSyncbaseStatsResult struct {
+ err error
+}
+
+type metricData struct {
+ metricLabel string
+ value float64
+}
+
+func checkCloudSyncbaseInstances(v23ctx *context.T, ctx *tool.Context, s *cloudmonitoring.Service) error {
+ v23ctx, cancel := context.WithTimeout(v23ctx, cloudSyncbaseTimeout)
+ defer cancel()
+
+ // Find all cloud syncbase instances.
+ glob, err := v23.GetNamespace(v23ctx).Glob(v23ctx, cloudSyncbasePattern)
+ if err != nil {
+ return err
+ }
+ sbInstances := []*naming.MountEntry{}
+ for e := range glob {
+ switch e := e.(type) {
+ case *naming.GlobReplyEntry:
+ sbInstances = append(sbInstances, &e.Value)
+ case *naming.GlobReplyError:
+ fmt.Fprintf(ctx.Stderr(), "%v\n", e.Value.Error)
+ }
+ }
+
+ // Query stats from each instance.
+ taskTypes := []cloudSyncbaseStatsTaskType{taskTypeCpu, taskTypeMem, taskTypeDisk, taskTypeLatency, taskTypeQPS}
+ numTasks := len(sbInstances) * len(taskTypes)
+ tasks := make(chan cloudSyncbaseStatsTask, numTasks)
+ taskResults := make(chan cloudSyncbaseStatsResult, numTasks)
+ aggs := map[string]*aggregator{}
+ md, err := monitoring.GetMetric("cloud-syncbase", projectFlag)
+ if err != nil {
+ return err
+ }
+ now := time.Now().UTC().Format(time.RFC3339)
+ // Start workers and distribute work to them.
+ for i := 0; i < runtime.NumCPU(); i++ {
+ go statsWorker(v23ctx, ctx, s, now, aggs, md, tasks, taskResults)
+ }
+ for _, sb := range sbInstances {
+ for _, t := range taskTypes {
+ tasks <- cloudSyncbaseStatsTask{
+ mountedName: sb.Name,
+ sbMountEntry: sb,
+ taskType: t,
+ }
+ }
+ }
+ close(tasks)
+
+ // Wait for all results to come back and send the aggregated data to GCM.
+ for i := 0; i < numTasks; i++ {
+ result := <-taskResults
+ if result.err != nil {
+ fmt.Fprintf(ctx.Stderr(), "%v\n", result.err)
+ continue
+ }
+ }
+ mdAgg, err := monitoring.GetMetric("cloud-syncbase-agg", projectFlag)
+ if err != nil {
+ return err
+ }
+ for metricLabel, agg := range aggs {
+ if err := sendAggregatedDataToGCM(ctx, s, mdAgg, agg, now, metricLabel); err != nil {
+ fmt.Fprintf(ctx.Stderr(), "%v\n", err)
+ }
+ }
+
+ return nil
+}
+
+// statsWorker queries stats based on the task type, sends the results to GCM,
+// and updates the corresponding aggregator.
+func statsWorker(
+ v23ctx *context.T, ctx *tool.Context, s *cloudmonitoring.Service,
+ now string, aggs map[string]*aggregator, md *cloudmonitoring.MetricDescriptor,
+ tasks <-chan cloudSyncbaseStatsTask, results chan<- cloudSyncbaseStatsResult) {
+ for t := range tasks {
+ result := cloudSyncbaseStatsResult{}
+ metrics := []metricData{}
+ switch t.taskType {
+ case taskTypeCpu:
+ if cpuUsagePct, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatCPUUsagePercent); err != nil {
+ result.err = err
+ } else {
+ metrics = append(metrics, metricData{
+ metricLabel: "syscpu-usage-pct",
+ value: cpuUsagePct,
+ })
+ }
+ case taskTypeMem:
+ if memUsagePct, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatMemUsagePercent); err != nil {
+ result.err = err
+ } else {
+ metrics = append(metrics,
+ metricData{
+ metricLabel: "sysmem-usage-pct",
+ value: memUsagePct,
+ })
+ }
+ if memUsageBytes, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatMemUsageBytes); err != nil {
+ result.err = err
+ } else {
+ metrics = append(metrics,
+ metricData{
+ metricLabel: "sysmem-usage-bytes",
+ value: memUsageBytes,
+ })
+ }
+ case taskTypeDisk:
+ if diskUsagePct, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatDiskUsagePercent); err != nil {
+ result.err = err
+ } else {
+ metrics = append(metrics,
+ metricData{
+ metricLabel: "sysdisk-usage-pct",
+ value: diskUsagePct,
+ })
+ }
+ if diskUsageBytes, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatDiskUsageBytes); err != nil {
+ result.err = err
+ } else {
+ metrics = append(metrics,
+ metricData{
+ metricLabel: "sysdisk-usage-bytes",
+ value: diskUsageBytes,
+ })
+ }
+ case taskTypeLatency:
+ if lat, err := getLatency(v23ctx, t.sbMountEntry); err != nil {
+ result.err = err
+ } else {
+ metrics = append(metrics, metricData{
+ metricLabel: "latency",
+ value: float64(lat.Nanoseconds()) / 1000000.0,
+ })
+ }
+ case taskTypeQPS:
+ if _, totalQPS, err := getQPS(v23ctx, ctx, t.sbMountEntry); err != nil {
+ result.err = err
+ } else {
+ metrics = append(metrics, metricData{
+ metricLabel: "qps",
+ value: totalQPS,
+ })
+ }
+ }
+ for _, metric := range metrics {
+ getAggregator(aggs, metric.metricLabel).add(metric.value)
+ if err := sendDataToGCM(s, md, metric.value, now, "", "", metric.metricLabel, t.mountedName); err != nil {
+ fmt.Fprintf(ctx.Stderr(), "%v\n", err)
+ } else {
+ test.Pass(ctx, "%s, %s: %v\n", t.mountedName, metric.metricLabel, metric.value)
+ }
+ }
+ results <- result
+ }
+}
+
+func getAggregator(aggs map[string]*aggregator, metricLabel string) *aggregator {
+ _, ok := aggs[metricLabel]
+ if !ok {
+ aggs[metricLabel] = newAggregator()
+ }
+ return aggs[metricLabel]
+}
+
+func getSysStat(v23ctx *context.T, ctx *tool.Context, me *naming.MountEntry, stat string) (float64, error) {
+ me.Name = "" // Need this to make monitoring.GetStat work correctly.
+ values, err := monitoring.GetStat(v23ctx, ctx, *me, stat)
+ if err != nil {
+ return -1, err
+ }
+ v := values[0]
+ fv, err := v.GetFloat64Value()
+ if err != nil {
+ return -1, err
+ }
+ return fv, nil
+}
diff --git a/vmon/doc.go b/vmon/doc.go
index 3972f6c..9d6524e 100644
--- a/vmon/doc.go
+++ b/vmon/doc.go
@@ -115,11 +115,12 @@
Usage:
vmon md create [flags] <names>
-<names> is a list of metric descriptor names to create. Available: gce-instance,
-jenkins, nginx, rpc-load-test, service-counters, service-counters-agg,
-service-latency, service-latency-agg, service-metadata, service-metadata-agg,
-service-permethod-latency, service-permethod-latency-agg, service-qps-method,
-service-qps-method-agg, service-qps-total, service-qps-total-agg
+<names> is a list of metric descriptor names to create. Available:
+cloud-syncbase, cloud-syncbase-agg, gce-instance, jenkins, nginx, rpc-load-test,
+service-counters, service-counters-agg, service-latency, service-latency-agg,
+service-metadata, service-metadata-agg, service-permethod-latency,
+service-permethod-latency-agg, service-qps-method, service-qps-method-agg,
+service-qps-total, service-qps-total-agg
The vmon md create flags are:
-color=true
@@ -138,11 +139,12 @@
Usage:
vmon md delete [flags] <names>
-<names> is a list of metric descriptor names to delete. Available: gce-instance,
-jenkins, nginx, rpc-load-test, service-counters, service-counters-agg,
-service-latency, service-latency-agg, service-metadata, service-metadata-agg,
-service-permethod-latency, service-permethod-latency-agg, service-qps-method,
-service-qps-method-agg, service-qps-total, service-qps-total-agg
+<names> is a list of metric descriptor names to delete. Available:
+cloud-syncbase, cloud-syncbase-agg, gce-instance, jenkins, nginx, rpc-load-test,
+service-counters, service-counters-agg, service-latency, service-latency-agg,
+service-metadata, service-metadata-agg, service-permethod-latency,
+service-permethod-latency-agg, service-qps-method, service-qps-method-agg,
+service-qps-total, service-qps-total-agg
The vmon md delete flags are:
-color=true
@@ -254,8 +256,8 @@
vmon check run [flags] <names>
<names> is a list of names identifying the checks to run. Available:
-gce-instance, jenkins, rpc-load-test, service-counters, service-latency,
-service-metadata, service-permethod-latency, service-qps
+cloud-syncbase, gce-instance, jenkins, rpc-load-test, service-counters,
+service-latency, service-metadata, service-permethod-latency, service-qps
The vmon check run flags are:
-bin-dir=
diff --git a/vmon/servicecommon.go b/vmon/servicecommon.go
index b27a403..f40c9db 100644
--- a/vmon/servicecommon.go
+++ b/vmon/servicecommon.go
@@ -112,7 +112,7 @@
},
},
}}).Do(); err != nil {
- return fmt.Errorf("Timeseries Write failed for metric %q with value %f: %v", md.Name, value, err)
+ return fmt.Errorf("Timeseries Write failed for metric %q with value %f and labels %v: %v", md.Name, value, labels, err)
}
return nil
}
diff --git a/vmon/servicelatency.go b/vmon/servicelatency.go
index 4719871..d999475 100644
--- a/vmon/servicelatency.go
+++ b/vmon/servicelatency.go
@@ -12,6 +12,7 @@
"v.io/jiri/tool"
"v.io/v23/context"
+ "v.io/v23/naming"
"v.io/v23/options"
"v.io/v23/rpc/reserved"
"v.io/v23/verror"
@@ -113,17 +114,12 @@
latencies := []latencyData{}
errors := []error{}
for _, group := range groups {
- latency := timeout
v23ctx, cancel := context.WithTimeout(v23ctx, timeout)
defer cancel()
- start := time.Now()
- if _, err := reserved.Signature(v23ctx, "", options.Preresolved{&group}); err != nil {
- if verror.ErrorID(err) != verror.ErrTimeout.ID {
- errors = append(errors, err)
- continue
- }
- } else {
- latency = time.Now().Sub(start)
+ latency, err := getLatency(v23ctx, &group)
+ if err != nil {
+ errors = append(errors, err)
+ continue
}
location, err := monitoring.GetServiceLocation(v23ctx, ctx, group)
if err != nil {
@@ -141,3 +137,16 @@
return latencies, nil
}
+
+func getLatency(v23ctx *context.T, me *naming.MountEntry) (time.Duration, error) {
+ latency := timeout
+ start := time.Now()
+ if _, err := reserved.Signature(v23ctx, "", options.Preresolved{me}); err != nil {
+ if verror.ErrorID(err) != verror.ErrTimeout.ID {
+ return -1, err
+ }
+ } else {
+ latency = time.Now().Sub(start)
+ }
+ return latency, nil
+}
diff --git a/vmon/serviceqps.go b/vmon/serviceqps.go
index 7044731..f7c076d 100644
--- a/vmon/serviceqps.go
+++ b/vmon/serviceqps.go
@@ -14,6 +14,7 @@
"v.io/jiri/tool"
"v.io/v23/context"
+ "v.io/v23/naming"
"v.io/x/devtools/internal/monitoring"
"v.io/x/devtools/internal/test"
"v.io/x/ref/services/stats"
@@ -141,31 +142,11 @@
qps := []qpsData{}
errors := []error{}
for _, group := range groups {
- perMethodQPS := map[string]float64{}
- totalQPS := 0.0
- qpsResults, err := monitoring.GetStat(v23ctx, ctx, group, qpsSuffix)
+ perMethodQPS, totalQPS, err := getQPS(v23ctx, ctx, &group)
if err != nil {
errors = append(errors, err)
continue
}
- curPerMethodQPS := map[string]float64{}
- curTotalQPS := 0.0
- for _, r := range qpsResults {
- data, ok := r.Value.(stats.HistogramValue)
- if !ok {
- return nil, fmt.Errorf("invalid qps data: %v", r)
- }
- matches := qpsRE.FindStringSubmatch(r.Name)
- if matches == nil {
- continue
- }
- method := matches[1]
- qps := (float64)(data.Count) / 60.0
- curPerMethodQPS[method] += qps
- curTotalQPS += qps
- }
- perMethodQPS = curPerMethodQPS
- totalQPS = curTotalQPS
if len(perMethodQPS) == 0 {
errors = append(errors, fmt.Errorf("failed to check qps for service %q", serviceName))
continue
@@ -188,3 +169,27 @@
return qps, nil
}
+
+func getQPS(v23ctx *context.T, ctx *tool.Context, me *naming.MountEntry) (map[string]float64, float64, error) {
+ qpsResults, err := monitoring.GetStat(v23ctx, ctx, *me, qpsSuffix)
+ if err != nil {
+ return nil, -1, err
+ }
+ perMethodQPS := map[string]float64{}
+ totalQPS := 0.0
+ for _, r := range qpsResults {
+ data, ok := r.Value.(stats.HistogramValue)
+ if !ok {
+ return nil, -1, fmt.Errorf("invalid qps data: %v", r)
+ }
+ matches := qpsRE.FindStringSubmatch(r.Name)
+ if matches == nil {
+ continue
+ }
+ method := matches[1]
+ qps := (float64)(data.Count) / 60.0
+ perMethodQPS[method] += qps
+ totalQPS += qps
+ }
+ return perMethodQPS, totalQPS, nil
+}