blob: 08e4912fdc242ae1cce09bf5c09b3f3c95e94165 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"fmt"
"regexp"
"sort"
"time"
cloudmonitoring "google.golang.org/api/monitoring/v3"
"v.io/jiri/tool"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/x/devtools/internal/monitoring"
"v.io/x/devtools/internal/test"
"v.io/x/lib/gcm"
"v.io/x/ref/services/stats"
)
const (
qpsSuffix = "__debug/stats/rpc/server/routing-id/*/methods/*/latency-ms/delta1m"
)
var (
qpsRE = regexp.MustCompile(`.*/methods/([^/]*)/.*`)
)
type qpsData struct {
location *monitoring.ServiceLocation
perMethodQPS map[string]float64
totalQPS float64
}
// checkServiceQPS checks service RPC QPS (per-method and total) and adds
// the results to GCM.
func checkServiceQPS(v23ctx *context.T, ctx *tool.Context, s *cloudmonitoring.Service) error {
serviceNames := []string{
monitoring.SNMounttable,
monitoring.SNIdentity,
monitoring.SNRole,
monitoring.SNProxy,
monitoring.SNBenchmark,
monitoring.SNAllocator,
}
hasError := false
mdPerMethodQPS, err := gcm.GetMetric("service-qps-method", projectFlag)
if err != nil {
return err
}
mdTotalQPS, err := gcm.GetMetric("service-qps-total", projectFlag)
if err != nil {
return err
}
now := time.Now().UTC().Format(time.RFC3339)
for _, serviceName := range serviceNames {
qps, err := checkSingleServiceQPS(v23ctx, ctx, serviceName)
if err != nil {
test.Fail(ctx, "%s\n", serviceName)
fmt.Fprintf(ctx.Stderr(), "%v\n", err)
hasError = true
continue
}
agg := newAggregator()
aggByMethod := map[string]*aggregator{}
for _, curQPS := range qps {
instance := curQPS.location.Instance
zone := curQPS.location.Zone
agg.add(curQPS.totalQPS)
label := fmt.Sprintf("%s (%s, %s)", serviceName, instance, zone)
result := ""
methods := []string{}
for m := range curQPS.perMethodQPS {
methods = append(methods, m)
}
sort.Strings(methods)
for _, m := range methods {
result += fmt.Sprintf(" - %s: %f\n", m, curQPS.perMethodQPS[m])
}
result += fmt.Sprintf(" Total: %f", curQPS.totalQPS)
// Send data to GCM.
// Total qps:
if err := sendDataToGCM(s, mdTotalQPS, curQPS.totalQPS, now, instance, zone, serviceName); err != nil {
return err
}
// Per-method qps:
for _, m := range methods {
curPerMethodQPS := curQPS.perMethodQPS[m]
if _, ok := aggByMethod[m]; !ok {
aggByMethod[m] = newAggregator()
}
aggByMethod[m].add(curPerMethodQPS)
if err := sendDataToGCM(s, mdPerMethodQPS, curPerMethodQPS, now, instance, zone, serviceName, m); err != nil {
return err
}
}
test.Pass(ctx, "%s:\n%s\n", label, result)
}
// Send aggregated data to GCM.
mdTotalAgg, err := gcm.GetMetric("service-qps-total-agg", projectFlag)
if err != nil {
return err
}
if err := sendAggregatedDataToGCM(ctx, s, mdTotalAgg, agg, now, serviceName); err != nil {
return err
}
for method, agg := range aggByMethod {
mdMethodAgg, err := gcm.GetMetric("service-qps-method-agg", projectFlag)
if err != nil {
return err
}
if err := sendAggregatedDataToGCM(ctx, s, mdMethodAgg, agg, now, serviceName, method); err != nil {
return err
}
}
}
if hasError {
return fmt.Errorf("failed to check RPC QPS for some services.")
}
return nil
}
func checkSingleServiceQPS(v23ctx *context.T, ctx *tool.Context, serviceName string) ([]qpsData, error) {
mountedName, err := monitoring.GetServiceMountedName(namespaceRootFlag, serviceName)
if err != nil {
return nil, err
}
// Resolve name and group results by routing ids.
groups, err := monitoring.ResolveAndProcessServiceName(v23ctx, ctx, serviceName, mountedName)
if err != nil {
return nil, err
}
// Get qps for each group.
qps := []qpsData{}
errors := []error{}
for _, group := range groups {
perMethodQPS, totalQPS, err := getQPS(v23ctx, ctx, &group)
if err != nil {
errors = append(errors, err)
continue
}
if len(perMethodQPS) == 0 {
errors = append(errors, fmt.Errorf("failed to check qps for service %q", serviceName))
continue
}
location, err := monitoring.GetServiceLocation(v23ctx, ctx, group)
if err != nil {
errors = append(errors, err)
continue
}
qps = append(qps, qpsData{
location: location,
perMethodQPS: perMethodQPS,
totalQPS: totalQPS,
})
}
if len(errors) == len(groups) {
return nil, fmt.Errorf("%v", errors)
}
return qps, nil
}
func getQPS(v23ctx *context.T, ctx *tool.Context, me *naming.MountEntry) (map[string]float64, float64, error) {
qpsResults, err := monitoring.GetStat(v23ctx, ctx, *me, qpsSuffix)
if err != nil {
return nil, -1, err
}
perMethodQPS := map[string]float64{}
totalQPS := 0.0
for _, r := range qpsResults {
data, ok := r.Value.(stats.HistogramValue)
if !ok {
return nil, -1, fmt.Errorf("invalid qps data: %v", r)
}
matches := qpsRE.FindStringSubmatch(r.Name)
if matches == nil {
continue
}
method := matches[1]
qps := (float64)(data.Count) / 60.0
perMethodQPS[method] += qps
totalQPS += qps
}
return perMethodQPS, totalQPS, nil
}