| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package main |
| |
| import ( |
| "bytes" |
| "encoding/json" |
| "fmt" |
| "io/ioutil" |
| "net/http" |
| "os" |
| "os/exec" |
| "path/filepath" |
| "regexp" |
| "strconv" |
| "strings" |
| "time" |
| |
| cloudmonitoring "google.golang.org/api/monitoring/v3" |
| |
| "v.io/jiri/collect" |
| "v.io/jiri/tool" |
| "v.io/v23/context" |
| "v.io/x/devtools/internal/test" |
| "v.io/x/lib/gcm" |
| ) |
| |
| const localCheckScript = `#!/bin/bash |
| |
| # Check cpu. |
| CPU_IDLE="$(top -bn2 | grep Cpu | tail -1 | sed -n 's/.*,\s*\(\S*\) id,.*/\1/p')" |
| CPU_USAGE="$(echo "scale=4; 100.0-${CPU_IDLE}" | bc)" |
| echo ${CPU_USAGE} > output_cpu_$(hostname) |
| |
| # Check memory. |
| MEM="$(free -m)" |
| MEM_TOTAL="$(echo "${MEM}" | grep Mem: | awk '{print $2}')" |
| MEM_FREE="$(echo "${MEM}" | grep /cache: | awk '{print $4}')" |
| MEM_USAGE="$(echo "scale=4; (${MEM_TOTAL}-${MEM_FREE})/${MEM_TOTAL}*100.0" | bc)" |
| echo ${MEM_USAGE} > output_mem_$(hostname) |
| |
| # Check disk. |
| DISK_PCT="$(df | grep /dev/sda1 | awk '{print $5}')" |
| DISK_USAGE="$(echo "scale=4; ${DISK_PCT::-1}" | bc)" |
| echo ${DISK_USAGE} > output_disk_$(hostname) |
| |
| # Check open TCP connections. |
| sudo netstat -anp --tcp | egrep "ESTABLISHED|CLOSE_WAIT|FIN_WAIT1|FIN_WAIT2" | wc -l > output_tcpconn_$(hostname) |
| |
| # Check nginx |
| if [[ "$(hostname)" == nginx* ]]; then |
| # Output is in the form of: |
| # |
| # Active connections: 12 |
| # server accepts handled requests |
| # 64028 64028 65047 |
| # Reading: 3 Writing: 4 Waiting: 5 |
| NGINX_STATS="$(curl -s local-stackdriver-agent.stackdriver.com/nginx_status)" |
| |
| # Calculate qps. |
| CUR_TIME_SECONDS="$(date +%s)" |
| CUR_TOTAL_REQS="$(echo "${NGINX_STATS}" | grep '^ ' | awk '{print $3}')" |
| LAST_REQS_INFO_FILE="/tmp/v-last-requests-info" |
| QPS=0 |
| if [ -f "${LAST_REQS_INFO_FILE=}" ]; then |
| LAST_TIME_SECONDS="$(cat "${LAST_REQS_INFO_FILE}" | awk '{print $1}')" |
| LAST_TOTAL_REQS="$(cat "${LAST_REQS_INFO_FILE}" | awk '{print $2}')" |
| QPS="$(echo "scale=2; (${CUR_TOTAL_REQS}-${LAST_TOTAL_REQS})/(${CUR_TIME_SECONDS}-${LAST_TIME_SECONDS})" | bc)" |
| fi |
| echo "${CUR_TIME_SECONDS} ${CUR_TOTAL_REQS}" > "${LAST_REQS_INFO_FILE}" |
| echo ${QPS} > output_nginx-qps_$(hostname) |
| |
| # Other stats. |
| echo "${NGINX_STATS}" | sed -n 's/^Active connections:\s*\(\d*\)/\1/p' > output_nginx-activeconn_$(hostname) |
| echo "${NGINX_STATS}" | grep Reading | awk '{print $2}' > output_nginx-readingconn_$(hostname) |
| echo "${NGINX_STATS}" | grep Reading | awk '{print $4}' > output_nginx-writingconn_$(hostname) |
| echo "${NGINX_STATS}" | grep Reading | awk '{print $6}' > output_nginx-waitingconn_$(hostname) |
| fi |
| ` |
| |
| var ( |
| pingResultRE = regexp.MustCompile(`(\S*)\s*:.*min/avg/max = [^/]*/([^/]*)/[^/]*`) |
| gceMetricNames = []string{"cpu-usage", "memory-usage", "disk-usage", "ping", "tcpconn"} |
| nginxMetricNames = []string{"healthCheckLatency", "qps", "active-connections", "reading-connections", "writing-connections", "waiting-connections"} |
| ) |
| |
| type gceInstanceData struct { |
| name string |
| zone string |
| ip string |
| stat *gceInstanceStat |
| nginxStat *nginxStat |
| } |
| |
| type gceInstanceStat struct { |
| cpuUsage float64 |
| memUsage float64 |
| diskUsage float64 |
| pingLatency float64 |
| tcpconn float64 |
| } |
| |
| type nginxStat struct { |
| healthCheckLatency float64 |
| qps float64 |
| activeConnections float64 |
| readingConnections float64 |
| writingConnections float64 |
| waitingConnections float64 |
| } |
| |
| // checkGCEInstances checks all GCE instances in a GCE project. |
| func checkGCEInstances(_ *context.T, ctx *tool.Context, s *cloudmonitoring.Service) error { |
| msg := "Getting instance list\n" |
| instances, err := getInstances(ctx) |
| if err != nil { |
| test.Fail(ctx, msg) |
| return err |
| } |
| test.Pass(ctx, msg) |
| |
| if err := invoker(ctx, "Check ping latencies\n", instances, checkPing); err != nil { |
| return err |
| } |
| |
| if err := invoker(ctx, "Check machine stats\n", instances, checkInstanceStats); err != nil { |
| return err |
| } |
| |
| if err := invoker(ctx, "Check nginx health\n", instances, checkNginxHealth); err != nil { |
| return err |
| } |
| |
| if err := invoker(ctx, "Send data to GCM\n", instances, sendToGCM); err != nil { |
| return err |
| } |
| |
| return nil |
| } |
| |
| func invoker(ctx *tool.Context, msg string, instances []*gceInstanceData, fn func(*tool.Context, []*gceInstanceData) error) error { |
| if err := fn(ctx, instances); err != nil { |
| test.Fail(ctx, msg) |
| return err |
| } |
| test.Pass(ctx, msg) |
| return nil |
| } |
| |
| // getInstances uses "gcloud compute instances list" to get a list of instances |
| // we care about. |
| func getInstances(ctx *tool.Context) ([]*gceInstanceData, error) { |
| var out bytes.Buffer |
| if err := ctx.NewSeq().Capture(&out, &out). |
| Last("gcloud", "-q", "--project="+projectFlag, "compute", "instances", "list", "--format=json"); err != nil { |
| return nil, err |
| } |
| var instances []struct { |
| Name string |
| Zone string |
| Status string |
| NetworkInterfaces []struct { |
| AccessConfigs []struct { |
| NatIP string |
| } |
| } |
| } |
| if err := json.Unmarshal(out.Bytes(), &instances); err != nil { |
| return nil, fmt.Errorf("Unmarshal() failed: %v", err) |
| } |
| filteredInstances := []*gceInstanceData{} |
| for _, instance := range instances { |
| if instance.Status != "RUNNING" { |
| continue |
| } |
| name := instance.Name |
| // We only collect data for the machines that are running vanadium services |
| // and nginx servers. |
| // New patterns should be added if new machine name patterns are introduced. |
| if strings.HasPrefix(name, "vanadium") || strings.HasPrefix(name, "nginx") { |
| filteredInstances = append(filteredInstances, &gceInstanceData{ |
| name: name, |
| zone: instance.Zone, |
| ip: instance.NetworkInterfaces[0].AccessConfigs[0].NatIP, |
| stat: &gceInstanceStat{ |
| cpuUsage: -1, |
| memUsage: -1, |
| diskUsage: -1, |
| pingLatency: -1, |
| tcpconn: -1, |
| }, |
| nginxStat: &nginxStat{ |
| healthCheckLatency: -1, |
| qps: -1, |
| activeConnections: -1, |
| readingConnections: -1, |
| writingConnections: -1, |
| waitingConnections: -1, |
| }, |
| }) |
| } |
| } |
| return filteredInstances, nil |
| } |
| |
| // checkPing checks the ping response from all instances using fping. |
| // |
| // By default, fping is installed on debian/ubuntu linux machines in GCE. For |
| // Macs, fping needs to be installed through homebrew. |
| func checkPing(ctx *tool.Context, instances []*gceInstanceData) error { |
| // Check the fping program. |
| if _, err := exec.LookPath("fping"); err != nil { |
| return fmt.Errorf("fping not installed.") |
| } |
| |
| // Run fping. |
| args := []string{ |
| "-q", |
| "-c3", // ping 3 times for each host. |
| } |
| ipToInstance := map[string]*gceInstanceData{} |
| for _, instance := range instances { |
| args = append(args, instance.ip) |
| ipToInstance[instance.ip] = instance |
| } |
| var out bytes.Buffer |
| if err := ctx.NewSeq().Capture(&out, &out).Last("fping", args...); err != nil { |
| // When some hosts are not reachable, the command's exit code will be non-zero. |
| fmt.Fprintf(ctx.Stdout(), "Output:\n%s\n", out.String()) |
| } |
| |
| // Parse output. |
| for _, line := range strings.Split(out.String(), "\n") { |
| matches := pingResultRE.FindStringSubmatch(line) |
| if matches != nil { |
| ip := matches[1] |
| strLatency := matches[2] |
| latency, err := strconv.ParseFloat(strLatency, 64) |
| if err != nil { |
| return fmt.Errorf("ParseFloat(%q) failed: %v", strLatency, err) |
| } |
| ipToInstance[ip].stat.pingLatency = latency |
| } |
| } |
| return nil |
| } |
| |
| // checkInstanceStats uses "vcloud run" command to run a script in remote |
| // instances and collects and processes results. |
| func checkInstanceStats(ctx *tool.Context, instances []*gceInstanceData) (e error) { |
| // Create the check script in a tmp dir. |
| s := ctx.NewSeq() |
| tmpdir, err := s.TempDir("", "") |
| if err != nil { |
| return err |
| } |
| defer collect.Error(func() error { return ctx.NewSeq().RemoveAll(tmpdir).Done() }, &e) |
| scriptPath := filepath.Join(tmpdir, "localtest.sh") |
| if err := s.WriteFile(scriptPath, []byte(localCheckScript), os.FileMode(0755)).Done(); err != nil { |
| return err |
| } |
| |
| // Run "vcloud run" on all nodes. |
| // This will run the local check script remotely and copy the "output" files back to |
| // <tmpdir>/<node_name>/<tmpdir2>/output_<checktype>_<nodename>. |
| nodes := []string{} |
| instanceByNode := map[string]*gceInstanceData{} |
| for _, instance := range instances { |
| nodes = append(nodes, instance.name) |
| instanceByNode[instance.name] = instance |
| } |
| vcloud := filepath.Join(binDirFlag, "vcloud") |
| args := []string{ |
| "--project=" + projectFlag, |
| "run", |
| "--outdir=" + tmpdir, |
| strings.Join(nodes, "|"), |
| scriptPath, |
| } |
| if err := s.Capture(ioutil.Discard, ctx.Stderr()). |
| Last(vcloud, args...); err != nil { |
| return err |
| } |
| |
| // Find and read output files. |
| if err := filepath.Walk(tmpdir, func(path string, info os.FileInfo, err error) error { |
| fileName := info.Name() |
| if strings.HasPrefix(fileName, "output_") { |
| // The filename is in the form of: output_<checktype>_<nodename>. |
| parts := strings.SplitN(fileName, "_", 3) |
| checkType := parts[1] |
| node := parts[2] |
| value, err := readFloatFromFile(ctx, path) |
| if err != nil { |
| return err |
| } |
| switch checkType { |
| case "cpu": |
| instanceByNode[node].stat.cpuUsage = value |
| case "mem": |
| instanceByNode[node].stat.memUsage = value |
| case "disk": |
| instanceByNode[node].stat.diskUsage = value |
| case "tcpconn": |
| instanceByNode[node].stat.tcpconn = value |
| case "nginx-qps": |
| instanceByNode[node].nginxStat.qps = value |
| case "nginx-activeconn": |
| instanceByNode[node].nginxStat.activeConnections = value |
| case "nginx-readingconn": |
| instanceByNode[node].nginxStat.readingConnections = value |
| case "nginx-writingconn": |
| instanceByNode[node].nginxStat.writingConnections = value |
| case "nginx-waitingconn": |
| instanceByNode[node].nginxStat.waitingConnections = value |
| } |
| } |
| return nil |
| }); err != nil { |
| return fmt.Errorf("Walk() failed: %v", err) |
| } |
| return nil |
| } |
| |
| // readFloatFromFile reads the given file's content as a number and converts it |
| // to float64. |
| func readFloatFromFile(ctx *tool.Context, path string) (float64, error) { |
| bytes, err := ctx.NewSeq().ReadFile(path) |
| if err != nil { |
| return -1, err |
| } |
| strValue := strings.TrimSpace(string(bytes)) |
| value, err := strconv.ParseFloat(strValue, 64) |
| if err != nil { |
| if strValue == "" { |
| value = 0 |
| } else { |
| return -1, fmt.Errorf("ParseFloat(%s) failed:\nfile: %s\nerr: %v", strValue, path, err) |
| } |
| } |
| return value, nil |
| } |
| |
| func checkNginxHealth(ctx *tool.Context, instances []*gceInstanceData) error { |
| timeout := time.Duration(5 * time.Second) |
| client := http.Client{ |
| Timeout: timeout, |
| } |
| hasError := false |
| for _, instance := range instances { |
| if !strings.HasPrefix(instance.name, "nginx-worker") { |
| continue |
| } |
| // Check the latency of worker's /health endpoint. |
| lat := 5000.0 // default to 5s |
| url := fmt.Sprintf("http://%s/health", instance.ip) |
| start := time.Now() |
| if resp, err := client.Get(url); err != nil { |
| hasError = true |
| fmt.Fprintf(ctx.Stderr(), "client.Get(%s) failed: %v\n", url, err) |
| } else if resp.StatusCode != http.StatusOK { |
| hasError = true |
| resp.Body.Close() |
| fmt.Fprintf(ctx.Stderr(), "got status code %v while checking %s, expected 200", resp.StatusCode, url) |
| } else { |
| resp.Body.Close() |
| // Convert to ms. |
| lat = float64(time.Now().Sub(start).Nanoseconds()) / 1000000.0 |
| if ctx.Verbose() { |
| fmt.Fprintf(ctx.Stdout(), "/health latency for %s: %f ms\n", url, lat) |
| } |
| } |
| instance.nginxStat.healthCheckLatency = lat |
| } |
| if hasError { |
| return fmt.Errorf("some checks failed") |
| } |
| return nil |
| } |
| |
| // sendToGCM sends instance stats data to GCM. |
| func sendToGCM(ctx *tool.Context, instances []*gceInstanceData) error { |
| s, err := gcm.Authenticate(keyFileFlag) |
| if err != nil { |
| return err |
| } |
| timeStr := time.Now().UTC().Format(time.RFC3339) |
| for _, instance := range instances { |
| msg := fmt.Sprintf("Send gce instance data for %s (%s)\n", instance.name, instance.zone) |
| for _, metricName := range gceMetricNames { |
| value := -1.0 |
| switch metricName { |
| case "cpu-usage": |
| value = instance.stat.cpuUsage |
| case "memory-usage": |
| value = instance.stat.memUsage |
| case "disk-usage": |
| value = instance.stat.diskUsage |
| case "ping": |
| value = instance.stat.pingLatency |
| case "tcpconn": |
| value = instance.stat.tcpconn |
| default: |
| test.Fail(ctx, msg) |
| return fmt.Errorf("Invalid metric name: %q", metricName) |
| } |
| // GCM treats 0 and missing value the same. |
| if value == 0 { |
| value = 0.0001 |
| } |
| if err := sendInstanceDataToGCM(s, "gce-instance", metricName, timeStr, instance, value); err != nil { |
| test.Fail(ctx, msg) |
| return fmt.Errorf("failed to add %q to GCM: %v\n", metricName, err) |
| } |
| } |
| |
| msg = fmt.Sprintf("Send nginx data for %s (%s)\n", instance.name, instance.zone) |
| for _, metricName := range nginxMetricNames { |
| value := -1.0 |
| switch metricName { |
| case "healthCheckLatency": |
| value = instance.nginxStat.healthCheckLatency |
| case "qps": |
| value = instance.nginxStat.qps |
| case "active-connections": |
| value = instance.nginxStat.activeConnections |
| case "reading-connections": |
| value = instance.nginxStat.readingConnections |
| case "writing-connections": |
| value = instance.nginxStat.writingConnections |
| case "waiting-connections": |
| value = instance.nginxStat.waitingConnections |
| default: |
| test.Fail(ctx, msg) |
| return fmt.Errorf("Invalid metric name: %q", metricName) |
| } |
| // GCM treats 0 and missing value the same. |
| if value == 0 { |
| value = 0.0001 |
| } |
| if err := sendInstanceDataToGCM(s, "nginx", metricName, timeStr, instance, value); err != nil { |
| test.Fail(ctx, msg) |
| return fmt.Errorf("failed to add %q to GCM: %v\n", metricName, err) |
| } |
| } |
| |
| test.Pass(ctx, msg) |
| } |
| return nil |
| } |
| |
| // sendInstanceDataToGCM sends a single instance's stat to GCM. |
| func sendInstanceDataToGCM(s *cloudmonitoring.Service, metricType, metricName, timeStr string, instance *gceInstanceData, value float64) error { |
| md, err := gcm.GetMetric(metricType, projectFlag) |
| if err != nil { |
| return err |
| } |
| if err := sendDataToGCM(s, md, value, timeStr, "", "", instance.name, instance.zone, metricName); err != nil { |
| return fmt.Errorf("failed to write timeseries: %v", err) |
| } |
| return nil |
| } |